MediaWiki  REL1_24
JobQueue.php
Go to the documentation of this file.
00001 <?php
00031 abstract class JobQueue {
00033     protected $wiki;
00034 
00036     protected $type;
00037 
00039     protected $order;
00040 
00042     protected $claimTTL;
00043 
00045     protected $maxTries;
00046 
00048     protected $checkDelay;
00049 
00051     protected $dupCache;
00052 
00053     const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
00054 
00055     const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
00056 
00061     protected function __construct( array $params ) {
00062         $this->wiki = $params['wiki'];
00063         $this->type = $params['type'];
00064         $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
00065         $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
00066         if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
00067             $this->order = $params['order'];
00068         } else {
00069             $this->order = $this->optimalOrder();
00070         }
00071         if ( !in_array( $this->order, $this->supportedOrders() ) ) {
00072             throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
00073         }
00074         $this->checkDelay = !empty( $params['checkDelay'] );
00075         if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
00076             throw new MWException( __CLASS__ . " does not support delayed jobs." );
00077         }
00078         $this->dupCache = wfGetCache( CACHE_ANYTHING );
00079     }
00080 
00112     final public static function factory( array $params ) {
00113         $class = $params['class'];
00114         if ( !class_exists( $class ) ) {
00115             throw new MWException( "Invalid job queue class '$class'." );
00116         }
00117         $obj = new $class( $params );
00118         if ( !( $obj instanceof self ) ) {
00119             throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
00120         }
00121 
00122         return $obj;
00123     }
00124 
00128     final public function getWiki() {
00129         return $this->wiki;
00130     }
00131 
00135     final public function getType() {
00136         return $this->type;
00137     }
00138 
00142     final public function getOrder() {
00143         return $this->order;
00144     }
00145 
00150     final public function delayedJobsEnabled() {
00151         return $this->checkDelay;
00152     }
00153 
00159     abstract protected function supportedOrders();
00160 
00166     abstract protected function optimalOrder();
00167 
00173     protected function supportsDelayedJobs() {
00174         return false; // not implemented
00175     }
00176 
00189     final public function isEmpty() {
00190         wfProfileIn( __METHOD__ );
00191         $res = $this->doIsEmpty();
00192         wfProfileOut( __METHOD__ );
00193 
00194         return $res;
00195     }
00196 
00201     abstract protected function doIsEmpty();
00202 
00212     final public function getSize() {
00213         wfProfileIn( __METHOD__ );
00214         $res = $this->doGetSize();
00215         wfProfileOut( __METHOD__ );
00216 
00217         return $res;
00218     }
00219 
00224     abstract protected function doGetSize();
00225 
00235     final public function getAcquiredCount() {
00236         wfProfileIn( __METHOD__ );
00237         $res = $this->doGetAcquiredCount();
00238         wfProfileOut( __METHOD__ );
00239 
00240         return $res;
00241     }
00242 
00247     abstract protected function doGetAcquiredCount();
00248 
00259     final public function getDelayedCount() {
00260         wfProfileIn( __METHOD__ );
00261         $res = $this->doGetDelayedCount();
00262         wfProfileOut( __METHOD__ );
00263 
00264         return $res;
00265     }
00266 
00271     protected function doGetDelayedCount() {
00272         return 0; // not implemented
00273     }
00274 
00284     final public function getAbandonedCount() {
00285         wfProfileIn( __METHOD__ );
00286         $res = $this->doGetAbandonedCount();
00287         wfProfileOut( __METHOD__ );
00288 
00289         return $res;
00290     }
00291 
00296     protected function doGetAbandonedCount() {
00297         return 0; // not implemented
00298     }
00299 
00310     final public function push( $jobs, $flags = 0 ) {
00311         $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
00312     }
00313 
00324     final public function batchPush( array $jobs, $flags = 0 ) {
00325         if ( !count( $jobs ) ) {
00326             return; // nothing to do
00327         }
00328 
00329         foreach ( $jobs as $job ) {
00330             if ( $job->getType() !== $this->type ) {
00331                 throw new MWException(
00332                     "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
00333             } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
00334                 throw new MWException(
00335                     "Got delayed '{$job->getType()}' job; delays are not supported." );
00336             }
00337         }
00338 
00339         wfProfileIn( __METHOD__ );
00340         $this->doBatchPush( $jobs, $flags );
00341         wfProfileOut( __METHOD__ );
00342     }
00343 
00349     abstract protected function doBatchPush( array $jobs, $flags );
00350 
00359     final public function pop() {
00360         global $wgJobClasses;
00361 
00362         if ( $this->wiki !== wfWikiID() ) {
00363             throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
00364         } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
00365             // Do not pop jobs if there is no class for the queue type
00366             throw new MWException( "Unrecognized job type '{$this->type}'." );
00367         }
00368 
00369         wfProfileIn( __METHOD__ );
00370         $job = $this->doPop();
00371         wfProfileOut( __METHOD__ );
00372 
00373         // Flag this job as an old duplicate based on its "root" job...
00374         try {
00375             if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
00376                 JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki );
00377                 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
00378             }
00379         } catch ( MWException $e ) {
00380             // don't lose jobs over this
00381         }
00382 
00383         return $job;
00384     }
00385 
00390     abstract protected function doPop();
00391 
00402     final public function ack( Job $job ) {
00403         if ( $job->getType() !== $this->type ) {
00404             throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00405         }
00406         wfProfileIn( __METHOD__ );
00407         $this->doAck( $job );
00408         wfProfileOut( __METHOD__ );
00409     }
00410 
00415     abstract protected function doAck( Job $job );
00416 
00448     final public function deduplicateRootJob( Job $job ) {
00449         if ( $job->getType() !== $this->type ) {
00450             throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00451         }
00452         wfProfileIn( __METHOD__ );
00453         $ok = $this->doDeduplicateRootJob( $job );
00454         wfProfileOut( __METHOD__ );
00455 
00456         return $ok;
00457     }
00458 
00465     protected function doDeduplicateRootJob( Job $job ) {
00466         if ( !$job->hasRootJobParams() ) {
00467             throw new MWException( "Cannot register root job; missing parameters." );
00468         }
00469         $params = $job->getRootJobParams();
00470 
00471         $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
00472         // Callers should call batchInsert() and then this function so that if the insert
00473         // fails, the de-duplication registration will be aborted. Since the insert is
00474         // deferred till "transaction idle", do the same here, so that the ordering is
00475         // maintained. Having only the de-duplication registration succeed would cause
00476         // jobs to become no-ops without any actual jobs that made them redundant.
00477         $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
00478         if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
00479             return true; // a newer version of this root job was enqueued
00480         }
00481 
00482         // Update the timestamp of the last root job started at the location...
00483         return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
00484     }
00485 
00493     final protected function isRootJobOldDuplicate( Job $job ) {
00494         if ( $job->getType() !== $this->type ) {
00495             throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00496         }
00497         wfProfileIn( __METHOD__ );
00498         $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
00499         wfProfileOut( __METHOD__ );
00500 
00501         return $isDuplicate;
00502     }
00503 
00509     protected function doIsRootJobOldDuplicate( Job $job ) {
00510         if ( !$job->hasRootJobParams() ) {
00511             return false; // job has no de-deplication info
00512         }
00513         $params = $job->getRootJobParams();
00514 
00515         $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
00516         // Get the last time this root job was enqueued
00517         $timestamp = $this->dupCache->get( $key );
00518 
00519         // Check if a new root job was started at the location after this one's...
00520         return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
00521     }
00522 
00527     protected function getRootJobCacheKey( $signature ) {
00528         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00529 
00530         return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
00531     }
00532 
00540     final public function delete() {
00541         wfProfileIn( __METHOD__ );
00542         $this->doDelete();
00543         wfProfileOut( __METHOD__ );
00544     }
00545 
00550     protected function doDelete() {
00551         throw new MWException( "This method is not implemented." );
00552     }
00553 
00562     final public function waitForBackups() {
00563         wfProfileIn( __METHOD__ );
00564         $this->doWaitForBackups();
00565         wfProfileOut( __METHOD__ );
00566     }
00567 
00572     protected function doWaitForBackups() {
00573     }
00574 
00587     final public function getPeriodicTasks() {
00588         $tasks = $this->doGetPeriodicTasks();
00589         foreach ( $tasks as $name => &$def ) {
00590             $def['name'] = $name;
00591         }
00592 
00593         return $tasks;
00594     }
00595 
00600     protected function doGetPeriodicTasks() {
00601         return array();
00602     }
00603 
00609     final public function flushCaches() {
00610         wfProfileIn( __METHOD__ );
00611         $this->doFlushCaches();
00612         wfProfileOut( __METHOD__ );
00613     }
00614 
00619     protected function doFlushCaches() {
00620     }
00621 
00630     abstract public function getAllQueuedJobs();
00631 
00640     public function getAllDelayedJobs() {
00641         return new ArrayIterator( array() ); // not implemented
00642     }
00643 
00650     public function getCoalesceLocationInternal() {
00651         return null;
00652     }
00653 
00663     final public function getSiblingQueuesWithJobs( array $types ) {
00664         $section = new ProfileSection( __METHOD__ );
00665 
00666         return $this->doGetSiblingQueuesWithJobs( $types );
00667     }
00668 
00674     protected function doGetSiblingQueuesWithJobs( array $types ) {
00675         return null; // not supported
00676     }
00677 
00688     final public function getSiblingQueueSizes( array $types ) {
00689         $section = new ProfileSection( __METHOD__ );
00690 
00691         return $this->doGetSiblingQueueSizes( $types );
00692     }
00693 
00699     protected function doGetSiblingQueueSizes( array $types ) {
00700         return null; // not supported
00701     }
00702 
00712     public static function incrStats( $key, $type, $delta = 1, $wiki = null ) {
00713         wfIncrStats( $key, $delta );
00714         wfIncrStats( "{$key}-{$type}", $delta );
00715         if ( $wiki !== null ) {
00716             wfIncrStats( "{$key}-{$type}-{$wiki}", $delta );
00717         }
00718     }
00719 
00727     public function setTestingPrefix( $key ) {
00728         throw new MWException( "Queue namespacing not supported for this queue type." );
00729     }
00730 }
00731 
00736 class JobQueueError extends MWException {
00737 }
00738 
00739 class JobQueueConnectionError extends JobQueueError {
00740 }