MediaWiki  REL1_22
JobQueue.php
Go to the documentation of this file.
00001 <?php
00031 abstract class JobQueue {
00032     protected $wiki; // string; wiki ID
00033     protected $type; // string; job type
00034     protected $order; // string; job priority for pop()
00035     protected $claimTTL; // integer; seconds
00036     protected $maxTries; // integer; maximum number of times to try a job
00037     protected $checkDelay; // boolean; allow delayed jobs
00038 
00040     protected $dupCache;
00041 
00042     const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
00043 
00044     const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
00045 
00049     protected function __construct( array $params ) {
00050         $this->wiki = $params['wiki'];
00051         $this->type = $params['type'];
00052         $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
00053         $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
00054         if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
00055             $this->order = $params['order'];
00056         } else {
00057             $this->order = $this->optimalOrder();
00058         }
00059         if ( !in_array( $this->order, $this->supportedOrders() ) ) {
00060             throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
00061         }
00062         $this->checkDelay = !empty( $params['checkDelay'] );
00063         if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
00064             throw new MWException( __CLASS__ . " does not support delayed jobs." );
00065         }
00066         $this->dupCache = wfGetCache( CACHE_ANYTHING );
00067     }
00068 
00100     final public static function factory( array $params ) {
00101         $class = $params['class'];
00102         if ( !class_exists( $class ) ) {
00103             throw new MWException( "Invalid job queue class '$class'." );
00104         }
00105         $obj = new $class( $params );
00106         if ( !( $obj instanceof self ) ) {
00107             throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
00108         }
00109         return $obj;
00110     }
00111 
00115     final public function getWiki() {
00116         return $this->wiki;
00117     }
00118 
00122     final public function getType() {
00123         return $this->type;
00124     }
00125 
00129     final public function getOrder() {
00130         return $this->order;
00131     }
00132 
00137     final public function delayedJobsEnabled() {
00138         return $this->checkDelay;
00139     }
00140 
00146     abstract protected function supportedOrders();
00147 
00153     abstract protected function optimalOrder();
00154 
00160     protected function supportsDelayedJobs() {
00161         return false; // not implemented
00162     }
00163 
00176     final public function isEmpty() {
00177         wfProfileIn( __METHOD__ );
00178         $res = $this->doIsEmpty();
00179         wfProfileOut( __METHOD__ );
00180         return $res;
00181     }
00182 
00187     abstract protected function doIsEmpty();
00188 
00198     final public function getSize() {
00199         wfProfileIn( __METHOD__ );
00200         $res = $this->doGetSize();
00201         wfProfileOut( __METHOD__ );
00202         return $res;
00203     }
00204 
00209     abstract protected function doGetSize();
00210 
00220     final public function getAcquiredCount() {
00221         wfProfileIn( __METHOD__ );
00222         $res = $this->doGetAcquiredCount();
00223         wfProfileOut( __METHOD__ );
00224         return $res;
00225     }
00226 
00231     abstract protected function doGetAcquiredCount();
00232 
00243     final public function getDelayedCount() {
00244         wfProfileIn( __METHOD__ );
00245         $res = $this->doGetDelayedCount();
00246         wfProfileOut( __METHOD__ );
00247         return $res;
00248     }
00249 
00254     protected function doGetDelayedCount() {
00255         return 0; // not implemented
00256     }
00257 
00267     final public function getAbandonedCount() {
00268         wfProfileIn( __METHOD__ );
00269         $res = $this->doGetAbandonedCount();
00270         wfProfileOut( __METHOD__ );
00271         return $res;
00272     }
00273 
00278     protected function doGetAbandonedCount() {
00279         return 0; // not implemented
00280     }
00281 
00292     final public function push( $jobs, $flags = 0 ) {
00293         return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
00294     }
00295 
00306     final public function batchPush( array $jobs, $flags = 0 ) {
00307         if ( !count( $jobs ) ) {
00308             return true; // nothing to do
00309         }
00310 
00311         foreach ( $jobs as $job ) {
00312             if ( $job->getType() !== $this->type ) {
00313                 throw new MWException(
00314                     "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
00315             } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
00316                 throw new MWException(
00317                     "Got delayed '{$job->getType()}' job; delays are not supported." );
00318             }
00319         }
00320 
00321         wfProfileIn( __METHOD__ );
00322         $ok = $this->doBatchPush( $jobs, $flags );
00323         wfProfileOut( __METHOD__ );
00324         return $ok;
00325     }
00326 
00331     abstract protected function doBatchPush( array $jobs, $flags );
00332 
00341     final public function pop() {
00342         global $wgJobClasses;
00343 
00344         if ( $this->wiki !== wfWikiID() ) {
00345             throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
00346         } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
00347             // Do not pop jobs if there is no class for the queue type
00348             throw new MWException( "Unrecognized job type '{$this->type}'." );
00349         }
00350 
00351         wfProfileIn( __METHOD__ );
00352         $job = $this->doPop();
00353         wfProfileOut( __METHOD__ );
00354 
00355         // Flag this job as an old duplicate based on its "root" job...
00356         try {
00357             if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
00358                 JobQueue::incrStats( 'job-pop-duplicate', $this->type );
00359                 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
00360             }
00361         } catch ( MWException $e ) {} // don't lose jobs over this
00362 
00363         return $job;
00364     }
00365 
00370     abstract protected function doPop();
00371 
00382     final public function ack( Job $job ) {
00383         if ( $job->getType() !== $this->type ) {
00384             throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00385         }
00386         wfProfileIn( __METHOD__ );
00387         $ok = $this->doAck( $job );
00388         wfProfileOut( __METHOD__ );
00389         return $ok;
00390     }
00391 
00396     abstract protected function doAck( Job $job );
00397 
00429     final public function deduplicateRootJob( Job $job ) {
00430         if ( $job->getType() !== $this->type ) {
00431             throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00432         }
00433         wfProfileIn( __METHOD__ );
00434         $ok = $this->doDeduplicateRootJob( $job );
00435         wfProfileOut( __METHOD__ );
00436         return $ok;
00437     }
00438 
00444     protected function doDeduplicateRootJob( Job $job ) {
00445         if ( !$job->hasRootJobParams() ) {
00446             throw new MWException( "Cannot register root job; missing parameters." );
00447         }
00448         $params = $job->getRootJobParams();
00449 
00450         $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
00451         // Callers should call batchInsert() and then this function so that if the insert
00452         // fails, the de-duplication registration will be aborted. Since the insert is
00453         // deferred till "transaction idle", do the same here, so that the ordering is
00454         // maintained. Having only the de-duplication registration succeed would cause
00455         // jobs to become no-ops without any actual jobs that made them redundant.
00456         $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
00457         if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
00458             return true; // a newer version of this root job was enqueued
00459         }
00460 
00461         // Update the timestamp of the last root job started at the location...
00462         return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
00463     }
00464 
00472     final protected function isRootJobOldDuplicate( Job $job ) {
00473         if ( $job->getType() !== $this->type ) {
00474             throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00475         }
00476         wfProfileIn( __METHOD__ );
00477         $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
00478         wfProfileOut( __METHOD__ );
00479         return $isDuplicate;
00480     }
00481 
00487     protected function doIsRootJobOldDuplicate( Job $job ) {
00488         if ( !$job->hasRootJobParams() ) {
00489             return false; // job has no de-deplication info
00490         }
00491         $params = $job->getRootJobParams();
00492 
00493         $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
00494         // Get the last time this root job was enqueued
00495         $timestamp = $this->dupCache->get( $key );
00496 
00497         // Check if a new root job was started at the location after this one's...
00498         return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
00499     }
00500 
00505     protected function getRootJobCacheKey( $signature ) {
00506         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00507         return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
00508     }
00509 
00517     final public function delete() {
00518         wfProfileIn( __METHOD__ );
00519         $res = $this->doDelete();
00520         wfProfileOut( __METHOD__ );
00521         return $res;
00522     }
00523 
00528     protected function doDelete() {
00529         throw new MWException( "This method is not implemented." );
00530     }
00531 
00540     final public function waitForBackups() {
00541         wfProfileIn( __METHOD__ );
00542         $this->doWaitForBackups();
00543         wfProfileOut( __METHOD__ );
00544     }
00545 
00550     protected function doWaitForBackups() {}
00551 
00564     final public function getPeriodicTasks() {
00565         $tasks = $this->doGetPeriodicTasks();
00566         foreach ( $tasks as $name => &$def ) {
00567             $def['name'] = $name;
00568         }
00569         return $tasks;
00570     }
00571 
00576     protected function doGetPeriodicTasks() {
00577         return array();
00578     }
00579 
00585     final public function flushCaches() {
00586         wfProfileIn( __METHOD__ );
00587         $this->doFlushCaches();
00588         wfProfileOut( __METHOD__ );
00589     }
00590 
00595     protected function doFlushCaches() {}
00596 
00605     abstract public function getAllQueuedJobs();
00606 
00615     public function getAllDelayedJobs() {
00616         return new ArrayIterator( array() ); // not implemented
00617     }
00618 
00625     public function getCoalesceLocationInternal() {
00626         return null;
00627     }
00628 
00638     final public function getSiblingQueuesWithJobs( array $types ) {
00639         $section = new ProfileSection( __METHOD__ );
00640         return $this->doGetSiblingQueuesWithJobs( $types );
00641     }
00642 
00648     protected function doGetSiblingQueuesWithJobs( array $types ) {
00649         return null; // not supported
00650     }
00651 
00662     final public function getSiblingQueueSizes( array $types ) {
00663         $section = new ProfileSection( __METHOD__ );
00664         return $this->doGetSiblingQueueSizes( $types );
00665     }
00666 
00672     protected function doGetSiblingQueueSizes( array $types ) {
00673         return null; // not supported
00674     }
00675 
00684     public static function incrStats( $key, $type, $delta = 1 ) {
00685         wfIncrStats( $key, $delta );
00686         wfIncrStats( "{$key}-{$type}", $delta );
00687     }
00688 
00696     public function setTestingPrefix( $key ) {
00697         throw new MWException( "Queue namespacing not supported for this queue type." );
00698     }
00699 }
00700 
00705 class JobQueueError extends MWException {}
00706 class JobQueueConnectionError extends JobQueueError {}