MediaWiki  REL1_21
JobQueue.php
Go to the documentation of this file.
00001 <?php
00031 abstract class JobQueue {
00032         protected $wiki; // string; wiki ID
00033         protected $type; // string; job type
00034         protected $order; // string; job priority for pop()
00035         protected $claimTTL; // integer; seconds
00036         protected $maxTries; // integer; maximum number of times to try a job
00037 
00038         const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
00039 
00043         protected function __construct( array $params ) {
00044                 $this->wiki = $params['wiki'];
00045                 $this->type = $params['type'];
00046                 $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
00047                 $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
00048                 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
00049                         $this->order = $params['order'];
00050                 } else {
00051                         $this->order = $this->optimalOrder();
00052                 }
00053                 if ( !in_array( $this->order, $this->supportedOrders() ) ) {
00054                         throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
00055                 }
00056         }
00057 
00085         final public static function factory( array $params ) {
00086                 $class = $params['class'];
00087                 if ( !MWInit::classExists( $class ) ) {
00088                         throw new MWException( "Invalid job queue class '$class'." );
00089                 }
00090                 $obj = new $class( $params );
00091                 if ( !( $obj instanceof self ) ) {
00092                         throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
00093                 }
00094                 return $obj;
00095         }
00096 
00100         final public function getWiki() {
00101                 return $this->wiki;
00102         }
00103 
00107         final public function getType() {
00108                 return $this->type;
00109         }
00110 
00114         final public function getOrder() {
00115                 return $this->order;
00116         }
00117 
00121         abstract protected function supportedOrders();
00122 
00126         abstract protected function optimalOrder();
00127 
00140         final public function isEmpty() {
00141                 wfProfileIn( __METHOD__ );
00142                 $res = $this->doIsEmpty();
00143                 wfProfileOut( __METHOD__ );
00144                 return $res;
00145         }
00146 
00151         abstract protected function doIsEmpty();
00152 
00162         final public function getSize() {
00163                 wfProfileIn( __METHOD__ );
00164                 $res = $this->doGetSize();
00165                 wfProfileOut( __METHOD__ );
00166                 return $res;
00167         }
00168 
00173         abstract protected function doGetSize();
00174 
00184         final public function getAcquiredCount() {
00185                 wfProfileIn( __METHOD__ );
00186                 $res = $this->doGetAcquiredCount();
00187                 wfProfileOut( __METHOD__ );
00188                 return $res;
00189         }
00190 
00195         abstract protected function doGetAcquiredCount();
00196 
00207         final public function push( $jobs, $flags = 0 ) {
00208                 return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
00209         }
00210 
00221         final public function batchPush( array $jobs, $flags = 0 ) {
00222                 if ( !count( $jobs ) ) {
00223                         return true; // nothing to do
00224                 }
00225 
00226                 foreach ( $jobs as $job ) {
00227                         if ( $job->getType() !== $this->type ) {
00228                                 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00229                         }
00230                 }
00231 
00232                 wfProfileIn( __METHOD__ );
00233                 $ok = $this->doBatchPush( $jobs, $flags );
00234                 wfProfileOut( __METHOD__ );
00235                 return $ok;
00236         }
00237 
00242         abstract protected function doBatchPush( array $jobs, $flags );
00243 
00252         final public function pop() {
00253                 global $wgJobClasses;
00254 
00255                 if ( $this->wiki !== wfWikiID() ) {
00256                         throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
00257                 } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
00258                         // Do not pop jobs if there is no class for the queue type
00259                         throw new MWException( "Unrecognized job type '{$this->type}'." );
00260                 }
00261 
00262                 wfProfileIn( __METHOD__ );
00263                 $job = $this->doPop();
00264                 wfProfileOut( __METHOD__ );
00265                 return $job;
00266         }
00267 
00272         abstract protected function doPop();
00273 
00284         final public function ack( Job $job ) {
00285                 if ( $job->getType() !== $this->type ) {
00286                         throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00287                 }
00288                 wfProfileIn( __METHOD__ );
00289                 $ok = $this->doAck( $job );
00290                 wfProfileOut( __METHOD__ );
00291                 return $ok;
00292         }
00293 
00298         abstract protected function doAck( Job $job );
00299 
00331         final public function deduplicateRootJob( Job $job ) {
00332                 if ( $job->getType() !== $this->type ) {
00333                         throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
00334                 }
00335                 wfProfileIn( __METHOD__ );
00336                 $ok = $this->doDeduplicateRootJob( $job );
00337                 wfProfileOut( __METHOD__ );
00338                 return $ok;
00339         }
00340 
00346         protected function doDeduplicateRootJob( Job $job ) {
00347                 return true;
00348         }
00349 
00358         final public function waitForBackups() {
00359                 wfProfileIn( __METHOD__ );
00360                 $this->doWaitForBackups();
00361                 wfProfileOut( __METHOD__ );
00362         }
00363 
00368         protected function doWaitForBackups() {}
00369 
00382         final public function getPeriodicTasks() {
00383                 $tasks = $this->doGetPeriodicTasks();
00384                 foreach ( $tasks as $name => &$def ) {
00385                         $def['name'] = $name;
00386                 }
00387                 return $tasks;
00388         }
00389 
00394         protected function doGetPeriodicTasks() {
00395                 return array();
00396         }
00397 
00403         final public function flushCaches() {
00404                 wfProfileIn( __METHOD__ );
00405                 $this->doFlushCaches();
00406                 wfProfileOut( __METHOD__ );
00407         }
00408 
00413         protected function doFlushCaches() {}
00414 
00423         abstract public function getAllQueuedJobs();
00424 
00432         public function setTestingPrefix( $key ) {
00433                 throw new MWException( "Queue namespacing not supported for this queue type." );
00434         }
00435 }