MediaWiki
REL1_21
|
00001 <?php 00031 abstract class JobQueue { 00032 protected $wiki; // string; wiki ID 00033 protected $type; // string; job type 00034 protected $order; // string; job priority for pop() 00035 protected $claimTTL; // integer; seconds 00036 protected $maxTries; // integer; maximum number of times to try a job 00037 00038 const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions 00039 00043 protected function __construct( array $params ) { 00044 $this->wiki = $params['wiki']; 00045 $this->type = $params['type']; 00046 $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; 00047 $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; 00048 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { 00049 $this->order = $params['order']; 00050 } else { 00051 $this->order = $this->optimalOrder(); 00052 } 00053 if ( !in_array( $this->order, $this->supportedOrders() ) ) { 00054 throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); 00055 } 00056 } 00057 00085 final public static function factory( array $params ) { 00086 $class = $params['class']; 00087 if ( !MWInit::classExists( $class ) ) { 00088 throw new MWException( "Invalid job queue class '$class'." ); 00089 } 00090 $obj = new $class( $params ); 00091 if ( !( $obj instanceof self ) ) { 00092 throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); 00093 } 00094 return $obj; 00095 } 00096 00100 final public function getWiki() { 00101 return $this->wiki; 00102 } 00103 00107 final public function getType() { 00108 return $this->type; 00109 } 00110 00114 final public function getOrder() { 00115 return $this->order; 00116 } 00117 00121 abstract protected function supportedOrders(); 00122 00126 abstract protected function optimalOrder(); 00127 00140 final public function isEmpty() { 00141 wfProfileIn( __METHOD__ ); 00142 $res = $this->doIsEmpty(); 00143 wfProfileOut( __METHOD__ ); 00144 return $res; 00145 } 00146 00151 abstract protected function doIsEmpty(); 00152 00162 final public function getSize() { 00163 wfProfileIn( __METHOD__ ); 00164 $res = $this->doGetSize(); 00165 wfProfileOut( __METHOD__ ); 00166 return $res; 00167 } 00168 00173 abstract protected function doGetSize(); 00174 00184 final public function getAcquiredCount() { 00185 wfProfileIn( __METHOD__ ); 00186 $res = $this->doGetAcquiredCount(); 00187 wfProfileOut( __METHOD__ ); 00188 return $res; 00189 } 00190 00195 abstract protected function doGetAcquiredCount(); 00196 00207 final public function push( $jobs, $flags = 0 ) { 00208 return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); 00209 } 00210 00221 final public function batchPush( array $jobs, $flags = 0 ) { 00222 if ( !count( $jobs ) ) { 00223 return true; // nothing to do 00224 } 00225 00226 foreach ( $jobs as $job ) { 00227 if ( $job->getType() !== $this->type ) { 00228 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00229 } 00230 } 00231 00232 wfProfileIn( __METHOD__ ); 00233 $ok = $this->doBatchPush( $jobs, $flags ); 00234 wfProfileOut( __METHOD__ ); 00235 return $ok; 00236 } 00237 00242 abstract protected function doBatchPush( array $jobs, $flags ); 00243 00252 final public function pop() { 00253 global $wgJobClasses; 00254 00255 if ( $this->wiki !== wfWikiID() ) { 00256 throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." ); 00257 } elseif ( !isset( $wgJobClasses[$this->type] ) ) { 00258 // Do not pop jobs if there is no class for the queue type 00259 throw new MWException( "Unrecognized job type '{$this->type}'." ); 00260 } 00261 00262 wfProfileIn( __METHOD__ ); 00263 $job = $this->doPop(); 00264 wfProfileOut( __METHOD__ ); 00265 return $job; 00266 } 00267 00272 abstract protected function doPop(); 00273 00284 final public function ack( Job $job ) { 00285 if ( $job->getType() !== $this->type ) { 00286 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00287 } 00288 wfProfileIn( __METHOD__ ); 00289 $ok = $this->doAck( $job ); 00290 wfProfileOut( __METHOD__ ); 00291 return $ok; 00292 } 00293 00298 abstract protected function doAck( Job $job ); 00299 00331 final public function deduplicateRootJob( Job $job ) { 00332 if ( $job->getType() !== $this->type ) { 00333 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 00334 } 00335 wfProfileIn( __METHOD__ ); 00336 $ok = $this->doDeduplicateRootJob( $job ); 00337 wfProfileOut( __METHOD__ ); 00338 return $ok; 00339 } 00340 00346 protected function doDeduplicateRootJob( Job $job ) { 00347 return true; 00348 } 00349 00358 final public function waitForBackups() { 00359 wfProfileIn( __METHOD__ ); 00360 $this->doWaitForBackups(); 00361 wfProfileOut( __METHOD__ ); 00362 } 00363 00368 protected function doWaitForBackups() {} 00369 00382 final public function getPeriodicTasks() { 00383 $tasks = $this->doGetPeriodicTasks(); 00384 foreach ( $tasks as $name => &$def ) { 00385 $def['name'] = $name; 00386 } 00387 return $tasks; 00388 } 00389 00394 protected function doGetPeriodicTasks() { 00395 return array(); 00396 } 00397 00403 final public function flushCaches() { 00404 wfProfileIn( __METHOD__ ); 00405 $this->doFlushCaches(); 00406 wfProfileOut( __METHOD__ ); 00407 } 00408 00413 protected function doFlushCaches() {} 00414 00423 abstract public function getAllQueuedJobs(); 00424 00432 public function setTestingPrefix( $key ) { 00433 throw new MWException( "Queue namespacing not supported for this queue type." ); 00434 } 00435 }