MediaWiki  REL1_20
Job.php
Go to the documentation of this file.
00001 <?php
00029 abstract class Job {
00030 
00034         var $title;
00035 
00036         var $command,
00037                 $params,
00038                 $id,
00039                 $removeDuplicates,
00040                 $error;
00041 
00042         /*-------------------------------------------------------------------------
00043          * Abstract functions
00044          *------------------------------------------------------------------------*/
00045 
00050         abstract function run();
00051 
00052         /*-------------------------------------------------------------------------
00053          * Static functions
00054          *------------------------------------------------------------------------*/
00055 
00065         static function pop_type( $type ) {
00066                 wfProfilein( __METHOD__ );
00067 
00068                 $dbw = wfGetDB( DB_MASTER );
00069 
00070                 $dbw->begin( __METHOD__ );
00071 
00072                 $row = $dbw->selectRow(
00073                         'job',
00074                         '*',
00075                         array( 'job_cmd' => $type ),
00076                         __METHOD__,
00077                         array( 'LIMIT' => 1, 'FOR UPDATE' )
00078                 );
00079 
00080                 if ( $row === false ) {
00081                         $dbw->commit( __METHOD__ );
00082                         wfProfileOut( __METHOD__ );
00083                         return false;
00084                 }
00085 
00086                 /* Ensure we "own" this row */
00087                 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
00088                 $affected = $dbw->affectedRows();
00089                 $dbw->commit( __METHOD__ );
00090 
00091                 if ( $affected == 0 ) {
00092                         wfProfileOut( __METHOD__ );
00093                         return false;
00094                 }
00095 
00096                 wfIncrStats( 'job-pop' );
00097                 $namespace = $row->job_namespace;
00098                 $dbkey = $row->job_title;
00099                 $title = Title::makeTitleSafe( $namespace, $dbkey );
00100                 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ),
00101                         $row->job_id );
00102 
00103                 $job->removeDuplicates();
00104 
00105                 wfProfileOut( __METHOD__ );
00106                 return $job;
00107         }
00108 
00115         static function pop( $offset = 0 ) {
00116                 wfProfileIn( __METHOD__ );
00117 
00118                 $dbr = wfGetDB( DB_SLAVE );
00119 
00120                 /* Get a job from the slave, start with an offset,
00121                         scan full set afterwards, avoid hitting purged rows
00122 
00123                         NB: If random fetch previously was used, offset
00124                                 will always be ahead of few entries
00125                 */
00126 
00127                 $conditions = self::defaultQueueConditions();
00128 
00129                 $offset = intval( $offset );
00130                 $options = array( 'ORDER BY' => 'job_id', 'USE INDEX' => 'PRIMARY' );
00131 
00132                 $row = $dbr->selectRow( 'job', '*',
00133                         array_merge( $conditions, array( "job_id >= $offset" ) ),
00134                         __METHOD__,
00135                         $options
00136                 );
00137 
00138                 // Refetching without offset is needed as some of job IDs could have had delayed commits
00139                 // and have lower IDs than jobs already executed, blame concurrency :)
00140                 //
00141                 if ( $row === false ) {
00142                         if ( $offset != 0 ) {
00143                                 $row = $dbr->selectRow( 'job', '*', $conditions, __METHOD__, $options );
00144                         }
00145 
00146                         if ( $row === false ) {
00147                                 wfProfileOut( __METHOD__ );
00148                                 return false;
00149                         }
00150                 }
00151 
00152                 // Try to delete it from the master
00153                 $dbw = wfGetDB( DB_MASTER );
00154                 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
00155                 $affected = $dbw->affectedRows();
00156 
00157                 if ( !$affected ) {
00158                         // Failed, someone else beat us to it
00159                         // Try getting a random row
00160                         $row = $dbw->selectRow( 'job', array( 'minjob' => 'MIN(job_id)',
00161                                 'maxjob' => 'MAX(job_id)' ), '1=1', __METHOD__ );
00162                         if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) {
00163                                 // No jobs to get
00164                                 wfProfileOut( __METHOD__ );
00165                                 return false;
00166                         }
00167                         // Get the random row
00168                         $row = $dbw->selectRow( 'job', '*',
00169                                 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ );
00170                         if ( $row === false ) {
00171                                 // Random job gone before we got the chance to select it
00172                                 // Give up
00173                                 wfProfileOut( __METHOD__ );
00174                                 return false;
00175                         }
00176                         // Delete the random row
00177                         $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
00178                         $affected = $dbw->affectedRows();
00179 
00180                         if ( !$affected ) {
00181                                 // Random job gone before we exclusively deleted it
00182                                 // Give up
00183                                 wfProfileOut( __METHOD__ );
00184                                 return false;
00185                         }
00186                 }
00187 
00188                 // If execution got to here, there's a row in $row that has been deleted from the database
00189                 // by this thread. Hence the concurrent pop was successful.
00190                 wfIncrStats( 'job-pop' );
00191                 $namespace = $row->job_namespace;
00192                 $dbkey = $row->job_title;
00193                 $title = Title::makeTitleSafe( $namespace, $dbkey );
00194 
00195                 if ( is_null( $title ) ) {
00196                         wfProfileOut( __METHOD__ );
00197                         return false;
00198                 }
00199 
00200                 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id );
00201 
00202                 // Remove any duplicates it may have later in the queue
00203                 $job->removeDuplicates();
00204 
00205                 wfProfileOut( __METHOD__ );
00206                 return $job;
00207         }
00208 
00219         static function factory( $command, Title $title, $params = false, $id = 0 ) {
00220                 global $wgJobClasses;
00221                 if( isset( $wgJobClasses[$command] ) ) {
00222                         $class = $wgJobClasses[$command];
00223                         return new $class( $title, $params, $id );
00224                 }
00225                 throw new MWException( "Invalid job command `{$command}`" );
00226         }
00227 
00232         static function makeBlob( $params ) {
00233                 if ( $params !== false ) {
00234                         return serialize( $params );
00235                 } else {
00236                         return '';
00237                 }
00238         }
00239 
00244         static function extractBlob( $blob ) {
00245                 if ( (string)$blob !== '' ) {
00246                         return unserialize( $blob );
00247                 } else {
00248                         return false;
00249                 }
00250         }
00251 
00261         static function batchInsert( $jobs ) {
00262                 if ( !count( $jobs ) ) {
00263                         return;
00264                 }
00265                 $dbw = wfGetDB( DB_MASTER );
00266                 $rows = array();
00267 
00271                 foreach ( $jobs as $job ) {
00272                         $rows[] = $job->insertFields();
00273                         if ( count( $rows ) >= 50 ) {
00274                                 # Do a small transaction to avoid slave lag
00275                                 $dbw->begin( __METHOD__ );
00276                                 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
00277                                 $dbw->commit( __METHOD__ );
00278                                 $rows = array();
00279                         }
00280                 }
00281                 if ( $rows ) { // last chunk
00282                         $dbw->begin( __METHOD__ );
00283                         $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
00284                         $dbw->commit( __METHOD__ );
00285                 }
00286                 wfIncrStats( 'job-insert', count( $jobs ) );
00287         }
00288 
00298         static function safeBatchInsert( $jobs ) {
00299                 if ( !count( $jobs ) ) {
00300                         return;
00301                 }
00302                 $dbw = wfGetDB( DB_MASTER );
00303                 $rows = array();
00304                 foreach ( $jobs as $job ) {
00305                         $rows[] = $job->insertFields();
00306                         if ( count( $rows ) >= 500 ) {
00307                                 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
00308                                 $rows = array();
00309                         }
00310                 }
00311                 if ( $rows ) { // last chunk
00312                         $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' );
00313                 }
00314                 wfIncrStats( 'job-insert', count( $jobs ) );
00315         }
00316 
00317 
00326         static function defaultQueueConditions( ) {
00327                 global $wgJobTypesExcludedFromDefaultQueue;
00328                 $conditions = array();
00329                 if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) {
00330                         $dbr = wfGetDB( DB_SLAVE );
00331                         foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) {
00332                                 $conditions[] = "job_cmd != " . $dbr->addQuotes( $cmdType );
00333                         }
00334                 }
00335                 return $conditions;
00336         }
00337 
00338         /*-------------------------------------------------------------------------
00339          * Non-static functions
00340          *------------------------------------------------------------------------*/
00341 
00348         function __construct( $command, $title, $params = false, $id = 0 ) {
00349                 $this->command = $command;
00350                 $this->title = $title;
00351                 $this->params = $params;
00352                 $this->id = $id;
00353 
00354                 // A bit of premature generalisation
00355                 // Oh well, the whole class is premature generalisation really
00356                 $this->removeDuplicates = true;
00357         }
00358 
00363         function insert() {
00364                 $fields = $this->insertFields();
00365 
00366                 $dbw = wfGetDB( DB_MASTER );
00367 
00368                 if ( $this->removeDuplicates ) {
00369                         $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ );
00370                         if ( $dbw->numRows( $res ) ) {
00371                                 return true;
00372                         }
00373                 }
00374                 wfIncrStats( 'job-insert' );
00375                 return $dbw->insert( 'job', $fields, __METHOD__ );
00376         }
00377 
00381         protected function insertFields() {
00382                 $dbw = wfGetDB( DB_MASTER );
00383                 return array(
00384                         'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
00385                         'job_cmd' => $this->command,
00386                         'job_namespace' => $this->title->getNamespace(),
00387                         'job_title' => $this->title->getDBkey(),
00388                         'job_timestamp' => $dbw->timestamp(),
00389                         'job_params' => Job::makeBlob( $this->params )
00390                 );
00391         }
00392 
00397         function removeDuplicates() {
00398                 if ( !$this->removeDuplicates ) {
00399                         return;
00400                 }
00401 
00402                 $fields = $this->insertFields();
00403                 unset( $fields['job_id'] );
00404                 unset( $fields['job_timestamp'] );
00405                 $dbw = wfGetDB( DB_MASTER );
00406                 $dbw->begin( __METHOD__ );
00407                 $dbw->delete( 'job', $fields, __METHOD__ );
00408                 $affected = $dbw->affectedRows();
00409                 $dbw->commit( __METHOD__ );
00410                 if ( $affected ) {
00411                         wfIncrStats( 'job-dup-delete', $affected );
00412                 }
00413         }
00414 
00418         function toString() {
00419                 $paramString = '';
00420                 if ( $this->params ) {
00421                         foreach ( $this->params as $key => $value ) {
00422                                 if ( $paramString != '' ) {
00423                                         $paramString .= ' ';
00424                                 }
00425                                 $paramString .= "$key=$value";
00426                         }
00427                 }
00428 
00429                 if ( is_object( $this->title ) ) {
00430                         $s = "{$this->command} " . $this->title->getPrefixedDBkey();
00431                         if ( $paramString !== '' ) {
00432                                 $s .= ' ' . $paramString;
00433                         }
00434                         return $s;
00435                 } else {
00436                         return "{$this->command} $paramString";
00437                 }
00438         }
00439 
00440         protected function setLastError( $error ) {
00441                 $this->error = $error;
00442         }
00443 
00444         function getLastError() {
00445                 return $this->error;
00446         }
00447 }