MediaWiki  REL1_19
JobQueue.php
Go to the documentation of this file.
00001 <?php
00009 if ( !defined( 'MEDIAWIKI' ) ) {
00010         die( "This file is part of MediaWiki, it is not a valid entry point\n" );
00011 }
00012 
00018 abstract class Job {
00019 
00023         var $title;
00024 
00025         var $command,
00026                 $params,
00027                 $id,
00028                 $removeDuplicates,
00029                 $error;
00030 
00031         /*-------------------------------------------------------------------------
00032          * Abstract functions
00033          *------------------------------------------------------------------------*/
00034 
00039         abstract function run();
00040 
00041         /*-------------------------------------------------------------------------
00042          * Static functions
00043          *------------------------------------------------------------------------*/
00044 
00054         static function pop_type( $type ) {
00055                 wfProfilein( __METHOD__ );
00056 
00057                 $dbw = wfGetDB( DB_MASTER );
00058 
00059                 $dbw->begin();
00060 
00061                 $row = $dbw->selectRow(
00062                         'job',
00063                         '*',
00064                         array( 'job_cmd' => $type ),
00065                         __METHOD__,
00066                         array( 'LIMIT' => 1, 'FOR UPDATE' )
00067                 );
00068 
00069                 if ( $row === false ) {
00070                         $dbw->commit();
00071                         wfProfileOut( __METHOD__ );
00072                         return false;
00073                 }
00074 
00075                 /* Ensure we "own" this row */
00076                 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
00077                 $affected = $dbw->affectedRows();
00078                 $dbw->commit();
00079 
00080                 if ( $affected == 0 ) {
00081                         wfProfileOut( __METHOD__ );
00082                         return false;
00083                 }
00084 
00085                 wfIncrStats( 'job-pop' );
00086                 $namespace = $row->job_namespace;
00087                 $dbkey = $row->job_title;
00088                 $title = Title::makeTitleSafe( $namespace, $dbkey );
00089                 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ),
00090                         $row->job_id );
00091 
00092                 $job->removeDuplicates();
00093 
00094                 wfProfileOut( __METHOD__ );
00095                 return $job;
00096         }
00097 
00104         static function pop( $offset = 0 ) {
00105                 global $wgJobTypesExcludedFromDefaultQueue;
00106                 wfProfileIn( __METHOD__ );
00107 
00108                 $dbr = wfGetDB( DB_SLAVE );
00109 
00110                 /* Get a job from the slave, start with an offset,
00111                         scan full set afterwards, avoid hitting purged rows
00112 
00113                         NB: If random fetch previously was used, offset
00114                                 will always be ahead of few entries
00115                 */
00116                 $conditions = array();
00117                 if ( count( $wgJobTypesExcludedFromDefaultQueue ) != 0 ) {
00118                         foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) {
00119                                 $conditions[] = "job_cmd != " . $dbr->addQuotes( $cmdType );
00120                         }
00121                 }
00122                 $offset = intval( $offset );
00123                 $options = array( 'ORDER BY' => 'job_id', 'USE INDEX' => 'PRIMARY' );
00124 
00125                 $row = $dbr->selectRow( 'job', '*',
00126                         array_merge( $conditions, array( "job_id >= $offset" ) ),
00127                         __METHOD__,
00128                         $options
00129                 );
00130 
00131                 // Refetching without offset is needed as some of job IDs could have had delayed commits
00132                 // and have lower IDs than jobs already executed, blame concurrency :)
00133                 //
00134                 if ( $row === false ) {
00135                         if ( $offset != 0 ) {
00136                                 $row = $dbr->selectRow( 'job', '*', $conditions, __METHOD__, $options );
00137                         }
00138 
00139                         if ( $row === false ) {
00140                                 wfProfileOut( __METHOD__ );
00141                                 return false;
00142                         }
00143                 }
00144 
00145                 // Try to delete it from the master
00146                 $dbw = wfGetDB( DB_MASTER );
00147                 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
00148                 $affected = $dbw->affectedRows();
00149                 $dbw->commit();
00150 
00151                 if ( !$affected ) {
00152                         // Failed, someone else beat us to it
00153                         // Try getting a random row
00154                         $row = $dbw->selectRow( 'job', array( 'MIN(job_id) as minjob',
00155                                 'MAX(job_id) as maxjob' ), '1=1', __METHOD__ );
00156                         if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) {
00157                                 // No jobs to get
00158                                 wfProfileOut( __METHOD__ );
00159                                 return false;
00160                         }
00161                         // Get the random row
00162                         $row = $dbw->selectRow( 'job', '*',
00163                                 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ );
00164                         if ( $row === false ) {
00165                                 // Random job gone before we got the chance to select it
00166                                 // Give up
00167                                 wfProfileOut( __METHOD__ );
00168                                 return false;
00169                         }
00170                         // Delete the random row
00171                         $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
00172                         $affected = $dbw->affectedRows();
00173                         $dbw->commit();
00174 
00175                         if ( !$affected ) {
00176                                 // Random job gone before we exclusively deleted it
00177                                 // Give up
00178                                 wfProfileOut( __METHOD__ );
00179                                 return false;
00180                         }
00181                 }
00182 
00183                 // If execution got to here, there's a row in $row that has been deleted from the database
00184                 // by this thread. Hence the concurrent pop was successful.
00185                 wfIncrStats( 'job-pop' );
00186                 $namespace = $row->job_namespace;
00187                 $dbkey = $row->job_title;
00188                 $title = Title::makeTitleSafe( $namespace, $dbkey );
00189                 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
00190 
00191                 // Remove any duplicates it may have later in the queue
00192                 $job->removeDuplicates();
00193 
00194                 wfProfileOut( __METHOD__ );
00195                 return $job;
00196         }
00197 
00207         static function factory( $command, $title, $params = false, $id = 0 ) {
00208                 global $wgJobClasses;
00209                 if( isset( $wgJobClasses[$command] ) ) {
00210                         $class = $wgJobClasses[$command];
00211                         return new $class( $title, $params, $id );
00212                 }
00213                 throw new MWException( "Invalid job command `{$command}`" );
00214         }
00215 
00220         static function makeBlob( $params ) {
00221                 if ( $params !== false ) {
00222                         return serialize( $params );
00223                 } else {
00224                         return '';
00225                 }
00226         }
00227 
00232         static function extractBlob( $blob ) {
00233                 if ( (string)$blob !== '' ) {
00234                         return unserialize( $blob );
00235                 } else {
00236                         return false;
00237                 }
00238         }
00239 
00249         static function batchInsert( $jobs ) {
00250                 if ( !count( $jobs ) ) {
00251                         return;
00252                 }
00253                 $dbw = wfGetDB( DB_MASTER );
00254                 $rows = array();
00255 
00259                 foreach ( $jobs as $job ) {
00260                         $rows[] = $job->insertFields();
00261                         if ( count( $rows ) >= 50 ) {
00262                                 # Do a small transaction to avoid slave lag
00263                                 $dbw->begin();
00264                                 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
00265                                 $dbw->commit();
00266                                 $rows = array();
00267                         }
00268                 }
00269                 if ( $rows ) { // last chunk
00270                         $dbw->begin();
00271                         $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
00272                         $dbw->commit();
00273                 }
00274                 wfIncrStats( 'job-insert', count( $jobs ) );
00275         }
00276 
00286         static function safeBatchInsert( $jobs ) {
00287                 if ( !count( $jobs ) ) {
00288                         return;
00289                 }
00290                 $dbw = wfGetDB( DB_MASTER );
00291                 $rows = array();
00292                 foreach ( $jobs as $job ) {
00293                         $rows[] = $job->insertFields();
00294                         if ( count( $rows ) >= 500 ) {
00295                                 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
00296                                 $rows = array();
00297                         }
00298                 }
00299                 if ( $rows ) { // last chunk
00300                         $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
00301                 }
00302                 wfIncrStats( 'job-insert', count( $jobs ) );
00303         }
00304 
00305         /*-------------------------------------------------------------------------
00306          * Non-static functions
00307          *------------------------------------------------------------------------*/
00308 
00315         function __construct( $command, $title, $params = false, $id = 0 ) {
00316                 $this->command = $command;
00317                 $this->title = $title;
00318                 $this->params = $params;
00319                 $this->id = $id;
00320 
00321                 // A bit of premature generalisation
00322                 // Oh well, the whole class is premature generalisation really
00323                 $this->removeDuplicates = true;
00324         }
00325 
00330         function insert() {
00331                 $fields = $this->insertFields();
00332 
00333                 $dbw = wfGetDB( DB_MASTER );
00334 
00335                 if ( $this->removeDuplicates ) {
00336                         $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ );
00337                         if ( $dbw->numRows( $res ) ) {
00338                                 return true;
00339                         }
00340                 }
00341                 wfIncrStats( 'job-insert' );
00342                 return $dbw->insert( 'job', $fields, __METHOD__ );
00343         }
00344 
00348         protected function insertFields() {
00349                 $dbw = wfGetDB( DB_MASTER );
00350                 return array(
00351                         'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
00352                         'job_cmd' => $this->command,
00353                         'job_namespace' => $this->title->getNamespace(),
00354                         'job_title' => $this->title->getDBkey(),
00355                         'job_timestamp' => $dbw->timestamp(),
00356                         'job_params' => Job::makeBlob( $this->params )
00357                 );
00358         }
00359 
00364         function removeDuplicates() {
00365                 if ( !$this->removeDuplicates ) {
00366                         return;
00367                 }
00368 
00369                 $fields = $this->insertFields();
00370                 unset( $fields['job_id'] );
00371                 $dbw = wfGetDB( DB_MASTER );
00372                 $dbw->begin();
00373                 $dbw->delete( 'job', $fields, __METHOD__ );
00374                 $affected = $dbw->affectedRows();
00375                 $dbw->commit();
00376                 if ( $affected ) {
00377                         wfIncrStats( 'job-dup-delete', $affected );
00378                 }
00379         }
00380 
00384         function toString() {
00385                 $paramString = '';
00386                 if ( $this->params ) {
00387                         foreach ( $this->params as $key => $value ) {
00388                                 if ( $paramString != '' ) {
00389                                         $paramString .= ' ';
00390                                 }
00391                                 $paramString .= "$key=$value";
00392                         }
00393                 }
00394 
00395                 if ( is_object( $this->title ) ) {
00396                         $s = "{$this->command} " . $this->title->getPrefixedDBkey();
00397                         if ( $paramString !== '' ) {
00398                                 $s .= ' ' . $paramString;
00399                         }
00400                         return $s;
00401                 } else {
00402                         return "{$this->command} $paramString";
00403                 }
00404         }
00405 
00406         protected function setLastError( $error ) {
00407                 $this->error = $error;
00408         }
00409 
00410         function getLastError() {
00411                 return $this->error;
00412         }
00413 }