MediaWiki  REL1_23
JobQueue.php
Go to the documentation of this file.
00001 <?php
00031 abstract class JobQueue {
00033     protected $wiki;
00034 
00036     protected $type;
00037 
00039     protected $order;
00040 
00042     protected $claimTTL;
00043 
00045     protected $maxTries;
00046 
00048     protected $checkDelay;
00049 
00051     protected $dupCache;
00052 
00053     const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
00054 
00055     const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
00056 
00061     protected function __construct( array $params ) {
00062         $this->wiki = $params['wiki'];
00063         $this->type = $params['type'];
00064         $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
00065         $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
00066         if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
00067             $this->order = $params['order'];
00068         } else {
00069             $this->order = $this->optimalOrder();
00070         }
00071         if ( !in_array( $this->order, $this->supportedOrders() ) ) {
00072             throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
00073         }
00074         $this->checkDelay = !empty( $params['checkDelay'] );
00075         if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
00076             throw new MWException( __CLASS__ . " does not support delayed jobs." );
00077         }
00078         $this->dupCache = wfGetCache( CACHE_ANYTHING );
00079     }
00080 
00112     final public static function factory( array $params ) {
00113         $class = $params['class'];
00114         if ( !class_exists( $class ) ) {
00115             throw new MWException( "Invalid job queue class '$class'." );
00116         }
00117         $obj = new $class( $params );
00118         if ( !( $obj instanceof self ) ) {
00119             throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
00120         }
00121 
00122         return $obj;
00123     }
00124 
00128     final public function getWiki() {
00129         return $this->wiki;
00130     }
00131 
00135     final public function getType() {
00136         return $this->type;
00137     }
00138 
00142     final public function getOrder() {
00143         return $this->order;
00144     }
00145 
00150     final public function delayedJobsEnabled() {
00151         return $this->checkDelay;
00152     }
00153 
00159     abstract protected function supportedOrders();
00160 
00166     abstract protected function optimalOrder();
00167 
00173     protected function supportsDelayedJobs() {
00174         return false; // not implemented
00175     }
00176 
00189     final public function isEmpty() {
00190         wfProfileIn( __METHOD__ );
00191         $res = $this->doIsEmpty();
00192         wfProfileOut( __METHOD__ );
00193 
00194         return $res;
00195     }
00196 
00201     abstract protected function doIsEmpty();
00202 
00212     final public function getSize() {
00213         wfProfileIn( __METHOD__ );
00214         $res = $this->doGetSize();
00215         wfProfileOut( __METHOD__ );
00216 
00217         return $res;
00218     }
00219 
00224     abstract protected function doGetSize();
00225 
00235     final public function getAcquiredCount() {
00236         wfProfileIn( __METHOD__ );
00237         $res = $this->doGetAcquiredCount();
00238         wfProfileOut( __METHOD__ );
00239 
00240         return $res;
00241     }
00242 
00247     abstract protected function doGetAcquiredCount();
00248 
00259     final public function getDelayedCount() {
00260         wfProfileIn( __METHOD__ );
00261         $res = $this->doGetDelayedCount();
00262         wfProfileOut( __METHOD__ );
00263 
00264         return $res;
00265     }
00266 
00271     protected function doGetDelayedCount() {
00272         return 0; // not implemented
00273     }
00274 
00284     final public function getAbandonedCount() {
00285         wfProfileIn( __METHOD__ );
00286         $res = $this->doGetAbandonedCount();
00287         wfProfileOut( __METHOD__ );
00288 
00289         return $res;
00290     }
00291 
00296     protected function doGetAbandonedCount() {
00297         return 0; // not implemented
00298     }
00299 
00310     final public function push( $jobs, $flags = 0 ) {
00311         return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
00312     }
00313 
00324     final public function batchPush( array $jobs, $flags = 0 ) {
00325         if ( !count( $jobs ) ) {
00326             return true; // nothing to do
00327         }
00328 
00329         foreach ( $jobs as $job ) {
00330             if ( $job->getType() !== $this->type ) {
00331                 throw new MWException(
00332                     "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
00333             } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
00334                 throw new MWException(
00335                     "Got delayed '{$job->getType()}' job; delays are not supported." );
00336             }
00337         }
00338 
00339         wfProfileIn( __METHOD__ );
00340         $ok = $this->doBatchPush( $jobs, $flags );
00341         wfProfileOut( __METHOD__ );
00342 
00343         return $ok;
00344     }
00345 
00352     abstract protected function doBatchPush( array $jobs, $flags );
00353 
00362     final public function pop() {
00363         global $wgJobClasses;
00364 
00365         if ( $this->wiki !== wfWikiID() ) {
00366             throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
00367         } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
00368             // Do not pop jobs if there is no class for the queue type
00369             throw new MWException( "Unrecognized job type '{$this->type}'." );
00370         }
00371 
00372         wfProfileIn( __METHOD__ );
00373         $job = $this->doPop();
00374         wfProfileOut( __METHOD__ );
00375 
00376         // Flag this job as an old duplicate based on its "root" job...
00377         try {
00378             if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
00379                 JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki );
00380                 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
00381             }
00382         } catch ( MWException $e ) {
00383             // don't lose jobs over this
00384         }
00385 
00386         return $job;
00387     }
00388 
00393     abstract protected function doPop();
00394 
00405     final public function ack( Job $job ) {
00406         if ( $job->getType() !== $this->type ) {
00407             throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00408         }
00409         wfProfileIn( __METHOD__ );
00410         $ok = $this->doAck( $job );
00411         wfProfileOut( __METHOD__ );
00412 
00413         return $ok;
00414     }
00415 
00421     abstract protected function doAck( Job $job );
00422 
00454     final public function deduplicateRootJob( Job $job ) {
00455         if ( $job->getType() !== $this->type ) {
00456             throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00457         }
00458         wfProfileIn( __METHOD__ );
00459         $ok = $this->doDeduplicateRootJob( $job );
00460         wfProfileOut( __METHOD__ );
00461 
00462         return $ok;
00463     }
00464 
00471     protected function doDeduplicateRootJob( Job $job ) {
00472         if ( !$job->hasRootJobParams() ) {
00473             throw new MWException( "Cannot register root job; missing parameters." );
00474         }
00475         $params = $job->getRootJobParams();
00476 
00477         $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
00478         // Callers should call batchInsert() and then this function so that if the insert
00479         // fails, the de-duplication registration will be aborted. Since the insert is
00480         // deferred till "transaction idle", do the same here, so that the ordering is
00481         // maintained. Having only the de-duplication registration succeed would cause
00482         // jobs to become no-ops without any actual jobs that made them redundant.
00483         $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
00484         if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
00485             return true; // a newer version of this root job was enqueued
00486         }
00487 
00488         // Update the timestamp of the last root job started at the location...
00489         return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
00490     }
00491 
00499     final protected function isRootJobOldDuplicate( Job $job ) {
00500         if ( $job->getType() !== $this->type ) {
00501             throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00502         }
00503         wfProfileIn( __METHOD__ );
00504         $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
00505         wfProfileOut( __METHOD__ );
00506 
00507         return $isDuplicate;
00508     }
00509 
00515     protected function doIsRootJobOldDuplicate( Job $job ) {
00516         if ( !$job->hasRootJobParams() ) {
00517             return false; // job has no de-deplication info
00518         }
00519         $params = $job->getRootJobParams();
00520 
00521         $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
00522         // Get the last time this root job was enqueued
00523         $timestamp = $this->dupCache->get( $key );
00524 
00525         // Check if a new root job was started at the location after this one's...
00526         return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
00527     }
00528 
00533     protected function getRootJobCacheKey( $signature ) {
00534         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00535 
00536         return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
00537     }
00538 
00546     final public function delete() {
00547         wfProfileIn( __METHOD__ );
00548         $res = $this->doDelete();
00549         wfProfileOut( __METHOD__ );
00550 
00551         return $res;
00552     }
00553 
00559     protected function doDelete() {
00560         throw new MWException( "This method is not implemented." );
00561     }
00562 
00571     final public function waitForBackups() {
00572         wfProfileIn( __METHOD__ );
00573         $this->doWaitForBackups();
00574         wfProfileOut( __METHOD__ );
00575     }
00576 
00581     protected function doWaitForBackups() {
00582     }
00583 
00596     final public function getPeriodicTasks() {
00597         $tasks = $this->doGetPeriodicTasks();
00598         foreach ( $tasks as $name => &$def ) {
00599             $def['name'] = $name;
00600         }
00601 
00602         return $tasks;
00603     }
00604 
00609     protected function doGetPeriodicTasks() {
00610         return array();
00611     }
00612 
00618     final public function flushCaches() {
00619         wfProfileIn( __METHOD__ );
00620         $this->doFlushCaches();
00621         wfProfileOut( __METHOD__ );
00622     }
00623 
00628     protected function doFlushCaches() {
00629     }
00630 
00639     abstract public function getAllQueuedJobs();
00640 
00649     public function getAllDelayedJobs() {
00650         return new ArrayIterator( array() ); // not implemented
00651     }
00652 
00659     public function getCoalesceLocationInternal() {
00660         return null;
00661     }
00662 
00672     final public function getSiblingQueuesWithJobs( array $types ) {
00673         $section = new ProfileSection( __METHOD__ );
00674 
00675         return $this->doGetSiblingQueuesWithJobs( $types );
00676     }
00677 
00683     protected function doGetSiblingQueuesWithJobs( array $types ) {
00684         return null; // not supported
00685     }
00686 
00697     final public function getSiblingQueueSizes( array $types ) {
00698         $section = new ProfileSection( __METHOD__ );
00699 
00700         return $this->doGetSiblingQueueSizes( $types );
00701     }
00702 
00708     protected function doGetSiblingQueueSizes( array $types ) {
00709         return null; // not supported
00710     }
00711 
00721     public static function incrStats( $key, $type, $delta = 1, $wiki = null ) {
00722         wfIncrStats( $key, $delta );
00723         wfIncrStats( "{$key}-{$type}", $delta );
00724         if ( $wiki !== null ) {
00725             wfIncrStats( "{$key}-{$type}-{$wiki}", $delta );
00726         }
00727     }
00728 
00736     public function setTestingPrefix( $key ) {
00737         throw new MWException( "Queue namespacing not supported for this queue type." );
00738     }
00739 }
00740 
00745 class JobQueueError extends MWException {
00746 }
00747 
00748 class JobQueueConnectionError extends JobQueueError {
00749 }