MediaWiki
REL1_22
|
00001 <?php 00059 class JobQueueRedis extends JobQueue { 00061 protected $redisPool; 00062 00063 protected $server; // string; server address 00064 protected $compression; // string; compression method to use 00065 00066 const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) 00067 00068 protected $key; // string; key to prefix the queue keys with (used for testing) 00069 00080 public function __construct( array $params ) { 00081 parent::__construct( $params ); 00082 $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua 00083 $this->server = $params['redisServer']; 00084 $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; 00085 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); 00086 } 00087 00088 protected function supportedOrders() { 00089 return array( 'timestamp', 'fifo' ); 00090 } 00091 00092 protected function optimalOrder() { 00093 return 'fifo'; 00094 } 00095 00096 protected function supportsDelayedJobs() { 00097 return true; 00098 } 00099 00105 protected function doIsEmpty() { 00106 return $this->doGetSize() == 0; 00107 } 00108 00114 protected function doGetSize() { 00115 $conn = $this->getConnection(); 00116 try { 00117 return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); 00118 } catch ( RedisException $e ) { 00119 $this->throwRedisException( $this->server, $conn, $e ); 00120 } 00121 } 00122 00128 protected function doGetAcquiredCount() { 00129 if ( $this->claimTTL <= 0 ) { 00130 return 0; // no acknowledgements 00131 } 00132 $conn = $this->getConnection(); 00133 try { 00134 $conn->multi( Redis::PIPELINE ); 00135 $conn->zSize( $this->getQueueKey( 'z-claimed' ) ); 00136 $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); 00137 return array_sum( $conn->exec() ); 00138 } catch ( RedisException $e ) { 00139 $this->throwRedisException( $this->server, $conn, $e ); 00140 } 00141 } 00142 00148 protected function doGetDelayedCount() { 00149 if ( !$this->checkDelay ) { 00150 return 0; // no delayed jobs 00151 } 00152 $conn = $this->getConnection(); 00153 try { 00154 return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); 00155 } catch ( RedisException $e ) { 00156 $this->throwRedisException( $this->server, $conn, $e ); 00157 } 00158 } 00159 00165 protected function doGetAbandonedCount() { 00166 if ( $this->claimTTL <= 0 ) { 00167 return 0; // no acknowledgements 00168 } 00169 $conn = $this->getConnection(); 00170 try { 00171 return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); 00172 } catch ( RedisException $e ) { 00173 $this->throwRedisException( $this->server, $conn, $e ); 00174 } 00175 } 00176 00184 protected function doBatchPush( array $jobs, $flags ) { 00185 // Convert the jobs into field maps (de-duplicated against each other) 00186 $items = array(); // (job ID => job fields map) 00187 foreach ( $jobs as $job ) { 00188 $item = $this->getNewJobFields( $job ); 00189 if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate 00190 $items[$item['sha1']] = $item; 00191 } else { 00192 $items[$item['uuid']] = $item; 00193 } 00194 } 00195 00196 if ( !count( $items ) ) { 00197 return true; // nothing to do 00198 } 00199 00200 $conn = $this->getConnection(); 00201 try { 00202 // Actually push the non-duplicate jobs into the queue... 00203 if ( $flags & self::QOS_ATOMIC ) { 00204 $batches = array( $items ); // all or nothing 00205 } else { 00206 $batches = array_chunk( $items, 500 ); // avoid tying up the server 00207 } 00208 $failed = 0; 00209 $pushed = 0; 00210 foreach ( $batches as $itemBatch ) { 00211 $added = $this->pushBlobs( $conn, $itemBatch ); 00212 if ( is_int( $added ) ) { 00213 $pushed += $added; 00214 } else { 00215 $failed += count( $itemBatch ); 00216 } 00217 } 00218 if ( $failed > 0 ) { 00219 wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); 00220 return false; 00221 } 00222 JobQueue::incrStats( 'job-insert', $this->type, count( $items ) ); 00223 JobQueue::incrStats( 'job-insert-duplicate', $this->type, 00224 count( $items ) - $failed - $pushed ); 00225 } catch ( RedisException $e ) { 00226 $this->throwRedisException( $this->server, $conn, $e ); 00227 } 00228 00229 return true; 00230 } 00231 00238 protected function pushBlobs( RedisConnRef $conn, array $items ) { 00239 $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) 00240 foreach ( $items as $item ) { 00241 $args[] = (string)$item['uuid']; 00242 $args[] = (string)$item['sha1']; 00243 $args[] = (string)$item['rtimestamp']; 00244 $args[] = (string)$this->serialize( $item ); 00245 } 00246 static $script = 00247 <<<LUA 00248 if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end 00249 local pushed = 0 00250 for i = 1,#ARGV,4 do 00251 local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] 00252 if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then 00253 if 1*rtimestamp > 0 then 00254 -- Insert into delayed queue (release time as score) 00255 redis.call('zAdd',KEYS[4],rtimestamp,id) 00256 else 00257 -- Insert into unclaimed queue 00258 redis.call('lPush',KEYS[1],id) 00259 end 00260 if sha1 ~= '' then 00261 redis.call('hSet',KEYS[2],id,sha1) 00262 redis.call('hSet',KEYS[3],sha1,id) 00263 end 00264 redis.call('hSet',KEYS[5],id,blob) 00265 pushed = pushed + 1 00266 end 00267 end 00268 return pushed 00269 LUA; 00270 return $conn->luaEval( $script, 00271 array_merge( 00272 array( 00273 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 00274 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 00275 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 00276 $this->getQueueKey( 'z-delayed' ), # KEYS[4] 00277 $this->getQueueKey( 'h-data' ), # KEYS[5] 00278 ), 00279 $args 00280 ), 00281 5 # number of first argument(s) that are keys 00282 ); 00283 } 00284 00290 protected function doPop() { 00291 $job = false; 00292 00293 // Push ready delayed jobs into the queue every 10 jobs to spread the load. 00294 // This is also done as a periodic task, but we don't want too much done at once. 00295 if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { 00296 $this->releaseReadyDelayedJobs(); 00297 } 00298 00299 $conn = $this->getConnection(); 00300 try { 00301 do { 00302 if ( $this->claimTTL > 0 ) { 00303 // Keep the claimed job list down for high-traffic queues 00304 if ( mt_rand( 0, 99 ) == 0 ) { 00305 $this->recycleAndDeleteStaleJobs(); 00306 } 00307 $blob = $this->popAndAcquireBlob( $conn ); 00308 } else { 00309 $blob = $this->popAndDeleteBlob( $conn ); 00310 } 00311 if ( $blob === false ) { 00312 break; // no jobs; nothing to do 00313 } 00314 00315 JobQueue::incrStats( 'job-pop', $this->type ); 00316 $item = $this->unserialize( $blob ); 00317 if ( $item === false ) { 00318 wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); 00319 continue; 00320 } 00321 00322 // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed 00323 $job = $this->getJobFromFields( $item ); // may be false 00324 } while ( !$job ); // job may be false if invalid 00325 } catch ( RedisException $e ) { 00326 $this->throwRedisException( $this->server, $conn, $e ); 00327 } 00328 00329 return $job; 00330 } 00331 00337 protected function popAndDeleteBlob( RedisConnRef $conn ) { 00338 static $script = 00339 <<<LUA 00340 -- Pop an item off the queue 00341 local id = redis.call('rpop',KEYS[1]) 00342 if not id then return false end 00343 -- Get the job data and remove it 00344 local item = redis.call('hGet',KEYS[4],id) 00345 redis.call('hDel',KEYS[4],id) 00346 -- Allow new duplicates of this job 00347 local sha1 = redis.call('hGet',KEYS[2],id) 00348 if sha1 then redis.call('hDel',KEYS[3],sha1) end 00349 redis.call('hDel',KEYS[2],id) 00350 -- Return the job data 00351 return item 00352 LUA; 00353 return $conn->luaEval( $script, 00354 array( 00355 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 00356 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 00357 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 00358 $this->getQueueKey( 'h-data' ), # KEYS[4] 00359 ), 00360 4 # number of first argument(s) that are keys 00361 ); 00362 } 00363 00369 protected function popAndAcquireBlob( RedisConnRef $conn ) { 00370 static $script = 00371 <<<LUA 00372 -- Pop an item off the queue 00373 local id = redis.call('rPop',KEYS[1]) 00374 if not id then return false end 00375 -- Allow new duplicates of this job 00376 local sha1 = redis.call('hGet',KEYS[2],id) 00377 if sha1 then redis.call('hDel',KEYS[3],sha1) end 00378 redis.call('hDel',KEYS[2],id) 00379 -- Mark the jobs as claimed and return it 00380 redis.call('zAdd',KEYS[4],ARGV[1],id) 00381 redis.call('hIncrBy',KEYS[5],id,1) 00382 return redis.call('hGet',KEYS[6],id) 00383 LUA; 00384 return $conn->luaEval( $script, 00385 array( 00386 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 00387 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 00388 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 00389 $this->getQueueKey( 'z-claimed' ), # KEYS[4] 00390 $this->getQueueKey( 'h-attempts' ), # KEYS[5] 00391 $this->getQueueKey( 'h-data' ), # KEYS[6] 00392 time(), # ARGV[1] (injected to be replication-safe) 00393 ), 00394 6 # number of first argument(s) that are keys 00395 ); 00396 } 00397 00404 protected function doAck( Job $job ) { 00405 if ( !isset( $job->metadata['uuid'] ) ) { 00406 throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); 00407 } 00408 if ( $this->claimTTL > 0 ) { 00409 $conn = $this->getConnection(); 00410 try { 00411 static $script = 00412 <<<LUA 00413 -- Unmark the job as claimed 00414 redis.call('zRem',KEYS[1],ARGV[1]) 00415 redis.call('hDel',KEYS[2],ARGV[1]) 00416 -- Delete the job data itself 00417 return redis.call('hDel',KEYS[3],ARGV[1]) 00418 LUA; 00419 $res = $conn->luaEval( $script, 00420 array( 00421 $this->getQueueKey( 'z-claimed' ), # KEYS[1] 00422 $this->getQueueKey( 'h-attempts' ), # KEYS[2] 00423 $this->getQueueKey( 'h-data' ), # KEYS[3] 00424 $job->metadata['uuid'] # ARGV[1] 00425 ), 00426 3 # number of first argument(s) that are keys 00427 ); 00428 00429 if ( !$res ) { 00430 wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); 00431 return false; 00432 } 00433 } catch ( RedisException $e ) { 00434 $this->throwRedisException( $this->server, $conn, $e ); 00435 } 00436 } 00437 return true; 00438 } 00439 00446 protected function doDeduplicateRootJob( Job $job ) { 00447 if ( !$job->hasRootJobParams() ) { 00448 throw new MWException( "Cannot register root job; missing parameters." ); 00449 } 00450 $params = $job->getRootJobParams(); 00451 00452 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 00453 00454 $conn = $this->getConnection(); 00455 try { 00456 $timestamp = $conn->get( $key ); // current last timestamp of this job 00457 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { 00458 return true; // a newer version of this root job was enqueued 00459 } 00460 // Update the timestamp of the last root job started at the location... 00461 return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks 00462 } catch ( RedisException $e ) { 00463 $this->throwRedisException( $this->server, $conn, $e ); 00464 } 00465 } 00466 00472 protected function doIsRootJobOldDuplicate( Job $job ) { 00473 if ( !$job->hasRootJobParams() ) { 00474 return false; // job has no de-deplication info 00475 } 00476 $params = $job->getRootJobParams(); 00477 00478 $conn = $this->getConnection(); 00479 try { 00480 // Get the last time this root job was enqueued 00481 $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); 00482 } catch ( RedisException $e ) { 00483 $this->throwRedisException( $this->server, $conn, $e ); 00484 } 00485 00486 // Check if a new root job was started at the location after this one's... 00487 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); 00488 } 00489 00494 protected function doDelete() { 00495 static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned', 00496 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ); 00497 00498 $conn = $this->getConnection(); 00499 try { 00500 $keys = array(); 00501 foreach ( $props as $prop ) { 00502 $keys[] = $this->getQueueKey( $prop ); 00503 } 00504 return ( $conn->delete( $keys ) !== false ); 00505 } catch ( RedisException $e ) { 00506 $this->throwRedisException( $this->server, $conn, $e ); 00507 } 00508 } 00509 00514 public function getAllQueuedJobs() { 00515 $conn = $this->getConnection(); 00516 try { 00517 $that = $this; 00518 return new MappedIterator( 00519 $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), 00520 function( $uid ) use ( $that, $conn ) { 00521 return $that->getJobFromUidInternal( $uid, $conn ); 00522 }, 00523 array( 'accept' => function ( $job ) { return is_object( $job ); } ) 00524 ); 00525 } catch ( RedisException $e ) { 00526 $this->throwRedisException( $this->server, $conn, $e ); 00527 } 00528 } 00529 00534 public function getAllDelayedJobs() { 00535 $conn = $this->getConnection(); 00536 try { 00537 $that = $this; 00538 return new MappedIterator( // delayed jobs 00539 $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), 00540 function( $uid ) use ( $that, $conn ) { 00541 return $that->getJobFromUidInternal( $uid, $conn ); 00542 }, 00543 array( 'accept' => function ( $job ) { return is_object( $job ); } ) 00544 ); 00545 } catch ( RedisException $e ) { 00546 $this->throwRedisException( $this->server, $conn, $e ); 00547 } 00548 } 00549 00550 public function getCoalesceLocationInternal() { 00551 return "RedisServer:" . $this->server; 00552 } 00553 00554 protected function doGetSiblingQueuesWithJobs( array $types ) { 00555 return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); 00556 } 00557 00558 protected function doGetSiblingQueueSizes( array $types ) { 00559 $sizes = array(); // (type => size) 00560 $types = array_values( $types ); // reindex 00561 try { 00562 $conn = $this->getConnection(); 00563 $conn->multi( Redis::PIPELINE ); 00564 foreach ( $types as $type ) { 00565 $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); 00566 } 00567 $res = $conn->exec(); 00568 if ( is_array( $res ) ) { 00569 foreach ( $res as $i => $size ) { 00570 $sizes[$types[$i]] = $size; 00571 } 00572 } 00573 } catch ( RedisException $e ) { 00574 $this->throwRedisException( $this->server, $conn, $e ); 00575 } 00576 return $sizes; 00577 } 00578 00587 public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { 00588 try { 00589 $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); 00590 if ( $data === false ) { 00591 return false; // not found 00592 } 00593 $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); 00594 if ( !is_array( $item ) ) { // this shouldn't happen 00595 throw new MWException( "Could not find job with ID '$uid'." ); 00596 } 00597 $title = Title::makeTitle( $item['namespace'], $item['title'] ); 00598 $job = Job::factory( $item['type'], $title, $item['params'] ); 00599 $job->metadata['uuid'] = $item['uuid']; 00600 return $job; 00601 } catch ( RedisException $e ) { 00602 $this->throwRedisException( $this->server, $conn, $e ); 00603 } 00604 } 00605 00612 public function releaseReadyDelayedJobs() { 00613 $count = 0; 00614 00615 $conn = $this->getConnection(); 00616 try { 00617 static $script = 00618 <<<LUA 00619 -- Get the list of ready delayed jobs, sorted by readiness 00620 local ids = redis.call('zRangeByScore',KEYS[1],0,ARGV[1]) 00621 -- Migrate the jobs from the "delayed" set to the "unclaimed" list 00622 for k,id in ipairs(ids) do 00623 redis.call('lPush',KEYS[2],id) 00624 redis.call('zRem',KEYS[1],id) 00625 end 00626 return #ids 00627 LUA; 00628 $count += (int)$conn->luaEval( $script, 00629 array( 00630 $this->getQueueKey( 'z-delayed' ), // KEYS[1] 00631 $this->getQueueKey( 'l-unclaimed' ), // KEYS[2] 00632 time() // ARGV[1]; max "delay until" UNIX timestamp 00633 ), 00634 2 # first two arguments are keys 00635 ); 00636 } catch ( RedisException $e ) { 00637 $this->throwRedisException( $this->server, $conn, $e ); 00638 } 00639 00640 return $count; 00641 } 00642 00649 public function recycleAndDeleteStaleJobs() { 00650 if ( $this->claimTTL <= 0 ) { // sanity 00651 throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." ); 00652 } 00653 $count = 0; 00654 // For each job item that can be retried, we need to add it back to the 00655 // main queue and remove it from the list of currenty claimed job items. 00656 // For those that cannot, they are marked as dead and kept around for 00657 // investigation and manual job restoration but are eventually deleted. 00658 $conn = $this->getConnection(); 00659 try { 00660 $now = time(); 00661 static $script = 00662 <<<LUA 00663 local released,abandoned,pruned = 0,0,0 00664 -- Get all non-dead jobs that have an expired claim on them. 00665 -- The score for each item is the last claim timestamp (UNIX). 00666 local staleClaims = redis.call('zRangeByScore',KEYS[1],0,ARGV[1]) 00667 for k,id in ipairs(staleClaims) do 00668 local timestamp = redis.call('zScore',KEYS[1],id) 00669 local attempts = redis.call('hGet',KEYS[2],id) 00670 if attempts < ARGV[3] then 00671 -- Claim expired and retries left: re-enqueue the job 00672 redis.call('lPush',KEYS[3],id) 00673 redis.call('hIncrBy',KEYS[2],id,1) 00674 released = released + 1 00675 else 00676 -- Claim expired and no retries left: mark the job as dead 00677 redis.call('zAdd',KEYS[5],timestamp,id) 00678 abandoned = abandoned + 1 00679 end 00680 redis.call('zRem',KEYS[1],id) 00681 end 00682 -- Get all of the dead jobs that have been marked as dead for too long. 00683 -- The score for each item is the last claim timestamp (UNIX). 00684 local deadClaims = redis.call('zRangeByScore',KEYS[5],0,ARGV[2]) 00685 for k,id in ipairs(deadClaims) do 00686 -- Stale and out of retries: remove any traces of the job 00687 redis.call('zRem',KEYS[5],id) 00688 redis.call('hDel',KEYS[2],id) 00689 redis.call('hDel',KEYS[4],id) 00690 pruned = pruned + 1 00691 end 00692 return {released,abandoned,pruned} 00693 LUA; 00694 $res = $conn->luaEval( $script, 00695 array( 00696 $this->getQueueKey( 'z-claimed' ), # KEYS[1] 00697 $this->getQueueKey( 'h-attempts' ), # KEYS[2] 00698 $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] 00699 $this->getQueueKey( 'h-data' ), # KEYS[4] 00700 $this->getQueueKey( 'z-abandoned' ), # KEYS[5] 00701 $now - $this->claimTTL, # ARGV[1] 00702 $now - self::MAX_AGE_PRUNE, # ARGV[2] 00703 $this->maxTries # ARGV[3] 00704 ), 00705 5 # number of first argument(s) that are keys 00706 ); 00707 if ( $res ) { 00708 list( $released, $abandoned, $pruned ) = $res; 00709 $count += $released + $pruned; 00710 JobQueue::incrStats( 'job-recycle', $this->type, $released ); 00711 JobQueue::incrStats( 'job-abandon', $this->type, $abandoned ); 00712 } 00713 } catch ( RedisException $e ) { 00714 $this->throwRedisException( $this->server, $conn, $e ); 00715 } 00716 00717 return $count; 00718 } 00719 00723 protected function doGetPeriodicTasks() { 00724 $tasks = array(); 00725 if ( $this->claimTTL > 0 ) { 00726 $tasks['recycleAndDeleteStaleJobs'] = array( 00727 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), 00728 'period' => ceil( $this->claimTTL / 2 ) 00729 ); 00730 } 00731 if ( $this->checkDelay ) { 00732 $tasks['releaseReadyDelayedJobs'] = array( 00733 'callback' => array( $this, 'releaseReadyDelayedJobs' ), 00734 'period' => 300 // 5 minutes 00735 ); 00736 } 00737 return $tasks; 00738 } 00739 00744 protected function getNewJobFields( Job $job ) { 00745 return array( 00746 // Fields that describe the nature of the job 00747 'type' => $job->getType(), 00748 'namespace' => $job->getTitle()->getNamespace(), 00749 'title' => $job->getTitle()->getDBkey(), 00750 'params' => $job->getParams(), 00751 // Some jobs cannot run until a "release timestamp" 00752 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, 00753 // Additional job metadata 00754 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), 00755 'sha1' => $job->ignoreDuplicates() 00756 ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) 00757 : '', 00758 'timestamp' => time() // UNIX timestamp 00759 ); 00760 } 00761 00766 protected function getJobFromFields( array $fields ) { 00767 $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); 00768 if ( $title ) { 00769 $job = Job::factory( $fields['type'], $title, $fields['params'] ); 00770 $job->metadata['uuid'] = $fields['uuid']; 00771 return $job; 00772 } 00773 return false; 00774 } 00775 00780 protected function serialize( array $fields ) { 00781 $blob = serialize( $fields ); 00782 if ( $this->compression === 'gzip' 00783 && strlen( $blob ) >= 1024 && function_exists( 'gzdeflate' ) ) 00784 { 00785 $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ); 00786 $blobz = serialize( $object ); 00787 return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; 00788 } else { 00789 return $blob; 00790 } 00791 } 00792 00797 protected function unserialize( $blob ) { 00798 $fields = unserialize( $blob ); 00799 if ( is_object( $fields ) ) { 00800 if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { 00801 $fields = unserialize( gzinflate( $fields->blob ) ); 00802 } else { 00803 $fields = false; 00804 } 00805 } 00806 return is_array( $fields ) ? $fields : false; 00807 } 00808 00815 protected function getConnection() { 00816 $conn = $this->redisPool->getConnection( $this->server ); 00817 if ( !$conn ) { 00818 throw new JobQueueConnectionError( "Unable to connect to redis server." ); 00819 } 00820 return $conn; 00821 } 00822 00829 protected function throwRedisException( $server, RedisConnRef $conn, $e ) { 00830 $this->redisPool->handleException( $server, $conn, $e ); 00831 throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); 00832 } 00833 00839 private function getQueueKey( $prop, $type = null ) { 00840 $type = is_string( $type ) ? $type : $this->type; 00841 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00842 if ( strlen( $this->key ) ) { // namespaced queue (for testing) 00843 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop ); 00844 } else { 00845 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop ); 00846 } 00847 } 00848 00853 public function setTestingPrefix( $key ) { 00854 $this->key = $key; 00855 } 00856 }