MediaWiki
REL1_22
|
00001 <?php 00031 abstract class JobQueue { 00032 protected $wiki; // string; wiki ID 00033 protected $type; // string; job type 00034 protected $order; // string; job priority for pop() 00035 protected $claimTTL; // integer; seconds 00036 protected $maxTries; // integer; maximum number of times to try a job 00037 protected $checkDelay; // boolean; allow delayed jobs 00038 00040 protected $dupCache; 00041 00042 const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions 00043 00044 const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) 00045 00049 protected function __construct( array $params ) { 00050 $this->wiki = $params['wiki']; 00051 $this->type = $params['type']; 00052 $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; 00053 $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; 00054 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { 00055 $this->order = $params['order']; 00056 } else { 00057 $this->order = $this->optimalOrder(); 00058 } 00059 if ( !in_array( $this->order, $this->supportedOrders() ) ) { 00060 throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); 00061 } 00062 $this->checkDelay = !empty( $params['checkDelay'] ); 00063 if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { 00064 throw new MWException( __CLASS__ . " does not support delayed jobs." ); 00065 } 00066 $this->dupCache = wfGetCache( CACHE_ANYTHING ); 00067 } 00068 00100 final public static function factory( array $params ) { 00101 $class = $params['class']; 00102 if ( !class_exists( $class ) ) { 00103 throw new MWException( "Invalid job queue class '$class'." ); 00104 } 00105 $obj = new $class( $params ); 00106 if ( !( $obj instanceof self ) ) { 00107 throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); 00108 } 00109 return $obj; 00110 } 00111 00115 final public function getWiki() { 00116 return $this->wiki; 00117 } 00118 00122 final public function getType() { 00123 return $this->type; 00124 } 00125 00129 final public function getOrder() { 00130 return $this->order; 00131 } 00132 00137 final public function delayedJobsEnabled() { 00138 return $this->checkDelay; 00139 } 00140 00146 abstract protected function supportedOrders(); 00147 00153 abstract protected function optimalOrder(); 00154 00160 protected function supportsDelayedJobs() { 00161 return false; // not implemented 00162 } 00163 00176 final public function isEmpty() { 00177 wfProfileIn( __METHOD__ ); 00178 $res = $this->doIsEmpty(); 00179 wfProfileOut( __METHOD__ ); 00180 return $res; 00181 } 00182 00187 abstract protected function doIsEmpty(); 00188 00198 final public function getSize() { 00199 wfProfileIn( __METHOD__ ); 00200 $res = $this->doGetSize(); 00201 wfProfileOut( __METHOD__ ); 00202 return $res; 00203 } 00204 00209 abstract protected function doGetSize(); 00210 00220 final public function getAcquiredCount() { 00221 wfProfileIn( __METHOD__ ); 00222 $res = $this->doGetAcquiredCount(); 00223 wfProfileOut( __METHOD__ ); 00224 return $res; 00225 } 00226 00231 abstract protected function doGetAcquiredCount(); 00232 00243 final public function getDelayedCount() { 00244 wfProfileIn( __METHOD__ ); 00245 $res = $this->doGetDelayedCount(); 00246 wfProfileOut( __METHOD__ ); 00247 return $res; 00248 } 00249 00254 protected function doGetDelayedCount() { 00255 return 0; // not implemented 00256 } 00257 00267 final public function getAbandonedCount() { 00268 wfProfileIn( __METHOD__ ); 00269 $res = $this->doGetAbandonedCount(); 00270 wfProfileOut( __METHOD__ ); 00271 return $res; 00272 } 00273 00278 protected function doGetAbandonedCount() { 00279 return 0; // not implemented 00280 } 00281 00292 final public function push( $jobs, $flags = 0 ) { 00293 return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); 00294 } 00295 00306 final public function batchPush( array $jobs, $flags = 0 ) { 00307 if ( !count( $jobs ) ) { 00308 return true; // nothing to do 00309 } 00310 00311 foreach ( $jobs as $job ) { 00312 if ( $job->getType() !== $this->type ) { 00313 throw new MWException( 00314 "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); 00315 } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { 00316 throw new MWException( 00317 "Got delayed '{$job->getType()}' job; delays are not supported." ); 00318 } 00319 } 00320 00321 wfProfileIn( __METHOD__ ); 00322 $ok = $this->doBatchPush( $jobs, $flags ); 00323 wfProfileOut( __METHOD__ ); 00324 return $ok; 00325 } 00326 00331 abstract protected function doBatchPush( array $jobs, $flags ); 00332 00341 final public function pop() { 00342 global $wgJobClasses; 00343 00344 if ( $this->wiki !== wfWikiID() ) { 00345 throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." ); 00346 } elseif ( !isset( $wgJobClasses[$this->type] ) ) { 00347 // Do not pop jobs if there is no class for the queue type 00348 throw new MWException( "Unrecognized job type '{$this->type}'." ); 00349 } 00350 00351 wfProfileIn( __METHOD__ ); 00352 $job = $this->doPop(); 00353 wfProfileOut( __METHOD__ ); 00354 00355 // Flag this job as an old duplicate based on its "root" job... 00356 try { 00357 if ( $job && $this->isRootJobOldDuplicate( $job ) ) { 00358 JobQueue::incrStats( 'job-pop-duplicate', $this->type ); 00359 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op 00360 } 00361 } catch ( MWException $e ) {} // don't lose jobs over this 00362 00363 return $job; 00364 } 00365 00370 abstract protected function doPop(); 00371 00382 final public function ack( Job $job ) { 00383 if ( $job->getType() !== $this->type ) { 00384 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00385 } 00386 wfProfileIn( __METHOD__ ); 00387 $ok = $this->doAck( $job ); 00388 wfProfileOut( __METHOD__ ); 00389 return $ok; 00390 } 00391 00396 abstract protected function doAck( Job $job ); 00397 00429 final public function deduplicateRootJob( Job $job ) { 00430 if ( $job->getType() !== $this->type ) { 00431 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00432 } 00433 wfProfileIn( __METHOD__ ); 00434 $ok = $this->doDeduplicateRootJob( $job ); 00435 wfProfileOut( __METHOD__ ); 00436 return $ok; 00437 } 00438 00444 protected function doDeduplicateRootJob( Job $job ) { 00445 if ( !$job->hasRootJobParams() ) { 00446 throw new MWException( "Cannot register root job; missing parameters." ); 00447 } 00448 $params = $job->getRootJobParams(); 00449 00450 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 00451 // Callers should call batchInsert() and then this function so that if the insert 00452 // fails, the de-duplication registration will be aborted. Since the insert is 00453 // deferred till "transaction idle", do the same here, so that the ordering is 00454 // maintained. Having only the de-duplication registration succeed would cause 00455 // jobs to become no-ops without any actual jobs that made them redundant. 00456 $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job 00457 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { 00458 return true; // a newer version of this root job was enqueued 00459 } 00460 00461 // Update the timestamp of the last root job started at the location... 00462 return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); 00463 } 00464 00472 final protected function isRootJobOldDuplicate( Job $job ) { 00473 if ( $job->getType() !== $this->type ) { 00474 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00475 } 00476 wfProfileIn( __METHOD__ ); 00477 $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); 00478 wfProfileOut( __METHOD__ ); 00479 return $isDuplicate; 00480 } 00481 00487 protected function doIsRootJobOldDuplicate( Job $job ) { 00488 if ( !$job->hasRootJobParams() ) { 00489 return false; // job has no de-deplication info 00490 } 00491 $params = $job->getRootJobParams(); 00492 00493 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 00494 // Get the last time this root job was enqueued 00495 $timestamp = $this->dupCache->get( $key ); 00496 00497 // Check if a new root job was started at the location after this one's... 00498 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); 00499 } 00500 00505 protected function getRootJobCacheKey( $signature ) { 00506 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00507 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); 00508 } 00509 00517 final public function delete() { 00518 wfProfileIn( __METHOD__ ); 00519 $res = $this->doDelete(); 00520 wfProfileOut( __METHOD__ ); 00521 return $res; 00522 } 00523 00528 protected function doDelete() { 00529 throw new MWException( "This method is not implemented." ); 00530 } 00531 00540 final public function waitForBackups() { 00541 wfProfileIn( __METHOD__ ); 00542 $this->doWaitForBackups(); 00543 wfProfileOut( __METHOD__ ); 00544 } 00545 00550 protected function doWaitForBackups() {} 00551 00564 final public function getPeriodicTasks() { 00565 $tasks = $this->doGetPeriodicTasks(); 00566 foreach ( $tasks as $name => &$def ) { 00567 $def['name'] = $name; 00568 } 00569 return $tasks; 00570 } 00571 00576 protected function doGetPeriodicTasks() { 00577 return array(); 00578 } 00579 00585 final public function flushCaches() { 00586 wfProfileIn( __METHOD__ ); 00587 $this->doFlushCaches(); 00588 wfProfileOut( __METHOD__ ); 00589 } 00590 00595 protected function doFlushCaches() {} 00596 00605 abstract public function getAllQueuedJobs(); 00606 00615 public function getAllDelayedJobs() { 00616 return new ArrayIterator( array() ); // not implemented 00617 } 00618 00625 public function getCoalesceLocationInternal() { 00626 return null; 00627 } 00628 00638 final public function getSiblingQueuesWithJobs( array $types ) { 00639 $section = new ProfileSection( __METHOD__ ); 00640 return $this->doGetSiblingQueuesWithJobs( $types ); 00641 } 00642 00648 protected function doGetSiblingQueuesWithJobs( array $types ) { 00649 return null; // not supported 00650 } 00651 00662 final public function getSiblingQueueSizes( array $types ) { 00663 $section = new ProfileSection( __METHOD__ ); 00664 return $this->doGetSiblingQueueSizes( $types ); 00665 } 00666 00672 protected function doGetSiblingQueueSizes( array $types ) { 00673 return null; // not supported 00674 } 00675 00684 public static function incrStats( $key, $type, $delta = 1 ) { 00685 wfIncrStats( $key, $delta ); 00686 wfIncrStats( "{$key}-{$type}", $delta ); 00687 } 00688 00696 public function setTestingPrefix( $key ) { 00697 throw new MWException( "Queue namespacing not supported for this queue type." ); 00698 } 00699 } 00700 00705 class JobQueueError extends MWException {} 00706 class JobQueueConnectionError extends JobQueueError {}