[ Index ]

PHP Cross Reference of MediaWiki-1.24.0

title

Body

[close]

/includes/jobqueue/ -> JobQueueDB.php (source)

   1  <?php
   2  /**
   3   * Database-backed job queue code.
   4   *
   5   * This program is free software; you can redistribute it and/or modify
   6   * it under the terms of the GNU General Public License as published by
   7   * the Free Software Foundation; either version 2 of the License, or
   8   * (at your option) any later version.
   9   *
  10   * This program is distributed in the hope that it will be useful,
  11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13   * GNU General Public License for more details.
  14   *
  15   * You should have received a copy of the GNU General Public License along
  16   * with this program; if not, write to the Free Software Foundation, Inc.,
  17   * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18   * http://www.gnu.org/copyleft/gpl.html
  19   *
  20   * @file
  21   * @author Aaron Schulz
  22   */
  23  
  24  /**
  25   * Class to handle job queues stored in the DB
  26   *
  27   * @ingroup JobQueue
  28   * @since 1.21
  29   */
  30  class JobQueueDB extends JobQueue {
  31      const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
  32      const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
  33      const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
  34      const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
  35      const MAX_OFFSET = 255; // integer; maximum number of rows to skip
  36  
  37      /** @var BagOStuff */
  38      protected $cache;
  39  
  40      /** @var bool|string Name of an external DB cluster. False if not set */
  41      protected $cluster = false;
  42  
  43      /**
  44       * Additional parameters include:
  45       *   - cluster : The name of an external cluster registered via LBFactory.
  46       *               If not specified, the primary DB cluster for the wiki will be used.
  47       *               This can be overridden with a custom cluster so that DB handles will
  48       *               be retrieved via LBFactory::getExternalLB() and getConnection().
  49       * @param array $params
  50       */
  51  	protected function __construct( array $params ) {
  52          global $wgMemc;
  53  
  54          parent::__construct( $params );
  55  
  56          $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
  57          // Make sure that we don't use the SQL cache, which would be harmful
  58          $this->cache = ( $wgMemc instanceof SqlBagOStuff ) ? new EmptyBagOStuff() : $wgMemc;
  59      }
  60  
  61  	protected function supportedOrders() {
  62          return array( 'random', 'timestamp', 'fifo' );
  63      }
  64  
  65  	protected function optimalOrder() {
  66          return 'random';
  67      }
  68  
  69      /**
  70       * @see JobQueue::doIsEmpty()
  71       * @return bool
  72       */
  73  	protected function doIsEmpty() {
  74          $key = $this->getCacheKey( 'empty' );
  75  
  76          $isEmpty = $this->cache->get( $key );
  77          if ( $isEmpty === 'true' ) {
  78              return true;
  79          } elseif ( $isEmpty === 'false' ) {
  80              return false;
  81          }
  82  
  83          $dbr = $this->getSlaveDB();
  84          try {
  85              $found = $dbr->selectField( // unclaimed job
  86                  'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
  87              );
  88          } catch ( DBError $e ) {
  89              $this->throwDBException( $e );
  90          }
  91          $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
  92  
  93          return !$found;
  94      }
  95  
  96      /**
  97       * @see JobQueue::doGetSize()
  98       * @return int
  99       */
 100  	protected function doGetSize() {
 101          $key = $this->getCacheKey( 'size' );
 102  
 103          $size = $this->cache->get( $key );
 104          if ( is_int( $size ) ) {
 105              return $size;
 106          }
 107  
 108          try {
 109              $dbr = $this->getSlaveDB();
 110              $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
 111                  array( 'job_cmd' => $this->type, 'job_token' => '' ),
 112                  __METHOD__
 113              );
 114          } catch ( DBError $e ) {
 115              $this->throwDBException( $e );
 116          }
 117          $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
 118  
 119          return $size;
 120      }
 121  
 122      /**
 123       * @see JobQueue::doGetAcquiredCount()
 124       * @return int
 125       */
 126  	protected function doGetAcquiredCount() {
 127          if ( $this->claimTTL <= 0 ) {
 128              return 0; // no acknowledgements
 129          }
 130  
 131          $key = $this->getCacheKey( 'acquiredcount' );
 132  
 133          $count = $this->cache->get( $key );
 134          if ( is_int( $count ) ) {
 135              return $count;
 136          }
 137  
 138          $dbr = $this->getSlaveDB();
 139          try {
 140              $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
 141                  array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
 142                  __METHOD__
 143              );
 144          } catch ( DBError $e ) {
 145              $this->throwDBException( $e );
 146          }
 147          $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
 148  
 149          return $count;
 150      }
 151  
 152      /**
 153       * @see JobQueue::doGetAbandonedCount()
 154       * @return int
 155       * @throws MWException
 156       */
 157  	protected function doGetAbandonedCount() {
 158          global $wgMemc;
 159  
 160          if ( $this->claimTTL <= 0 ) {
 161              return 0; // no acknowledgements
 162          }
 163  
 164          $key = $this->getCacheKey( 'abandonedcount' );
 165  
 166          $count = $wgMemc->get( $key );
 167          if ( is_int( $count ) ) {
 168              return $count;
 169          }
 170  
 171          $dbr = $this->getSlaveDB();
 172          try {
 173              $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
 174                  array(
 175                      'job_cmd' => $this->type,
 176                      "job_token != {$dbr->addQuotes( '' )}",
 177                      "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
 178                  ),
 179                  __METHOD__
 180              );
 181          } catch ( DBError $e ) {
 182              $this->throwDBException( $e );
 183          }
 184          $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
 185  
 186          return $count;
 187      }
 188  
 189      /**
 190       * @see JobQueue::doBatchPush()
 191       * @param array $jobs
 192       * @param int $flags
 193       * @throws DBError|Exception
 194       * @return void
 195       */
 196  	protected function doBatchPush( array $jobs, $flags ) {
 197          $dbw = $this->getMasterDB();
 198  
 199          $that = $this;
 200          $method = __METHOD__;
 201          $dbw->onTransactionIdle(
 202              function () use ( $dbw, $that, $jobs, $flags, $method ) {
 203                  $that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
 204              }
 205          );
 206      }
 207  
 208      /**
 209       * This function should *not* be called outside of JobQueueDB
 210       *
 211       * @param IDatabase $dbw
 212       * @param array $jobs
 213       * @param int $flags
 214       * @param string $method
 215       * @throws DBError
 216       * @return void
 217       */
 218  	public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
 219          if ( !count( $jobs ) ) {
 220              return;
 221          }
 222  
 223          $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
 224          $rowList = array(); // list of jobs for jobs that are are not de-duplicated
 225          foreach ( $jobs as $job ) {
 226              $row = $this->insertFields( $job );
 227              if ( $job->ignoreDuplicates() ) {
 228                  $rowSet[$row['job_sha1']] = $row;
 229              } else {
 230                  $rowList[] = $row;
 231              }
 232          }
 233  
 234          if ( $flags & self::QOS_ATOMIC ) {
 235              $dbw->begin( $method ); // wrap all the job additions in one transaction
 236          }
 237          try {
 238              // Strip out any duplicate jobs that are already in the queue...
 239              if ( count( $rowSet ) ) {
 240                  $res = $dbw->select( 'job', 'job_sha1',
 241                      array(
 242                          // No job_type condition since it's part of the job_sha1 hash
 243                          'job_sha1' => array_keys( $rowSet ),
 244                          'job_token' => '' // unclaimed
 245                      ),
 246                      $method
 247                  );
 248                  foreach ( $res as $row ) {
 249                      wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
 250                      unset( $rowSet[$row->job_sha1] ); // already enqueued
 251                  }
 252              }
 253              // Build the full list of job rows to insert
 254              $rows = array_merge( $rowList, array_values( $rowSet ) );
 255              // Insert the job rows in chunks to avoid slave lag...
 256              foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
 257                  $dbw->insert( 'job', $rowBatch, $method );
 258              }
 259              JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki );
 260              JobQueue::incrStats(
 261                  'job-insert-duplicate',
 262                  $this->type,
 263                  count( $rowSet ) + count( $rowList ) - count( $rows ),
 264                  $this->wiki
 265              );
 266          } catch ( DBError $e ) {
 267              if ( $flags & self::QOS_ATOMIC ) {
 268                  $dbw->rollback( $method );
 269              }
 270              throw $e;
 271          }
 272          if ( $flags & self::QOS_ATOMIC ) {
 273              $dbw->commit( $method );
 274          }
 275  
 276          $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
 277  
 278          return;
 279      }
 280  
 281      /**
 282       * @see JobQueue::doPop()
 283       * @return Job|bool
 284       */
 285  	protected function doPop() {
 286          if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
 287              return false; // queue is empty
 288          }
 289  
 290          $dbw = $this->getMasterDB();
 291          try {
 292              $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
 293              $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
 294              $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
 295              $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
 296                  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
 297              } );
 298  
 299              $uuid = wfRandomString( 32 ); // pop attempt
 300              $job = false; // job popped off
 301              do { // retry when our row is invalid or deleted as a duplicate
 302                  // Try to reserve a row in the DB...
 303                  if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
 304                      $row = $this->claimOldest( $uuid );
 305                  } else { // random first
 306                      $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
 307                      $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
 308                      $row = $this->claimRandom( $uuid, $rand, $gte );
 309                  }
 310                  // Check if we found a row to reserve...
 311                  if ( !$row ) {
 312                      $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
 313                      break; // nothing to do
 314                  }
 315                  JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
 316                  // Get the job object from the row...
 317                  $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
 318                  if ( !$title ) {
 319                      $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
 320                      wfDebug( "Row has invalid title '{$row->job_title}'.\n" );
 321                      continue; // try again
 322                  }
 323                  $job = Job::factory( $row->job_cmd, $title,
 324                      self::extractBlob( $row->job_params ), $row->job_id );
 325                  $job->metadata['id'] = $row->job_id;
 326                  break; // done
 327              } while ( true );
 328          } catch ( DBError $e ) {
 329              $this->throwDBException( $e );
 330          }
 331  
 332          return $job;
 333      }
 334  
 335      /**
 336       * Reserve a row with a single UPDATE without holding row locks over RTTs...
 337       *
 338       * @param string $uuid 32 char hex string
 339       * @param int $rand Random unsigned integer (31 bits)
 340       * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
 341       * @return stdClass|bool Row|false
 342       */
 343  	protected function claimRandom( $uuid, $rand, $gte ) {
 344          $dbw = $this->getMasterDB();
 345          // Check cache to see if the queue has <= OFFSET items
 346          $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
 347  
 348          $row = false; // the row acquired
 349          $invertedDirection = false; // whether one job_random direction was already scanned
 350          // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
 351          // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
 352          // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
 353          // be used here with MySQL.
 354          do {
 355              if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
 356                  // For small queues, using OFFSET will overshoot and return no rows more often.
 357                  // Instead, this uses job_random to pick a row (possibly checking both directions).
 358                  $ineq = $gte ? '>=' : '<=';
 359                  $dir = $gte ? 'ASC' : 'DESC';
 360                  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
 361                      array(
 362                          'job_cmd' => $this->type,
 363                          'job_token' => '', // unclaimed
 364                          "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
 365                      __METHOD__,
 366                      array( 'ORDER BY' => "job_random {$dir}" )
 367                  );
 368                  if ( !$row && !$invertedDirection ) {
 369                      $gte = !$gte;
 370                      $invertedDirection = true;
 371                      continue; // try the other direction
 372                  }
 373              } else { // table *may* have >= MAX_OFFSET rows
 374                  // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU
 375                  // in MySQL if there are many rows for some reason. This uses a small OFFSET
 376                  // instead of job_random for reducing excess claim retries.
 377                  $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
 378                      array(
 379                          'job_cmd' => $this->type,
 380                          'job_token' => '', // unclaimed
 381                      ),
 382                      __METHOD__,
 383                      array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) )
 384                  );
 385                  if ( !$row ) {
 386                      $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
 387                      $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
 388                      continue; // use job_random
 389                  }
 390              }
 391  
 392              if ( $row ) { // claim the job
 393                  $dbw->update( 'job', // update by PK
 394                      array(
 395                          'job_token' => $uuid,
 396                          'job_token_timestamp' => $dbw->timestamp(),
 397                          'job_attempts = job_attempts+1' ),
 398                      array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
 399                      __METHOD__
 400                  );
 401                  // This might get raced out by another runner when claiming the previously
 402                  // selected row. The use of job_random should minimize this problem, however.
 403                  if ( !$dbw->affectedRows() ) {
 404                      $row = false; // raced out
 405                  }
 406              } else {
 407                  break; // nothing to do
 408              }
 409          } while ( !$row );
 410  
 411          return $row;
 412      }
 413  
 414      /**
 415       * Reserve a row with a single UPDATE without holding row locks over RTTs...
 416       *
 417       * @param string $uuid 32 char hex string
 418       * @return stdClass|bool Row|false
 419       */
 420  	protected function claimOldest( $uuid ) {
 421          $dbw = $this->getMasterDB();
 422  
 423          $row = false; // the row acquired
 424          do {
 425              if ( $dbw->getType() === 'mysql' ) {
 426                  // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
 427                  // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
 428                  // Oracle and Postgre have no such limitation. However, MySQL offers an
 429                  // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
 430                  $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
 431                      "SET " .
 432                          "job_token = {$dbw->addQuotes( $uuid ) }, " .
 433                          "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
 434                          "job_attempts = job_attempts+1 " .
 435                      "WHERE ( " .
 436                          "job_cmd = {$dbw->addQuotes( $this->type )} " .
 437                          "AND job_token = {$dbw->addQuotes( '' )} " .
 438                      ") ORDER BY job_id ASC LIMIT 1",
 439                      __METHOD__
 440                  );
 441              } else {
 442                  // Use a subquery to find the job, within an UPDATE to claim it.
 443                  // This uses as much of the DB wrapper functions as possible.
 444                  $dbw->update( 'job',
 445                      array(
 446                          'job_token' => $uuid,
 447                          'job_token_timestamp' => $dbw->timestamp(),
 448                          'job_attempts = job_attempts+1' ),
 449                      array( 'job_id = (' .
 450                          $dbw->selectSQLText( 'job', 'job_id',
 451                              array( 'job_cmd' => $this->type, 'job_token' => '' ),
 452                              __METHOD__,
 453                              array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
 454                          ')'
 455                      ),
 456                      __METHOD__
 457                  );
 458              }
 459              // Fetch any row that we just reserved...
 460              if ( $dbw->affectedRows() ) {
 461                  $row = $dbw->selectRow( 'job', self::selectFields(),
 462                      array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__
 463                  );
 464                  if ( !$row ) { // raced out by duplicate job removal
 465                      wfDebug( "Row deleted as duplicate by another process.\n" );
 466                  }
 467              } else {
 468                  break; // nothing to do
 469              }
 470          } while ( !$row );
 471  
 472          return $row;
 473      }
 474  
 475      /**
 476       * @see JobQueue::doAck()
 477       * @param Job $job
 478       * @throws MWException
 479       * @return Job|bool
 480       */
 481  	protected function doAck( Job $job ) {
 482          if ( !isset( $job->metadata['id'] ) ) {
 483              throw new MWException( "Job of type '{$job->getType()}' has no ID." );
 484          }
 485  
 486          $dbw = $this->getMasterDB();
 487          try {
 488              $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
 489              $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
 490              $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
 491              $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
 492                  $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
 493              } );
 494  
 495              // Delete a row with a single DELETE without holding row locks over RTTs...
 496              $dbw->delete( 'job',
 497                  array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
 498          } catch ( DBError $e ) {
 499              $this->throwDBException( $e );
 500          }
 501  
 502          return true;
 503      }
 504  
 505      /**
 506       * @see JobQueue::doDeduplicateRootJob()
 507       * @param Job $job
 508       * @throws MWException
 509       * @return bool
 510       */
 511  	protected function doDeduplicateRootJob( Job $job ) {
 512          $params = $job->getParams();
 513          if ( !isset( $params['rootJobSignature'] ) ) {
 514              throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
 515          } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
 516              throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
 517          }
 518          $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
 519          // Callers should call batchInsert() and then this function so that if the insert
 520          // fails, the de-duplication registration will be aborted. Since the insert is
 521          // deferred till "transaction idle", do the same here, so that the ordering is
 522          // maintained. Having only the de-duplication registration succeed would cause
 523          // jobs to become no-ops without any actual jobs that made them redundant.
 524          $dbw = $this->getMasterDB();
 525          $cache = $this->dupCache;
 526          $dbw->onTransactionIdle( function () use ( $cache, $params, $key, $dbw ) {
 527              $timestamp = $cache->get( $key ); // current last timestamp of this job
 528              if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
 529                  return true; // a newer version of this root job was enqueued
 530              }
 531  
 532              // Update the timestamp of the last root job started at the location...
 533              return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
 534          } );
 535  
 536          return true;
 537      }
 538  
 539      /**
 540       * @see JobQueue::doDelete()
 541       * @return bool
 542       */
 543  	protected function doDelete() {
 544          $dbw = $this->getMasterDB();
 545          try {
 546              $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
 547          } catch ( DBError $e ) {
 548              $this->throwDBException( $e );
 549          }
 550  
 551          return true;
 552      }
 553  
 554      /**
 555       * @see JobQueue::doWaitForBackups()
 556       * @return void
 557       */
 558  	protected function doWaitForBackups() {
 559          wfWaitForSlaves();
 560      }
 561  
 562      /**
 563       * @return array
 564       */
 565  	protected function doGetPeriodicTasks() {
 566          return array(
 567              'recycleAndDeleteStaleJobs' => array(
 568                  'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
 569                  'period' => ceil( $this->claimTTL / 2 )
 570              )
 571          );
 572      }
 573  
 574      /**
 575       * @return void
 576       */
 577  	protected function doFlushCaches() {
 578          foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
 579              $this->cache->delete( $this->getCacheKey( $type ) );
 580          }
 581      }
 582  
 583      /**
 584       * @see JobQueue::getAllQueuedJobs()
 585       * @return Iterator
 586       */
 587  	public function getAllQueuedJobs() {
 588          $dbr = $this->getSlaveDB();
 589          try {
 590              return new MappedIterator(
 591                  $dbr->select( 'job', self::selectFields(),
 592                      array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
 593                  function ( $row ) use ( $dbr ) {
 594                      $job = Job::factory(
 595                          $row->job_cmd,
 596                          Title::makeTitle( $row->job_namespace, $row->job_title ),
 597                          strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
 598                      );
 599                      $job->metadata['id'] = $row->job_id;
 600                      return $job;
 601                  }
 602              );
 603          } catch ( DBError $e ) {
 604              $this->throwDBException( $e );
 605          }
 606      }
 607  
 608  	public function getCoalesceLocationInternal() {
 609          return $this->cluster
 610              ? "DBCluster:{$this->cluster}:{$this->wiki}"
 611              : "LBFactory:{$this->wiki}";
 612      }
 613  
 614  	protected function doGetSiblingQueuesWithJobs( array $types ) {
 615          $dbr = $this->getSlaveDB();
 616          $res = $dbr->select( 'job', 'DISTINCT job_cmd',
 617              array( 'job_cmd' => $types ), __METHOD__ );
 618  
 619          $types = array();
 620          foreach ( $res as $row ) {
 621              $types[] = $row->job_cmd;
 622          }
 623  
 624          return $types;
 625      }
 626  
 627  	protected function doGetSiblingQueueSizes( array $types ) {
 628          $dbr = $this->getSlaveDB();
 629          $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
 630              array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
 631  
 632          $sizes = array();
 633          foreach ( $res as $row ) {
 634              $sizes[$row->job_cmd] = (int)$row->count;
 635          }
 636  
 637          return $sizes;
 638      }
 639  
 640      /**
 641       * Recycle or destroy any jobs that have been claimed for too long
 642       *
 643       * @return int Number of jobs recycled/deleted
 644       */
 645  	public function recycleAndDeleteStaleJobs() {
 646          $now = time();
 647          $count = 0; // affected rows
 648          $dbw = $this->getMasterDB();
 649  
 650          try {
 651              if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
 652                  return $count; // already in progress
 653              }
 654  
 655              // Remove claims on jobs acquired for too long if enabled...
 656              if ( $this->claimTTL > 0 ) {
 657                  $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
 658                  // Get the IDs of jobs that have be claimed but not finished after too long.
 659                  // These jobs can be recycled into the queue by expiring the claim. Selecting
 660                  // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
 661                  $res = $dbw->select( 'job', 'job_id',
 662                      array(
 663                          'job_cmd' => $this->type,
 664                          "job_token != {$dbw->addQuotes( '' )}", // was acquired
 665                          "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
 666                          "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
 667                      __METHOD__
 668                  );
 669                  $ids = array_map(
 670                      function ( $o ) {
 671                          return $o->job_id;
 672                      }, iterator_to_array( $res )
 673                  );
 674                  if ( count( $ids ) ) {
 675                      // Reset job_token for these jobs so that other runners will pick them up.
 676                      // Set the timestamp to the current time, as it is useful to now that the job
 677                      // was already tried before (the timestamp becomes the "released" time).
 678                      $dbw->update( 'job',
 679                          array(
 680                              'job_token' => '',
 681                              'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
 682                          array(
 683                              'job_id' => $ids ),
 684                          __METHOD__
 685                      );
 686                      $affected = $dbw->affectedRows();
 687                      $count += $affected;
 688                      JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
 689                      $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
 690                  }
 691              }
 692  
 693              // Just destroy any stale jobs...
 694              $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
 695              $conds = array(
 696                  'job_cmd' => $this->type,
 697                  "job_token != {$dbw->addQuotes( '' )}", // was acquired
 698                  "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
 699              );
 700              if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
 701                  $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
 702              }
 703              // Get the IDs of jobs that are considered stale and should be removed. Selecting
 704              // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
 705              $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
 706              $ids = array_map(
 707                  function ( $o ) {
 708                      return $o->job_id;
 709                  }, iterator_to_array( $res )
 710              );
 711              if ( count( $ids ) ) {
 712                  $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
 713                  $affected = $dbw->affectedRows();
 714                  $count += $affected;
 715                  JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki );
 716              }
 717  
 718              $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
 719          } catch ( DBError $e ) {
 720              $this->throwDBException( $e );
 721          }
 722  
 723          return $count;
 724      }
 725  
 726      /**
 727       * @param IJobSpecification $job
 728       * @return array
 729       */
 730  	protected function insertFields( IJobSpecification $job ) {
 731          $dbw = $this->getMasterDB();
 732  
 733          return array(
 734              // Fields that describe the nature of the job
 735              'job_cmd' => $job->getType(),
 736              'job_namespace' => $job->getTitle()->getNamespace(),
 737              'job_title' => $job->getTitle()->getDBkey(),
 738              'job_params' => self::makeBlob( $job->getParams() ),
 739              // Additional job metadata
 740              'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
 741              'job_timestamp' => $dbw->timestamp(),
 742              'job_sha1' => wfBaseConvert(
 743                  sha1( serialize( $job->getDeduplicationInfo() ) ),
 744                  16, 36, 31
 745              ),
 746              'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
 747          );
 748      }
 749  
 750      /**
 751       * @throws JobQueueConnectionError
 752       * @return DBConnRef
 753       */
 754  	protected function getSlaveDB() {
 755          try {
 756              return $this->getDB( DB_SLAVE );
 757          } catch ( DBConnectionError $e ) {
 758              throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
 759          }
 760      }
 761  
 762      /**
 763       * @throws JobQueueConnectionError
 764       * @return DBConnRef
 765       */
 766  	protected function getMasterDB() {
 767          try {
 768              return $this->getDB( DB_MASTER );
 769          } catch ( DBConnectionError $e ) {
 770              throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
 771          }
 772      }
 773  
 774      /**
 775       * @param int $index (DB_SLAVE/DB_MASTER)
 776       * @return DBConnRef
 777       */
 778  	protected function getDB( $index ) {
 779          $lb = ( $this->cluster !== false )
 780              ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki )
 781              : wfGetLB( $this->wiki );
 782  
 783          return $lb->getConnectionRef( $index, array(), $this->wiki );
 784      }
 785  
 786      /**
 787       * @param string $property
 788       * @return string
 789       */
 790  	private function getCacheKey( $property ) {
 791          list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
 792          $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
 793  
 794          return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
 795      }
 796  
 797      /**
 798       * @param array|bool $params
 799       * @return string
 800       */
 801  	protected static function makeBlob( $params ) {
 802          if ( $params !== false ) {
 803              return serialize( $params );
 804          } else {
 805              return '';
 806          }
 807      }
 808  
 809      /**
 810       * @param string $blob
 811       * @return bool|mixed
 812       */
 813  	protected static function extractBlob( $blob ) {
 814          if ( (string)$blob !== '' ) {
 815              return unserialize( $blob );
 816          } else {
 817              return false;
 818          }
 819      }
 820  
 821      /**
 822       * @param DBError $e
 823       * @throws JobQueueError
 824       */
 825  	protected function throwDBException( DBError $e ) {
 826          throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
 827      }
 828  
 829      /**
 830       * Return the list of job fields that should be selected.
 831       * @since 1.23
 832       * @return array
 833       */
 834  	public static function selectFields() {
 835          return array(
 836              'job_id',
 837              'job_cmd',
 838              'job_namespace',
 839              'job_title',
 840              'job_timestamp',
 841              'job_params',
 842              'job_random',
 843              'job_attempts',
 844              'job_token',
 845              'job_token_timestamp',
 846              'job_sha1',
 847          );
 848      }
 849  }


Generated: Fri Nov 28 14:03:12 2014 Cross-referenced by PHPXref 0.7.1