[ Index ]

PHP Cross Reference of MediaWiki-1.24.0

title

Body

[close]

/includes/jobqueue/ -> JobQueueGroup.php (source)

   1  <?php
   2  /**
   3   * Job queue base code.
   4   *
   5   * This program is free software; you can redistribute it and/or modify
   6   * it under the terms of the GNU General Public License as published by
   7   * the Free Software Foundation; either version 2 of the License, or
   8   * (at your option) any later version.
   9   *
  10   * This program is distributed in the hope that it will be useful,
  11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13   * GNU General Public License for more details.
  14   *
  15   * You should have received a copy of the GNU General Public License along
  16   * with this program; if not, write to the Free Software Foundation, Inc.,
  17   * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18   * http://www.gnu.org/copyleft/gpl.html
  19   *
  20   * @file
  21   * @author Aaron Schulz
  22   */
  23  
  24  /**
  25   * Class to handle enqueueing of background jobs
  26   *
  27   * @ingroup JobQueue
  28   * @since 1.21
  29   */
  30  class JobQueueGroup {
  31      /** @var array */
  32      protected static $instances = array();
  33  
  34      /** @var ProcessCacheLRU */
  35      protected $cache;
  36  
  37      /** @var string Wiki ID */
  38      protected $wiki;
  39  
  40      /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
  41      protected $coalescedQueues;
  42  
  43      const TYPE_DEFAULT = 1; // integer; jobs popped by default
  44      const TYPE_ANY = 2; // integer; any job
  45  
  46      const USE_CACHE = 1; // integer; use process or persistent cache
  47  
  48      const PROC_CACHE_TTL = 15; // integer; seconds
  49  
  50      const CACHE_VERSION = 1; // integer; cache version
  51  
  52      /**
  53       * @param string $wiki Wiki ID
  54       */
  55  	protected function __construct( $wiki ) {
  56          $this->wiki = $wiki;
  57          $this->cache = new ProcessCacheLRU( 10 );
  58      }
  59  
  60      /**
  61       * @param bool|string $wiki Wiki ID
  62       * @return JobQueueGroup
  63       */
  64  	public static function singleton( $wiki = false ) {
  65          $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
  66          if ( !isset( self::$instances[$wiki] ) ) {
  67              self::$instances[$wiki] = new self( $wiki );
  68          }
  69  
  70          return self::$instances[$wiki];
  71      }
  72  
  73      /**
  74       * Destroy the singleton instances
  75       *
  76       * @return void
  77       */
  78  	public static function destroySingletons() {
  79          self::$instances = array();
  80      }
  81  
  82      /**
  83       * Get the job queue object for a given queue type
  84       *
  85       * @param string $type
  86       * @return JobQueue
  87       */
  88  	public function get( $type ) {
  89          global $wgJobTypeConf;
  90  
  91          $conf = array( 'wiki' => $this->wiki, 'type' => $type );
  92          if ( isset( $wgJobTypeConf[$type] ) ) {
  93              $conf = $conf + $wgJobTypeConf[$type];
  94          } else {
  95              $conf = $conf + $wgJobTypeConf['default'];
  96          }
  97  
  98          return JobQueue::factory( $conf );
  99      }
 100  
 101      /**
 102       * Insert jobs into the respective queues of with the belong.
 103       *
 104       * This inserts the jobs into the queue specified by $wgJobTypeConf
 105       * and updates the aggregate job queue information cache as needed.
 106       *
 107       * @param Job|array $jobs A single Job or a list of Jobs
 108       * @throws MWException
 109       * @return void
 110       */
 111  	public function push( $jobs ) {
 112          $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
 113          if ( !count( $jobs ) ) {
 114              return;
 115          }
 116  
 117          $jobsByType = array(); // (job type => list of jobs)
 118          foreach ( $jobs as $job ) {
 119              if ( $job instanceof IJobSpecification ) {
 120                  $jobsByType[$job->getType()][] = $job;
 121              } else {
 122                  throw new MWException( "Attempted to push a non-Job object into a queue." );
 123              }
 124          }
 125  
 126          foreach ( $jobsByType as $type => $jobs ) {
 127              $this->get( $type )->push( $jobs );
 128              JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
 129          }
 130  
 131          if ( $this->cache->has( 'queues-ready', 'list' ) ) {
 132              $list = $this->cache->get( 'queues-ready', 'list' );
 133              if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
 134                  $this->cache->clear( 'queues-ready' );
 135              }
 136          }
 137      }
 138  
 139      /**
 140       * Pop a job off one of the job queues
 141       *
 142       * This pops a job off a queue as specified by $wgJobTypeConf and
 143       * updates the aggregate job queue information cache as needed.
 144       *
 145       * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string
 146       * @param int $flags Bitfield of JobQueueGroup::USE_* constants
 147       * @param array $blacklist List of job types to ignore
 148       * @return Job|bool Returns false on failure
 149       */
 150  	public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = array() ) {
 151          $job = false;
 152  
 153          if ( is_string( $qtype ) ) { // specific job type
 154              if ( !in_array( $qtype, $blacklist ) ) {
 155                  $job = $this->get( $qtype )->pop();
 156                  if ( !$job ) {
 157                      JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
 158                  }
 159              }
 160          } else { // any job in the "default" jobs types
 161              if ( $flags & self::USE_CACHE ) {
 162                  if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
 163                      $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
 164                  }
 165                  $types = $this->cache->get( 'queues-ready', 'list' );
 166              } else {
 167                  $types = $this->getQueuesWithJobs();
 168              }
 169  
 170              if ( $qtype == self::TYPE_DEFAULT ) {
 171                  $types = array_intersect( $types, $this->getDefaultQueueTypes() );
 172              }
 173  
 174              $types = array_diff( $types, $blacklist ); // avoid selected types
 175              shuffle( $types ); // avoid starvation
 176  
 177              foreach ( $types as $type ) { // for each queue...
 178                  $job = $this->get( $type )->pop();
 179                  if ( $job ) { // found
 180                      break;
 181                  } else { // not found
 182                      JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
 183                      $this->cache->clear( 'queues-ready' );
 184                  }
 185              }
 186          }
 187  
 188          return $job;
 189      }
 190  
 191      /**
 192       * Acknowledge that a job was completed
 193       *
 194       * @param Job $job
 195       * @return bool
 196       */
 197  	public function ack( Job $job ) {
 198          return $this->get( $job->getType() )->ack( $job );
 199      }
 200  
 201      /**
 202       * Register the "root job" of a given job into the queue for de-duplication.
 203       * This should only be called right *after* all the new jobs have been inserted.
 204       *
 205       * @param Job $job
 206       * @return bool
 207       */
 208  	public function deduplicateRootJob( Job $job ) {
 209          return $this->get( $job->getType() )->deduplicateRootJob( $job );
 210      }
 211  
 212      /**
 213       * Wait for any slaves or backup queue servers to catch up.
 214       *
 215       * This does nothing for certain queue classes.
 216       *
 217       * @return void
 218       * @throws MWException
 219       */
 220  	public function waitForBackups() {
 221          global $wgJobTypeConf;
 222  
 223          wfProfileIn( __METHOD__ );
 224          // Try to avoid doing this more than once per queue storage medium
 225          foreach ( $wgJobTypeConf as $type => $conf ) {
 226              $this->get( $type )->waitForBackups();
 227          }
 228          wfProfileOut( __METHOD__ );
 229      }
 230  
 231      /**
 232       * Get the list of queue types
 233       *
 234       * @return array List of strings
 235       */
 236  	public function getQueueTypes() {
 237          return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
 238      }
 239  
 240      /**
 241       * Get the list of default queue types
 242       *
 243       * @return array List of strings
 244       */
 245  	public function getDefaultQueueTypes() {
 246          global $wgJobTypesExcludedFromDefaultQueue;
 247  
 248          return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
 249      }
 250  
 251      /**
 252       * Check if there are any queues with jobs (this is cached)
 253       *
 254       * @param int $type JobQueueGroup::TYPE_* constant
 255       * @return bool
 256       * @since 1.23
 257       */
 258  	public function queuesHaveJobs( $type = self::TYPE_ANY ) {
 259          global $wgMemc;
 260  
 261          $key = wfMemcKey( 'jobqueue', 'queueshavejobs', $type );
 262  
 263          $value = $wgMemc->get( $key );
 264          if ( $value === false ) {
 265              $queues = $this->getQueuesWithJobs();
 266              if ( $type == self::TYPE_DEFAULT ) {
 267                  $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
 268              }
 269              $value = count( $queues ) ? 'true' : 'false';
 270              $wgMemc->add( $key, $value, 15 );
 271          }
 272  
 273          return ( $value === 'true' );
 274      }
 275  
 276      /**
 277       * Get the list of job types that have non-empty queues
 278       *
 279       * @return array List of job types that have non-empty queues
 280       */
 281  	public function getQueuesWithJobs() {
 282          $types = array();
 283          foreach ( $this->getCoalescedQueues() as $info ) {
 284              $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
 285              if ( is_array( $nonEmpty ) ) { // batching features supported
 286                  $types = array_merge( $types, $nonEmpty );
 287              } else { // we have to go through the queues in the bucket one-by-one
 288                  foreach ( $info['types'] as $type ) {
 289                      if ( !$this->get( $type )->isEmpty() ) {
 290                          $types[] = $type;
 291                      }
 292                  }
 293              }
 294          }
 295  
 296          return $types;
 297      }
 298  
 299      /**
 300       * Get the size of the queus for a list of job types
 301       *
 302       * @return array Map of (job type => size)
 303       */
 304  	public function getQueueSizes() {
 305          $sizeMap = array();
 306          foreach ( $this->getCoalescedQueues() as $info ) {
 307              $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
 308              if ( is_array( $sizes ) ) { // batching features supported
 309                  $sizeMap = $sizeMap + $sizes;
 310              } else { // we have to go through the queues in the bucket one-by-one
 311                  foreach ( $info['types'] as $type ) {
 312                      $sizeMap[$type] = $this->get( $type )->getSize();
 313                  }
 314              }
 315          }
 316  
 317          return $sizeMap;
 318      }
 319  
 320      /**
 321       * @return array
 322       */
 323  	protected function getCoalescedQueues() {
 324          global $wgJobTypeConf;
 325  
 326          if ( $this->coalescedQueues === null ) {
 327              $this->coalescedQueues = array();
 328              foreach ( $wgJobTypeConf as $type => $conf ) {
 329                  $queue = JobQueue::factory(
 330                      array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
 331                  $loc = $queue->getCoalesceLocationInternal();
 332                  if ( !isset( $this->coalescedQueues[$loc] ) ) {
 333                      $this->coalescedQueues[$loc]['queue'] = $queue;
 334                      $this->coalescedQueues[$loc]['types'] = array();
 335                  }
 336                  if ( $type === 'default' ) {
 337                      $this->coalescedQueues[$loc]['types'] = array_merge(
 338                          $this->coalescedQueues[$loc]['types'],
 339                          array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
 340                      );
 341                  } else {
 342                      $this->coalescedQueues[$loc]['types'][] = $type;
 343                  }
 344              }
 345          }
 346  
 347          return $this->coalescedQueues;
 348      }
 349  
 350      /**
 351       * Execute any due periodic queue maintenance tasks for all queues.
 352       *
 353       * A task is "due" if the time ellapsed since the last run is greater than
 354       * the defined run period. Concurrent calls to this function will cause tasks
 355       * to be attempted twice, so they may need their own methods of mutual exclusion.
 356       *
 357       * @return int Number of tasks run
 358       */
 359  	public function executeReadyPeriodicTasks() {
 360          global $wgMemc;
 361  
 362          list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
 363          $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
 364          $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
 365  
 366          $count = 0;
 367          $tasksRun = array(); // (queue => task => UNIX timestamp)
 368          foreach ( $this->getQueueTypes() as $type ) {
 369              $queue = $this->get( $type );
 370              foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
 371                  if ( $definition['period'] <= 0 ) {
 372                      continue; // disabled
 373                  } elseif ( !isset( $lastRuns[$type][$task] )
 374                      || $lastRuns[$type][$task] < ( time() - $definition['period'] )
 375                  ) {
 376                      try {
 377                          if ( call_user_func( $definition['callback'] ) !== null ) {
 378                              $tasksRun[$type][$task] = time();
 379                              ++$count;
 380                          }
 381                      } catch ( JobQueueError $e ) {
 382                          MWExceptionHandler::logException( $e );
 383                      }
 384                  }
 385              }
 386              // The tasks may have recycled jobs or release delayed jobs into the queue
 387              if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
 388                  JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
 389              }
 390          }
 391  
 392          if ( $count === 0 ) {
 393              return $count; // nothing to update
 394          }
 395  
 396          $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {
 397              if ( is_array( $lastRuns ) ) {
 398                  foreach ( $tasksRun as $type => $tasks ) {
 399                      foreach ( $tasks as $task => $timestamp ) {
 400                          if ( !isset( $lastRuns[$type][$task] )
 401                              || $timestamp > $lastRuns[$type][$task]
 402                          ) {
 403                              $lastRuns[$type][$task] = $timestamp;
 404                          }
 405                      }
 406                  }
 407              } else {
 408                  $lastRuns = $tasksRun;
 409              }
 410  
 411              return $lastRuns;
 412          } );
 413  
 414          return $count;
 415      }
 416  
 417      /**
 418       * @param string $name
 419       * @return mixed
 420       */
 421  	private function getCachedConfigVar( $name ) {
 422          global $wgConf, $wgMemc;
 423  
 424          if ( $this->wiki === wfWikiID() ) {
 425              return $GLOBALS[$name]; // common case
 426          } else {
 427              list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
 428              $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name );
 429              $value = $wgMemc->get( $key ); // ('v' => ...) or false
 430              if ( is_array( $value ) ) {
 431                  return $value['v'];
 432              } else {
 433                  $value = $wgConf->getConfig( $this->wiki, $name );
 434                  $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
 435  
 436                  return $value;
 437              }
 438          }
 439      }
 440  }


Generated: Fri Nov 28 14:03:12 2014 Cross-referenced by PHPXref 0.7.1