MediaWiki
REL1_22
|
00001 <?php 00030 class JobQueueGroup { 00032 protected static $instances = array(); 00033 00035 protected $cache; 00036 00037 protected $wiki; // string; wiki ID 00038 00040 protected $coalescedQueues; 00041 00042 const TYPE_DEFAULT = 1; // integer; jobs popped by default 00043 const TYPE_ANY = 2; // integer; any job 00044 00045 const USE_CACHE = 1; // integer; use process or persistent cache 00046 const USE_PRIORITY = 2; // integer; respect deprioritization 00047 00048 const PROC_CACHE_TTL = 15; // integer; seconds 00049 00050 const CACHE_VERSION = 1; // integer; cache version 00051 00055 protected function __construct( $wiki ) { 00056 $this->wiki = $wiki; 00057 $this->cache = new ProcessCacheLRU( 10 ); 00058 } 00059 00064 public static function singleton( $wiki = false ) { 00065 $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; 00066 if ( !isset( self::$instances[$wiki] ) ) { 00067 self::$instances[$wiki] = new self( $wiki ); 00068 } 00069 return self::$instances[$wiki]; 00070 } 00071 00077 public static function destroySingletons() { 00078 self::$instances = array(); 00079 } 00080 00087 public function get( $type ) { 00088 global $wgJobTypeConf; 00089 00090 $conf = array( 'wiki' => $this->wiki, 'type' => $type ); 00091 if ( isset( $wgJobTypeConf[$type] ) ) { 00092 $conf = $conf + $wgJobTypeConf[$type]; 00093 } else { 00094 $conf = $conf + $wgJobTypeConf['default']; 00095 } 00096 00097 return JobQueue::factory( $conf ); 00098 } 00099 00110 public function push( $jobs ) { 00111 $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); 00112 00113 $jobsByType = array(); // (job type => list of jobs) 00114 foreach ( $jobs as $job ) { 00115 if ( $job instanceof Job ) { 00116 $jobsByType[$job->getType()][] = $job; 00117 } else { 00118 throw new MWException( "Attempted to push a non-Job object into a queue." ); 00119 } 00120 } 00121 00122 $ok = true; 00123 foreach ( $jobsByType as $type => $jobs ) { 00124 if ( $this->get( $type )->push( $jobs ) ) { 00125 JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); 00126 } else { 00127 $ok = false; 00128 } 00129 } 00130 00131 if ( $this->cache->has( 'queues-ready', 'list' ) ) { 00132 $list = $this->cache->get( 'queues-ready', 'list' ); 00133 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { 00134 $this->cache->clear( 'queues-ready' ); 00135 } 00136 } 00137 00138 return $ok; 00139 } 00140 00151 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) { 00152 if ( is_string( $qtype ) ) { // specific job type 00153 if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) { 00154 return false; // back off 00155 } 00156 $job = $this->get( $qtype )->pop(); 00157 if ( !$job ) { 00158 JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); 00159 } 00160 return $job; 00161 } else { // any job in the "default" jobs types 00162 if ( $flags & self::USE_CACHE ) { 00163 if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { 00164 $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); 00165 } 00166 $types = $this->cache->get( 'queues-ready', 'list' ); 00167 } else { 00168 $types = $this->getQueuesWithJobs(); 00169 } 00170 00171 if ( $qtype == self::TYPE_DEFAULT ) { 00172 $types = array_intersect( $types, $this->getDefaultQueueTypes() ); 00173 } 00174 shuffle( $types ); // avoid starvation 00175 00176 foreach ( $types as $type ) { // for each queue... 00177 if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) { 00178 continue; // back off 00179 } 00180 $job = $this->get( $type )->pop(); 00181 if ( $job ) { // found 00182 return $job; 00183 } else { // not found 00184 JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type ); 00185 $this->cache->clear( 'queues-ready' ); 00186 } 00187 } 00188 00189 return false; // no jobs found 00190 } 00191 } 00192 00199 public function ack( Job $job ) { 00200 return $this->get( $job->getType() )->ack( $job ); 00201 } 00202 00210 public function deduplicateRootJob( Job $job ) { 00211 return $this->get( $job->getType() )->deduplicateRootJob( $job ); 00212 } 00213 00222 public function waitForBackups() { 00223 global $wgJobTypeConf; 00224 00225 wfProfileIn( __METHOD__ ); 00226 // Try to avoid doing this more than once per queue storage medium 00227 foreach ( $wgJobTypeConf as $type => $conf ) { 00228 $this->get( $type )->waitForBackups(); 00229 } 00230 wfProfileOut( __METHOD__ ); 00231 } 00232 00238 public function getQueueTypes() { 00239 return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) ); 00240 } 00241 00247 public function getDefaultQueueTypes() { 00248 global $wgJobTypesExcludedFromDefaultQueue; 00249 00250 return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); 00251 } 00252 00258 public function getQueuesWithJobs() { 00259 $types = array(); 00260 foreach ( $this->getCoalescedQueues() as $info ) { 00261 $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() ); 00262 if ( is_array( $nonEmpty ) ) { // batching features supported 00263 $types = array_merge( $types, $nonEmpty ); 00264 } else { // we have to go through the queues in the bucket one-by-one 00265 foreach ( $info['types'] as $type ) { 00266 if ( !$this->get( $type )->isEmpty() ) { 00267 $types[] = $type; 00268 } 00269 } 00270 } 00271 } 00272 return $types; 00273 } 00274 00280 public function getQueueSizes() { 00281 $sizeMap = array(); 00282 foreach ( $this->getCoalescedQueues() as $info ) { 00283 $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); 00284 if ( is_array( $sizes ) ) { // batching features supported 00285 $sizeMap = $sizeMap + $sizes; 00286 } else { // we have to go through the queues in the bucket one-by-one 00287 foreach ( $info['types'] as $type ) { 00288 $sizeMap[$type] = $this->get( $type )->getSize(); 00289 } 00290 } 00291 } 00292 return $sizeMap; 00293 } 00294 00298 protected function getCoalescedQueues() { 00299 global $wgJobTypeConf; 00300 00301 if ( $this->coalescedQueues === null ) { 00302 $this->coalescedQueues = array(); 00303 foreach ( $wgJobTypeConf as $type => $conf ) { 00304 $queue = JobQueue::factory( 00305 array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf ); 00306 $loc = $queue->getCoalesceLocationInternal(); 00307 if ( !isset( $this->coalescedQueues[$loc] ) ) { 00308 $this->coalescedQueues[$loc]['queue'] = $queue; 00309 $this->coalescedQueues[$loc]['types'] = array(); 00310 } 00311 if ( $type === 'default' ) { 00312 $this->coalescedQueues[$loc]['types'] = array_merge( 00313 $this->coalescedQueues[$loc]['types'], 00314 array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) 00315 ); 00316 } else { 00317 $this->coalescedQueues[$loc]['types'][] = $type; 00318 } 00319 } 00320 } 00321 00322 return $this->coalescedQueues; 00323 } 00324 00333 public function isQueueDeprioritized( $type ) { 00334 if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) { 00335 return $this->cache->get( 'isDeprioritized', $type ); 00336 } 00337 if ( $type === 'refreshLinks2' ) { 00338 // Don't keep converting refreshLinks2 => refreshLinks jobs if the 00339 // later jobs have not been done yet. This helps throttle queue spam. 00340 $deprioritized = !$this->get( 'refreshLinks' )->isEmpty(); 00341 $this->cache->set( 'isDeprioritized', $type, $deprioritized ); 00342 return $deprioritized; 00343 } 00344 return false; 00345 } 00346 00356 public function executeReadyPeriodicTasks() { 00357 global $wgMemc; 00358 00359 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00360 $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); 00361 $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) 00362 00363 $count = 0; 00364 $tasksRun = array(); // (queue => task => UNIX timestamp) 00365 foreach ( $this->getQueueTypes() as $type ) { 00366 $queue = $this->get( $type ); 00367 foreach ( $queue->getPeriodicTasks() as $task => $definition ) { 00368 if ( $definition['period'] <= 0 ) { 00369 continue; // disabled 00370 } elseif ( !isset( $lastRuns[$type][$task] ) 00371 || $lastRuns[$type][$task] < ( time() - $definition['period'] ) ) 00372 { 00373 try { 00374 if ( call_user_func( $definition['callback'] ) !== null ) { 00375 $tasksRun[$type][$task] = time(); 00376 ++$count; 00377 } 00378 } catch ( JobQueueError $e ) { 00379 MWExceptionHandler::logException( $e ); 00380 } 00381 } 00382 } 00383 } 00384 00385 $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) { 00386 if ( is_array( $lastRuns ) ) { 00387 foreach ( $tasksRun as $type => $tasks ) { 00388 foreach ( $tasks as $task => $timestamp ) { 00389 if ( !isset( $lastRuns[$type][$task] ) 00390 || $timestamp > $lastRuns[$type][$task] ) 00391 { 00392 $lastRuns[$type][$task] = $timestamp; 00393 } 00394 } 00395 } 00396 } else { 00397 $lastRuns = $tasksRun; 00398 } 00399 return $lastRuns; 00400 } ); 00401 00402 return $count; 00403 } 00404 00409 private function getCachedConfigVar( $name ) { 00410 global $wgConf, $wgMemc; 00411 00412 if ( $this->wiki === wfWikiID() ) { 00413 return $GLOBALS[$name]; // common case 00414 } else { 00415 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00416 $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name ); 00417 $value = $wgMemc->get( $key ); // ('v' => ...) or false 00418 if ( is_array( $value ) ) { 00419 return $value['v']; 00420 } else { 00421 $value = $wgConf->getConfig( $this->wiki, $name ); 00422 $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) ); 00423 return $value; 00424 } 00425 } 00426 } 00427 }