MediaWiki
REL1_21
|
00001 <?php 00030 class JobQueueGroup { 00032 protected static $instances = array(); 00033 00035 protected $cache; 00036 00037 protected $wiki; // string; wiki ID 00038 00039 const TYPE_DEFAULT = 1; // integer; jobs popped by default 00040 const TYPE_ANY = 2; // integer; any job 00041 00042 const USE_CACHE = 1; // integer; use process or persistent cache 00043 00044 const PROC_CACHE_TTL = 15; // integer; seconds 00045 00046 const CACHE_VERSION = 1; // integer; cache version 00047 00051 protected function __construct( $wiki ) { 00052 $this->wiki = $wiki; 00053 $this->cache = new ProcessCacheLRU( 10 ); 00054 } 00055 00060 public static function singleton( $wiki = false ) { 00061 $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; 00062 if ( !isset( self::$instances[$wiki] ) ) { 00063 self::$instances[$wiki] = new self( $wiki ); 00064 } 00065 return self::$instances[$wiki]; 00066 } 00067 00073 public static function destroySingletons() { 00074 self::$instances = array(); 00075 } 00076 00083 public function get( $type ) { 00084 global $wgJobTypeConf; 00085 00086 $conf = array( 'wiki' => $this->wiki, 'type' => $type ); 00087 if ( isset( $wgJobTypeConf[$type] ) ) { 00088 $conf = $conf + $wgJobTypeConf[$type]; 00089 } else { 00090 $conf = $conf + $wgJobTypeConf['default']; 00091 } 00092 00093 return JobQueue::factory( $conf ); 00094 } 00095 00106 public function push( $jobs ) { 00107 $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); 00108 00109 $jobsByType = array(); // (job type => list of jobs) 00110 foreach ( $jobs as $job ) { 00111 if ( $job instanceof Job ) { 00112 $jobsByType[$job->getType()][] = $job; 00113 } else { 00114 throw new MWException( "Attempted to push a non-Job object into a queue." ); 00115 } 00116 } 00117 00118 $ok = true; 00119 foreach ( $jobsByType as $type => $jobs ) { 00120 if ( $this->get( $type )->push( $jobs ) ) { 00121 JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); 00122 } else { 00123 $ok = false; 00124 } 00125 } 00126 00127 if ( $this->cache->has( 'queues-ready', 'list' ) ) { 00128 $list = $this->cache->get( 'queues-ready', 'list' ); 00129 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { 00130 $this->cache->clear( 'queues-ready' ); 00131 } 00132 } 00133 00134 return $ok; 00135 } 00136 00147 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) { 00148 if ( is_string( $qtype ) ) { // specific job type 00149 $job = $this->get( $qtype )->pop(); 00150 if ( !$job ) { 00151 JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); 00152 } 00153 return $job; 00154 } else { // any job in the "default" jobs types 00155 if ( $flags & self::USE_CACHE ) { 00156 if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { 00157 $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); 00158 } 00159 $types = $this->cache->get( 'queues-ready', 'list' ); 00160 } else { 00161 $types = $this->getQueuesWithJobs(); 00162 } 00163 00164 if ( $qtype == self::TYPE_DEFAULT ) { 00165 $types = array_intersect( $types, $this->getDefaultQueueTypes() ); 00166 } 00167 shuffle( $types ); // avoid starvation 00168 00169 foreach ( $types as $type ) { // for each queue... 00170 $job = $this->get( $type )->pop(); 00171 if ( $job ) { // found 00172 return $job; 00173 } else { // not found 00174 JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type ); 00175 $this->cache->clear( 'queues-ready' ); 00176 } 00177 } 00178 00179 return false; // no jobs found 00180 } 00181 } 00182 00189 public function ack( Job $job ) { 00190 return $this->get( $job->getType() )->ack( $job ); 00191 } 00192 00200 public function deduplicateRootJob( Job $job ) { 00201 return $this->get( $job->getType() )->deduplicateRootJob( $job ); 00202 } 00203 00212 public function waitForBackups() { 00213 global $wgJobTypeConf; 00214 00215 wfProfileIn( __METHOD__ ); 00216 // Try to avoid doing this more than once per queue storage medium 00217 foreach ( $wgJobTypeConf as $type => $conf ) { 00218 $this->get( $type )->waitForBackups(); 00219 } 00220 wfProfileOut( __METHOD__ ); 00221 } 00222 00228 public function getQueueTypes() { 00229 return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) ); 00230 } 00231 00237 public function getDefaultQueueTypes() { 00238 global $wgJobTypesExcludedFromDefaultQueue; 00239 00240 return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); 00241 } 00242 00248 public function getQueuesWithJobs() { 00249 $types = array(); 00250 foreach ( $this->getQueueTypes() as $type ) { 00251 if ( !$this->get( $type )->isEmpty() ) { 00252 $types[] = $type; 00253 } 00254 } 00255 return $types; 00256 } 00257 00266 public function isQueueDeprioritized( $type ) { 00267 if ( $type === 'refreshLinks2' ) { 00268 // Don't keep converting refreshLinks2 => refreshLinks jobs if the 00269 // later jobs have not been done yet. This helps throttle queue spam. 00270 return !$this->get( 'refreshLinks' )->isEmpty(); 00271 } 00272 return false; 00273 } 00274 00284 public function executeReadyPeriodicTasks() { 00285 global $wgMemc; 00286 00287 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00288 $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); 00289 $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) 00290 00291 $count = 0; 00292 $tasksRun = array(); // (queue => task => UNIX timestamp) 00293 foreach ( $this->getQueueTypes() as $type ) { 00294 $queue = $this->get( $type ); 00295 foreach ( $queue->getPeriodicTasks() as $task => $definition ) { 00296 if ( $definition['period'] <= 0 ) { 00297 continue; // disabled 00298 } elseif ( !isset( $lastRuns[$type][$task] ) 00299 || $lastRuns[$type][$task] < ( time() - $definition['period'] ) ) 00300 { 00301 if ( call_user_func( $definition['callback'] ) !== null ) { 00302 $tasksRun[$type][$task] = time(); 00303 ++$count; 00304 } 00305 } 00306 } 00307 } 00308 00309 $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) { 00310 if ( is_array( $lastRuns ) ) { 00311 foreach ( $tasksRun as $type => $tasks ) { 00312 foreach ( $tasks as $task => $timestamp ) { 00313 if ( !isset( $lastRuns[$type][$task] ) 00314 || $timestamp > $lastRuns[$type][$task] ) 00315 { 00316 $lastRuns[$type][$task] = $timestamp; 00317 } 00318 } 00319 } 00320 } else { 00321 $lastRuns = $tasksRun; 00322 } 00323 return $lastRuns; 00324 } ); 00325 00326 return $count; 00327 } 00328 00333 private function getCachedConfigVar( $name ) { 00334 global $wgConf, $wgMemc; 00335 00336 if ( $this->wiki === wfWikiID() ) { 00337 return $GLOBALS[$name]; // common case 00338 } else { 00339 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00340 $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name ); 00341 $value = $wgMemc->get( $key ); // ('v' => ...) or false 00342 if ( is_array( $value ) ) { 00343 return $value['v']; 00344 } else { 00345 $value = $wgConf->getConfig( $this->wiki, $name ); 00346 $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) ); 00347 return $value; 00348 } 00349 } 00350 } 00351 }