[ Index ] |
PHP Cross Reference of MediaWiki-1.24.0 |
[Summary view] [Print] [Text view]
1 <?php 2 /** 3 * Redis-backed job queue code. 4 * 5 * This program is free software; you can redistribute it and/or modify 6 * it under the terms of the GNU General Public License as published by 7 * the Free Software Foundation; either version 2 of the License, or 8 * (at your option) any later version. 9 * 10 * This program is distributed in the hope that it will be useful, 11 * but WITHOUT ANY WARRANTY; without even the implied warranty of 12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13 * GNU General Public License for more details. 14 * 15 * You should have received a copy of the GNU General Public License along 16 * with this program; if not, write to the Free Software Foundation, Inc., 17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. 18 * http://www.gnu.org/copyleft/gpl.html 19 * 20 * @file 21 * @author Aaron Schulz 22 */ 23 24 /** 25 * Class to handle job queues stored in Redis 26 * 27 * This is faster, less resource intensive, queue that JobQueueDB. 28 * All data for a queue using this class is placed into one redis server. 29 * 30 * There are eight main redis keys used to track jobs: 31 * - l-unclaimed : A list of job IDs used for ready unclaimed jobs 32 * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries 33 * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs 34 * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs 35 * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication 36 * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication 37 * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries 38 * - h-data : A hash of (job ID => serialized blobs) for job storage 39 * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. 40 * If an ID appears in any of those lists, it should have a h-data entry for its ID. 41 * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then 42 * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById 43 * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its 44 * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. 45 * 46 * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. 47 * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. 48 * All the keys are prefixed with the relevant wiki ID information. 49 * 50 * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. 51 * Additionally, it should be noted that redis has different persistence modes, such 52 * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be 53 * made on the servers based on what queues are using it and what tolerance they have. 54 * 55 * @ingroup JobQueue 56 * @ingroup Redis 57 * @since 1.22 58 */ 59 class JobQueueRedis extends JobQueue { 60 /** @var RedisConnectionPool */ 61 protected $redisPool; 62 63 /** @var string Server address */ 64 protected $server; 65 /** @var string Compression method to use */ 66 protected $compression; 67 /** @var bool */ 68 protected $daemonized; 69 70 const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) 71 72 /** @var string Key to prefix the queue keys with (used for testing) */ 73 protected $key; 74 75 /** 76 * @param array $params Possible keys: 77 * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). 78 * Note that the serializer option is ignored as "none" is always used. 79 * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. 80 * If a hostname is specified but no port, the standard port number 81 * 6379 will be used. Required. 82 * - compression : The type of compression to use; one of (none,gzip). 83 * - daemonized : Set to true if the redisJobRunnerService runs in the background. 84 * This will disable job recycling/undelaying from the MediaWiki side 85 * to avoid redundance and out-of-sync configuration. 86 */ 87 public function __construct( array $params ) { 88 parent::__construct( $params ); 89 $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua 90 $this->server = $params['redisServer']; 91 $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; 92 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); 93 $this->daemonized = !empty( $params['daemonized'] ); 94 } 95 96 protected function supportedOrders() { 97 return array( 'timestamp', 'fifo' ); 98 } 99 100 protected function optimalOrder() { 101 return 'fifo'; 102 } 103 104 protected function supportsDelayedJobs() { 105 return true; 106 } 107 108 /** 109 * @see JobQueue::doIsEmpty() 110 * @return bool 111 * @throws MWException 112 */ 113 protected function doIsEmpty() { 114 return $this->doGetSize() == 0; 115 } 116 117 /** 118 * @see JobQueue::doGetSize() 119 * @return int 120 * @throws MWException 121 */ 122 protected function doGetSize() { 123 $conn = $this->getConnection(); 124 try { 125 return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); 126 } catch ( RedisException $e ) { 127 $this->throwRedisException( $conn, $e ); 128 } 129 } 130 131 /** 132 * @see JobQueue::doGetAcquiredCount() 133 * @return int 134 * @throws JobQueueError 135 */ 136 protected function doGetAcquiredCount() { 137 if ( $this->claimTTL <= 0 ) { 138 return 0; // no acknowledgements 139 } 140 $conn = $this->getConnection(); 141 try { 142 $conn->multi( Redis::PIPELINE ); 143 $conn->zSize( $this->getQueueKey( 'z-claimed' ) ); 144 $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); 145 146 return array_sum( $conn->exec() ); 147 } catch ( RedisException $e ) { 148 $this->throwRedisException( $conn, $e ); 149 } 150 } 151 152 /** 153 * @see JobQueue::doGetDelayedCount() 154 * @return int 155 * @throws JobQueueError 156 */ 157 protected function doGetDelayedCount() { 158 if ( !$this->checkDelay ) { 159 return 0; // no delayed jobs 160 } 161 $conn = $this->getConnection(); 162 try { 163 return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); 164 } catch ( RedisException $e ) { 165 $this->throwRedisException( $conn, $e ); 166 } 167 } 168 169 /** 170 * @see JobQueue::doGetAbandonedCount() 171 * @return int 172 * @throws JobQueueError 173 */ 174 protected function doGetAbandonedCount() { 175 if ( $this->claimTTL <= 0 ) { 176 return 0; // no acknowledgements 177 } 178 $conn = $this->getConnection(); 179 try { 180 return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); 181 } catch ( RedisException $e ) { 182 $this->throwRedisException( $conn, $e ); 183 } 184 } 185 186 /** 187 * @see JobQueue::doBatchPush() 188 * @param array $jobs 189 * @param int $flags 190 * @return void 191 * @throws JobQueueError 192 */ 193 protected function doBatchPush( array $jobs, $flags ) { 194 // Convert the jobs into field maps (de-duplicated against each other) 195 $items = array(); // (job ID => job fields map) 196 foreach ( $jobs as $job ) { 197 $item = $this->getNewJobFields( $job ); 198 if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate 199 $items[$item['sha1']] = $item; 200 } else { 201 $items[$item['uuid']] = $item; 202 } 203 } 204 205 if ( !count( $items ) ) { 206 return; // nothing to do 207 } 208 209 $conn = $this->getConnection(); 210 try { 211 // Actually push the non-duplicate jobs into the queue... 212 if ( $flags & self::QOS_ATOMIC ) { 213 $batches = array( $items ); // all or nothing 214 } else { 215 $batches = array_chunk( $items, 500 ); // avoid tying up the server 216 } 217 $failed = 0; 218 $pushed = 0; 219 foreach ( $batches as $itemBatch ) { 220 $added = $this->pushBlobs( $conn, $itemBatch ); 221 if ( is_int( $added ) ) { 222 $pushed += $added; 223 } else { 224 $failed += count( $itemBatch ); 225 } 226 } 227 if ( $failed > 0 ) { 228 wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); 229 230 throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." ); 231 } 232 JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki ); 233 JobQueue::incrStats( 'job-insert-duplicate', $this->type, 234 count( $items ) - $failed - $pushed, $this->wiki ); 235 } catch ( RedisException $e ) { 236 $this->throwRedisException( $conn, $e ); 237 } 238 } 239 240 /** 241 * @param RedisConnRef $conn 242 * @param array $items List of results from JobQueueRedis::getNewJobFields() 243 * @return int Number of jobs inserted (duplicates are ignored) 244 * @throws RedisException 245 */ 246 protected function pushBlobs( RedisConnRef $conn, array $items ) { 247 $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) 248 foreach ( $items as $item ) { 249 $args[] = (string)$item['uuid']; 250 $args[] = (string)$item['sha1']; 251 $args[] = (string)$item['rtimestamp']; 252 $args[] = (string)$this->serialize( $item ); 253 } 254 static $script = 255 <<<LUA 256 local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS) 257 if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end 258 local pushed = 0 259 for i = 1,#ARGV,4 do 260 local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] 261 if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then 262 if 1*rtimestamp > 0 then 263 -- Insert into delayed queue (release time as score) 264 redis.call('zAdd',kDelayed,rtimestamp,id) 265 else 266 -- Insert into unclaimed queue 267 redis.call('lPush',kUnclaimed,id) 268 end 269 if sha1 ~= '' then 270 redis.call('hSet',kSha1ById,id,sha1) 271 redis.call('hSet',kIdBySha1,sha1,id) 272 end 273 redis.call('hSet',kData,id,blob) 274 pushed = pushed + 1 275 end 276 end 277 return pushed 278 LUA; 279 return $conn->luaEval( $script, 280 array_merge( 281 array( 282 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 283 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 284 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 285 $this->getQueueKey( 'z-delayed' ), # KEYS[4] 286 $this->getQueueKey( 'h-data' ), # KEYS[5] 287 ), 288 $args 289 ), 290 5 # number of first argument(s) that are keys 291 ); 292 } 293 294 /** 295 * @see JobQueue::doPop() 296 * @return Job|bool 297 * @throws JobQueueError 298 */ 299 protected function doPop() { 300 $job = false; 301 302 // Push ready delayed jobs into the queue every 10 jobs to spread the load. 303 // This is also done as a periodic task, but we don't want too much done at once. 304 if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { 305 $this->recyclePruneAndUndelayJobs(); 306 } 307 308 $conn = $this->getConnection(); 309 try { 310 do { 311 if ( $this->claimTTL > 0 ) { 312 // Keep the claimed job list down for high-traffic queues 313 if ( mt_rand( 0, 99 ) == 0 ) { 314 $this->recyclePruneAndUndelayJobs(); 315 } 316 $blob = $this->popAndAcquireBlob( $conn ); 317 } else { 318 $blob = $this->popAndDeleteBlob( $conn ); 319 } 320 if ( !is_string( $blob ) ) { 321 break; // no jobs; nothing to do 322 } 323 324 JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki ); 325 $item = $this->unserialize( $blob ); 326 if ( $item === false ) { 327 wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); 328 continue; 329 } 330 331 // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed 332 $job = $this->getJobFromFields( $item ); // may be false 333 } while ( !$job ); // job may be false if invalid 334 } catch ( RedisException $e ) { 335 $this->throwRedisException( $conn, $e ); 336 } 337 338 return $job; 339 } 340 341 /** 342 * @param RedisConnRef $conn 343 * @return array Serialized string or false 344 * @throws RedisException 345 */ 346 protected function popAndDeleteBlob( RedisConnRef $conn ) { 347 static $script = 348 <<<LUA 349 local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS) 350 -- Pop an item off the queue 351 local id = redis.call('rpop',kUnclaimed) 352 if not id then return false end 353 -- Get the job data and remove it 354 local item = redis.call('hGet',kData,id) 355 redis.call('hDel',kData,id) 356 -- Allow new duplicates of this job 357 local sha1 = redis.call('hGet',kSha1ById,id) 358 if sha1 then redis.call('hDel',kIdBySha1,sha1) end 359 redis.call('hDel',kSha1ById,id) 360 -- Return the job data 361 return item 362 LUA; 363 return $conn->luaEval( $script, 364 array( 365 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 366 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 367 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 368 $this->getQueueKey( 'h-data' ), # KEYS[4] 369 ), 370 4 # number of first argument(s) that are keys 371 ); 372 } 373 374 /** 375 * @param RedisConnRef $conn 376 * @return array Serialized string or false 377 * @throws RedisException 378 */ 379 protected function popAndAcquireBlob( RedisConnRef $conn ) { 380 static $script = 381 <<<LUA 382 local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS) 383 -- Pop an item off the queue 384 local id = redis.call('rPop',kUnclaimed) 385 if not id then return false end 386 -- Allow new duplicates of this job 387 local sha1 = redis.call('hGet',kSha1ById,id) 388 if sha1 then redis.call('hDel',kIdBySha1,sha1) end 389 redis.call('hDel',kSha1ById,id) 390 -- Mark the jobs as claimed and return it 391 redis.call('zAdd',kClaimed,ARGV[1],id) 392 redis.call('hIncrBy',kAttempts,id,1) 393 return redis.call('hGet',kData,id) 394 LUA; 395 return $conn->luaEval( $script, 396 array( 397 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 398 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 399 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 400 $this->getQueueKey( 'z-claimed' ), # KEYS[4] 401 $this->getQueueKey( 'h-attempts' ), # KEYS[5] 402 $this->getQueueKey( 'h-data' ), # KEYS[6] 403 time(), # ARGV[1] (injected to be replication-safe) 404 ), 405 6 # number of first argument(s) that are keys 406 ); 407 } 408 409 /** 410 * @see JobQueue::doAck() 411 * @param Job $job 412 * @return Job|bool 413 * @throws MWException|JobQueueError 414 */ 415 protected function doAck( Job $job ) { 416 if ( !isset( $job->metadata['uuid'] ) ) { 417 throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); 418 } 419 if ( $this->claimTTL > 0 ) { 420 $conn = $this->getConnection(); 421 try { 422 static $script = 423 <<<LUA 424 local kClaimed, kAttempts, kData = unpack(KEYS) 425 -- Unmark the job as claimed 426 redis.call('zRem',kClaimed,ARGV[1]) 427 redis.call('hDel',kAttempts,ARGV[1]) 428 -- Delete the job data itself 429 return redis.call('hDel',kData,ARGV[1]) 430 LUA; 431 $res = $conn->luaEval( $script, 432 array( 433 $this->getQueueKey( 'z-claimed' ), # KEYS[1] 434 $this->getQueueKey( 'h-attempts' ), # KEYS[2] 435 $this->getQueueKey( 'h-data' ), # KEYS[3] 436 $job->metadata['uuid'] # ARGV[1] 437 ), 438 3 # number of first argument(s) that are keys 439 ); 440 441 if ( !$res ) { 442 wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); 443 444 return false; 445 } 446 } catch ( RedisException $e ) { 447 $this->throwRedisException( $conn, $e ); 448 } 449 } 450 451 return true; 452 } 453 454 /** 455 * @see JobQueue::doDeduplicateRootJob() 456 * @param Job $job 457 * @return bool 458 * @throws MWException|JobQueueError 459 */ 460 protected function doDeduplicateRootJob( Job $job ) { 461 if ( !$job->hasRootJobParams() ) { 462 throw new MWException( "Cannot register root job; missing parameters." ); 463 } 464 $params = $job->getRootJobParams(); 465 466 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 467 468 $conn = $this->getConnection(); 469 try { 470 $timestamp = $conn->get( $key ); // current last timestamp of this job 471 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { 472 return true; // a newer version of this root job was enqueued 473 } 474 475 // Update the timestamp of the last root job started at the location... 476 return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks 477 } catch ( RedisException $e ) { 478 $this->throwRedisException( $conn, $e ); 479 } 480 } 481 482 /** 483 * @see JobQueue::doIsRootJobOldDuplicate() 484 * @param Job $job 485 * @return bool 486 * @throws JobQueueError 487 */ 488 protected function doIsRootJobOldDuplicate( Job $job ) { 489 if ( !$job->hasRootJobParams() ) { 490 return false; // job has no de-deplication info 491 } 492 $params = $job->getRootJobParams(); 493 494 $conn = $this->getConnection(); 495 try { 496 // Get the last time this root job was enqueued 497 $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); 498 } catch ( RedisException $e ) { 499 $this->throwRedisException( $conn, $e ); 500 } 501 502 // Check if a new root job was started at the location after this one's... 503 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); 504 } 505 506 /** 507 * @see JobQueue::doDelete() 508 * @return bool 509 * @throws JobQueueError 510 */ 511 protected function doDelete() { 512 static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned', 513 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ); 514 515 $conn = $this->getConnection(); 516 try { 517 $keys = array(); 518 foreach ( $props as $prop ) { 519 $keys[] = $this->getQueueKey( $prop ); 520 } 521 522 return ( $conn->delete( $keys ) !== false ); 523 } catch ( RedisException $e ) { 524 $this->throwRedisException( $conn, $e ); 525 } 526 } 527 528 /** 529 * @see JobQueue::getAllQueuedJobs() 530 * @return Iterator 531 */ 532 public function getAllQueuedJobs() { 533 $conn = $this->getConnection(); 534 try { 535 $that = $this; 536 537 return new MappedIterator( 538 $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), 539 function ( $uid ) use ( $that, $conn ) { 540 return $that->getJobFromUidInternal( $uid, $conn ); 541 }, 542 array( 'accept' => function ( $job ) { 543 return is_object( $job ); 544 } ) 545 ); 546 } catch ( RedisException $e ) { 547 $this->throwRedisException( $conn, $e ); 548 } 549 } 550 551 /** 552 * @see JobQueue::getAllQueuedJobs() 553 * @return Iterator 554 */ 555 public function getAllDelayedJobs() { 556 $conn = $this->getConnection(); 557 try { 558 $that = $this; 559 560 return new MappedIterator( // delayed jobs 561 $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), 562 function ( $uid ) use ( $that, $conn ) { 563 return $that->getJobFromUidInternal( $uid, $conn ); 564 }, 565 array( 'accept' => function ( $job ) { 566 return is_object( $job ); 567 } ) 568 ); 569 } catch ( RedisException $e ) { 570 $this->throwRedisException( $conn, $e ); 571 } 572 } 573 574 public function getCoalesceLocationInternal() { 575 return "RedisServer:" . $this->server; 576 } 577 578 protected function doGetSiblingQueuesWithJobs( array $types ) { 579 return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); 580 } 581 582 protected function doGetSiblingQueueSizes( array $types ) { 583 $sizes = array(); // (type => size) 584 $types = array_values( $types ); // reindex 585 $conn = $this->getConnection(); 586 try { 587 $conn->multi( Redis::PIPELINE ); 588 foreach ( $types as $type ) { 589 $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); 590 } 591 $res = $conn->exec(); 592 if ( is_array( $res ) ) { 593 foreach ( $res as $i => $size ) { 594 $sizes[$types[$i]] = $size; 595 } 596 } 597 } catch ( RedisException $e ) { 598 $this->throwRedisException( $conn, $e ); 599 } 600 601 return $sizes; 602 } 603 604 /** 605 * This function should not be called outside JobQueueRedis 606 * 607 * @param string $uid 608 * @param RedisConnRef $conn 609 * @return Job|bool Returns false if the job does not exist 610 * @throws MWException|JobQueueError 611 */ 612 public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { 613 try { 614 $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); 615 if ( $data === false ) { 616 return false; // not found 617 } 618 $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); 619 if ( !is_array( $item ) ) { // this shouldn't happen 620 throw new MWException( "Could not find job with ID '$uid'." ); 621 } 622 $title = Title::makeTitle( $item['namespace'], $item['title'] ); 623 $job = Job::factory( $item['type'], $title, $item['params'] ); 624 $job->metadata['uuid'] = $item['uuid']; 625 626 return $job; 627 } catch ( RedisException $e ) { 628 $this->throwRedisException( $conn, $e ); 629 } 630 } 631 632 /** 633 * Recycle or destroy any jobs that have been claimed for too long 634 * and release any ready delayed jobs into the queue 635 * 636 * @return int Number of jobs recycled/deleted/undelayed 637 * @throws MWException|JobQueueError 638 */ 639 public function recyclePruneAndUndelayJobs() { 640 $count = 0; 641 // For each job item that can be retried, we need to add it back to the 642 // main queue and remove it from the list of currenty claimed job items. 643 // For those that cannot, they are marked as dead and kept around for 644 // investigation and manual job restoration but are eventually deleted. 645 $conn = $this->getConnection(); 646 try { 647 $now = time(); 648 static $script = 649 <<<LUA 650 local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS) 651 local released,abandoned,pruned,undelayed = 0,0,0,0 652 -- Get all non-dead jobs that have an expired claim on them. 653 -- The score for each item is the last claim timestamp (UNIX). 654 local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1]) 655 for k,id in ipairs(staleClaims) do 656 local timestamp = redis.call('zScore',kClaimed,id) 657 local attempts = redis.call('hGet',kAttempts,id) 658 if attempts < ARGV[3] then 659 -- Claim expired and retries left: re-enqueue the job 660 redis.call('lPush',kUnclaimed,id) 661 redis.call('hIncrBy',kAttempts,id,1) 662 released = released + 1 663 else 664 -- Claim expired and no retries left: mark the job as dead 665 redis.call('zAdd',kAbandoned,timestamp,id) 666 abandoned = abandoned + 1 667 end 668 redis.call('zRem',kClaimed,id) 669 end 670 -- Get all of the dead jobs that have been marked as dead for too long. 671 -- The score for each item is the last claim timestamp (UNIX). 672 local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2]) 673 for k,id in ipairs(deadClaims) do 674 -- Stale and out of retries: remove any traces of the job 675 redis.call('zRem',kAbandoned,id) 676 redis.call('hDel',kAttempts,id) 677 redis.call('hDel',kData,id) 678 pruned = pruned + 1 679 end 680 -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp) 681 local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4]) 682 -- Migrate the jobs from the "delayed" set to the "unclaimed" list 683 for k,id in ipairs(ids) do 684 redis.call('lPush',kUnclaimed,id) 685 redis.call('zRem',kDelayed,id) 686 end 687 undelayed = #ids 688 return {released,abandoned,pruned,undelayed} 689 LUA; 690 $res = $conn->luaEval( $script, 691 array( 692 $this->getQueueKey( 'z-claimed' ), # KEYS[1] 693 $this->getQueueKey( 'h-attempts' ), # KEYS[2] 694 $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] 695 $this->getQueueKey( 'h-data' ), # KEYS[4] 696 $this->getQueueKey( 'z-abandoned' ), # KEYS[5] 697 $this->getQueueKey( 'z-delayed' ), # KEYS[6] 698 $now - $this->claimTTL, # ARGV[1] 699 $now - self::MAX_AGE_PRUNE, # ARGV[2] 700 $this->maxTries, # ARGV[3] 701 $now # ARGV[4] 702 ), 703 6 # number of first argument(s) that are keys 704 ); 705 if ( $res ) { 706 list( $released, $abandoned, $pruned, $undelayed ) = $res; 707 $count += $released + $pruned + $undelayed; 708 JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki ); 709 JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki ); 710 JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki ); 711 } 712 } catch ( RedisException $e ) { 713 $this->throwRedisException( $conn, $e ); 714 } 715 716 return $count; 717 } 718 719 /** 720 * @return array 721 */ 722 protected function doGetPeriodicTasks() { 723 if ( $this->daemonized ) { 724 return array(); // managed in the runner loop 725 } 726 $periods = array( 3600 ); // standard cleanup (useful on config change) 727 if ( $this->claimTTL > 0 ) { 728 $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing 729 } 730 if ( $this->checkDelay ) { 731 $periods[] = 300; // 5 minutes 732 } 733 $period = min( $periods ); 734 $period = max( $period, 30 ); // sanity 735 736 return array( 737 'recyclePruneAndUndelayJobs' => array( 738 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ), 739 'period' => $period, 740 ) 741 ); 742 } 743 744 /** 745 * @param IJobSpecification $job 746 * @return array 747 */ 748 protected function getNewJobFields( IJobSpecification $job ) { 749 return array( 750 // Fields that describe the nature of the job 751 'type' => $job->getType(), 752 'namespace' => $job->getTitle()->getNamespace(), 753 'title' => $job->getTitle()->getDBkey(), 754 'params' => $job->getParams(), 755 // Some jobs cannot run until a "release timestamp" 756 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, 757 // Additional job metadata 758 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), 759 'sha1' => $job->ignoreDuplicates() 760 ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) 761 : '', 762 'timestamp' => time() // UNIX timestamp 763 ); 764 } 765 766 /** 767 * @param array $fields 768 * @return Job|bool 769 */ 770 protected function getJobFromFields( array $fields ) { 771 $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); 772 if ( $title ) { 773 $job = Job::factory( $fields['type'], $title, $fields['params'] ); 774 $job->metadata['uuid'] = $fields['uuid']; 775 776 return $job; 777 } 778 779 return false; 780 } 781 782 /** 783 * @param array $fields 784 * @return string Serialized and possibly compressed version of $fields 785 */ 786 protected function serialize( array $fields ) { 787 $blob = serialize( $fields ); 788 if ( $this->compression === 'gzip' 789 && strlen( $blob ) >= 1024 790 && function_exists( 'gzdeflate' ) 791 ) { 792 $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ); 793 $blobz = serialize( $object ); 794 795 return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; 796 } else { 797 return $blob; 798 } 799 } 800 801 /** 802 * @param string $blob 803 * @return array|bool Unserialized version of $blob or false 804 */ 805 protected function unserialize( $blob ) { 806 $fields = unserialize( $blob ); 807 if ( is_object( $fields ) ) { 808 if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { 809 $fields = unserialize( gzinflate( $fields->blob ) ); 810 } else { 811 $fields = false; 812 } 813 } 814 815 return is_array( $fields ) ? $fields : false; 816 } 817 818 /** 819 * Get a connection to the server that handles all sub-queues for this queue 820 * 821 * @return RedisConnRef 822 * @throws JobQueueConnectionError 823 */ 824 protected function getConnection() { 825 $conn = $this->redisPool->getConnection( $this->server ); 826 if ( !$conn ) { 827 throw new JobQueueConnectionError( "Unable to connect to redis server." ); 828 } 829 830 return $conn; 831 } 832 833 /** 834 * @param RedisConnRef $conn 835 * @param RedisException $e 836 * @throws JobQueueError 837 */ 838 protected function throwRedisException( RedisConnRef $conn, $e ) { 839 $this->redisPool->handleError( $conn, $e ); 840 throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); 841 } 842 843 /** 844 * @param string $prop 845 * @param string|null $type 846 * @return string 847 */ 848 private function getQueueKey( $prop, $type = null ) { 849 $type = is_string( $type ) ? $type : $this->type; 850 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 851 if ( strlen( $this->key ) ) { // namespaced queue (for testing) 852 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop ); 853 } else { 854 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop ); 855 } 856 } 857 858 /** 859 * @param string $key 860 * @return void 861 */ 862 public function setTestingPrefix( $key ) { 863 $this->key = $key; 864 } 865 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Fri Nov 28 14:03:12 2014 | Cross-referenced by PHPXref 0.7.1 |