MediaWiki  REL1_21
JobQueueGroup.php
Go to the documentation of this file.
00001 <?php
00030 class JobQueueGroup {
00032         protected static $instances = array();
00033 
00035         protected $cache;
00036 
00037         protected $wiki; // string; wiki ID
00038 
00039         const TYPE_DEFAULT = 1; // integer; jobs popped by default
00040         const TYPE_ANY = 2; // integer; any job
00041 
00042         const USE_CACHE = 1; // integer; use process or persistent cache
00043 
00044         const PROC_CACHE_TTL = 15; // integer; seconds
00045 
00046         const CACHE_VERSION = 1; // integer; cache version
00047 
00051         protected function __construct( $wiki ) {
00052                 $this->wiki = $wiki;
00053                 $this->cache = new ProcessCacheLRU( 10 );
00054         }
00055 
00060         public static function singleton( $wiki = false ) {
00061                 $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
00062                 if ( !isset( self::$instances[$wiki] ) ) {
00063                         self::$instances[$wiki] = new self( $wiki );
00064                 }
00065                 return self::$instances[$wiki];
00066         }
00067 
00073         public static function destroySingletons() {
00074                 self::$instances = array();
00075         }
00076 
00083         public function get( $type ) {
00084                 global $wgJobTypeConf;
00085 
00086                 $conf = array( 'wiki' => $this->wiki, 'type' => $type );
00087                 if ( isset( $wgJobTypeConf[$type] ) ) {
00088                         $conf = $conf + $wgJobTypeConf[$type];
00089                 } else {
00090                         $conf = $conf + $wgJobTypeConf['default'];
00091                 }
00092 
00093                 return JobQueue::factory( $conf );
00094         }
00095 
00106         public function push( $jobs ) {
00107                 $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
00108 
00109                 $jobsByType = array(); // (job type => list of jobs)
00110                 foreach ( $jobs as $job ) {
00111                         if ( $job instanceof Job ) {
00112                                 $jobsByType[$job->getType()][] = $job;
00113                         } else {
00114                                 throw new MWException( "Attempted to push a non-Job object into a queue." );
00115                         }
00116                 }
00117 
00118                 $ok = true;
00119                 foreach ( $jobsByType as $type => $jobs ) {
00120                         if ( $this->get( $type )->push( $jobs ) ) {
00121                                 JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
00122                         } else {
00123                                 $ok = false;
00124                         }
00125                 }
00126 
00127                 if ( $this->cache->has( 'queues-ready', 'list' ) ) {
00128                         $list = $this->cache->get( 'queues-ready', 'list' );
00129                         if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
00130                                 $this->cache->clear( 'queues-ready' );
00131                         }
00132                 }
00133 
00134                 return $ok;
00135         }
00136 
00147         public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
00148                 if ( is_string( $qtype ) ) { // specific job type
00149                         $job = $this->get( $qtype )->pop();
00150                         if ( !$job ) {
00151                                 JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
00152                         }
00153                         return $job;
00154                 } else { // any job in the "default" jobs types
00155                         if ( $flags & self::USE_CACHE ) {
00156                                 if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
00157                                         $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
00158                                 }
00159                                 $types = $this->cache->get( 'queues-ready', 'list' );
00160                         } else {
00161                                 $types = $this->getQueuesWithJobs();
00162                         }
00163 
00164                         if ( $qtype == self::TYPE_DEFAULT ) {
00165                                 $types = array_intersect( $types, $this->getDefaultQueueTypes() );
00166                         }
00167                         shuffle( $types ); // avoid starvation
00168 
00169                         foreach ( $types as $type ) { // for each queue...
00170                                 $job = $this->get( $type )->pop();
00171                                 if ( $job ) { // found
00172                                         return $job;
00173                                 } else { // not found
00174                                         JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
00175                                         $this->cache->clear( 'queues-ready' );
00176                                 }
00177                         }
00178 
00179                         return false; // no jobs found
00180                 }
00181         }
00182 
00189         public function ack( Job $job ) {
00190                 return $this->get( $job->getType() )->ack( $job );
00191         }
00192 
00200         public function deduplicateRootJob( Job $job ) {
00201                 return $this->get( $job->getType() )->deduplicateRootJob( $job );
00202         }
00203 
00212         public function waitForBackups() {
00213                 global $wgJobTypeConf;
00214 
00215                 wfProfileIn( __METHOD__ );
00216                 // Try to avoid doing this more than once per queue storage medium
00217                 foreach ( $wgJobTypeConf as $type => $conf ) {
00218                         $this->get( $type )->waitForBackups();
00219                 }
00220                 wfProfileOut( __METHOD__ );
00221         }
00222 
00228         public function getQueueTypes() {
00229                 return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
00230         }
00231 
00237         public function getDefaultQueueTypes() {
00238                 global $wgJobTypesExcludedFromDefaultQueue;
00239 
00240                 return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
00241         }
00242 
00248         public function getQueuesWithJobs() {
00249                 $types = array();
00250                 foreach ( $this->getQueueTypes() as $type ) {
00251                         if ( !$this->get( $type )->isEmpty() ) {
00252                                 $types[] = $type;
00253                         }
00254                 }
00255                 return $types;
00256         }
00257 
00266         public function isQueueDeprioritized( $type ) {
00267                 if ( $type === 'refreshLinks2' ) {
00268                         // Don't keep converting refreshLinks2 => refreshLinks jobs if the
00269                         // later jobs have not been done yet. This helps throttle queue spam.
00270                         return !$this->get( 'refreshLinks' )->isEmpty();
00271                 }
00272                 return false;
00273         }
00274 
00284         public function executeReadyPeriodicTasks() {
00285                 global $wgMemc;
00286 
00287                 list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00288                 $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
00289                 $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
00290 
00291                 $count = 0;
00292                 $tasksRun = array(); // (queue => task => UNIX timestamp)
00293                 foreach ( $this->getQueueTypes() as $type ) {
00294                         $queue = $this->get( $type );
00295                         foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
00296                                 if ( $definition['period'] <= 0 ) {
00297                                         continue; // disabled
00298                                 } elseif ( !isset( $lastRuns[$type][$task] )
00299                                         || $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
00300                                 {
00301                                         if ( call_user_func( $definition['callback'] ) !== null ) {
00302                                                 $tasksRun[$type][$task] = time();
00303                                                 ++$count;
00304                                         }
00305                                 }
00306                         }
00307                 }
00308 
00309                 $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) {
00310                         if ( is_array( $lastRuns ) ) {
00311                                 foreach ( $tasksRun as $type => $tasks ) {
00312                                         foreach ( $tasks as $task => $timestamp ) {
00313                                                 if ( !isset( $lastRuns[$type][$task] )
00314                                                         || $timestamp > $lastRuns[$type][$task] )
00315                                                 {
00316                                                         $lastRuns[$type][$task] = $timestamp;
00317                                                 }
00318                                         }
00319                                 }
00320                         } else {
00321                                 $lastRuns = $tasksRun;
00322                         }
00323                         return $lastRuns;
00324                 } );
00325 
00326                 return $count;
00327         }
00328 
00333         private function getCachedConfigVar( $name ) {
00334                 global $wgConf, $wgMemc;
00335 
00336                 if ( $this->wiki === wfWikiID() ) {
00337                         return $GLOBALS[$name]; // common case
00338                 } else {
00339                         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00340                         $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name );
00341                         $value = $wgMemc->get( $key ); // ('v' => ...) or false
00342                         if ( is_array( $value ) ) {
00343                                 return $value['v'];
00344                         } else {
00345                                 $value = $wgConf->getConfig( $this->wiki, $name );
00346                                 $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
00347                                 return $value;
00348                         }
00349                 }
00350         }
00351 }