MediaWiki
REL1_20
|
00001 <?php 00029 abstract class Job { 00030 00034 var $title; 00035 00036 var $command, 00037 $params, 00038 $id, 00039 $removeDuplicates, 00040 $error; 00041 00042 /*------------------------------------------------------------------------- 00043 * Abstract functions 00044 *------------------------------------------------------------------------*/ 00045 00050 abstract function run(); 00051 00052 /*------------------------------------------------------------------------- 00053 * Static functions 00054 *------------------------------------------------------------------------*/ 00055 00065 static function pop_type( $type ) { 00066 wfProfilein( __METHOD__ ); 00067 00068 $dbw = wfGetDB( DB_MASTER ); 00069 00070 $dbw->begin( __METHOD__ ); 00071 00072 $row = $dbw->selectRow( 00073 'job', 00074 '*', 00075 array( 'job_cmd' => $type ), 00076 __METHOD__, 00077 array( 'LIMIT' => 1, 'FOR UPDATE' ) 00078 ); 00079 00080 if ( $row === false ) { 00081 $dbw->commit( __METHOD__ ); 00082 wfProfileOut( __METHOD__ ); 00083 return false; 00084 } 00085 00086 /* Ensure we "own" this row */ 00087 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); 00088 $affected = $dbw->affectedRows(); 00089 $dbw->commit( __METHOD__ ); 00090 00091 if ( $affected == 0 ) { 00092 wfProfileOut( __METHOD__ ); 00093 return false; 00094 } 00095 00096 wfIncrStats( 'job-pop' ); 00097 $namespace = $row->job_namespace; 00098 $dbkey = $row->job_title; 00099 $title = Title::makeTitleSafe( $namespace, $dbkey ); 00100 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), 00101 $row->job_id ); 00102 00103 $job->removeDuplicates(); 00104 00105 wfProfileOut( __METHOD__ ); 00106 return $job; 00107 } 00108 00115 static function pop( $offset = 0 ) { 00116 wfProfileIn( __METHOD__ ); 00117 00118 $dbr = wfGetDB( DB_SLAVE ); 00119 00120 /* Get a job from the slave, start with an offset, 00121 scan full set afterwards, avoid hitting purged rows 00122 00123 NB: If random fetch previously was used, offset 00124 will always be ahead of few entries 00125 */ 00126 00127 $conditions = self::defaultQueueConditions(); 00128 00129 $offset = intval( $offset ); 00130 $options = array( 'ORDER BY' => 'job_id', 'USE INDEX' => 'PRIMARY' ); 00131 00132 $row = $dbr->selectRow( 'job', '*', 00133 array_merge( $conditions, array( "job_id >= $offset" ) ), 00134 __METHOD__, 00135 $options 00136 ); 00137 00138 // Refetching without offset is needed as some of job IDs could have had delayed commits 00139 // and have lower IDs than jobs already executed, blame concurrency :) 00140 // 00141 if ( $row === false ) { 00142 if ( $offset != 0 ) { 00143 $row = $dbr->selectRow( 'job', '*', $conditions, __METHOD__, $options ); 00144 } 00145 00146 if ( $row === false ) { 00147 wfProfileOut( __METHOD__ ); 00148 return false; 00149 } 00150 } 00151 00152 // Try to delete it from the master 00153 $dbw = wfGetDB( DB_MASTER ); 00154 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); 00155 $affected = $dbw->affectedRows(); 00156 00157 if ( !$affected ) { 00158 // Failed, someone else beat us to it 00159 // Try getting a random row 00160 $row = $dbw->selectRow( 'job', array( 'minjob' => 'MIN(job_id)', 00161 'maxjob' => 'MAX(job_id)' ), '1=1', __METHOD__ ); 00162 if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) { 00163 // No jobs to get 00164 wfProfileOut( __METHOD__ ); 00165 return false; 00166 } 00167 // Get the random row 00168 $row = $dbw->selectRow( 'job', '*', 00169 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ ); 00170 if ( $row === false ) { 00171 // Random job gone before we got the chance to select it 00172 // Give up 00173 wfProfileOut( __METHOD__ ); 00174 return false; 00175 } 00176 // Delete the random row 00177 $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); 00178 $affected = $dbw->affectedRows(); 00179 00180 if ( !$affected ) { 00181 // Random job gone before we exclusively deleted it 00182 // Give up 00183 wfProfileOut( __METHOD__ ); 00184 return false; 00185 } 00186 } 00187 00188 // If execution got to here, there's a row in $row that has been deleted from the database 00189 // by this thread. Hence the concurrent pop was successful. 00190 wfIncrStats( 'job-pop' ); 00191 $namespace = $row->job_namespace; 00192 $dbkey = $row->job_title; 00193 $title = Title::makeTitleSafe( $namespace, $dbkey ); 00194 00195 if ( is_null( $title ) ) { 00196 wfProfileOut( __METHOD__ ); 00197 return false; 00198 } 00199 00200 $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id ); 00201 00202 // Remove any duplicates it may have later in the queue 00203 $job->removeDuplicates(); 00204 00205 wfProfileOut( __METHOD__ ); 00206 return $job; 00207 } 00208 00219 static function factory( $command, Title $title, $params = false, $id = 0 ) { 00220 global $wgJobClasses; 00221 if( isset( $wgJobClasses[$command] ) ) { 00222 $class = $wgJobClasses[$command]; 00223 return new $class( $title, $params, $id ); 00224 } 00225 throw new MWException( "Invalid job command `{$command}`" ); 00226 } 00227 00232 static function makeBlob( $params ) { 00233 if ( $params !== false ) { 00234 return serialize( $params ); 00235 } else { 00236 return ''; 00237 } 00238 } 00239 00244 static function extractBlob( $blob ) { 00245 if ( (string)$blob !== '' ) { 00246 return unserialize( $blob ); 00247 } else { 00248 return false; 00249 } 00250 } 00251 00261 static function batchInsert( $jobs ) { 00262 if ( !count( $jobs ) ) { 00263 return; 00264 } 00265 $dbw = wfGetDB( DB_MASTER ); 00266 $rows = array(); 00267 00271 foreach ( $jobs as $job ) { 00272 $rows[] = $job->insertFields(); 00273 if ( count( $rows ) >= 50 ) { 00274 # Do a small transaction to avoid slave lag 00275 $dbw->begin( __METHOD__ ); 00276 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); 00277 $dbw->commit( __METHOD__ ); 00278 $rows = array(); 00279 } 00280 } 00281 if ( $rows ) { // last chunk 00282 $dbw->begin( __METHOD__ ); 00283 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); 00284 $dbw->commit( __METHOD__ ); 00285 } 00286 wfIncrStats( 'job-insert', count( $jobs ) ); 00287 } 00288 00298 static function safeBatchInsert( $jobs ) { 00299 if ( !count( $jobs ) ) { 00300 return; 00301 } 00302 $dbw = wfGetDB( DB_MASTER ); 00303 $rows = array(); 00304 foreach ( $jobs as $job ) { 00305 $rows[] = $job->insertFields(); 00306 if ( count( $rows ) >= 500 ) { 00307 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); 00308 $rows = array(); 00309 } 00310 } 00311 if ( $rows ) { // last chunk 00312 $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); 00313 } 00314 wfIncrStats( 'job-insert', count( $jobs ) ); 00315 } 00316 00317 00326 static function defaultQueueConditions( ) { 00327 global $wgJobTypesExcludedFromDefaultQueue; 00328 $conditions = array(); 00329 if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) { 00330 $dbr = wfGetDB( DB_SLAVE ); 00331 foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) { 00332 $conditions[] = "job_cmd != " . $dbr->addQuotes( $cmdType ); 00333 } 00334 } 00335 return $conditions; 00336 } 00337 00338 /*------------------------------------------------------------------------- 00339 * Non-static functions 00340 *------------------------------------------------------------------------*/ 00341 00348 function __construct( $command, $title, $params = false, $id = 0 ) { 00349 $this->command = $command; 00350 $this->title = $title; 00351 $this->params = $params; 00352 $this->id = $id; 00353 00354 // A bit of premature generalisation 00355 // Oh well, the whole class is premature generalisation really 00356 $this->removeDuplicates = true; 00357 } 00358 00363 function insert() { 00364 $fields = $this->insertFields(); 00365 00366 $dbw = wfGetDB( DB_MASTER ); 00367 00368 if ( $this->removeDuplicates ) { 00369 $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ ); 00370 if ( $dbw->numRows( $res ) ) { 00371 return true; 00372 } 00373 } 00374 wfIncrStats( 'job-insert' ); 00375 return $dbw->insert( 'job', $fields, __METHOD__ ); 00376 } 00377 00381 protected function insertFields() { 00382 $dbw = wfGetDB( DB_MASTER ); 00383 return array( 00384 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), 00385 'job_cmd' => $this->command, 00386 'job_namespace' => $this->title->getNamespace(), 00387 'job_title' => $this->title->getDBkey(), 00388 'job_timestamp' => $dbw->timestamp(), 00389 'job_params' => Job::makeBlob( $this->params ) 00390 ); 00391 } 00392 00397 function removeDuplicates() { 00398 if ( !$this->removeDuplicates ) { 00399 return; 00400 } 00401 00402 $fields = $this->insertFields(); 00403 unset( $fields['job_id'] ); 00404 unset( $fields['job_timestamp'] ); 00405 $dbw = wfGetDB( DB_MASTER ); 00406 $dbw->begin( __METHOD__ ); 00407 $dbw->delete( 'job', $fields, __METHOD__ ); 00408 $affected = $dbw->affectedRows(); 00409 $dbw->commit( __METHOD__ ); 00410 if ( $affected ) { 00411 wfIncrStats( 'job-dup-delete', $affected ); 00412 } 00413 } 00414 00418 function toString() { 00419 $paramString = ''; 00420 if ( $this->params ) { 00421 foreach ( $this->params as $key => $value ) { 00422 if ( $paramString != '' ) { 00423 $paramString .= ' '; 00424 } 00425 $paramString .= "$key=$value"; 00426 } 00427 } 00428 00429 if ( is_object( $this->title ) ) { 00430 $s = "{$this->command} " . $this->title->getPrefixedDBkey(); 00431 if ( $paramString !== '' ) { 00432 $s .= ' ' . $paramString; 00433 } 00434 return $s; 00435 } else { 00436 return "{$this->command} $paramString"; 00437 } 00438 } 00439 00440 protected function setLastError( $error ) { 00441 $this->error = $error; 00442 } 00443 00444 function getLastError() { 00445 return $this->error; 00446 } 00447 }