MediaWiki
REL1_19
|
00001 <?php 00009 if ( !defined( 'MEDIAWIKI' ) ) { 00010 die( "This file is part of MediaWiki, it is not a valid entry point\n" ); 00011 } 00012 00018 abstract class Job { 00019 00023 var $title; 00024 00025 var $command, 00026 $params, 00027 $id, 00028 $removeDuplicates, 00029 $error; 00030 00031 /*------------------------------------------------------------------------- 00032 * Abstract functions 00033 *------------------------------------------------------------------------*/ 00034 00039 abstract function run(); 00040 00041 /*------------------------------------------------------------------------- 00042 * Static functions 00043 *------------------------------------------------------------------------*/ 00044 00054 static function pop_type( $type ) { 00055 wfProfilein( __METHOD__ ); 00056 00057 $dbw = wfGetDB( DB_MASTER ); 00058 00059 $dbw->begin(); 00060 00061 $row = $dbw->selectRow( 00062 'job', 00063 '*', 00064 array( 'job_cmd' => $type ), 00065 __METHOD__, 00066 array( 'LIMIT' => 1, 'FOR UPDATE' ) 00067 ); 00068 00069 if ( $row === false ) { 00070 $dbw->commit(); 00071 wfProfileOut( __METHOD__ ); 00072 return false; 00073 } 00074 00075 /* Ensure we "own" this row */ 00076 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); 00077 $affected = $dbw->affectedRows(); 00078 $dbw->commit(); 00079 00080 if ( $affected == 0 ) { 00081 wfProfileOut( __METHOD__ ); 00082 return false; 00083 } 00084 00085 wfIncrStats( 'job-pop' ); 00086 $namespace = $row->job_namespace; 00087 $dbkey = $row->job_title; 00088 $title = Title::makeTitleSafe( $namespace, $dbkey ); 00089 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), 00090 $row->job_id ); 00091 00092 $job->removeDuplicates(); 00093 00094 wfProfileOut( __METHOD__ ); 00095 return $job; 00096 } 00097 00104 static function pop( $offset = 0 ) { 00105 global $wgJobTypesExcludedFromDefaultQueue; 00106 wfProfileIn( __METHOD__ ); 00107 00108 $dbr = wfGetDB( DB_SLAVE ); 00109 00110 /* Get a job from the slave, start with an offset, 00111 scan full set afterwards, avoid hitting purged rows 00112 00113 NB: If random fetch previously was used, offset 00114 will always be ahead of few entries 00115 */ 00116 $conditions = array(); 00117 if ( count( $wgJobTypesExcludedFromDefaultQueue ) != 0 ) { 00118 foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) { 00119 $conditions[] = "job_cmd != " . $dbr->addQuotes( $cmdType ); 00120 } 00121 } 00122 $offset = intval( $offset ); 00123 $options = array( 'ORDER BY' => 'job_id', 'USE INDEX' => 'PRIMARY' ); 00124 00125 $row = $dbr->selectRow( 'job', '*', 00126 array_merge( $conditions, array( "job_id >= $offset" ) ), 00127 __METHOD__, 00128 $options 00129 ); 00130 00131 // Refetching without offset is needed as some of job IDs could have had delayed commits 00132 // and have lower IDs than jobs already executed, blame concurrency :) 00133 // 00134 if ( $row === false ) { 00135 if ( $offset != 0 ) { 00136 $row = $dbr->selectRow( 'job', '*', $conditions, __METHOD__, $options ); 00137 } 00138 00139 if ( $row === false ) { 00140 wfProfileOut( __METHOD__ ); 00141 return false; 00142 } 00143 } 00144 00145 // Try to delete it from the master 00146 $dbw = wfGetDB( DB_MASTER ); 00147 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); 00148 $affected = $dbw->affectedRows(); 00149 $dbw->commit(); 00150 00151 if ( !$affected ) { 00152 // Failed, someone else beat us to it 00153 // Try getting a random row 00154 $row = $dbw->selectRow( 'job', array( 'MIN(job_id) as minjob', 00155 'MAX(job_id) as maxjob' ), '1=1', __METHOD__ ); 00156 if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) { 00157 // No jobs to get 00158 wfProfileOut( __METHOD__ ); 00159 return false; 00160 } 00161 // Get the random row 00162 $row = $dbw->selectRow( 'job', '*', 00163 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ ); 00164 if ( $row === false ) { 00165 // Random job gone before we got the chance to select it 00166 // Give up 00167 wfProfileOut( __METHOD__ ); 00168 return false; 00169 } 00170 // Delete the random row 00171 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); 00172 $affected = $dbw->affectedRows(); 00173 $dbw->commit(); 00174 00175 if ( !$affected ) { 00176 // Random job gone before we exclusively deleted it 00177 // Give up 00178 wfProfileOut( __METHOD__ ); 00179 return false; 00180 } 00181 } 00182 00183 // If execution got to here, there's a row in $row that has been deleted from the database 00184 // by this thread. Hence the concurrent pop was successful. 00185 wfIncrStats( 'job-pop' ); 00186 $namespace = $row->job_namespace; 00187 $dbkey = $row->job_title; 00188 $title = Title::makeTitleSafe( $namespace, $dbkey ); 00189 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id ); 00190 00191 // Remove any duplicates it may have later in the queue 00192 $job->removeDuplicates(); 00193 00194 wfProfileOut( __METHOD__ ); 00195 return $job; 00196 } 00197 00207 static function factory( $command, $title, $params = false, $id = 0 ) { 00208 global $wgJobClasses; 00209 if( isset( $wgJobClasses[$command] ) ) { 00210 $class = $wgJobClasses[$command]; 00211 return new $class( $title, $params, $id ); 00212 } 00213 throw new MWException( "Invalid job command `{$command}`" ); 00214 } 00215 00220 static function makeBlob( $params ) { 00221 if ( $params !== false ) { 00222 return serialize( $params ); 00223 } else { 00224 return ''; 00225 } 00226 } 00227 00232 static function extractBlob( $blob ) { 00233 if ( (string)$blob !== '' ) { 00234 return unserialize( $blob ); 00235 } else { 00236 return false; 00237 } 00238 } 00239 00249 static function batchInsert( $jobs ) { 00250 if ( !count( $jobs ) ) { 00251 return; 00252 } 00253 $dbw = wfGetDB( DB_MASTER ); 00254 $rows = array(); 00255 00259 foreach ( $jobs as $job ) { 00260 $rows[] = $job->insertFields(); 00261 if ( count( $rows ) >= 50 ) { 00262 # Do a small transaction to avoid slave lag 00263 $dbw->begin(); 00264 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); 00265 $dbw->commit(); 00266 $rows = array(); 00267 } 00268 } 00269 if ( $rows ) { // last chunk 00270 $dbw->begin(); 00271 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); 00272 $dbw->commit(); 00273 } 00274 wfIncrStats( 'job-insert', count( $jobs ) ); 00275 } 00276 00286 static function safeBatchInsert( $jobs ) { 00287 if ( !count( $jobs ) ) { 00288 return; 00289 } 00290 $dbw = wfGetDB( DB_MASTER ); 00291 $rows = array(); 00292 foreach ( $jobs as $job ) { 00293 $rows[] = $job->insertFields(); 00294 if ( count( $rows ) >= 500 ) { 00295 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); 00296 $rows = array(); 00297 } 00298 } 00299 if ( $rows ) { // last chunk 00300 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); 00301 } 00302 wfIncrStats( 'job-insert', count( $jobs ) ); 00303 } 00304 00305 /*------------------------------------------------------------------------- 00306 * Non-static functions 00307 *------------------------------------------------------------------------*/ 00308 00315 function __construct( $command, $title, $params = false, $id = 0 ) { 00316 $this->command = $command; 00317 $this->title = $title; 00318 $this->params = $params; 00319 $this->id = $id; 00320 00321 // A bit of premature generalisation 00322 // Oh well, the whole class is premature generalisation really 00323 $this->removeDuplicates = true; 00324 } 00325 00330 function insert() { 00331 $fields = $this->insertFields(); 00332 00333 $dbw = wfGetDB( DB_MASTER ); 00334 00335 if ( $this->removeDuplicates ) { 00336 $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ ); 00337 if ( $dbw->numRows( $res ) ) { 00338 return true; 00339 } 00340 } 00341 wfIncrStats( 'job-insert' ); 00342 return $dbw->insert( 'job', $fields, __METHOD__ ); 00343 } 00344 00348 protected function insertFields() { 00349 $dbw = wfGetDB( DB_MASTER ); 00350 return array( 00351 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), 00352 'job_cmd' => $this->command, 00353 'job_namespace' => $this->title->getNamespace(), 00354 'job_title' => $this->title->getDBkey(), 00355 'job_timestamp' => $dbw->timestamp(), 00356 'job_params' => Job::makeBlob( $this->params ) 00357 ); 00358 } 00359 00364 function removeDuplicates() { 00365 if ( !$this->removeDuplicates ) { 00366 return; 00367 } 00368 00369 $fields = $this->insertFields(); 00370 unset( $fields['job_id'] ); 00371 $dbw = wfGetDB( DB_MASTER ); 00372 $dbw->begin(); 00373 $dbw->delete( 'job', $fields, __METHOD__ ); 00374 $affected = $dbw->affectedRows(); 00375 $dbw->commit(); 00376 if ( $affected ) { 00377 wfIncrStats( 'job-dup-delete', $affected ); 00378 } 00379 } 00380 00384 function toString() { 00385 $paramString = ''; 00386 if ( $this->params ) { 00387 foreach ( $this->params as $key => $value ) { 00388 if ( $paramString != '' ) { 00389 $paramString .= ' '; 00390 } 00391 $paramString .= "$key=$value"; 00392 } 00393 } 00394 00395 if ( is_object( $this->title ) ) { 00396 $s = "{$this->command} " . $this->title->getPrefixedDBkey(); 00397 if ( $paramString !== '' ) { 00398 $s .= ' ' . $paramString; 00399 } 00400 return $s; 00401 } else { 00402 return "{$this->command} $paramString"; 00403 } 00404 } 00405 00406 protected function setLastError( $error ) { 00407 $this->error = $error; 00408 } 00409 00410 function getLastError() { 00411 return $this->error; 00412 } 00413 }