MediaWiki
REL1_23
|
00001 <?php 00031 abstract class JobQueue { 00033 protected $wiki; 00034 00036 protected $type; 00037 00039 protected $order; 00040 00042 protected $claimTTL; 00043 00045 protected $maxTries; 00046 00048 protected $checkDelay; 00049 00051 protected $dupCache; 00052 00053 const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions 00054 00055 const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) 00056 00061 protected function __construct( array $params ) { 00062 $this->wiki = $params['wiki']; 00063 $this->type = $params['type']; 00064 $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; 00065 $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; 00066 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { 00067 $this->order = $params['order']; 00068 } else { 00069 $this->order = $this->optimalOrder(); 00070 } 00071 if ( !in_array( $this->order, $this->supportedOrders() ) ) { 00072 throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); 00073 } 00074 $this->checkDelay = !empty( $params['checkDelay'] ); 00075 if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { 00076 throw new MWException( __CLASS__ . " does not support delayed jobs." ); 00077 } 00078 $this->dupCache = wfGetCache( CACHE_ANYTHING ); 00079 } 00080 00112 final public static function factory( array $params ) { 00113 $class = $params['class']; 00114 if ( !class_exists( $class ) ) { 00115 throw new MWException( "Invalid job queue class '$class'." ); 00116 } 00117 $obj = new $class( $params ); 00118 if ( !( $obj instanceof self ) ) { 00119 throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); 00120 } 00121 00122 return $obj; 00123 } 00124 00128 final public function getWiki() { 00129 return $this->wiki; 00130 } 00131 00135 final public function getType() { 00136 return $this->type; 00137 } 00138 00142 final public function getOrder() { 00143 return $this->order; 00144 } 00145 00150 final public function delayedJobsEnabled() { 00151 return $this->checkDelay; 00152 } 00153 00159 abstract protected function supportedOrders(); 00160 00166 abstract protected function optimalOrder(); 00167 00173 protected function supportsDelayedJobs() { 00174 return false; // not implemented 00175 } 00176 00189 final public function isEmpty() { 00190 wfProfileIn( __METHOD__ ); 00191 $res = $this->doIsEmpty(); 00192 wfProfileOut( __METHOD__ ); 00193 00194 return $res; 00195 } 00196 00201 abstract protected function doIsEmpty(); 00202 00212 final public function getSize() { 00213 wfProfileIn( __METHOD__ ); 00214 $res = $this->doGetSize(); 00215 wfProfileOut( __METHOD__ ); 00216 00217 return $res; 00218 } 00219 00224 abstract protected function doGetSize(); 00225 00235 final public function getAcquiredCount() { 00236 wfProfileIn( __METHOD__ ); 00237 $res = $this->doGetAcquiredCount(); 00238 wfProfileOut( __METHOD__ ); 00239 00240 return $res; 00241 } 00242 00247 abstract protected function doGetAcquiredCount(); 00248 00259 final public function getDelayedCount() { 00260 wfProfileIn( __METHOD__ ); 00261 $res = $this->doGetDelayedCount(); 00262 wfProfileOut( __METHOD__ ); 00263 00264 return $res; 00265 } 00266 00271 protected function doGetDelayedCount() { 00272 return 0; // not implemented 00273 } 00274 00284 final public function getAbandonedCount() { 00285 wfProfileIn( __METHOD__ ); 00286 $res = $this->doGetAbandonedCount(); 00287 wfProfileOut( __METHOD__ ); 00288 00289 return $res; 00290 } 00291 00296 protected function doGetAbandonedCount() { 00297 return 0; // not implemented 00298 } 00299 00310 final public function push( $jobs, $flags = 0 ) { 00311 return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); 00312 } 00313 00324 final public function batchPush( array $jobs, $flags = 0 ) { 00325 if ( !count( $jobs ) ) { 00326 return true; // nothing to do 00327 } 00328 00329 foreach ( $jobs as $job ) { 00330 if ( $job->getType() !== $this->type ) { 00331 throw new MWException( 00332 "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); 00333 } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { 00334 throw new MWException( 00335 "Got delayed '{$job->getType()}' job; delays are not supported." ); 00336 } 00337 } 00338 00339 wfProfileIn( __METHOD__ ); 00340 $ok = $this->doBatchPush( $jobs, $flags ); 00341 wfProfileOut( __METHOD__ ); 00342 00343 return $ok; 00344 } 00345 00352 abstract protected function doBatchPush( array $jobs, $flags ); 00353 00362 final public function pop() { 00363 global $wgJobClasses; 00364 00365 if ( $this->wiki !== wfWikiID() ) { 00366 throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." ); 00367 } elseif ( !isset( $wgJobClasses[$this->type] ) ) { 00368 // Do not pop jobs if there is no class for the queue type 00369 throw new MWException( "Unrecognized job type '{$this->type}'." ); 00370 } 00371 00372 wfProfileIn( __METHOD__ ); 00373 $job = $this->doPop(); 00374 wfProfileOut( __METHOD__ ); 00375 00376 // Flag this job as an old duplicate based on its "root" job... 00377 try { 00378 if ( $job && $this->isRootJobOldDuplicate( $job ) ) { 00379 JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki ); 00380 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op 00381 } 00382 } catch ( MWException $e ) { 00383 // don't lose jobs over this 00384 } 00385 00386 return $job; 00387 } 00388 00393 abstract protected function doPop(); 00394 00405 final public function ack( Job $job ) { 00406 if ( $job->getType() !== $this->type ) { 00407 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00408 } 00409 wfProfileIn( __METHOD__ ); 00410 $ok = $this->doAck( $job ); 00411 wfProfileOut( __METHOD__ ); 00412 00413 return $ok; 00414 } 00415 00421 abstract protected function doAck( Job $job ); 00422 00454 final public function deduplicateRootJob( Job $job ) { 00455 if ( $job->getType() !== $this->type ) { 00456 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00457 } 00458 wfProfileIn( __METHOD__ ); 00459 $ok = $this->doDeduplicateRootJob( $job ); 00460 wfProfileOut( __METHOD__ ); 00461 00462 return $ok; 00463 } 00464 00471 protected function doDeduplicateRootJob( Job $job ) { 00472 if ( !$job->hasRootJobParams() ) { 00473 throw new MWException( "Cannot register root job; missing parameters." ); 00474 } 00475 $params = $job->getRootJobParams(); 00476 00477 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 00478 // Callers should call batchInsert() and then this function so that if the insert 00479 // fails, the de-duplication registration will be aborted. Since the insert is 00480 // deferred till "transaction idle", do the same here, so that the ordering is 00481 // maintained. Having only the de-duplication registration succeed would cause 00482 // jobs to become no-ops without any actual jobs that made them redundant. 00483 $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job 00484 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { 00485 return true; // a newer version of this root job was enqueued 00486 } 00487 00488 // Update the timestamp of the last root job started at the location... 00489 return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); 00490 } 00491 00499 final protected function isRootJobOldDuplicate( Job $job ) { 00500 if ( $job->getType() !== $this->type ) { 00501 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00502 } 00503 wfProfileIn( __METHOD__ ); 00504 $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); 00505 wfProfileOut( __METHOD__ ); 00506 00507 return $isDuplicate; 00508 } 00509 00515 protected function doIsRootJobOldDuplicate( Job $job ) { 00516 if ( !$job->hasRootJobParams() ) { 00517 return false; // job has no de-deplication info 00518 } 00519 $params = $job->getRootJobParams(); 00520 00521 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 00522 // Get the last time this root job was enqueued 00523 $timestamp = $this->dupCache->get( $key ); 00524 00525 // Check if a new root job was started at the location after this one's... 00526 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); 00527 } 00528 00533 protected function getRootJobCacheKey( $signature ) { 00534 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00535 00536 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); 00537 } 00538 00546 final public function delete() { 00547 wfProfileIn( __METHOD__ ); 00548 $res = $this->doDelete(); 00549 wfProfileOut( __METHOD__ ); 00550 00551 return $res; 00552 } 00553 00559 protected function doDelete() { 00560 throw new MWException( "This method is not implemented." ); 00561 } 00562 00571 final public function waitForBackups() { 00572 wfProfileIn( __METHOD__ ); 00573 $this->doWaitForBackups(); 00574 wfProfileOut( __METHOD__ ); 00575 } 00576 00581 protected function doWaitForBackups() { 00582 } 00583 00596 final public function getPeriodicTasks() { 00597 $tasks = $this->doGetPeriodicTasks(); 00598 foreach ( $tasks as $name => &$def ) { 00599 $def['name'] = $name; 00600 } 00601 00602 return $tasks; 00603 } 00604 00609 protected function doGetPeriodicTasks() { 00610 return array(); 00611 } 00612 00618 final public function flushCaches() { 00619 wfProfileIn( __METHOD__ ); 00620 $this->doFlushCaches(); 00621 wfProfileOut( __METHOD__ ); 00622 } 00623 00628 protected function doFlushCaches() { 00629 } 00630 00639 abstract public function getAllQueuedJobs(); 00640 00649 public function getAllDelayedJobs() { 00650 return new ArrayIterator( array() ); // not implemented 00651 } 00652 00659 public function getCoalesceLocationInternal() { 00660 return null; 00661 } 00662 00672 final public function getSiblingQueuesWithJobs( array $types ) { 00673 $section = new ProfileSection( __METHOD__ ); 00674 00675 return $this->doGetSiblingQueuesWithJobs( $types ); 00676 } 00677 00683 protected function doGetSiblingQueuesWithJobs( array $types ) { 00684 return null; // not supported 00685 } 00686 00697 final public function getSiblingQueueSizes( array $types ) { 00698 $section = new ProfileSection( __METHOD__ ); 00699 00700 return $this->doGetSiblingQueueSizes( $types ); 00701 } 00702 00708 protected function doGetSiblingQueueSizes( array $types ) { 00709 return null; // not supported 00710 } 00711 00721 public static function incrStats( $key, $type, $delta = 1, $wiki = null ) { 00722 wfIncrStats( $key, $delta ); 00723 wfIncrStats( "{$key}-{$type}", $delta ); 00724 if ( $wiki !== null ) { 00725 wfIncrStats( "{$key}-{$type}-{$wiki}", $delta ); 00726 } 00727 } 00728 00736 public function setTestingPrefix( $key ) { 00737 throw new MWException( "Queue namespacing not supported for this queue type." ); 00738 } 00739 } 00740 00745 class JobQueueError extends MWException { 00746 } 00747 00748 class JobQueueConnectionError extends JobQueueError { 00749 }