MediaWiki  REL1_22
JobQueueGroup.php
Go to the documentation of this file.
00001 <?php
00030 class JobQueueGroup {
00032     protected static $instances = array();
00033 
00035     protected $cache;
00036 
00037     protected $wiki; // string; wiki ID
00038 
00040     protected $coalescedQueues;
00041 
00042     const TYPE_DEFAULT = 1; // integer; jobs popped by default
00043     const TYPE_ANY = 2; // integer; any job
00044 
00045     const USE_CACHE = 1; // integer; use process or persistent cache
00046     const USE_PRIORITY = 2; // integer; respect deprioritization
00047 
00048     const PROC_CACHE_TTL = 15; // integer; seconds
00049 
00050     const CACHE_VERSION = 1; // integer; cache version
00051 
00055     protected function __construct( $wiki ) {
00056         $this->wiki = $wiki;
00057         $this->cache = new ProcessCacheLRU( 10 );
00058     }
00059 
00064     public static function singleton( $wiki = false ) {
00065         $wiki = ( $wiki === false ) ? wfWikiID() : $wiki;
00066         if ( !isset( self::$instances[$wiki] ) ) {
00067             self::$instances[$wiki] = new self( $wiki );
00068         }
00069         return self::$instances[$wiki];
00070     }
00071 
00077     public static function destroySingletons() {
00078         self::$instances = array();
00079     }
00080 
00087     public function get( $type ) {
00088         global $wgJobTypeConf;
00089 
00090         $conf = array( 'wiki' => $this->wiki, 'type' => $type );
00091         if ( isset( $wgJobTypeConf[$type] ) ) {
00092             $conf = $conf + $wgJobTypeConf[$type];
00093         } else {
00094             $conf = $conf + $wgJobTypeConf['default'];
00095         }
00096 
00097         return JobQueue::factory( $conf );
00098     }
00099 
00110     public function push( $jobs ) {
00111         $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
00112 
00113         $jobsByType = array(); // (job type => list of jobs)
00114         foreach ( $jobs as $job ) {
00115             if ( $job instanceof Job ) {
00116                 $jobsByType[$job->getType()][] = $job;
00117             } else {
00118                 throw new MWException( "Attempted to push a non-Job object into a queue." );
00119             }
00120         }
00121 
00122         $ok = true;
00123         foreach ( $jobsByType as $type => $jobs ) {
00124             if ( $this->get( $type )->push( $jobs ) ) {
00125                 JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
00126             } else {
00127                 $ok = false;
00128             }
00129         }
00130 
00131         if ( $this->cache->has( 'queues-ready', 'list' ) ) {
00132             $list = $this->cache->get( 'queues-ready', 'list' );
00133             if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
00134                 $this->cache->clear( 'queues-ready' );
00135             }
00136         }
00137 
00138         return $ok;
00139     }
00140 
00151     public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
00152         if ( is_string( $qtype ) ) { // specific job type
00153             if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) {
00154                 return false; // back off
00155             }
00156             $job = $this->get( $qtype )->pop();
00157             if ( !$job ) {
00158                 JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
00159             }
00160             return $job;
00161         } else { // any job in the "default" jobs types
00162             if ( $flags & self::USE_CACHE ) {
00163                 if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
00164                     $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
00165                 }
00166                 $types = $this->cache->get( 'queues-ready', 'list' );
00167             } else {
00168                 $types = $this->getQueuesWithJobs();
00169             }
00170 
00171             if ( $qtype == self::TYPE_DEFAULT ) {
00172                 $types = array_intersect( $types, $this->getDefaultQueueTypes() );
00173             }
00174             shuffle( $types ); // avoid starvation
00175 
00176             foreach ( $types as $type ) { // for each queue...
00177                 if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) {
00178                     continue; // back off
00179                 }
00180                 $job = $this->get( $type )->pop();
00181                 if ( $job ) { // found
00182                     return $job;
00183                 } else { // not found
00184                     JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
00185                     $this->cache->clear( 'queues-ready' );
00186                 }
00187             }
00188 
00189             return false; // no jobs found
00190         }
00191     }
00192 
00199     public function ack( Job $job ) {
00200         return $this->get( $job->getType() )->ack( $job );
00201     }
00202 
00210     public function deduplicateRootJob( Job $job ) {
00211         return $this->get( $job->getType() )->deduplicateRootJob( $job );
00212     }
00213 
00222     public function waitForBackups() {
00223         global $wgJobTypeConf;
00224 
00225         wfProfileIn( __METHOD__ );
00226         // Try to avoid doing this more than once per queue storage medium
00227         foreach ( $wgJobTypeConf as $type => $conf ) {
00228             $this->get( $type )->waitForBackups();
00229         }
00230         wfProfileOut( __METHOD__ );
00231     }
00232 
00238     public function getQueueTypes() {
00239         return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
00240     }
00241 
00247     public function getDefaultQueueTypes() {
00248         global $wgJobTypesExcludedFromDefaultQueue;
00249 
00250         return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
00251     }
00252 
00258     public function getQueuesWithJobs() {
00259         $types = array();
00260         foreach ( $this->getCoalescedQueues() as $info ) {
00261             $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
00262             if ( is_array( $nonEmpty ) ) { // batching features supported
00263                 $types = array_merge( $types, $nonEmpty );
00264             } else { // we have to go through the queues in the bucket one-by-one
00265                 foreach ( $info['types'] as $type ) {
00266                     if ( !$this->get( $type )->isEmpty() ) {
00267                         $types[] = $type;
00268                     }
00269                 }
00270             }
00271         }
00272         return $types;
00273     }
00274 
00280     public function getQueueSizes() {
00281         $sizeMap = array();
00282         foreach ( $this->getCoalescedQueues() as $info ) {
00283             $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
00284             if ( is_array( $sizes ) ) { // batching features supported
00285                 $sizeMap = $sizeMap + $sizes;
00286             } else { // we have to go through the queues in the bucket one-by-one
00287                 foreach ( $info['types'] as $type ) {
00288                     $sizeMap[$type] = $this->get( $type )->getSize();
00289                 }
00290             }
00291         }
00292         return $sizeMap;
00293     }
00294 
00298     protected function getCoalescedQueues() {
00299         global $wgJobTypeConf;
00300 
00301         if ( $this->coalescedQueues === null ) {
00302             $this->coalescedQueues = array();
00303             foreach ( $wgJobTypeConf as $type => $conf ) {
00304                 $queue = JobQueue::factory(
00305                     array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
00306                 $loc = $queue->getCoalesceLocationInternal();
00307                 if ( !isset( $this->coalescedQueues[$loc] ) ) {
00308                     $this->coalescedQueues[$loc]['queue'] = $queue;
00309                     $this->coalescedQueues[$loc]['types'] = array();
00310                 }
00311                 if ( $type === 'default' ) {
00312                     $this->coalescedQueues[$loc]['types'] = array_merge(
00313                         $this->coalescedQueues[$loc]['types'],
00314                         array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
00315                     );
00316                 } else {
00317                     $this->coalescedQueues[$loc]['types'][] = $type;
00318                 }
00319             }
00320         }
00321 
00322         return $this->coalescedQueues;
00323     }
00324 
00333     public function isQueueDeprioritized( $type ) {
00334         if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) {
00335             return $this->cache->get( 'isDeprioritized', $type );
00336         }
00337         if ( $type === 'refreshLinks2' ) {
00338             // Don't keep converting refreshLinks2 => refreshLinks jobs if the
00339             // later jobs have not been done yet. This helps throttle queue spam.
00340             $deprioritized = !$this->get( 'refreshLinks' )->isEmpty();
00341             $this->cache->set( 'isDeprioritized', $type, $deprioritized );
00342             return $deprioritized;
00343         }
00344         return false;
00345     }
00346 
00356     public function executeReadyPeriodicTasks() {
00357         global $wgMemc;
00358 
00359         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00360         $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' );
00361         $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp)
00362 
00363         $count = 0;
00364         $tasksRun = array(); // (queue => task => UNIX timestamp)
00365         foreach ( $this->getQueueTypes() as $type ) {
00366             $queue = $this->get( $type );
00367             foreach ( $queue->getPeriodicTasks() as $task => $definition ) {
00368                 if ( $definition['period'] <= 0 ) {
00369                     continue; // disabled
00370                 } elseif ( !isset( $lastRuns[$type][$task] )
00371                     || $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
00372                 {
00373                     try {
00374                         if ( call_user_func( $definition['callback'] ) !== null ) {
00375                             $tasksRun[$type][$task] = time();
00376                             ++$count;
00377                         }
00378                     } catch ( JobQueueError $e ) {
00379                         MWExceptionHandler::logException( $e );
00380                     }
00381                 }
00382             }
00383         }
00384 
00385         $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) {
00386             if ( is_array( $lastRuns ) ) {
00387                 foreach ( $tasksRun as $type => $tasks ) {
00388                     foreach ( $tasks as $task => $timestamp ) {
00389                         if ( !isset( $lastRuns[$type][$task] )
00390                             || $timestamp > $lastRuns[$type][$task] )
00391                         {
00392                             $lastRuns[$type][$task] = $timestamp;
00393                         }
00394                     }
00395                 }
00396             } else {
00397                 $lastRuns = $tasksRun;
00398             }
00399             return $lastRuns;
00400         } );
00401 
00402         return $count;
00403     }
00404 
00409     private function getCachedConfigVar( $name ) {
00410         global $wgConf, $wgMemc;
00411 
00412         if ( $this->wiki === wfWikiID() ) {
00413             return $GLOBALS[$name]; // common case
00414         } else {
00415             list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00416             $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name );
00417             $value = $wgMemc->get( $key ); // ('v' => ...) or false
00418             if ( is_array( $value ) ) {
00419                 return $value['v'];
00420             } else {
00421                 $value = $wgConf->getConfig( $this->wiki, $name );
00422                 $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) );
00423                 return $value;
00424             }
00425         }
00426     }
00427 }