MediaWiki  master
JobQueue.php
Go to the documentation of this file.
1 <?php
31 abstract class JobQueue {
33  protected $wiki;
35  protected $type;
37  protected $order;
39  protected $claimTTL;
41  protected $maxTries;
43  protected $readOnlyReason;
44 
46  protected $dupCache;
48  protected $aggr;
49 
50  const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
51 
52  const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
53 
58  protected function __construct( array $params ) {
59  $this->wiki = $params['wiki'];
60  $this->type = $params['type'];
61  $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
62  $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
63  if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
64  $this->order = $params['order'];
65  } else {
66  $this->order = $this->optimalOrder();
67  }
68  if ( !in_array( $this->order, $this->supportedOrders() ) ) {
69  throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
70  }
71  $this->dupCache = wfGetCache( CACHE_ANYTHING );
72  $this->aggr = isset( $params['aggregator'] )
73  ? $params['aggregator']
74  : new JobQueueAggregatorNull( [] );
75  $this->readOnlyReason = isset( $params['readOnlyReason'] )
76  ? $params['readOnlyReason']
77  : false;
78  }
79 
108  final public static function factory( array $params ) {
109  $class = $params['class'];
110  if ( !class_exists( $class ) ) {
111  throw new MWException( "Invalid job queue class '$class'." );
112  }
113  $obj = new $class( $params );
114  if ( !( $obj instanceof self ) ) {
115  throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
116  }
117 
118  return $obj;
119  }
120 
124  final public function getWiki() {
125  return $this->wiki;
126  }
127 
131  final public function getType() {
132  return $this->type;
133  }
134 
138  final public function getOrder() {
139  return $this->order;
140  }
141 
147  abstract protected function supportedOrders();
148 
154  abstract protected function optimalOrder();
155 
161  protected function supportsDelayedJobs() {
162  return false; // not implemented
163  }
164 
169  final public function delayedJobsEnabled() {
170  return $this->supportsDelayedJobs();
171  }
172 
177  public function getReadOnlyReason() {
178  return $this->readOnlyReason;
179  }
180 
193  final public function isEmpty() {
194  $res = $this->doIsEmpty();
195 
196  return $res;
197  }
198 
203  abstract protected function doIsEmpty();
204 
214  final public function getSize() {
215  $res = $this->doGetSize();
216 
217  return $res;
218  }
219 
224  abstract protected function doGetSize();
225 
235  final public function getAcquiredCount() {
236  $res = $this->doGetAcquiredCount();
237 
238  return $res;
239  }
240 
245  abstract protected function doGetAcquiredCount();
246 
257  final public function getDelayedCount() {
258  $res = $this->doGetDelayedCount();
259 
260  return $res;
261  }
262 
267  protected function doGetDelayedCount() {
268  return 0; // not implemented
269  }
270 
280  final public function getAbandonedCount() {
281  $res = $this->doGetAbandonedCount();
282 
283  return $res;
284  }
285 
290  protected function doGetAbandonedCount() {
291  return 0; // not implemented
292  }
293 
304  final public function push( $jobs, $flags = 0 ) {
305  $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
306  $this->batchPush( $jobs, $flags );
307  }
308 
319  final public function batchPush( array $jobs, $flags = 0 ) {
320  $this->assertNotReadOnly();
321 
322  if ( !count( $jobs ) ) {
323  return; // nothing to do
324  }
325 
326  foreach ( $jobs as $job ) {
327  if ( $job->getType() !== $this->type ) {
328  throw new MWException(
329  "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
330  } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
331  throw new MWException(
332  "Got delayed '{$job->getType()}' job; delays are not supported." );
333  }
334  }
335 
336  $this->doBatchPush( $jobs, $flags );
337  $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
338 
339  foreach ( $jobs as $job ) {
340  if ( $job->isRootJob() ) {
341  $this->deduplicateRootJob( $job );
342  }
343  }
344  }
345 
351  abstract protected function doBatchPush( array $jobs, $flags );
352 
361  final public function pop() {
363 
364  $this->assertNotReadOnly();
365  if ( $this->wiki !== wfWikiID() ) {
366  throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
367  } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
368  // Do not pop jobs if there is no class for the queue type
369  throw new MWException( "Unrecognized job type '{$this->type}'." );
370  }
371 
372  $job = $this->doPop();
373 
374  if ( !$job ) {
375  $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
376  }
377 
378  // Flag this job as an old duplicate based on its "root" job...
379  try {
380  if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
381  JobQueue::incrStats( 'dupe_pops', $this->type );
382  $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
383  }
384  } catch ( Exception $e ) {
385  // don't lose jobs over this
386  }
387 
388  return $job;
389  }
390 
395  abstract protected function doPop();
396 
407  final public function ack( Job $job ) {
408  $this->assertNotReadOnly();
409  if ( $job->getType() !== $this->type ) {
410  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
411  }
412 
413  $this->doAck( $job );
414  }
415 
420  abstract protected function doAck( Job $job );
421 
453  final public function deduplicateRootJob( IJobSpecification $job ) {
454  $this->assertNotReadOnly();
455  if ( $job->getType() !== $this->type ) {
456  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
457  }
458 
459  return $this->doDeduplicateRootJob( $job );
460  }
461 
469  if ( !$job->hasRootJobParams() ) {
470  throw new MWException( "Cannot register root job; missing parameters." );
471  }
472  $params = $job->getRootJobParams();
473 
474  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
475  // Callers should call batchInsert() and then this function so that if the insert
476  // fails, the de-duplication registration will be aborted. Since the insert is
477  // deferred till "transaction idle", do the same here, so that the ordering is
478  // maintained. Having only the de-duplication registration succeed would cause
479  // jobs to become no-ops without any actual jobs that made them redundant.
480  $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
481  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
482  return true; // a newer version of this root job was enqueued
483  }
484 
485  // Update the timestamp of the last root job started at the location...
486  return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
487  }
488 
496  final protected function isRootJobOldDuplicate( Job $job ) {
497  if ( $job->getType() !== $this->type ) {
498  throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
499  }
500  $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
501 
502  return $isDuplicate;
503  }
504 
510  protected function doIsRootJobOldDuplicate( Job $job ) {
511  if ( !$job->hasRootJobParams() ) {
512  return false; // job has no de-deplication info
513  }
514  $params = $job->getRootJobParams();
515 
516  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
517  // Get the last time this root job was enqueued
518  $timestamp = $this->dupCache->get( $key );
519 
520  // Check if a new root job was started at the location after this one's...
521  return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
522  }
523 
528  protected function getRootJobCacheKey( $signature ) {
529  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
530 
531  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
532  }
533 
541  final public function delete() {
542  $this->assertNotReadOnly();
543 
544  $this->doDelete();
545  }
546 
551  protected function doDelete() {
552  throw new MWException( "This method is not implemented." );
553  }
554 
563  final public function waitForBackups() {
564  $this->doWaitForBackups();
565  }
566 
571  protected function doWaitForBackups() {
572  }
573 
579  final public function flushCaches() {
580  $this->doFlushCaches();
581  }
582 
587  protected function doFlushCaches() {
588  }
589 
598  abstract public function getAllQueuedJobs();
599 
608  public function getAllDelayedJobs() {
609  return new ArrayIterator( [] ); // not implemented
610  }
611 
622  public function getAllAcquiredJobs() {
623  return new ArrayIterator( [] ); // not implemented
624  }
625 
633  public function getAllAbandonedJobs() {
634  return new ArrayIterator( [] ); // not implemented
635  }
636 
643  public function getCoalesceLocationInternal() {
644  return null;
645  }
646 
656  final public function getSiblingQueuesWithJobs( array $types ) {
657  return $this->doGetSiblingQueuesWithJobs( $types );
658  }
659 
665  protected function doGetSiblingQueuesWithJobs( array $types ) {
666  return null; // not supported
667  }
668 
679  final public function getSiblingQueueSizes( array $types ) {
680  return $this->doGetSiblingQueueSizes( $types );
681  }
682 
688  protected function doGetSiblingQueueSizes( array $types ) {
689  return null; // not supported
690  }
691 
695  protected function assertNotReadOnly() {
696  if ( $this->readOnlyReason !== false ) {
697  throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
698  }
699  }
700 
709  public static function incrStats( $key, $type, $delta = 1 ) {
710  static $stats;
711  if ( !$stats ) {
712  $stats = RequestContext::getMain()->getStats();
713  }
714  $stats->updateCount( "jobqueue.{$key}.all", $delta );
715  $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
716  }
717 }
718 
723 class JobQueueError extends MWException {
724 }
725 
727 }
728 
730 
731 }
getAllQueuedJobs()
Get an iterator to traverse over all available jobs in this queue.
string bool $readOnlyReason
Read only rationale (or false if r/w)
Definition: JobQueue.php:43
string $order
Job priority for pop()
Definition: JobQueue.php:37
static incrStats($key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:709
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition: design.txt:12
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
getType()
Definition: Job.php:121
the array() calling protocol came about after MediaWiki 1.4rc1.
const QOS_ATOMIC
Definition: JobQueue.php:50
hasRootJobParams()
Definition: Job.php:290
batchPush(array $jobs, $flags=0)
Push a batch of jobs into the queue.
Definition: JobQueue.php:319
wfForeignMemcKey($db, $prefix)
Make a cache key for a foreign DB.
doFlushCaches()
Definition: JobQueue.php:587
delayedJobsEnabled()
Definition: JobQueue.php:169
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException'returning false will NOT prevent logging $e
Definition: hooks.txt:1980
assertNotReadOnly()
Definition: JobQueue.php:695
supportsDelayedJobs()
Find out if delayed jobs are supported for configuration validation.
Definition: JobQueue.php:161
doDelete()
Definition: JobQueue.php:551
Class to both describe a background job and handle jobs.
Definition: Job.php:31
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2588
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueue.php:665
when a variable name is used in a it is silently declared as a new local masking the global
Definition: design.txt:93
getAbandonedCount()
Get the number of acquired jobs that can no longer be attempted.
Definition: JobQueue.php:280
getCoalesceLocationInternal()
Do not use this function outside of JobQueue/JobQueueGroup.
Definition: JobQueue.php:643
$wgJobClasses
Maps jobs to their handling classes; extensions can add to this to provide custom jobs...
getDelayedCount()
Get the number of delayed jobs (these are temporarily out of the queue).
Definition: JobQueue.php:257
BagOStuff $dupCache
Definition: JobQueue.php:46
isEmpty()
Quickly check if the queue has no available (unacquired, non-delayed) jobs.
Definition: JobQueue.php:193
__construct(array $params)
Definition: JobQueue.php:58
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
doGetAbandonedCount()
Definition: JobQueue.php:290
string $type
Job type.
Definition: JobQueue.php:35
getOrder()
Definition: JobQueue.php:138
static getMain()
Static methods.
wfGetCache($cacheType)
Get a specific cache object.
getRootJobCacheKey($signature)
Definition: JobQueue.php:528
if($limit) $timestamp
getAcquiredCount()
Get the number of acquired jobs (these are temporarily out of the queue).
Definition: JobQueue.php:235
deduplicateRootJob(IJobSpecification $job)
Register the "root job" of a given job into the queue for de-duplication.
Definition: JobQueue.php:453
doBatchPush(array $jobs, $flags)
ack(Job $job)
Acknowledge that a job was completed.
Definition: JobQueue.php:407
getRootJobParams()
Definition: Job.php:274
$res
Definition: database.txt:21
MediaWiki exception.
Definition: MWException.php:26
getReadOnlyReason()
Definition: JobQueue.php:177
$params
push($jobs, $flags=0)
Push one or more jobs into the queue.
Definition: JobQueue.php:304
isRootJobOldDuplicate(Job $job)
Check if the "root" job of a given job has been superseded by a newer one.
Definition: JobQueue.php:496
wfWikiID()
Get an ASCII string identifying this wiki This is used as a prefix in memcached keys.
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
getSiblingQueuesWithJobs(array $types)
Check whether each of the given queues are empty.
Definition: JobQueue.php:656
getAllAcquiredJobs()
Get an iterator to traverse over all claimed jobs in this queue.
Definition: JobQueue.php:622
JobQueueAggregator $aggr
Definition: JobQueue.php:48
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
doWaitForBackups()
Definition: JobQueue.php:571
static factory(array $params)
Get a job queue object of the specified type.
Definition: JobQueue.php:108
optimalOrder()
Get the default queue order to use if configuration does not specify one.
doGetSiblingQueueSizes(array $types)
Definition: JobQueue.php:688
doGetAcquiredCount()
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
if(count($args)< 1) $job
int $maxTries
Maximum number of times to try a job.
Definition: JobQueue.php:41
const CACHE_ANYTHING
Definition: Defines.php:101
static newFromJob(Job $job)
Get a duplicate no-op version of a job.
const ROOTJOB_TTL
Definition: JobQueue.php:52
getAllAbandonedJobs()
Get an iterator to traverse over all abandoned jobs in this queue.
Definition: JobQueue.php:633
Job queue task description interface.
wfSplitWikiID($wiki)
Split a wiki ID into DB name and table prefix.
int $claimTTL
Time to live in seconds.
Definition: JobQueue.php:39
getSiblingQueueSizes(array $types)
Check the size of each of the given queues.
Definition: JobQueue.php:679
getAllDelayedJobs()
Get an iterator to traverse over all delayed jobs in this queue.
Definition: JobQueue.php:608
doGetDelayedCount()
Definition: JobQueue.php:267
doAck(Job $job)
string $wiki
Wiki ID.
Definition: JobQueue.php:33
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueue.php:468
waitForBackups()
Wait for any slaves or backup servers to catch up.
Definition: JobQueue.php:563
doIsRootJobOldDuplicate(Job $job)
Definition: JobQueue.php:510
flushCaches()
Clear any process and persistent caches.
Definition: JobQueue.php:579
getSize()
Get the number of available (unacquired, non-delayed) jobs in the queue.
Definition: JobQueue.php:214
supportedOrders()
Get the allowed queue orders for configuration validation.
pop()
Pop a job off of the queue.
Definition: JobQueue.php:361