[ Index ] |
PHP Cross Reference of MediaWiki-1.24.0 |
[Summary view] [Print] [Text view]
1 <?php 2 /** 3 * Job queue base code. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 * @defgroup JobQueue JobQueue 22 * @author Aaron Schulz 23 */ 24 25 /** 26 * Class to handle enqueueing and running of background jobs 27 * 28 * @ingroup JobQueue 29 * @since 1.21 30 */ 31 abstract class JobQueue { 32 /** @var string Wiki ID */ 33 protected $wiki; 34 35 /** @var string Job type */ 36 protected $type; 37 38 /** @var string Job priority for pop() */ 39 protected $order; 40 41 /** @var int Time to live in seconds */ 42 protected $claimTTL; 43 44 /** @var int Maximum number of times to try a job */ 45 protected $maxTries; 46 47 /** @var bool Allow delayed jobs */ 48 protected $checkDelay; 49 50 /** @var BagOStuff */ 51 protected $dupCache; 52 53 const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions 54 55 const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) 56 57 /** 58 * @param array $params 59 * @throws MWException 60 */ 61 protected function __construct( array $params ) { 62 $this->wiki = $params['wiki']; 63 $this->type = $params['type']; 64 $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; 65 $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; 66 if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { 67 $this->order = $params['order']; 68 } else { 69 $this->order = $this->optimalOrder(); 70 } 71 if ( !in_array( $this->order, $this->supportedOrders() ) ) { 72 throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); 73 } 74 $this->checkDelay = !empty( $params['checkDelay'] ); 75 if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { 76 throw new MWException( __CLASS__ . " does not support delayed jobs." ); 77 } 78 $this->dupCache = wfGetCache( CACHE_ANYTHING ); 79 } 80 81 /** 82 * Get a job queue object of the specified type. 83 * $params includes: 84 * - class : What job class to use (determines job type) 85 * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) 86 * - type : The name of the job types this queue handles 87 * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". 88 * If "fifo" is used, the queue will effectively be FIFO. Note that job 89 * completion will not appear to be exactly FIFO if there are multiple 90 * job runners since jobs can take different times to finish once popped. 91 * If "timestamp" is used, the queue will at least be loosely ordered 92 * by timestamp, allowing for some jobs to be popped off out of order. 93 * If "random" is used, pop() will pick jobs in random order. 94 * Note that it may only be weakly random (e.g. a lottery of the oldest X). 95 * If "any" is choosen, the queue will use whatever order is the fastest. 96 * This might be useful for improving concurrency for job acquisition. 97 * - claimTTL : If supported, the queue will recycle jobs that have been popped 98 * but not acknowledged as completed after this many seconds. Recycling 99 * of jobs simple means re-inserting them into the queue. Jobs can be 100 * attempted up to three times before being discarded. 101 * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. 102 * This lets delayed jobs wait in a staging area until a given timestamp is 103 * reached, at which point they will enter the queue. If this is not enabled 104 * or not supported, an exception will be thrown on delayed job insertion. 105 * 106 * Queue classes should throw an exception if they do not support the options given. 107 * 108 * @param array $params 109 * @return JobQueue 110 * @throws MWException 111 */ 112 final public static function factory( array $params ) { 113 $class = $params['class']; 114 if ( !class_exists( $class ) ) { 115 throw new MWException( "Invalid job queue class '$class'." ); 116 } 117 $obj = new $class( $params ); 118 if ( !( $obj instanceof self ) ) { 119 throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); 120 } 121 122 return $obj; 123 } 124 125 /** 126 * @return string Wiki ID 127 */ 128 final public function getWiki() { 129 return $this->wiki; 130 } 131 132 /** 133 * @return string Job type that this queue handles 134 */ 135 final public function getType() { 136 return $this->type; 137 } 138 139 /** 140 * @return string One of (random, timestamp, fifo, undefined) 141 */ 142 final public function getOrder() { 143 return $this->order; 144 } 145 146 /** 147 * @return bool Whether delayed jobs are enabled 148 * @since 1.22 149 */ 150 final public function delayedJobsEnabled() { 151 return $this->checkDelay; 152 } 153 154 /** 155 * Get the allowed queue orders for configuration validation 156 * 157 * @return array Subset of (random, timestamp, fifo, undefined) 158 */ 159 abstract protected function supportedOrders(); 160 161 /** 162 * Get the default queue order to use if configuration does not specify one 163 * 164 * @return string One of (random, timestamp, fifo, undefined) 165 */ 166 abstract protected function optimalOrder(); 167 168 /** 169 * Find out if delayed jobs are supported for configuration validation 170 * 171 * @return bool Whether delayed jobs are supported 172 */ 173 protected function supportsDelayedJobs() { 174 return false; // not implemented 175 } 176 177 /** 178 * Quickly check if the queue has no available (unacquired, non-delayed) jobs. 179 * Queue classes should use caching if they are any slower without memcached. 180 * 181 * If caching is used, this might return false when there are actually no jobs. 182 * If pop() is called and returns false then it should correct the cache. Also, 183 * calling flushCaches() first prevents this. However, this affect is typically 184 * not distinguishable from the race condition between isEmpty() and pop(). 185 * 186 * @return bool 187 * @throws JobQueueError 188 */ 189 final public function isEmpty() { 190 wfProfileIn( __METHOD__ ); 191 $res = $this->doIsEmpty(); 192 wfProfileOut( __METHOD__ ); 193 194 return $res; 195 } 196 197 /** 198 * @see JobQueue::isEmpty() 199 * @return bool 200 */ 201 abstract protected function doIsEmpty(); 202 203 /** 204 * Get the number of available (unacquired, non-delayed) jobs in the queue. 205 * Queue classes should use caching if they are any slower without memcached. 206 * 207 * If caching is used, this number might be out of date for a minute. 208 * 209 * @return int 210 * @throws JobQueueError 211 */ 212 final public function getSize() { 213 wfProfileIn( __METHOD__ ); 214 $res = $this->doGetSize(); 215 wfProfileOut( __METHOD__ ); 216 217 return $res; 218 } 219 220 /** 221 * @see JobQueue::getSize() 222 * @return int 223 */ 224 abstract protected function doGetSize(); 225 226 /** 227 * Get the number of acquired jobs (these are temporarily out of the queue). 228 * Queue classes should use caching if they are any slower without memcached. 229 * 230 * If caching is used, this number might be out of date for a minute. 231 * 232 * @return int 233 * @throws JobQueueError 234 */ 235 final public function getAcquiredCount() { 236 wfProfileIn( __METHOD__ ); 237 $res = $this->doGetAcquiredCount(); 238 wfProfileOut( __METHOD__ ); 239 240 return $res; 241 } 242 243 /** 244 * @see JobQueue::getAcquiredCount() 245 * @return int 246 */ 247 abstract protected function doGetAcquiredCount(); 248 249 /** 250 * Get the number of delayed jobs (these are temporarily out of the queue). 251 * Queue classes should use caching if they are any slower without memcached. 252 * 253 * If caching is used, this number might be out of date for a minute. 254 * 255 * @return int 256 * @throws JobQueueError 257 * @since 1.22 258 */ 259 final public function getDelayedCount() { 260 wfProfileIn( __METHOD__ ); 261 $res = $this->doGetDelayedCount(); 262 wfProfileOut( __METHOD__ ); 263 264 return $res; 265 } 266 267 /** 268 * @see JobQueue::getDelayedCount() 269 * @return int 270 */ 271 protected function doGetDelayedCount() { 272 return 0; // not implemented 273 } 274 275 /** 276 * Get the number of acquired jobs that can no longer be attempted. 277 * Queue classes should use caching if they are any slower without memcached. 278 * 279 * If caching is used, this number might be out of date for a minute. 280 * 281 * @return int 282 * @throws JobQueueError 283 */ 284 final public function getAbandonedCount() { 285 wfProfileIn( __METHOD__ ); 286 $res = $this->doGetAbandonedCount(); 287 wfProfileOut( __METHOD__ ); 288 289 return $res; 290 } 291 292 /** 293 * @see JobQueue::getAbandonedCount() 294 * @return int 295 */ 296 protected function doGetAbandonedCount() { 297 return 0; // not implemented 298 } 299 300 /** 301 * Push one or more jobs into the queue. 302 * This does not require $wgJobClasses to be set for the given job type. 303 * Outside callers should use JobQueueGroup::push() instead of this function. 304 * 305 * @param Job|array $jobs A single job or an array of Jobs 306 * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) 307 * @return void 308 * @throws JobQueueError 309 */ 310 final public function push( $jobs, $flags = 0 ) { 311 $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); 312 } 313 314 /** 315 * Push a batch of jobs into the queue. 316 * This does not require $wgJobClasses to be set for the given job type. 317 * Outside callers should use JobQueueGroup::push() instead of this function. 318 * 319 * @param array $jobs List of Jobs 320 * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) 321 * @return void 322 * @throws MWException 323 */ 324 final public function batchPush( array $jobs, $flags = 0 ) { 325 if ( !count( $jobs ) ) { 326 return; // nothing to do 327 } 328 329 foreach ( $jobs as $job ) { 330 if ( $job->getType() !== $this->type ) { 331 throw new MWException( 332 "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); 333 } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { 334 throw new MWException( 335 "Got delayed '{$job->getType()}' job; delays are not supported." ); 336 } 337 } 338 339 wfProfileIn( __METHOD__ ); 340 $this->doBatchPush( $jobs, $flags ); 341 wfProfileOut( __METHOD__ ); 342 } 343 344 /** 345 * @see JobQueue::batchPush() 346 * @param array $jobs 347 * @param int $flags 348 */ 349 abstract protected function doBatchPush( array $jobs, $flags ); 350 351 /** 352 * Pop a job off of the queue. 353 * This requires $wgJobClasses to be set for the given job type. 354 * Outside callers should use JobQueueGroup::pop() instead of this function. 355 * 356 * @throws MWException 357 * @return Job|bool Returns false if there are no jobs 358 */ 359 final public function pop() { 360 global $wgJobClasses; 361 362 if ( $this->wiki !== wfWikiID() ) { 363 throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." ); 364 } elseif ( !isset( $wgJobClasses[$this->type] ) ) { 365 // Do not pop jobs if there is no class for the queue type 366 throw new MWException( "Unrecognized job type '{$this->type}'." ); 367 } 368 369 wfProfileIn( __METHOD__ ); 370 $job = $this->doPop(); 371 wfProfileOut( __METHOD__ ); 372 373 // Flag this job as an old duplicate based on its "root" job... 374 try { 375 if ( $job && $this->isRootJobOldDuplicate( $job ) ) { 376 JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki ); 377 $job = DuplicateJob::newFromJob( $job ); // convert to a no-op 378 } 379 } catch ( MWException $e ) { 380 // don't lose jobs over this 381 } 382 383 return $job; 384 } 385 386 /** 387 * @see JobQueue::pop() 388 * @return Job 389 */ 390 abstract protected function doPop(); 391 392 /** 393 * Acknowledge that a job was completed. 394 * 395 * This does nothing for certain queue classes or if "claimTTL" is not set. 396 * Outside callers should use JobQueueGroup::ack() instead of this function. 397 * 398 * @param Job $job 399 * @return void 400 * @throws MWException 401 */ 402 final public function ack( Job $job ) { 403 if ( $job->getType() !== $this->type ) { 404 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 405 } 406 wfProfileIn( __METHOD__ ); 407 $this->doAck( $job ); 408 wfProfileOut( __METHOD__ ); 409 } 410 411 /** 412 * @see JobQueue::ack() 413 * @param Job $job 414 */ 415 abstract protected function doAck( Job $job ); 416 417 /** 418 * Register the "root job" of a given job into the queue for de-duplication. 419 * This should only be called right *after* all the new jobs have been inserted. 420 * This is used to turn older, duplicate, job entries into no-ops. The root job 421 * information will remain in the registry until it simply falls out of cache. 422 * 423 * This requires that $job has two special fields in the "params" array: 424 * - rootJobSignature : hash (e.g. SHA1) that identifies the task 425 * - rootJobTimestamp : TS_MW timestamp of this instance of the task 426 * 427 * A "root job" is a conceptual job that consist of potentially many smaller jobs 428 * that are actually inserted into the queue. For example, "refreshLinks" jobs are 429 * spawned when a template is edited. One can think of the task as "update links 430 * of pages that use template X" and an instance of that task as a "root job". 431 * However, what actually goes into the queue are range and leaf job subtypes. 432 * Since these jobs include things like page ID ranges and DB master positions, 433 * and can morph into smaller jobs recursively, simple duplicate detection 434 * for individual jobs being identical (like that of job_sha1) is not useful. 435 * 436 * In the case of "refreshLinks", if these jobs are still in the queue when the template 437 * is edited again, we want all of these old refreshLinks jobs for that template to become 438 * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing. 439 * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a 440 * previous "root job" for the same task of "update links of pages that use template X". 441 * 442 * This does nothing for certain queue classes. 443 * 444 * @param Job $job 445 * @throws MWException 446 * @return bool 447 */ 448 final public function deduplicateRootJob( Job $job ) { 449 if ( $job->getType() !== $this->type ) { 450 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 451 } 452 wfProfileIn( __METHOD__ ); 453 $ok = $this->doDeduplicateRootJob( $job ); 454 wfProfileOut( __METHOD__ ); 455 456 return $ok; 457 } 458 459 /** 460 * @see JobQueue::deduplicateRootJob() 461 * @param Job $job 462 * @throws MWException 463 * @return bool 464 */ 465 protected function doDeduplicateRootJob( Job $job ) { 466 if ( !$job->hasRootJobParams() ) { 467 throw new MWException( "Cannot register root job; missing parameters." ); 468 } 469 $params = $job->getRootJobParams(); 470 471 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 472 // Callers should call batchInsert() and then this function so that if the insert 473 // fails, the de-duplication registration will be aborted. Since the insert is 474 // deferred till "transaction idle", do the same here, so that the ordering is 475 // maintained. Having only the de-duplication registration succeed would cause 476 // jobs to become no-ops without any actual jobs that made them redundant. 477 $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job 478 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { 479 return true; // a newer version of this root job was enqueued 480 } 481 482 // Update the timestamp of the last root job started at the location... 483 return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); 484 } 485 486 /** 487 * Check if the "root" job of a given job has been superseded by a newer one 488 * 489 * @param Job $job 490 * @throws MWException 491 * @return bool 492 */ 493 final protected function isRootJobOldDuplicate( Job $job ) { 494 if ( $job->getType() !== $this->type ) { 495 throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); 496 } 497 wfProfileIn( __METHOD__ ); 498 $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); 499 wfProfileOut( __METHOD__ ); 500 501 return $isDuplicate; 502 } 503 504 /** 505 * @see JobQueue::isRootJobOldDuplicate() 506 * @param Job $job 507 * @return bool 508 */ 509 protected function doIsRootJobOldDuplicate( Job $job ) { 510 if ( !$job->hasRootJobParams() ) { 511 return false; // job has no de-deplication info 512 } 513 $params = $job->getRootJobParams(); 514 515 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 516 // Get the last time this root job was enqueued 517 $timestamp = $this->dupCache->get( $key ); 518 519 // Check if a new root job was started at the location after this one's... 520 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); 521 } 522 523 /** 524 * @param string $signature Hash identifier of the root job 525 * @return string 526 */ 527 protected function getRootJobCacheKey( $signature ) { 528 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 529 530 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); 531 } 532 533 /** 534 * Deleted all unclaimed and delayed jobs from the queue 535 * 536 * @throws JobQueueError 537 * @since 1.22 538 * @return void 539 */ 540 final public function delete() { 541 wfProfileIn( __METHOD__ ); 542 $this->doDelete(); 543 wfProfileOut( __METHOD__ ); 544 } 545 546 /** 547 * @see JobQueue::delete() 548 * @throws MWException 549 */ 550 protected function doDelete() { 551 throw new MWException( "This method is not implemented." ); 552 } 553 554 /** 555 * Wait for any slaves or backup servers to catch up. 556 * 557 * This does nothing for certain queue classes. 558 * 559 * @return void 560 * @throws JobQueueError 561 */ 562 final public function waitForBackups() { 563 wfProfileIn( __METHOD__ ); 564 $this->doWaitForBackups(); 565 wfProfileOut( __METHOD__ ); 566 } 567 568 /** 569 * @see JobQueue::waitForBackups() 570 * @return void 571 */ 572 protected function doWaitForBackups() { 573 } 574 575 /** 576 * Return a map of task names to task definition maps. 577 * A "task" is a fast periodic queue maintenance action. 578 * Mutually exclusive tasks must implement their own locking in the callback. 579 * 580 * Each task value is an associative array with: 581 * - name : the name of the task 582 * - callback : a PHP callable that performs the task 583 * - period : the period in seconds corresponding to the task frequency 584 * 585 * @return array 586 */ 587 final public function getPeriodicTasks() { 588 $tasks = $this->doGetPeriodicTasks(); 589 foreach ( $tasks as $name => &$def ) { 590 $def['name'] = $name; 591 } 592 593 return $tasks; 594 } 595 596 /** 597 * @see JobQueue::getPeriodicTasks() 598 * @return array 599 */ 600 protected function doGetPeriodicTasks() { 601 return array(); 602 } 603 604 /** 605 * Clear any process and persistent caches 606 * 607 * @return void 608 */ 609 final public function flushCaches() { 610 wfProfileIn( __METHOD__ ); 611 $this->doFlushCaches(); 612 wfProfileOut( __METHOD__ ); 613 } 614 615 /** 616 * @see JobQueue::flushCaches() 617 * @return void 618 */ 619 protected function doFlushCaches() { 620 } 621 622 /** 623 * Get an iterator to traverse over all available jobs in this queue. 624 * This does not include jobs that are currently acquired or delayed. 625 * Note: results may be stale if the queue is concurrently modified. 626 * 627 * @return Iterator 628 * @throws JobQueueError 629 */ 630 abstract public function getAllQueuedJobs(); 631 632 /** 633 * Get an iterator to traverse over all delayed jobs in this queue. 634 * Note: results may be stale if the queue is concurrently modified. 635 * 636 * @return Iterator 637 * @throws JobQueueError 638 * @since 1.22 639 */ 640 public function getAllDelayedJobs() { 641 return new ArrayIterator( array() ); // not implemented 642 } 643 644 /** 645 * Do not use this function outside of JobQueue/JobQueueGroup 646 * 647 * @return string 648 * @since 1.22 649 */ 650 public function getCoalesceLocationInternal() { 651 return null; 652 } 653 654 /** 655 * Check whether each of the given queues are empty. 656 * This is used for batching checks for queues stored at the same place. 657 * 658 * @param array $types List of queues types 659 * @return array|null (list of non-empty queue types) or null if unsupported 660 * @throws MWException 661 * @since 1.22 662 */ 663 final public function getSiblingQueuesWithJobs( array $types ) { 664 $section = new ProfileSection( __METHOD__ ); 665 666 return $this->doGetSiblingQueuesWithJobs( $types ); 667 } 668 669 /** 670 * @see JobQueue::getSiblingQueuesWithJobs() 671 * @param array $types List of queues types 672 * @return array|null (list of queue types) or null if unsupported 673 */ 674 protected function doGetSiblingQueuesWithJobs( array $types ) { 675 return null; // not supported 676 } 677 678 /** 679 * Check the size of each of the given queues. 680 * For queues not served by the same store as this one, 0 is returned. 681 * This is used for batching checks for queues stored at the same place. 682 * 683 * @param array $types List of queues types 684 * @return array|null (job type => whether queue is empty) or null if unsupported 685 * @throws MWException 686 * @since 1.22 687 */ 688 final public function getSiblingQueueSizes( array $types ) { 689 $section = new ProfileSection( __METHOD__ ); 690 691 return $this->doGetSiblingQueueSizes( $types ); 692 } 693 694 /** 695 * @see JobQueue::getSiblingQueuesSize() 696 * @param array $types List of queues types 697 * @return array|null (list of queue types) or null if unsupported 698 */ 699 protected function doGetSiblingQueueSizes( array $types ) { 700 return null; // not supported 701 } 702 703 /** 704 * Call wfIncrStats() for the queue overall and for the queue type 705 * 706 * @param string $key Event type 707 * @param string $type Job type 708 * @param int $delta 709 * @param string $wiki Wiki ID (added in 1.23) 710 * @since 1.22 711 */ 712 public static function incrStats( $key, $type, $delta = 1, $wiki = null ) { 713 wfIncrStats( $key, $delta ); 714 wfIncrStats( "{$key}-{$type}", $delta ); 715 if ( $wiki !== null ) { 716 wfIncrStats( "{$key}-{$type}-{$wiki}", $delta ); 717 } 718 } 719 720 /** 721 * Namespace the queue with a key to isolate it for testing 722 * 723 * @param string $key 724 * @return void 725 * @throws MWException 726 */ 727 public function setTestingPrefix( $key ) { 728 throw new MWException( "Queue namespacing not supported for this queue type." ); 729 } 730 } 731 732 /** 733 * @ingroup JobQueue 734 * @since 1.22 735 */ 736 class JobQueueError extends MWException { 737 } 738 739 class JobQueueConnectionError extends JobQueueError { 740 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Fri Nov 28 14:03:12 2014 | Cross-referenced by PHPXref 0.7.1 |