[ Index ] |
PHP Cross Reference of MediaWiki-1.24.0 |
[Summary view] [Print] [Text view]
1 <?php 2 /** 3 * Job queue base code. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 * @author Aaron Schulz 22 */ 23 24 /** 25 * Class to handle enqueueing of background jobs 26 * 27 * @ingroup JobQueue 28 * @since 1.21 29 */ 30 class JobQueueGroup { 31 /** @var array */ 32 protected static $instances = array(); 33 34 /** @var ProcessCacheLRU */ 35 protected $cache; 36 37 /** @var string Wiki ID */ 38 protected $wiki; 39 40 /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ 41 protected $coalescedQueues; 42 43 const TYPE_DEFAULT = 1; // integer; jobs popped by default 44 const TYPE_ANY = 2; // integer; any job 45 46 const USE_CACHE = 1; // integer; use process or persistent cache 47 48 const PROC_CACHE_TTL = 15; // integer; seconds 49 50 const CACHE_VERSION = 1; // integer; cache version 51 52 /** 53 * @param string $wiki Wiki ID 54 */ 55 protected function __construct( $wiki ) { 56 $this->wiki = $wiki; 57 $this->cache = new ProcessCacheLRU( 10 ); 58 } 59 60 /** 61 * @param bool|string $wiki Wiki ID 62 * @return JobQueueGroup 63 */ 64 public static function singleton( $wiki = false ) { 65 $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; 66 if ( !isset( self::$instances[$wiki] ) ) { 67 self::$instances[$wiki] = new self( $wiki ); 68 } 69 70 return self::$instances[$wiki]; 71 } 72 73 /** 74 * Destroy the singleton instances 75 * 76 * @return void 77 */ 78 public static function destroySingletons() { 79 self::$instances = array(); 80 } 81 82 /** 83 * Get the job queue object for a given queue type 84 * 85 * @param string $type 86 * @return JobQueue 87 */ 88 public function get( $type ) { 89 global $wgJobTypeConf; 90 91 $conf = array( 'wiki' => $this->wiki, 'type' => $type ); 92 if ( isset( $wgJobTypeConf[$type] ) ) { 93 $conf = $conf + $wgJobTypeConf[$type]; 94 } else { 95 $conf = $conf + $wgJobTypeConf['default']; 96 } 97 98 return JobQueue::factory( $conf ); 99 } 100 101 /** 102 * Insert jobs into the respective queues of with the belong. 103 * 104 * This inserts the jobs into the queue specified by $wgJobTypeConf 105 * and updates the aggregate job queue information cache as needed. 106 * 107 * @param Job|array $jobs A single Job or a list of Jobs 108 * @throws MWException 109 * @return void 110 */ 111 public function push( $jobs ) { 112 $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); 113 if ( !count( $jobs ) ) { 114 return; 115 } 116 117 $jobsByType = array(); // (job type => list of jobs) 118 foreach ( $jobs as $job ) { 119 if ( $job instanceof IJobSpecification ) { 120 $jobsByType[$job->getType()][] = $job; 121 } else { 122 throw new MWException( "Attempted to push a non-Job object into a queue." ); 123 } 124 } 125 126 foreach ( $jobsByType as $type => $jobs ) { 127 $this->get( $type )->push( $jobs ); 128 JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); 129 } 130 131 if ( $this->cache->has( 'queues-ready', 'list' ) ) { 132 $list = $this->cache->get( 'queues-ready', 'list' ); 133 if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { 134 $this->cache->clear( 'queues-ready' ); 135 } 136 } 137 } 138 139 /** 140 * Pop a job off one of the job queues 141 * 142 * This pops a job off a queue as specified by $wgJobTypeConf and 143 * updates the aggregate job queue information cache as needed. 144 * 145 * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string 146 * @param int $flags Bitfield of JobQueueGroup::USE_* constants 147 * @param array $blacklist List of job types to ignore 148 * @return Job|bool Returns false on failure 149 */ 150 public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = array() ) { 151 $job = false; 152 153 if ( is_string( $qtype ) ) { // specific job type 154 if ( !in_array( $qtype, $blacklist ) ) { 155 $job = $this->get( $qtype )->pop(); 156 if ( !$job ) { 157 JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); 158 } 159 } 160 } else { // any job in the "default" jobs types 161 if ( $flags & self::USE_CACHE ) { 162 if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { 163 $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); 164 } 165 $types = $this->cache->get( 'queues-ready', 'list' ); 166 } else { 167 $types = $this->getQueuesWithJobs(); 168 } 169 170 if ( $qtype == self::TYPE_DEFAULT ) { 171 $types = array_intersect( $types, $this->getDefaultQueueTypes() ); 172 } 173 174 $types = array_diff( $types, $blacklist ); // avoid selected types 175 shuffle( $types ); // avoid starvation 176 177 foreach ( $types as $type ) { // for each queue... 178 $job = $this->get( $type )->pop(); 179 if ( $job ) { // found 180 break; 181 } else { // not found 182 JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type ); 183 $this->cache->clear( 'queues-ready' ); 184 } 185 } 186 } 187 188 return $job; 189 } 190 191 /** 192 * Acknowledge that a job was completed 193 * 194 * @param Job $job 195 * @return bool 196 */ 197 public function ack( Job $job ) { 198 return $this->get( $job->getType() )->ack( $job ); 199 } 200 201 /** 202 * Register the "root job" of a given job into the queue for de-duplication. 203 * This should only be called right *after* all the new jobs have been inserted. 204 * 205 * @param Job $job 206 * @return bool 207 */ 208 public function deduplicateRootJob( Job $job ) { 209 return $this->get( $job->getType() )->deduplicateRootJob( $job ); 210 } 211 212 /** 213 * Wait for any slaves or backup queue servers to catch up. 214 * 215 * This does nothing for certain queue classes. 216 * 217 * @return void 218 * @throws MWException 219 */ 220 public function waitForBackups() { 221 global $wgJobTypeConf; 222 223 wfProfileIn( __METHOD__ ); 224 // Try to avoid doing this more than once per queue storage medium 225 foreach ( $wgJobTypeConf as $type => $conf ) { 226 $this->get( $type )->waitForBackups(); 227 } 228 wfProfileOut( __METHOD__ ); 229 } 230 231 /** 232 * Get the list of queue types 233 * 234 * @return array List of strings 235 */ 236 public function getQueueTypes() { 237 return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) ); 238 } 239 240 /** 241 * Get the list of default queue types 242 * 243 * @return array List of strings 244 */ 245 public function getDefaultQueueTypes() { 246 global $wgJobTypesExcludedFromDefaultQueue; 247 248 return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); 249 } 250 251 /** 252 * Check if there are any queues with jobs (this is cached) 253 * 254 * @param int $type JobQueueGroup::TYPE_* constant 255 * @return bool 256 * @since 1.23 257 */ 258 public function queuesHaveJobs( $type = self::TYPE_ANY ) { 259 global $wgMemc; 260 261 $key = wfMemcKey( 'jobqueue', 'queueshavejobs', $type ); 262 263 $value = $wgMemc->get( $key ); 264 if ( $value === false ) { 265 $queues = $this->getQueuesWithJobs(); 266 if ( $type == self::TYPE_DEFAULT ) { 267 $queues = array_intersect( $queues, $this->getDefaultQueueTypes() ); 268 } 269 $value = count( $queues ) ? 'true' : 'false'; 270 $wgMemc->add( $key, $value, 15 ); 271 } 272 273 return ( $value === 'true' ); 274 } 275 276 /** 277 * Get the list of job types that have non-empty queues 278 * 279 * @return array List of job types that have non-empty queues 280 */ 281 public function getQueuesWithJobs() { 282 $types = array(); 283 foreach ( $this->getCoalescedQueues() as $info ) { 284 $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() ); 285 if ( is_array( $nonEmpty ) ) { // batching features supported 286 $types = array_merge( $types, $nonEmpty ); 287 } else { // we have to go through the queues in the bucket one-by-one 288 foreach ( $info['types'] as $type ) { 289 if ( !$this->get( $type )->isEmpty() ) { 290 $types[] = $type; 291 } 292 } 293 } 294 } 295 296 return $types; 297 } 298 299 /** 300 * Get the size of the queus for a list of job types 301 * 302 * @return array Map of (job type => size) 303 */ 304 public function getQueueSizes() { 305 $sizeMap = array(); 306 foreach ( $this->getCoalescedQueues() as $info ) { 307 $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); 308 if ( is_array( $sizes ) ) { // batching features supported 309 $sizeMap = $sizeMap + $sizes; 310 } else { // we have to go through the queues in the bucket one-by-one 311 foreach ( $info['types'] as $type ) { 312 $sizeMap[$type] = $this->get( $type )->getSize(); 313 } 314 } 315 } 316 317 return $sizeMap; 318 } 319 320 /** 321 * @return array 322 */ 323 protected function getCoalescedQueues() { 324 global $wgJobTypeConf; 325 326 if ( $this->coalescedQueues === null ) { 327 $this->coalescedQueues = array(); 328 foreach ( $wgJobTypeConf as $type => $conf ) { 329 $queue = JobQueue::factory( 330 array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf ); 331 $loc = $queue->getCoalesceLocationInternal(); 332 if ( !isset( $this->coalescedQueues[$loc] ) ) { 333 $this->coalescedQueues[$loc]['queue'] = $queue; 334 $this->coalescedQueues[$loc]['types'] = array(); 335 } 336 if ( $type === 'default' ) { 337 $this->coalescedQueues[$loc]['types'] = array_merge( 338 $this->coalescedQueues[$loc]['types'], 339 array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) 340 ); 341 } else { 342 $this->coalescedQueues[$loc]['types'][] = $type; 343 } 344 } 345 } 346 347 return $this->coalescedQueues; 348 } 349 350 /** 351 * Execute any due periodic queue maintenance tasks for all queues. 352 * 353 * A task is "due" if the time ellapsed since the last run is greater than 354 * the defined run period. Concurrent calls to this function will cause tasks 355 * to be attempted twice, so they may need their own methods of mutual exclusion. 356 * 357 * @return int Number of tasks run 358 */ 359 public function executeReadyPeriodicTasks() { 360 global $wgMemc; 361 362 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 363 $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); 364 $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) 365 366 $count = 0; 367 $tasksRun = array(); // (queue => task => UNIX timestamp) 368 foreach ( $this->getQueueTypes() as $type ) { 369 $queue = $this->get( $type ); 370 foreach ( $queue->getPeriodicTasks() as $task => $definition ) { 371 if ( $definition['period'] <= 0 ) { 372 continue; // disabled 373 } elseif ( !isset( $lastRuns[$type][$task] ) 374 || $lastRuns[$type][$task] < ( time() - $definition['period'] ) 375 ) { 376 try { 377 if ( call_user_func( $definition['callback'] ) !== null ) { 378 $tasksRun[$type][$task] = time(); 379 ++$count; 380 } 381 } catch ( JobQueueError $e ) { 382 MWExceptionHandler::logException( $e ); 383 } 384 } 385 } 386 // The tasks may have recycled jobs or release delayed jobs into the queue 387 if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) { 388 JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); 389 } 390 } 391 392 if ( $count === 0 ) { 393 return $count; // nothing to update 394 } 395 396 $wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) { 397 if ( is_array( $lastRuns ) ) { 398 foreach ( $tasksRun as $type => $tasks ) { 399 foreach ( $tasks as $task => $timestamp ) { 400 if ( !isset( $lastRuns[$type][$task] ) 401 || $timestamp > $lastRuns[$type][$task] 402 ) { 403 $lastRuns[$type][$task] = $timestamp; 404 } 405 } 406 } 407 } else { 408 $lastRuns = $tasksRun; 409 } 410 411 return $lastRuns; 412 } ); 413 414 return $count; 415 } 416 417 /** 418 * @param string $name 419 * @return mixed 420 */ 421 private function getCachedConfigVar( $name ) { 422 global $wgConf, $wgMemc; 423 424 if ( $this->wiki === wfWikiID() ) { 425 return $GLOBALS[$name]; // common case 426 } else { 427 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 428 $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name ); 429 $value = $wgMemc->get( $key ); // ('v' => ...) or false 430 if ( is_array( $value ) ) { 431 return $value['v']; 432 } else { 433 $value = $wgConf->getConfig( $this->wiki, $name ); 434 $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) ); 435 436 return $value; 437 } 438 } 439 } 440 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Fri Nov 28 14:03:12 2014 | Cross-referenced by PHPXref 0.7.1 |