[ Index ]

PHP Cross Reference of MediaWiki-1.24.0

title

Body

[close]

/includes/jobqueue/ -> JobQueueFederated.php (source)

   1  <?php
   2  /**
   3   * Job queue code for federated queues.
   4   *
   5   * This program is free software; you can redistribute it and/or modify
   6   * it under the terms of the GNU General Public License as published by
   7   * the Free Software Foundation; either version 2 of the License, or
   8   * (at your option) any later version.
   9   *
  10   * This program is distributed in the hope that it will be useful,
  11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13   * GNU General Public License for more details.
  14   *
  15   * You should have received a copy of the GNU General Public License along
  16   * with this program; if not, write to the Free Software Foundation, Inc.,
  17   * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18   * http://www.gnu.org/copyleft/gpl.html
  19   *
  20   * @file
  21   * @author Aaron Schulz
  22   */
  23  
  24  /**
  25   * Class to handle enqueueing and running of background jobs for federated queues
  26   *
  27   * This class allows for queues to be partitioned into smaller queues.
  28   * A partition is defined by the configuration for a JobQueue instance.
  29   * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a
  30   * JobQueueFederated instance, which itself would consist of three JobQueueRedis
  31   * instances, each using their own redis server. This would allow for the jobs
  32   * to be split (evenly or based on weights) accross multiple servers if a single
  33   * server becomes impractical or expensive. Different JobQueue classes can be mixed.
  34   *
  35   * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue
  36   * is inherited by the partition queues. Additional configuration defines what
  37   * section each wiki is in, what partition queues each section uses (and their weight),
  38   * and the JobQueue configuration for each partition. Some sections might only need a
  39   * single queue partition, like the sections for groups of small wikis.
  40   *
  41   * If used for performance, then $wgMainCacheType should be set to memcached/redis.
  42   * Note that "fifo" cannot be used for the ordering, since the data is distributed.
  43   * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also,
  44   * queue classes used by this should ignore down servers (with TTL) to avoid slowness.
  45   *
  46   * @ingroup JobQueue
  47   * @since 1.22
  48   */
  49  class JobQueueFederated extends JobQueue {
  50      /** @var HashRing */
  51      protected $partitionRing;
  52      /** @var HashRing */
  53      protected $partitionPushRing;
  54      /** @var array (partition name => JobQueue) reverse sorted by weight */
  55      protected $partitionQueues = array();
  56  
  57      /** @var BagOStuff */
  58      protected $cache;
  59  
  60      /** @var int Maximum number of partitions to try */
  61      protected $maxPartitionsTry;
  62  
  63      const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
  64      const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
  65  
  66      /**
  67       * @param array $params Possible keys:
  68       *  - sectionsByWiki      : A map of wiki IDs to section names.
  69       *                          Wikis will default to using the section "default".
  70       *  - partitionsBySection : Map of section names to maps of (partition name => weight).
  71       *                          A section called 'default' must be defined if not all wikis
  72       *                          have explicitly defined sections.
  73       *  - configByPartition   : Map of queue partition names to configuration arrays.
  74       *                          These configuration arrays are passed to JobQueue::factory().
  75       *                          The options set here are overriden by those passed to this
  76       *                          the federated queue itself (e.g. 'order' and 'claimTTL').
  77       *  - partitionsNoPush    : List of partition names that can handle pop() but not push().
  78       *                          This can be used to migrate away from a certain partition.
  79       *  - maxPartitionsTry    : Maximum number of times to attempt job insertion using
  80       *                          different partition queues. This improves availability
  81       *                          during failure, at the cost of added latency and somewhat
  82       *                          less reliable job de-duplication mechanisms.
  83       * @throws MWException
  84       */
  85  	protected function __construct( array $params ) {
  86          parent::__construct( $params );
  87          $section = isset( $params['sectionsByWiki'][$this->wiki] )
  88              ? $params['sectionsByWiki'][$this->wiki]
  89              : 'default';
  90          if ( !isset( $params['partitionsBySection'][$section] ) ) {
  91              throw new MWException( "No configuration for section '$section'." );
  92          }
  93          $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] )
  94              ? $params['maxPartitionsTry']
  95              : 2;
  96          // Get the full partition map
  97          $partitionMap = $params['partitionsBySection'][$section];
  98          arsort( $partitionMap, SORT_NUMERIC );
  99          // Get the partitions jobs can actually be pushed to
 100          $partitionPushMap = $partitionMap;
 101          if ( isset( $params['partitionsNoPush'] ) ) {
 102              foreach ( $params['partitionsNoPush'] as $partition ) {
 103                  unset( $partitionPushMap[$partition] );
 104              }
 105          }
 106          // Get the config to pass to merge into each partition queue config
 107          $baseConfig = $params;
 108          foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
 109              'partitionsBySection', 'configByPartition', 'partitionsNoPush' ) as $o
 110          ) {
 111              unset( $baseConfig[$o] ); // partition queue doesn't care about this
 112          }
 113          // Get the partition queue objects
 114          foreach ( $partitionMap as $partition => $w ) {
 115              if ( !isset( $params['configByPartition'][$partition] ) ) {
 116                  throw new MWException( "No configuration for partition '$partition'." );
 117              }
 118              $this->partitionQueues[$partition] = JobQueue::factory(
 119                  $baseConfig + $params['configByPartition'][$partition] );
 120          }
 121          // Ring of all partitions
 122          $this->partitionRing = new HashRing( $partitionMap );
 123          // Get the ring of partitions to push jobs into
 124          if ( count( $partitionPushMap ) === count( $partitionMap ) ) {
 125              $this->partitionPushRing = clone $this->partitionRing; // faster
 126          } else {
 127              $this->partitionPushRing = new HashRing( $partitionPushMap );
 128          }
 129          // Aggregate cache some per-queue values if there are multiple partition queues
 130          $this->cache = count( $partitionMap ) > 1 ? wfGetMainCache() : new EmptyBagOStuff();
 131      }
 132  
 133  	protected function supportedOrders() {
 134          // No FIFO due to partitioning, though "rough timestamp order" is supported
 135          return array( 'undefined', 'random', 'timestamp' );
 136      }
 137  
 138  	protected function optimalOrder() {
 139          return 'undefined'; // defer to the partitions
 140      }
 141  
 142  	protected function supportsDelayedJobs() {
 143          return true; // defer checks to the partitions
 144      }
 145  
 146  	protected function doIsEmpty() {
 147          $key = $this->getCacheKey( 'empty' );
 148  
 149          $isEmpty = $this->cache->get( $key );
 150          if ( $isEmpty === 'true' ) {
 151              return true;
 152          } elseif ( $isEmpty === 'false' ) {
 153              return false;
 154          }
 155  
 156          $empty = true;
 157          $failed = 0;
 158          foreach ( $this->partitionQueues as $queue ) {
 159              try {
 160                  $empty = $empty && $queue->doIsEmpty();
 161              } catch ( JobQueueError $e ) {
 162                  ++$failed;
 163                  MWExceptionHandler::logException( $e );
 164              }
 165          }
 166          $this->throwErrorIfAllPartitionsDown( $failed );
 167  
 168          $this->cache->add( $key, $empty ? 'true' : 'false', self::CACHE_TTL_LONG );
 169          return $empty;
 170      }
 171  
 172  	protected function doGetSize() {
 173          return $this->getCrossPartitionSum( 'size', 'doGetSize' );
 174      }
 175  
 176  	protected function doGetAcquiredCount() {
 177          return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
 178      }
 179  
 180  	protected function doGetDelayedCount() {
 181          return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
 182      }
 183  
 184  	protected function doGetAbandonedCount() {
 185          return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
 186      }
 187  
 188      /**
 189       * @param string $type
 190       * @param string $method
 191       * @return int
 192       */
 193  	protected function getCrossPartitionSum( $type, $method ) {
 194          $key = $this->getCacheKey( $type );
 195  
 196          $count = $this->cache->get( $key );
 197          if ( $count !== false ) {
 198              return $count;
 199          }
 200  
 201          $failed = 0;
 202          foreach ( $this->partitionQueues as $queue ) {
 203              try {
 204                  $count += $queue->$method();
 205              } catch ( JobQueueError $e ) {
 206                  ++$failed;
 207                  MWExceptionHandler::logException( $e );
 208              }
 209          }
 210          $this->throwErrorIfAllPartitionsDown( $failed );
 211  
 212          $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
 213  
 214          return $count;
 215      }
 216  
 217  	protected function doBatchPush( array $jobs, $flags ) {
 218          // Local ring variable that may be changed to point to a new ring on failure
 219          $partitionRing = $this->partitionPushRing;
 220          // Try to insert the jobs and update $partitionsTry on any failures.
 221          // Retry to insert any remaning jobs again, ignoring the bad partitions.
 222          $jobsLeft = $jobs;
 223          // @codingStandardsIgnoreStart Generic.CodeAnalysis.ForLoopWithTestFunctionCall.NotAllowed
 224          for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
 225              // @codingStandardsIgnoreEnd
 226              try {
 227                  $partitionRing->getLiveRing();
 228              } catch ( UnexpectedValueException $e ) {
 229                  break; // all servers down; nothing to insert to
 230              }
 231              $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
 232          }
 233          if ( count( $jobsLeft ) ) {
 234              throw new JobQueueError(
 235                  "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
 236          }
 237      }
 238  
 239      /**
 240       * @param array $jobs
 241       * @param HashRing $partitionRing
 242       * @param int $flags
 243       * @throws JobQueueError
 244       * @return array List of Job object that could not be inserted
 245       */
 246  	protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
 247          $jobsLeft = array();
 248  
 249          // Because jobs are spread across partitions, per-job de-duplication needs
 250          // to use a consistent hash to avoid allowing duplicate jobs per partition.
 251          // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
 252          $uJobsByPartition = array(); // (partition name => job list)
 253          /** @var Job $job */
 254          foreach ( $jobs as $key => $job ) {
 255              if ( $job->ignoreDuplicates() ) {
 256                  $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
 257                  $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
 258                  unset( $jobs[$key] );
 259              }
 260          }
 261          // Get the batches of jobs that are not de-duplicated
 262          if ( $flags & self::QOS_ATOMIC ) {
 263              $nuJobBatches = array( $jobs ); // all or nothing
 264          } else {
 265              // Split the jobs into batches and spread them out over servers if there
 266              // are many jobs. This helps keep the partitions even. Otherwise, send all
 267              // the jobs to a single partition queue to avoids the extra connections.
 268              $nuJobBatches = array_chunk( $jobs, 300 );
 269          }
 270  
 271          // Insert the de-duplicated jobs into the queues...
 272          foreach ( $uJobsByPartition as $partition => $jobBatch ) {
 273              /** @var JobQueue $queue */
 274              $queue = $this->partitionQueues[$partition];
 275              try {
 276                  $ok = true;
 277                  $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
 278              } catch ( JobQueueError $e ) {
 279                  $ok = false;
 280                  MWExceptionHandler::logException( $e );
 281              }
 282              if ( $ok ) {
 283                  $key = $this->getCacheKey( 'empty' );
 284                  $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
 285              } else {
 286                  if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
 287                      throw new JobQueueError( "Could not insert job(s), no partitions available." );
 288                  }
 289                  $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
 290              }
 291          }
 292  
 293          // Insert the jobs that are not de-duplicated into the queues...
 294          foreach ( $nuJobBatches as $jobBatch ) {
 295              $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() );
 296              $queue = $this->partitionQueues[$partition];
 297              try {
 298                  $ok = true;
 299                  $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
 300              } catch ( JobQueueError $e ) {
 301                  $ok = false;
 302                  MWExceptionHandler::logException( $e );
 303              }
 304              if ( $ok ) {
 305                  $key = $this->getCacheKey( 'empty' );
 306                  $this->cache->set( $key, 'false', self::CACHE_TTL_LONG );
 307              } else {
 308                  if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
 309                      throw new JobQueueError( "Could not insert job(s), no partitions available." );
 310                  }
 311                  $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
 312              }
 313          }
 314  
 315          return $jobsLeft;
 316      }
 317  
 318  	protected function doPop() {
 319          $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
 320  
 321          $failed = 0;
 322          while ( count( $partitionsTry ) ) {
 323              $partition = ArrayUtils::pickRandom( $partitionsTry );
 324              if ( $partition === false ) {
 325                  break; // all partitions at 0 weight
 326              }
 327  
 328              /** @var JobQueue $queue */
 329              $queue = $this->partitionQueues[$partition];
 330              try {
 331                  $job = $queue->pop();
 332              } catch ( JobQueueError $e ) {
 333                  ++$failed;
 334                  MWExceptionHandler::logException( $e );
 335                  $job = false;
 336              }
 337              if ( $job ) {
 338                  $job->metadata['QueuePartition'] = $partition;
 339  
 340                  return $job;
 341              } else {
 342                  unset( $partitionsTry[$partition] ); // blacklist partition
 343              }
 344          }
 345          $this->throwErrorIfAllPartitionsDown( $failed );
 346  
 347          $key = $this->getCacheKey( 'empty' );
 348          $this->cache->set( $key, 'true', self::CACHE_TTL_LONG );
 349  
 350          return false;
 351      }
 352  
 353  	protected function doAck( Job $job ) {
 354          if ( !isset( $job->metadata['QueuePartition'] ) ) {
 355              throw new MWException( "The given job has no defined partition name." );
 356          }
 357  
 358          return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
 359      }
 360  
 361  	protected function doIsRootJobOldDuplicate( Job $job ) {
 362          $params = $job->getRootJobParams();
 363          $sigature = $params['rootJobSignature'];
 364          $partition = $this->partitionPushRing->getLiveLocation( $sigature );
 365          try {
 366              return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
 367          } catch ( JobQueueError $e ) {
 368              if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
 369                  $partition = $this->partitionPushRing->getLiveLocation( $sigature );
 370                  return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
 371              }
 372          }
 373  
 374          return false;
 375      }
 376  
 377  	protected function doDeduplicateRootJob( Job $job ) {
 378          $params = $job->getRootJobParams();
 379          $sigature = $params['rootJobSignature'];
 380          $partition = $this->partitionPushRing->getLiveLocation( $sigature );
 381          try {
 382              return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
 383          } catch ( JobQueueError $e ) {
 384              if ( $this->partitionPushRing->ejectFromLiveRing( $partition, 5 ) ) {
 385                  $partition = $this->partitionPushRing->getLiveLocation( $sigature );
 386                  return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
 387              }
 388          }
 389  
 390          return false;
 391      }
 392  
 393  	protected function doDelete() {
 394          $failed = 0;
 395          /** @var JobQueue $queue */
 396          foreach ( $this->partitionQueues as $queue ) {
 397              try {
 398                  $queue->doDelete();
 399              } catch ( JobQueueError $e ) {
 400                  ++$failed;
 401                  MWExceptionHandler::logException( $e );
 402              }
 403          }
 404          $this->throwErrorIfAllPartitionsDown( $failed );
 405          return true;
 406      }
 407  
 408  	protected function doWaitForBackups() {
 409          $failed = 0;
 410          /** @var JobQueue $queue */
 411          foreach ( $this->partitionQueues as $queue ) {
 412              try {
 413                  $queue->waitForBackups();
 414              } catch ( JobQueueError $e ) {
 415                  ++$failed;
 416                  MWExceptionHandler::logException( $e );
 417              }
 418          }
 419          $this->throwErrorIfAllPartitionsDown( $failed );
 420      }
 421  
 422  	protected function doGetPeriodicTasks() {
 423          $tasks = array();
 424          /** @var JobQueue $queue */
 425          foreach ( $this->partitionQueues as $partition => $queue ) {
 426              foreach ( $queue->getPeriodicTasks() as $task => $def ) {
 427                  $tasks["{$partition}:{$task}"] = $def;
 428              }
 429          }
 430  
 431          return $tasks;
 432      }
 433  
 434  	protected function doFlushCaches() {
 435          static $types = array(
 436              'empty',
 437              'size',
 438              'acquiredcount',
 439              'delayedcount',
 440              'abandonedcount'
 441          );
 442  
 443          foreach ( $types as $type ) {
 444              $this->cache->delete( $this->getCacheKey( $type ) );
 445          }
 446  
 447          /** @var JobQueue $queue */
 448          foreach ( $this->partitionQueues as $queue ) {
 449              $queue->doFlushCaches();
 450          }
 451      }
 452  
 453  	public function getAllQueuedJobs() {
 454          $iterator = new AppendIterator();
 455  
 456          /** @var JobQueue $queue */
 457          foreach ( $this->partitionQueues as $queue ) {
 458              $iterator->append( $queue->getAllQueuedJobs() );
 459          }
 460  
 461          return $iterator;
 462      }
 463  
 464  	public function getAllDelayedJobs() {
 465          $iterator = new AppendIterator();
 466  
 467          /** @var JobQueue $queue */
 468          foreach ( $this->partitionQueues as $queue ) {
 469              $iterator->append( $queue->getAllDelayedJobs() );
 470          }
 471  
 472          return $iterator;
 473      }
 474  
 475  	public function getCoalesceLocationInternal() {
 476          return "JobQueueFederated:wiki:{$this->wiki}" .
 477              sha1( serialize( array_keys( $this->partitionQueues ) ) );
 478      }
 479  
 480  	protected function doGetSiblingQueuesWithJobs( array $types ) {
 481          $result = array();
 482  
 483          $failed = 0;
 484          /** @var JobQueue $queue */
 485          foreach ( $this->partitionQueues as $queue ) {
 486              try {
 487                  $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
 488                  if ( is_array( $nonEmpty ) ) {
 489                      $result = array_unique( array_merge( $result, $nonEmpty ) );
 490                  } else {
 491                      return null; // not supported on all partitions; bail
 492                  }
 493                  if ( count( $result ) == count( $types ) ) {
 494                      break; // short-circuit
 495                  }
 496              } catch ( JobQueueError $e ) {
 497                  ++$failed;
 498                  MWExceptionHandler::logException( $e );
 499              }
 500          }
 501          $this->throwErrorIfAllPartitionsDown( $failed );
 502  
 503          return array_values( $result );
 504      }
 505  
 506  	protected function doGetSiblingQueueSizes( array $types ) {
 507          $result = array();
 508          $failed = 0;
 509          /** @var JobQueue $queue */
 510          foreach ( $this->partitionQueues as $queue ) {
 511              try {
 512                  $sizes = $queue->doGetSiblingQueueSizes( $types );
 513                  if ( is_array( $sizes ) ) {
 514                      foreach ( $sizes as $type => $size ) {
 515                          $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
 516                      }
 517                  } else {
 518                      return null; // not supported on all partitions; bail
 519                  }
 520              } catch ( JobQueueError $e ) {
 521                  ++$failed;
 522                  MWExceptionHandler::logException( $e );
 523              }
 524          }
 525          $this->throwErrorIfAllPartitionsDown( $failed );
 526  
 527          return $result;
 528      }
 529  
 530      /**
 531       * Throw an error if no partitions available
 532       *
 533       * @param int $down The number of up partitions down
 534       * @return void
 535       * @throws JobQueueError
 536       */
 537  	protected function throwErrorIfAllPartitionsDown( $down ) {
 538          if ( $down >= count( $this->partitionQueues ) ) {
 539              throw new JobQueueError( 'No queue partitions available.' );
 540          }
 541      }
 542  
 543  	public function setTestingPrefix( $key ) {
 544          /** @var JobQueue $queue */
 545          foreach ( $this->partitionQueues as $queue ) {
 546              $queue->setTestingPrefix( $key );
 547          }
 548      }
 549  
 550      /**
 551       * @param string $property
 552       * @return string
 553       */
 554  	private function getCacheKey( $property ) {
 555          list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
 556  
 557          return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
 558      }
 559  }


Generated: Fri Nov 28 14:03:12 2014 Cross-referenced by PHPXref 0.7.1