MediaWiki  master
JobQueueDB.php
Go to the documentation of this file.
1 <?php
30 class JobQueueDB extends JobQueue {
31  const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
32  const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
33  const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
34  const MAX_OFFSET = 255; // integer; maximum number of rows to skip
35 
37  protected $cache;
38 
40  protected $cluster = false;
41 
50  protected function __construct( array $params ) {
51  parent::__construct( $params );
52 
53  $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
55  }
56 
57  protected function supportedOrders() {
58  return [ 'random', 'timestamp', 'fifo' ];
59  }
60 
61  protected function optimalOrder() {
62  return 'random';
63  }
64 
69  protected function doIsEmpty() {
70  $dbr = $this->getSlaveDB();
71  try {
72  $found = $dbr->selectField( // unclaimed job
73  'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
74  );
75  } catch ( DBError $e ) {
76  $this->throwDBException( $e );
77  }
78 
79  return !$found;
80  }
81 
86  protected function doGetSize() {
87  $key = $this->getCacheKey( 'size' );
88 
89  $size = $this->cache->get( $key );
90  if ( is_int( $size ) ) {
91  return $size;
92  }
93 
94  try {
95  $dbr = $this->getSlaveDB();
96  $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
97  [ 'job_cmd' => $this->type, 'job_token' => '' ],
98  __METHOD__
99  );
100  } catch ( DBError $e ) {
101  $this->throwDBException( $e );
102  }
103  $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
104 
105  return $size;
106  }
107 
112  protected function doGetAcquiredCount() {
113  if ( $this->claimTTL <= 0 ) {
114  return 0; // no acknowledgements
115  }
116 
117  $key = $this->getCacheKey( 'acquiredcount' );
118 
119  $count = $this->cache->get( $key );
120  if ( is_int( $count ) ) {
121  return $count;
122  }
123 
124  $dbr = $this->getSlaveDB();
125  try {
126  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
127  [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
128  __METHOD__
129  );
130  } catch ( DBError $e ) {
131  $this->throwDBException( $e );
132  }
133  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
134 
135  return $count;
136  }
137 
143  protected function doGetAbandonedCount() {
144  if ( $this->claimTTL <= 0 ) {
145  return 0; // no acknowledgements
146  }
147 
148  $key = $this->getCacheKey( 'abandonedcount' );
149 
150  $count = $this->cache->get( $key );
151  if ( is_int( $count ) ) {
152  return $count;
153  }
154 
155  $dbr = $this->getSlaveDB();
156  try {
157  $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
158  [
159  'job_cmd' => $this->type,
160  "job_token != {$dbr->addQuotes( '' )}",
161  "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
162  ],
163  __METHOD__
164  );
165  } catch ( DBError $e ) {
166  $this->throwDBException( $e );
167  }
168 
169  $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
170 
171  return $count;
172  }
173 
181  protected function doBatchPush( array $jobs, $flags ) {
182  $dbw = $this->getMasterDB();
183 
184  $method = __METHOD__;
185  $dbw->onTransactionIdle(
186  function () use ( $dbw, $jobs, $flags, $method ) {
187  $this->doBatchPushInternal( $dbw, $jobs, $flags, $method );
188  }
189  );
190  }
191 
202  public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
203  if ( !count( $jobs ) ) {
204  return;
205  }
206 
207  $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
208  $rowList = []; // list of jobs for jobs that are not de-duplicated
209  foreach ( $jobs as $job ) {
210  $row = $this->insertFields( $job );
211  if ( $job->ignoreDuplicates() ) {
212  $rowSet[$row['job_sha1']] = $row;
213  } else {
214  $rowList[] = $row;
215  }
216  }
217 
218  if ( $flags & self::QOS_ATOMIC ) {
219  $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
220  }
221  try {
222  // Strip out any duplicate jobs that are already in the queue...
223  if ( count( $rowSet ) ) {
224  $res = $dbw->select( 'job', 'job_sha1',
225  [
226  // No job_type condition since it's part of the job_sha1 hash
227  'job_sha1' => array_keys( $rowSet ),
228  'job_token' => '' // unclaimed
229  ],
230  $method
231  );
232  foreach ( $res as $row ) {
233  wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
234  unset( $rowSet[$row->job_sha1] ); // already enqueued
235  }
236  }
237  // Build the full list of job rows to insert
238  $rows = array_merge( $rowList, array_values( $rowSet ) );
239  // Insert the job rows in chunks to avoid slave lag...
240  foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
241  $dbw->insert( 'job', $rowBatch, $method );
242  }
243  JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
244  JobQueue::incrStats( 'dupe_inserts', $this->type,
245  count( $rowSet ) + count( $rowList ) - count( $rows )
246  );
247  } catch ( DBError $e ) {
248  if ( $flags & self::QOS_ATOMIC ) {
249  $dbw->rollback( $method );
250  }
251  throw $e;
252  }
253  if ( $flags & self::QOS_ATOMIC ) {
254  $dbw->endAtomic( $method );
255  }
256 
257  return;
258  }
259 
264  protected function doPop() {
265  $dbw = $this->getMasterDB();
266  try {
267  $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
268  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
269  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
270  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
271  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
272  } );
273 
274  $uuid = wfRandomString( 32 ); // pop attempt
275  $job = false; // job popped off
276  do { // retry when our row is invalid or deleted as a duplicate
277  // Try to reserve a row in the DB...
278  if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
279  $row = $this->claimOldest( $uuid );
280  } else { // random first
281  $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
282  $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
283  $row = $this->claimRandom( $uuid, $rand, $gte );
284  }
285  // Check if we found a row to reserve...
286  if ( !$row ) {
287  break; // nothing to do
288  }
289  JobQueue::incrStats( 'pops', $this->type );
290  // Get the job object from the row...
291  $title = Title::makeTitle( $row->job_namespace, $row->job_title );
292  $job = Job::factory( $row->job_cmd, $title,
293  self::extractBlob( $row->job_params ), $row->job_id );
294  $job->metadata['id'] = $row->job_id;
295  $job->metadata['timestamp'] = $row->job_timestamp;
296  break; // done
297  } while ( true );
298 
299  if ( !$job || mt_rand( 0, 9 ) == 0 ) {
300  // Handled jobs that need to be recycled/deleted;
301  // any recycled jobs will be picked up next attempt
302  $this->recycleAndDeleteStaleJobs();
303  }
304  } catch ( DBError $e ) {
305  $this->throwDBException( $e );
306  }
307 
308  return $job;
309  }
310 
319  protected function claimRandom( $uuid, $rand, $gte ) {
320  $dbw = $this->getMasterDB();
321  // Check cache to see if the queue has <= OFFSET items
322  $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
323 
324  $row = false; // the row acquired
325  $invertedDirection = false; // whether one job_random direction was already scanned
326  // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
327  // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
328  // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
329  // be used here with MySQL.
330  do {
331  if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
332  // For small queues, using OFFSET will overshoot and return no rows more often.
333  // Instead, this uses job_random to pick a row (possibly checking both directions).
334  $ineq = $gte ? '>=' : '<=';
335  $dir = $gte ? 'ASC' : 'DESC';
336  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
337  [
338  'job_cmd' => $this->type,
339  'job_token' => '', // unclaimed
340  "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
341  __METHOD__,
342  [ 'ORDER BY' => "job_random {$dir}" ]
343  );
344  if ( !$row && !$invertedDirection ) {
345  $gte = !$gte;
346  $invertedDirection = true;
347  continue; // try the other direction
348  }
349  } else { // table *may* have >= MAX_OFFSET rows
350  // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
351  // in MySQL if there are many rows for some reason. This uses a small OFFSET
352  // instead of job_random for reducing excess claim retries.
353  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
354  [
355  'job_cmd' => $this->type,
356  'job_token' => '', // unclaimed
357  ],
358  __METHOD__,
359  [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
360  );
361  if ( !$row ) {
362  $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
363  $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
364  continue; // use job_random
365  }
366  }
367 
368  if ( $row ) { // claim the job
369  $dbw->update( 'job', // update by PK
370  [
371  'job_token' => $uuid,
372  'job_token_timestamp' => $dbw->timestamp(),
373  'job_attempts = job_attempts+1' ],
374  [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
375  __METHOD__
376  );
377  // This might get raced out by another runner when claiming the previously
378  // selected row. The use of job_random should minimize this problem, however.
379  if ( !$dbw->affectedRows() ) {
380  $row = false; // raced out
381  }
382  } else {
383  break; // nothing to do
384  }
385  } while ( !$row );
386 
387  return $row;
388  }
389 
396  protected function claimOldest( $uuid ) {
397  $dbw = $this->getMasterDB();
398 
399  $row = false; // the row acquired
400  do {
401  if ( $dbw->getType() === 'mysql' ) {
402  // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
403  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
404  // Oracle and Postgre have no such limitation. However, MySQL offers an
405  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
406  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
407  "SET " .
408  "job_token = {$dbw->addQuotes( $uuid ) }, " .
409  "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
410  "job_attempts = job_attempts+1 " .
411  "WHERE ( " .
412  "job_cmd = {$dbw->addQuotes( $this->type )} " .
413  "AND job_token = {$dbw->addQuotes( '' )} " .
414  ") ORDER BY job_id ASC LIMIT 1",
415  __METHOD__
416  );
417  } else {
418  // Use a subquery to find the job, within an UPDATE to claim it.
419  // This uses as much of the DB wrapper functions as possible.
420  $dbw->update( 'job',
421  [
422  'job_token' => $uuid,
423  'job_token_timestamp' => $dbw->timestamp(),
424  'job_attempts = job_attempts+1' ],
425  [ 'job_id = (' .
426  $dbw->selectSQLText( 'job', 'job_id',
427  [ 'job_cmd' => $this->type, 'job_token' => '' ],
428  __METHOD__,
429  [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
430  ')'
431  ],
432  __METHOD__
433  );
434  }
435  // Fetch any row that we just reserved...
436  if ( $dbw->affectedRows() ) {
437  $row = $dbw->selectRow( 'job', self::selectFields(),
438  [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
439  );
440  if ( !$row ) { // raced out by duplicate job removal
441  wfDebug( "Row deleted as duplicate by another process.\n" );
442  }
443  } else {
444  break; // nothing to do
445  }
446  } while ( !$row );
447 
448  return $row;
449  }
450 
456  protected function doAck( Job $job ) {
457  if ( !isset( $job->metadata['id'] ) ) {
458  throw new MWException( "Job of type '{$job->getType()}' has no ID." );
459  }
460 
461  $dbw = $this->getMasterDB();
462  try {
463  $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
464  $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
465  $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
466  $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
467  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
468  } );
469 
470  // Delete a row with a single DELETE without holding row locks over RTTs...
471  $dbw->delete( 'job',
472  [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
473 
474  JobQueue::incrStats( 'acks', $this->type );
475  } catch ( DBError $e ) {
476  $this->throwDBException( $e );
477  }
478  }
479 
487  $params = $job->getParams();
488  if ( !isset( $params['rootJobSignature'] ) ) {
489  throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
490  } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
491  throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
492  }
493  $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
494  // Callers should call batchInsert() and then this function so that if the insert
495  // fails, the de-duplication registration will be aborted. Since the insert is
496  // deferred till "transaction idle", do the same here, so that the ordering is
497  // maintained. Having only the de-duplication registration succeed would cause
498  // jobs to become no-ops without any actual jobs that made them redundant.
499  $dbw = $this->getMasterDB();
501  $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) {
502  $timestamp = $cache->get( $key ); // current last timestamp of this job
503  if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
504  return true; // a newer version of this root job was enqueued
505  }
506 
507  // Update the timestamp of the last root job started at the location...
508  return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
509  } );
510 
511  return true;
512  }
513 
518  protected function doDelete() {
519  $dbw = $this->getMasterDB();
520  try {
521  $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
522  } catch ( DBError $e ) {
523  $this->throwDBException( $e );
524  }
525 
526  return true;
527  }
528 
533  protected function doWaitForBackups() {
534  wfWaitForSlaves( false, $this->wiki, $this->cluster ?: false );
535  }
536 
540  protected function doFlushCaches() {
541  foreach ( [ 'size', 'acquiredcount' ] as $type ) {
542  $this->cache->delete( $this->getCacheKey( $type ) );
543  }
544  }
545 
550  public function getAllQueuedJobs() {
551  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
552  }
553 
558  public function getAllAcquiredJobs() {
559  return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
560  }
561 
566  protected function getJobIterator( array $conds ) {
567  $dbr = $this->getSlaveDB();
568  try {
569  return new MappedIterator(
570  $dbr->select( 'job', self::selectFields(), $conds ),
571  function ( $row ) {
572  $job = Job::factory(
573  $row->job_cmd,
574  Title::makeTitle( $row->job_namespace, $row->job_title ),
575  strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
576  );
577  $job->metadata['id'] = $row->job_id;
578  $job->metadata['timestamp'] = $row->job_timestamp;
579 
580  return $job;
581  }
582  );
583  } catch ( DBError $e ) {
584  $this->throwDBException( $e );
585  }
586  }
587 
588  public function getCoalesceLocationInternal() {
589  return $this->cluster
590  ? "DBCluster:{$this->cluster}:{$this->wiki}"
591  : "LBFactory:{$this->wiki}";
592  }
593 
594  protected function doGetSiblingQueuesWithJobs( array $types ) {
595  $dbr = $this->getSlaveDB();
596  // @note: this does not check whether the jobs are claimed or not.
597  // This is useful so JobQueueGroup::pop() also sees queues that only
598  // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
599  // failed jobs so that they can be popped again for that edge case.
600  $res = $dbr->select( 'job', 'DISTINCT job_cmd',
601  [ 'job_cmd' => $types ], __METHOD__ );
602 
603  $types = [];
604  foreach ( $res as $row ) {
605  $types[] = $row->job_cmd;
606  }
607 
608  return $types;
609  }
610 
611  protected function doGetSiblingQueueSizes( array $types ) {
612  $dbr = $this->getSlaveDB();
613  $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
614  [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
615 
616  $sizes = [];
617  foreach ( $res as $row ) {
618  $sizes[$row->job_cmd] = (int)$row->count;
619  }
620 
621  return $sizes;
622  }
623 
629  public function recycleAndDeleteStaleJobs() {
630  $now = time();
631  $count = 0; // affected rows
632  $dbw = $this->getMasterDB();
633 
634  try {
635  if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
636  return $count; // already in progress
637  }
638 
639  // Remove claims on jobs acquired for too long if enabled...
640  if ( $this->claimTTL > 0 ) {
641  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
642  // Get the IDs of jobs that have be claimed but not finished after too long.
643  // These jobs can be recycled into the queue by expiring the claim. Selecting
644  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
645  $res = $dbw->select( 'job', 'job_id',
646  [
647  'job_cmd' => $this->type,
648  "job_token != {$dbw->addQuotes( '' )}", // was acquired
649  "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
650  "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
651  __METHOD__
652  );
653  $ids = array_map(
654  function ( $o ) {
655  return $o->job_id;
656  }, iterator_to_array( $res )
657  );
658  if ( count( $ids ) ) {
659  // Reset job_token for these jobs so that other runners will pick them up.
660  // Set the timestamp to the current time, as it is useful to now that the job
661  // was already tried before (the timestamp becomes the "released" time).
662  $dbw->update( 'job',
663  [
664  'job_token' => '',
665  'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
666  [
667  'job_id' => $ids ],
668  __METHOD__
669  );
670  $affected = $dbw->affectedRows();
671  $count += $affected;
672  JobQueue::incrStats( 'recycles', $this->type, $affected );
673  $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
674  }
675  }
676 
677  // Just destroy any stale jobs...
678  $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
679  $conds = [
680  'job_cmd' => $this->type,
681  "job_token != {$dbw->addQuotes( '' )}", // was acquired
682  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
683  ];
684  if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
685  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
686  }
687  // Get the IDs of jobs that are considered stale and should be removed. Selecting
688  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
689  $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
690  $ids = array_map(
691  function ( $o ) {
692  return $o->job_id;
693  }, iterator_to_array( $res )
694  );
695  if ( count( $ids ) ) {
696  $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
697  $affected = $dbw->affectedRows();
698  $count += $affected;
699  JobQueue::incrStats( 'abandons', $this->type, $affected );
700  }
701 
702  $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
703  } catch ( DBError $e ) {
704  $this->throwDBException( $e );
705  }
706 
707  return $count;
708  }
709 
714  protected function insertFields( IJobSpecification $job ) {
715  $dbw = $this->getMasterDB();
716 
717  return [
718  // Fields that describe the nature of the job
719  'job_cmd' => $job->getType(),
720  'job_namespace' => $job->getTitle()->getNamespace(),
721  'job_title' => $job->getTitle()->getDBkey(),
722  'job_params' => self::makeBlob( $job->getParams() ),
723  // Additional job metadata
724  'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
725  'job_timestamp' => $dbw->timestamp(),
726  'job_sha1' => Wikimedia\base_convert(
727  sha1( serialize( $job->getDeduplicationInfo() ) ),
728  16, 36, 31
729  ),
730  'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
731  ];
732  }
733 
738  protected function getSlaveDB() {
739  try {
740  return $this->getDB( DB_SLAVE );
741  } catch ( DBConnectionError $e ) {
742  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
743  }
744  }
745 
750  protected function getMasterDB() {
751  try {
752  return $this->getDB( DB_MASTER );
753  } catch ( DBConnectionError $e ) {
754  throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
755  }
756  }
757 
762  protected function getDB( $index ) {
763  $lb = ( $this->cluster !== false )
764  ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
765  : wfGetLB( $this->wiki );
766 
767  return $lb->getConnectionRef( $index, [], $this->wiki );
768  }
769 
774  private function getCacheKey( $property ) {
775  list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
776  $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
777 
778  return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
779  }
780 
785  protected static function makeBlob( $params ) {
786  if ( $params !== false ) {
787  return serialize( $params );
788  } else {
789  return '';
790  }
791  }
792 
797  protected static function extractBlob( $blob ) {
798  if ( (string)$blob !== '' ) {
799  return unserialize( $blob );
800  } else {
801  return false;
802  }
803  }
804 
809  protected function throwDBException( DBError $e ) {
810  throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
811  }
812 
818  public static function selectFields() {
819  return [
820  'job_id',
821  'job_cmd',
822  'job_namespace',
823  'job_title',
824  'job_timestamp',
825  'job_params',
826  'job_random',
827  'job_attempts',
828  'job_token',
829  'job_token_timestamp',
830  'job_sha1',
831  ];
832  }
833 }
set($key, $value, $ttl=0, array $opts=[])
Set the value of a key in cache.
doBatchPushInternal(IDatabase $dbw, array $jobs, $flags, $method)
This function should not be called outside of JobQueueDB.
Definition: JobQueueDB.php:202
static extractBlob($blob)
Definition: JobQueueDB.php:797
static incrStats($key, $type, $delta=1)
Call wfIncrStats() for the queue overall and for the queue type.
Definition: JobQueue.php:709
design txt This is a brief overview of the new design More thorough and up to date information is available on the documentation wiki at etc Handles the details of getting and saving to the user table of the and dealing with sessions and cookies OutputPage Encapsulates the entire HTML page that will be sent in response to any server request It is used by calling its functions to add in any order
Definition: design.txt:12
static getMainWANInstance()
Get the main WAN cache object.
insert($table, $a, $fname=__METHOD__, $options=[])
INSERT wrapper, inserts an array into a table.
deferred txt A few of the database updates required by various functions here can be deferred until after the result page is displayed to the user For updating the view updating the linked to tables after a etc PHP does not yet have any way to tell the server to actually return and disconnect while still running these but it might have such a feature in the future We handle these by creating a deferred update object and putting those objects on a global list
Definition: deferred.txt:11
Database error base class.
the array() calling protocol came about after MediaWiki 1.4rc1.
wfWaitForSlaves($ifWritesSince=null, $wiki=false, $cluster=false, $timeout=null)
Waits for the slaves to catch up to the master position.
$property
if(count($args)==0) $dir
wfForeignMemcKey($db, $prefix)
Make a cache key for a foreign DB.
getCoalesceLocationInternal()
Definition: JobQueueDB.php:588
doAck(Job $job)
Definition: JobQueueDB.php:456
processing should stop and the error should be shown to the user * false
Definition: hooks.txt:189
__construct(array $params)
Additional parameters include:
Definition: JobQueueDB.php:50
Apache License January AND DISTRIBUTION Definitions License shall mean the terms and conditions for use
div flags Integer display flags(NO_ACTION_LINK, NO_EXTRA_USER_LINKS) 'LogException'returning false will NOT prevent logging $e
Definition: hooks.txt:1980
doGetSiblingQueuesWithJobs(array $types)
Definition: JobQueueDB.php:594
Class to both describe a background job and handle jobs.
Definition: Job.php:31
const DBO_TRX
Definition: Defines.php:33
it s the revision text itself In either if gzip is the revision text is gzipped $flags
Definition: hooks.txt:2588
get($key, &$curTTL=null, array $checkKeys=[], &$asOf=null)
Fetch the value of a key from cache.
const MAX_AGE_PRUNE
Definition: JobQueueDB.php:32
Class to handle job queues stored in the DB.
Definition: JobQueueDB.php:30
BagOStuff $dupCache
Definition: JobQueue.php:46
wfRandomString($length=32)
Get a random string containing a number of pseudo-random hex characters.
wfDebug($text, $dest= 'all', array $context=[])
Sends a line to the debug log if enabled or, optionally, to a comment in output.
Prior to maintenance scripts were a hodgepodge of code that had no cohesion or formal method of action Beginning maintenance scripts have been cleaned up to use a unified class Directory structure How to run a script How to write your own DIRECTORY STRUCTURE The maintenance directory of a MediaWiki installation contains several all of which have unique purposes HOW TO RUN A SCRIPT Ridiculously just call php someScript php that s in the top level maintenance directory if not default wiki
Definition: maintenance.txt:1
recycleAndDeleteStaleJobs()
Recycle or destroy any jobs that have been claimed for too long.
Definition: JobQueueDB.php:629
you have access to all of the normal MediaWiki so you can get a DB use the cache
Definition: maintenance.txt:52
static makeBlob($params)
Definition: JobQueueDB.php:785
throwDBException(DBError $e)
Definition: JobQueueDB.php:809
Class for asserting that a callback happens when an dummy object leaves scope.
string $type
Job type.
Definition: JobQueue.php:35
wfGetLB($wiki=false)
Get a load balancer object.
getAllAcquiredJobs()
Definition: JobQueueDB.php:558
unserialize($serialized)
Definition: ApiMessage.php:102
const MAX_OFFSET
Definition: JobQueueDB.php:34
getRootJobCacheKey($signature)
Definition: JobQueue.php:528
if($limit) $timestamp
$res
Definition: database.txt:21
MediaWiki exception.
Definition: MWException.php:26
getDeduplicationInfo()
Subclasses may need to override this to make duplication detection work.
getCacheKey($property)
Definition: JobQueueDB.php:774
$params
const CACHE_TTL_SHORT
Definition: JobQueueDB.php:31
doGetAcquiredCount()
Definition: JobQueueDB.php:112
const DB_SLAVE
Definition: Defines.php:46
Allows to change the fields on the form that will be generated are created Can be used to omit specific feeds from being outputted You must not use this hook to add use OutputPage::addFeedLink() instead.&$feedLinks conditions will AND in the final query as a Content object as a Content object $title
Definition: hooks.txt:312
endAtomic($fname=__METHOD__)
Ends an atomic section of SQL statements.
const MAX_JOB_RANDOM
Definition: JobQueueDB.php:33
getJobIterator(array $conds)
Definition: JobQueueDB.php:566
doBatchPush(array $jobs, $flags)
Definition: JobQueueDB.php:181
Convenience class for generating iterators from iterators.
supportedOrders()
Definition: JobQueueDB.php:57
This document is intended to provide useful advice for parties seeking to redistribute MediaWiki to end users It s targeted particularly at maintainers for Linux since it s been observed that distribution packages of MediaWiki often break We ve consistently had to recommend that users seeking support use official tarballs instead of their distribution s and this often solves whatever problem the user is having It would be nice if this could such as
Definition: distributors.txt:9
getDB($index)
Definition: JobQueueDB.php:762
claimRandom($uuid, $rand, $gte)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:319
startAtomic($fname=__METHOD__)
Begin an atomic section of statements.
doDeduplicateRootJob(IJobSpecification $job)
Definition: JobQueueDB.php:486
injection txt This is an overview of how MediaWiki makes use of dependency injection The design described here grew from the discussion of RFC T384 The term dependency this means that anything an object needs to operate should be injected from the the object itself should only know narrow no concrete implementation of the logic it relies on The requirement to inject everything typically results in an architecture that based on two main types of and essentially stateless service objects that use other service objects to operate on the value objects As of the beginning MediaWiki is only starting to use the DI approach Much of the code still relies on global state or direct resulting in a highly cyclical dependency which acts as the top level factory for services in MediaWiki which can be used to gain access to default instances of various services MediaWikiServices however also allows new services to be defined and default services to be redefined Services are defined or redefined by providing a callback the instantiator that will return a new instance of the service When it will create an instance of MediaWikiServices and populate it with the services defined in the files listed by thereby bootstrapping the DI framework Per $wgServiceWiringFiles lists includes ServiceWiring php
Definition: injection.txt:35
doGetAbandonedCount()
Definition: JobQueueDB.php:143
doWaitForBackups()
Definition: JobQueueDB.php:533
wfGetLBFactory()
Get the load balancer factory object.
This document describes the state of Postgres support in and is fairly well maintained The main code is very well while extensions are very hit and miss it is probably the most supported database after MySQL Much of the work in making MediaWiki database agnostic came about through the work of creating Postgres as and are nearing end of but without copying over all the usage comments General notes on the but these can almost always be programmed around *Although Postgres has a true BOOLEAN type
Definition: postgres.txt:22
static factory($command, Title $title, $params=[])
Create the appropriate object to handle a specific job.
Definition: Job.php:68
Class to handle enqueueing and running of background jobs.
Definition: JobQueue.php:31
if(count($args)< 1) $job
const ROOTJOB_TTL
Definition: JobQueue.php:52
doGetSiblingQueueSizes(array $types)
Definition: JobQueueDB.php:611
static selectFields()
Return the list of job fields that should be selected.
Definition: JobQueueDB.php:818
$count
Job queue task description interface.
claimOldest($uuid)
Reserve a row with a single UPDATE without holding row locks over RTTs...
Definition: JobQueueDB.php:396
const DB_MASTER
Definition: Defines.php:47
wfSplitWikiID($wiki)
Split a wiki ID into DB name and table prefix.
rollback($fname=__METHOD__, $flush= '')
Rollback a transaction previously started using begin().
serialize()
Definition: ApiMessage.php:94
select($table, $vars, $conds= '', $fname=__METHOD__, $options=[], $join_conds=[])
Execute a SELECT query constructed using the various parameters provided.
WANObjectCache $cache
Definition: JobQueueDB.php:37
getAllQueuedJobs()
Definition: JobQueueDB.php:550
static makeTitle($ns, $title, $fragment= '', $interwiki= '')
Create a new Title from a namespace index and a DB key.
Definition: Title.php:503
Basic database interface for live and lazy-loaded DB handles.
Definition: IDatabase.php:35
insertFields(IJobSpecification $job)
Definition: JobQueueDB.php:714
bool string $cluster
Name of an external DB cluster.
Definition: JobQueueDB.php:40