MediaWiki
REL1_24
|
00001 <?php 00031 abstract class JobQueue { 00033 protected $wiki; 00034 00036 protected $type; 00037 00039 protected $order; 00040 00042 protected $claimTTL; 00043 00045 protected $maxTries; 00046 00048 protected $checkDelay; 00049 00051 protected $dupCache; 00052 00053 const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions 00054 00055 const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) 00056 00061 protected function __construct( array $params ) { 00062 $this->wiki = $params['wiki']; 00063 $this->type = $params['type']; 00064 $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; 00065 $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; 00066 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { 00067 $this->order = $params['order']; 00068 } else { 00069 $this->order = $this->optimalOrder(); 00070 } 00071 if ( !in_array( $this->order, $this->supportedOrders() ) ) { 00072 throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); 00073 } 00074 $this->checkDelay = !empty( $params['checkDelay'] ); 00075 if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { 00076 throw new MWException( __CLASS__ . " does not support delayed jobs." ); 00077 } 00078 $this->dupCache = wfGetCache( CACHE_ANYTHING ); 00079 } 00080 00112 final public static function factory( array $params ) { 00113 $class = $params['class']; 00114 if ( !class_exists( $class ) ) { 00115 throw new MWException( "Invalid job queue class '$class'." ); 00116 } 00117 $obj = new $class( $params ); 00118 if ( !( $obj instanceof self ) ) { 00119 throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); 00120 } 00121 00122 return $obj; 00123 } 00124 00128 final public function getWiki() { 00129 return $this->wiki; 00130 } 00131 00135 final public function getType() { 00136 return $this->type; 00137 } 00138 00142 final public function getOrder() { 00143 return $this->order; 00144 } 00145 00150 final public function delayedJobsEnabled() { 00151 return $this->checkDelay; 00152 } 00153 00159 abstract protected function supportedOrders(); 00160 00166 abstract protected function optimalOrder(); 00167 00173 protected function supportsDelayedJobs() { 00174 return false; // not implemented 00175 } 00176 00189 final public function isEmpty() { 00190 wfProfileIn( __METHOD__ ); 00191 $res = $this->doIsEmpty(); 00192 wfProfileOut( __METHOD__ ); 00193 00194 return $res; 00195 } 00196 00201 abstract protected function doIsEmpty(); 00202 00212 final public function getSize() { 00213 wfProfileIn( __METHOD__ ); 00214 $res = $this->doGetSize(); 00215 wfProfileOut( __METHOD__ ); 00216 00217 return $res; 00218 } 00219 00224 abstract protected function doGetSize(); 00225 00235 final public function getAcquiredCount() { 00236 wfProfileIn( __METHOD__ ); 00237 $res = $this->doGetAcquiredCount(); 00238 wfProfileOut( __METHOD__ ); 00239 00240 return $res; 00241 } 00242 00247 abstract protected function doGetAcquiredCount(); 00248 00259 final public function getDelayedCount() { 00260 wfProfileIn( __METHOD__ ); 00261 $res = $this->doGetDelayedCount(); 00262 wfProfileOut( __METHOD__ ); 00263 00264 return $res; 00265 } 00266 00271 protected function doGetDelayedCount() { 00272 return 0; // not implemented 00273 } 00274 00284 final public function getAbandonedCount() { 00285 wfProfileIn( __METHOD__ ); 00286 $res = $this->doGetAbandonedCount(); 00287 wfProfileOut( __METHOD__ ); 00288 00289 return $res; 00290 } 00291 00296 protected function doGetAbandonedCount() { 00297 return 0; // not implemented 00298 } 00299 00310 final public function push( $jobs, $flags = 0 ) { 00311 $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); 00312 } 00313 00324 final public function batchPush( array $jobs, $flags = 0 ) { 00325 if ( !count( $jobs ) ) { 00326 return; // nothing to do 00327 } 00328 00329 foreach ( $jobs as $job ) { 00330 if ( $job->getType() !== $this->type ) { 00331 throw new MWException( 00332 "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); 00333 } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { 00334 throw new MWException( 00335 "Got delayed '{$job->getType()}' job; delays are not supported." ); 00336 } 00337 } 00338 00339 wfProfileIn( __METHOD__ ); 00340 $this->doBatchPush( $jobs, $flags ); 00341 wfProfileOut( __METHOD__ ); 00342 } 00343 00349 abstract protected function doBatchPush( array $jobs, $flags ); 00350 00359 final public function pop() { 00360 global $wgJobClasses; 00361 00362 if ( $this->wiki !== wfWikiID() ) { 00363 throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." ); 00364 } elseif ( !isset( $wgJobClasses[$this->type] ) ) { 00365 // Do not pop jobs if there is no class for the queue type 00366 throw new MWException( "Unrecognized job type '{$this->type}'." ); 00367 } 00368 00369 wfProfileIn( __METHOD__ ); 00370 $job = $this->doPop(); 00371 wfProfileOut( __METHOD__ ); 00372 00373 // Flag this job as an old duplicate based on its "root" job... 00374 try { 00375 if ( $job && $this->isRootJobOldDuplicate( $job ) ) { 00376 JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki ); 00377 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op 00378 } 00379 } catch ( MWException $e ) { 00380 // don't lose jobs over this 00381 } 00382 00383 return $job; 00384 } 00385 00390 abstract protected function doPop(); 00391 00402 final public function ack( Job $job ) { 00403 if ( $job->getType() !== $this->type ) { 00404 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00405 } 00406 wfProfileIn( __METHOD__ ); 00407 $this->doAck( $job ); 00408 wfProfileOut( __METHOD__ ); 00409 } 00410 00415 abstract protected function doAck( Job $job ); 00416 00448 final public function deduplicateRootJob( Job $job ) { 00449 if ( $job->getType() !== $this->type ) { 00450 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00451 } 00452 wfProfileIn( __METHOD__ ); 00453 $ok = $this->doDeduplicateRootJob( $job ); 00454 wfProfileOut( __METHOD__ ); 00455 00456 return $ok; 00457 } 00458 00465 protected function doDeduplicateRootJob( Job $job ) { 00466 if ( !$job->hasRootJobParams() ) { 00467 throw new MWException( "Cannot register root job; missing parameters." ); 00468 } 00469 $params = $job->getRootJobParams(); 00470 00471 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 00472 // Callers should call batchInsert() and then this function so that if the insert 00473 // fails, the de-duplication registration will be aborted. Since the insert is 00474 // deferred till "transaction idle", do the same here, so that the ordering is 00475 // maintained. Having only the de-duplication registration succeed would cause 00476 // jobs to become no-ops without any actual jobs that made them redundant. 00477 $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job 00478 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { 00479 return true; // a newer version of this root job was enqueued 00480 } 00481 00482 // Update the timestamp of the last root job started at the location... 00483 return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); 00484 } 00485 00493 final protected function isRootJobOldDuplicate( Job $job ) { 00494 if ( $job->getType() !== $this->type ) { 00495 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00496 } 00497 wfProfileIn( __METHOD__ ); 00498 $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); 00499 wfProfileOut( __METHOD__ ); 00500 00501 return $isDuplicate; 00502 } 00503 00509 protected function doIsRootJobOldDuplicate( Job $job ) { 00510 if ( !$job->hasRootJobParams() ) { 00511 return false; // job has no de-deplication info 00512 } 00513 $params = $job->getRootJobParams(); 00514 00515 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 00516 // Get the last time this root job was enqueued 00517 $timestamp = $this->dupCache->get( $key ); 00518 00519 // Check if a new root job was started at the location after this one's... 00520 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); 00521 } 00522 00527 protected function getRootJobCacheKey( $signature ) { 00528 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00529 00530 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); 00531 } 00532 00540 final public function delete() { 00541 wfProfileIn( __METHOD__ ); 00542 $this->doDelete(); 00543 wfProfileOut( __METHOD__ ); 00544 } 00545 00550 protected function doDelete() { 00551 throw new MWException( "This method is not implemented." ); 00552 } 00553 00562 final public function waitForBackups() { 00563 wfProfileIn( __METHOD__ ); 00564 $this->doWaitForBackups(); 00565 wfProfileOut( __METHOD__ ); 00566 } 00567 00572 protected function doWaitForBackups() { 00573 } 00574 00587 final public function getPeriodicTasks() { 00588 $tasks = $this->doGetPeriodicTasks(); 00589 foreach ( $tasks as $name => &$def ) { 00590 $def['name'] = $name; 00591 } 00592 00593 return $tasks; 00594 } 00595 00600 protected function doGetPeriodicTasks() { 00601 return array(); 00602 } 00603 00609 final public function flushCaches() { 00610 wfProfileIn( __METHOD__ ); 00611 $this->doFlushCaches(); 00612 wfProfileOut( __METHOD__ ); 00613 } 00614 00619 protected function doFlushCaches() { 00620 } 00621 00630 abstract public function getAllQueuedJobs(); 00631 00640 public function getAllDelayedJobs() { 00641 return new ArrayIterator( array() ); // not implemented 00642 } 00643 00650 public function getCoalesceLocationInternal() { 00651 return null; 00652 } 00653 00663 final public function getSiblingQueuesWithJobs( array $types ) { 00664 $section = new ProfileSection( __METHOD__ ); 00665 00666 return $this->doGetSiblingQueuesWithJobs( $types ); 00667 } 00668 00674 protected function doGetSiblingQueuesWithJobs( array $types ) { 00675 return null; // not supported 00676 } 00677 00688 final public function getSiblingQueueSizes( array $types ) { 00689 $section = new ProfileSection( __METHOD__ ); 00690 00691 return $this->doGetSiblingQueueSizes( $types ); 00692 } 00693 00699 protected function doGetSiblingQueueSizes( array $types ) { 00700 return null; // not supported 00701 } 00702 00712 public static function incrStats( $key, $type, $delta = 1, $wiki = null ) { 00713 wfIncrStats( $key, $delta ); 00714 wfIncrStats( "{$key}-{$type}", $delta ); 00715 if ( $wiki !== null ) { 00716 wfIncrStats( "{$key}-{$type}-{$wiki}", $delta ); 00717 } 00718 } 00719 00727 public function setTestingPrefix( $key ) { 00728 throw new MWException( "Queue namespacing not supported for this queue type." ); 00729 } 00730 } 00731 00736 class JobQueueError extends MWException { 00737 } 00738 00739 class JobQueueConnectionError extends JobQueueError { 00740 }