MediaWiki
REL1_23
|
00001 <?php 00059 class JobQueueRedis extends JobQueue { 00061 protected $redisPool; 00062 00064 protected $server; 00065 00067 protected $compression; 00068 00069 const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) 00070 00072 protected $key; 00073 00078 protected $maximumPeriodicTaskSeconds; 00079 00094 public function __construct( array $params ) { 00095 parent::__construct( $params ); 00096 $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua 00097 $this->server = $params['redisServer']; 00098 $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; 00099 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); 00100 $this->maximumPeriodicTaskSeconds = isset( $params['maximumPeriodicTaskSeconds'] ) ? 00101 $params['maximumPeriodicTaskSeconds'] : null; 00102 } 00103 00104 protected function supportedOrders() { 00105 return array( 'timestamp', 'fifo' ); 00106 } 00107 00108 protected function optimalOrder() { 00109 return 'fifo'; 00110 } 00111 00112 protected function supportsDelayedJobs() { 00113 return true; 00114 } 00115 00121 protected function doIsEmpty() { 00122 return $this->doGetSize() == 0; 00123 } 00124 00130 protected function doGetSize() { 00131 $conn = $this->getConnection(); 00132 try { 00133 return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); 00134 } catch ( RedisException $e ) { 00135 $this->throwRedisException( $conn, $e ); 00136 } 00137 } 00138 00144 protected function doGetAcquiredCount() { 00145 if ( $this->claimTTL <= 0 ) { 00146 return 0; // no acknowledgements 00147 } 00148 $conn = $this->getConnection(); 00149 try { 00150 $conn->multi( Redis::PIPELINE ); 00151 $conn->zSize( $this->getQueueKey( 'z-claimed' ) ); 00152 $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); 00153 00154 return array_sum( $conn->exec() ); 00155 } catch ( RedisException $e ) { 00156 $this->throwRedisException( $conn, $e ); 00157 } 00158 } 00159 00165 protected function doGetDelayedCount() { 00166 if ( !$this->checkDelay ) { 00167 return 0; // no delayed jobs 00168 } 00169 $conn = $this->getConnection(); 00170 try { 00171 return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); 00172 } catch ( RedisException $e ) { 00173 $this->throwRedisException( $conn, $e ); 00174 } 00175 } 00176 00182 protected function doGetAbandonedCount() { 00183 if ( $this->claimTTL <= 0 ) { 00184 return 0; // no acknowledgements 00185 } 00186 $conn = $this->getConnection(); 00187 try { 00188 return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); 00189 } catch ( RedisException $e ) { 00190 $this->throwRedisException( $conn, $e ); 00191 } 00192 } 00193 00201 protected function doBatchPush( array $jobs, $flags ) { 00202 // Convert the jobs into field maps (de-duplicated against each other) 00203 $items = array(); // (job ID => job fields map) 00204 foreach ( $jobs as $job ) { 00205 $item = $this->getNewJobFields( $job ); 00206 if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate 00207 $items[$item['sha1']] = $item; 00208 } else { 00209 $items[$item['uuid']] = $item; 00210 } 00211 } 00212 00213 if ( !count( $items ) ) { 00214 return true; // nothing to do 00215 } 00216 00217 $conn = $this->getConnection(); 00218 try { 00219 // Actually push the non-duplicate jobs into the queue... 00220 if ( $flags & self::QOS_ATOMIC ) { 00221 $batches = array( $items ); // all or nothing 00222 } else { 00223 $batches = array_chunk( $items, 500 ); // avoid tying up the server 00224 } 00225 $failed = 0; 00226 $pushed = 0; 00227 foreach ( $batches as $itemBatch ) { 00228 $added = $this->pushBlobs( $conn, $itemBatch ); 00229 if ( is_int( $added ) ) { 00230 $pushed += $added; 00231 } else { 00232 $failed += count( $itemBatch ); 00233 } 00234 } 00235 if ( $failed > 0 ) { 00236 wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); 00237 00238 return false; 00239 } 00240 JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki ); 00241 JobQueue::incrStats( 'job-insert-duplicate', $this->type, 00242 count( $items ) - $failed - $pushed, $this->wiki ); 00243 } catch ( RedisException $e ) { 00244 $this->throwRedisException( $conn, $e ); 00245 } 00246 00247 return true; 00248 } 00249 00256 protected function pushBlobs( RedisConnRef $conn, array $items ) { 00257 $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) 00258 foreach ( $items as $item ) { 00259 $args[] = (string)$item['uuid']; 00260 $args[] = (string)$item['sha1']; 00261 $args[] = (string)$item['rtimestamp']; 00262 $args[] = (string)$this->serialize( $item ); 00263 } 00264 static $script = 00265 <<<LUA 00266 local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS) 00267 if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end 00268 local pushed = 0 00269 for i = 1,#ARGV,4 do 00270 local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] 00271 if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then 00272 if 1*rtimestamp > 0 then 00273 -- Insert into delayed queue (release time as score) 00274 redis.call('zAdd',kDelayed,rtimestamp,id) 00275 else 00276 -- Insert into unclaimed queue 00277 redis.call('lPush',kUnclaimed,id) 00278 end 00279 if sha1 ~= '' then 00280 redis.call('hSet',kSha1ById,id,sha1) 00281 redis.call('hSet',kIdBySha1,sha1,id) 00282 end 00283 redis.call('hSet',kData,id,blob) 00284 pushed = pushed + 1 00285 end 00286 end 00287 return pushed 00288 LUA; 00289 return $conn->luaEval( $script, 00290 array_merge( 00291 array( 00292 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 00293 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 00294 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 00295 $this->getQueueKey( 'z-delayed' ), # KEYS[4] 00296 $this->getQueueKey( 'h-data' ), # KEYS[5] 00297 ), 00298 $args 00299 ), 00300 5 # number of first argument(s) that are keys 00301 ); 00302 } 00303 00309 protected function doPop() { 00310 $job = false; 00311 00312 // Push ready delayed jobs into the queue every 10 jobs to spread the load. 00313 // This is also done as a periodic task, but we don't want too much done at once. 00314 if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { 00315 $this->recyclePruneAndUndelayJobs(); 00316 } 00317 00318 $conn = $this->getConnection(); 00319 try { 00320 do { 00321 if ( $this->claimTTL > 0 ) { 00322 // Keep the claimed job list down for high-traffic queues 00323 if ( mt_rand( 0, 99 ) == 0 ) { 00324 $this->recyclePruneAndUndelayJobs(); 00325 } 00326 $blob = $this->popAndAcquireBlob( $conn ); 00327 } else { 00328 $blob = $this->popAndDeleteBlob( $conn ); 00329 } 00330 if ( $blob === false ) { 00331 break; // no jobs; nothing to do 00332 } 00333 00334 JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki ); 00335 $item = $this->unserialize( $blob ); 00336 if ( $item === false ) { 00337 wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); 00338 continue; 00339 } 00340 00341 // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed 00342 $job = $this->getJobFromFields( $item ); // may be false 00343 } while ( !$job ); // job may be false if invalid 00344 } catch ( RedisException $e ) { 00345 $this->throwRedisException( $conn, $e ); 00346 } 00347 00348 return $job; 00349 } 00350 00356 protected function popAndDeleteBlob( RedisConnRef $conn ) { 00357 static $script = 00358 <<<LUA 00359 local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS) 00360 -- Pop an item off the queue 00361 local id = redis.call('rpop',kUnclaimed) 00362 if not id then return false end 00363 -- Get the job data and remove it 00364 local item = redis.call('hGet',kData,id) 00365 redis.call('hDel',kData,id) 00366 -- Allow new duplicates of this job 00367 local sha1 = redis.call('hGet',kSha1ById,id) 00368 if sha1 then redis.call('hDel',kIdBySha1,sha1) end 00369 redis.call('hDel',kSha1ById,id) 00370 -- Return the job data 00371 return item 00372 LUA; 00373 return $conn->luaEval( $script, 00374 array( 00375 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 00376 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 00377 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 00378 $this->getQueueKey( 'h-data' ), # KEYS[4] 00379 ), 00380 4 # number of first argument(s) that are keys 00381 ); 00382 } 00383 00389 protected function popAndAcquireBlob( RedisConnRef $conn ) { 00390 static $script = 00391 <<<LUA 00392 local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS) 00393 -- Pop an item off the queue 00394 local id = redis.call('rPop',kUnclaimed) 00395 if not id then return false end 00396 -- Allow new duplicates of this job 00397 local sha1 = redis.call('hGet',kSha1ById,id) 00398 if sha1 then redis.call('hDel',kIdBySha1,sha1) end 00399 redis.call('hDel',kSha1ById,id) 00400 -- Mark the jobs as claimed and return it 00401 redis.call('zAdd',kClaimed,ARGV[1],id) 00402 redis.call('hIncrBy',kAttempts,id,1) 00403 return redis.call('hGet',kData,id) 00404 LUA; 00405 return $conn->luaEval( $script, 00406 array( 00407 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 00408 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 00409 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 00410 $this->getQueueKey( 'z-claimed' ), # KEYS[4] 00411 $this->getQueueKey( 'h-attempts' ), # KEYS[5] 00412 $this->getQueueKey( 'h-data' ), # KEYS[6] 00413 time(), # ARGV[1] (injected to be replication-safe) 00414 ), 00415 6 # number of first argument(s) that are keys 00416 ); 00417 } 00418 00425 protected function doAck( Job $job ) { 00426 if ( !isset( $job->metadata['uuid'] ) ) { 00427 throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); 00428 } 00429 if ( $this->claimTTL > 0 ) { 00430 $conn = $this->getConnection(); 00431 try { 00432 static $script = 00433 <<<LUA 00434 local kClaimed, kAttempts, kData = unpack(KEYS) 00435 -- Unmark the job as claimed 00436 redis.call('zRem',kClaimed,ARGV[1]) 00437 redis.call('hDel',kAttempts,ARGV[1]) 00438 -- Delete the job data itself 00439 return redis.call('hDel',kData,ARGV[1]) 00440 LUA; 00441 $res = $conn->luaEval( $script, 00442 array( 00443 $this->getQueueKey( 'z-claimed' ), # KEYS[1] 00444 $this->getQueueKey( 'h-attempts' ), # KEYS[2] 00445 $this->getQueueKey( 'h-data' ), # KEYS[3] 00446 $job->metadata['uuid'] # ARGV[1] 00447 ), 00448 3 # number of first argument(s) that are keys 00449 ); 00450 00451 if ( !$res ) { 00452 wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); 00453 00454 return false; 00455 } 00456 } catch ( RedisException $e ) { 00457 $this->throwRedisException( $conn, $e ); 00458 } 00459 } 00460 00461 return true; 00462 } 00463 00470 protected function doDeduplicateRootJob( Job $job ) { 00471 if ( !$job->hasRootJobParams() ) { 00472 throw new MWException( "Cannot register root job; missing parameters." ); 00473 } 00474 $params = $job->getRootJobParams(); 00475 00476 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 00477 00478 $conn = $this->getConnection(); 00479 try { 00480 $timestamp = $conn->get( $key ); // current last timestamp of this job 00481 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { 00482 return true; // a newer version of this root job was enqueued 00483 } 00484 00485 // Update the timestamp of the last root job started at the location... 00486 return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks 00487 } catch ( RedisException $e ) { 00488 $this->throwRedisException( $conn, $e ); 00489 } 00490 } 00491 00498 protected function doIsRootJobOldDuplicate( Job $job ) { 00499 if ( !$job->hasRootJobParams() ) { 00500 return false; // job has no de-deplication info 00501 } 00502 $params = $job->getRootJobParams(); 00503 00504 $conn = $this->getConnection(); 00505 try { 00506 // Get the last time this root job was enqueued 00507 $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); 00508 } catch ( RedisException $e ) { 00509 $this->throwRedisException( $conn, $e ); 00510 } 00511 00512 // Check if a new root job was started at the location after this one's... 00513 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); 00514 } 00515 00521 protected function doDelete() { 00522 static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned', 00523 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ); 00524 00525 $conn = $this->getConnection(); 00526 try { 00527 $keys = array(); 00528 foreach ( $props as $prop ) { 00529 $keys[] = $this->getQueueKey( $prop ); 00530 } 00531 00532 return ( $conn->delete( $keys ) !== false ); 00533 } catch ( RedisException $e ) { 00534 $this->throwRedisException( $conn, $e ); 00535 } 00536 } 00537 00542 public function getAllQueuedJobs() { 00543 $conn = $this->getConnection(); 00544 try { 00545 $that = $this; 00546 00547 return new MappedIterator( 00548 $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), 00549 function ( $uid ) use ( $that, $conn ) { 00550 return $that->getJobFromUidInternal( $uid, $conn ); 00551 }, 00552 array( 'accept' => function ( $job ) { 00553 return is_object( $job ); 00554 } ) 00555 ); 00556 } catch ( RedisException $e ) { 00557 $this->throwRedisException( $conn, $e ); 00558 } 00559 } 00560 00565 public function getAllDelayedJobs() { 00566 $conn = $this->getConnection(); 00567 try { 00568 $that = $this; 00569 00570 return new MappedIterator( // delayed jobs 00571 $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), 00572 function ( $uid ) use ( $that, $conn ) { 00573 return $that->getJobFromUidInternal( $uid, $conn ); 00574 }, 00575 array( 'accept' => function ( $job ) { 00576 return is_object( $job ); 00577 } ) 00578 ); 00579 } catch ( RedisException $e ) { 00580 $this->throwRedisException( $conn, $e ); 00581 } 00582 } 00583 00584 public function getCoalesceLocationInternal() { 00585 return "RedisServer:" . $this->server; 00586 } 00587 00588 protected function doGetSiblingQueuesWithJobs( array $types ) { 00589 return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); 00590 } 00591 00592 protected function doGetSiblingQueueSizes( array $types ) { 00593 $sizes = array(); // (type => size) 00594 $types = array_values( $types ); // reindex 00595 $conn = $this->getConnection(); 00596 try { 00597 $conn->multi( Redis::PIPELINE ); 00598 foreach ( $types as $type ) { 00599 $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); 00600 } 00601 $res = $conn->exec(); 00602 if ( is_array( $res ) ) { 00603 foreach ( $res as $i => $size ) { 00604 $sizes[$types[$i]] = $size; 00605 } 00606 } 00607 } catch ( RedisException $e ) { 00608 $this->throwRedisException( $conn, $e ); 00609 } 00610 00611 return $sizes; 00612 } 00613 00622 public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { 00623 try { 00624 $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); 00625 if ( $data === false ) { 00626 return false; // not found 00627 } 00628 $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); 00629 if ( !is_array( $item ) ) { // this shouldn't happen 00630 throw new MWException( "Could not find job with ID '$uid'." ); 00631 } 00632 $title = Title::makeTitle( $item['namespace'], $item['title'] ); 00633 $job = Job::factory( $item['type'], $title, $item['params'] ); 00634 $job->metadata['uuid'] = $item['uuid']; 00635 00636 return $job; 00637 } catch ( RedisException $e ) { 00638 $this->throwRedisException( $conn, $e ); 00639 } 00640 } 00641 00649 public function recyclePruneAndUndelayJobs() { 00650 $count = 0; 00651 // For each job item that can be retried, we need to add it back to the 00652 // main queue and remove it from the list of currenty claimed job items. 00653 // For those that cannot, they are marked as dead and kept around for 00654 // investigation and manual job restoration but are eventually deleted. 00655 $conn = $this->getConnection(); 00656 try { 00657 $now = time(); 00658 static $script = 00659 <<<LUA 00660 local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS) 00661 local released,abandoned,pruned,undelayed = 0,0,0,0 00662 -- Get all non-dead jobs that have an expired claim on them. 00663 -- The score for each item is the last claim timestamp (UNIX). 00664 local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1]) 00665 for k,id in ipairs(staleClaims) do 00666 local timestamp = redis.call('zScore',kClaimed,id) 00667 local attempts = redis.call('hGet',kAttempts,id) 00668 if attempts < ARGV[3] then 00669 -- Claim expired and retries left: re-enqueue the job 00670 redis.call('lPush',kUnclaimed,id) 00671 redis.call('hIncrBy',kAttempts,id,1) 00672 released = released + 1 00673 else 00674 -- Claim expired and no retries left: mark the job as dead 00675 redis.call('zAdd',kAbandoned,timestamp,id) 00676 abandoned = abandoned + 1 00677 end 00678 redis.call('zRem',kClaimed,id) 00679 end 00680 -- Get all of the dead jobs that have been marked as dead for too long. 00681 -- The score for each item is the last claim timestamp (UNIX). 00682 local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2]) 00683 for k,id in ipairs(deadClaims) do 00684 -- Stale and out of retries: remove any traces of the job 00685 redis.call('zRem',kAbandoned,id) 00686 redis.call('hDel',kAttempts,id) 00687 redis.call('hDel',kData,id) 00688 pruned = pruned + 1 00689 end 00690 -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp) 00691 local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4]) 00692 -- Migrate the jobs from the "delayed" set to the "unclaimed" list 00693 for k,id in ipairs(ids) do 00694 redis.call('lPush',kUnclaimed,id) 00695 redis.call('zRem',kDelayed,id) 00696 end 00697 undelayed = #ids 00698 return {released,abandoned,pruned,undelayed} 00699 LUA; 00700 $res = $conn->luaEval( $script, 00701 array( 00702 $this->getQueueKey( 'z-claimed' ), # KEYS[1] 00703 $this->getQueueKey( 'h-attempts' ), # KEYS[2] 00704 $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] 00705 $this->getQueueKey( 'h-data' ), # KEYS[4] 00706 $this->getQueueKey( 'z-abandoned' ), # KEYS[5] 00707 $this->getQueueKey( 'z-delayed' ), # KEYS[6] 00708 $now - $this->claimTTL, # ARGV[1] 00709 $now - self::MAX_AGE_PRUNE, # ARGV[2] 00710 $this->maxTries, # ARGV[3] 00711 $now # ARGV[4] 00712 ), 00713 6 # number of first argument(s) that are keys 00714 ); 00715 if ( $res ) { 00716 list( $released, $abandoned, $pruned, $undelayed ) = $res; 00717 $count += $released + $pruned + $undelayed; 00718 JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki ); 00719 JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki ); 00720 } 00721 } catch ( RedisException $e ) { 00722 $this->throwRedisException( $conn, $e ); 00723 } 00724 00725 return $count; 00726 } 00727 00731 protected function doGetPeriodicTasks() { 00732 $periods = array( 3600 ); // standard cleanup (useful on config change) 00733 if ( $this->claimTTL > 0 ) { 00734 $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing 00735 } 00736 if ( $this->checkDelay ) { 00737 $periods[] = 300; // 5 minutes 00738 } 00739 $period = min( $periods ); 00740 $period = max( $period, 30 ); // sanity 00741 // Support override for faster testing 00742 if ( $this->maximumPeriodicTaskSeconds !== null ) { 00743 $period = min( $period, $this->maximumPeriodicTaskSeconds ); 00744 } 00745 return array( 00746 'recyclePruneAndUndelayJobs' => array( 00747 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ), 00748 'period' => $period, 00749 ) 00750 ); 00751 } 00752 00757 protected function getNewJobFields( IJobSpecification $job ) { 00758 return array( 00759 // Fields that describe the nature of the job 00760 'type' => $job->getType(), 00761 'namespace' => $job->getTitle()->getNamespace(), 00762 'title' => $job->getTitle()->getDBkey(), 00763 'params' => $job->getParams(), 00764 // Some jobs cannot run until a "release timestamp" 00765 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, 00766 // Additional job metadata 00767 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), 00768 'sha1' => $job->ignoreDuplicates() 00769 ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) 00770 : '', 00771 'timestamp' => time() // UNIX timestamp 00772 ); 00773 } 00774 00779 protected function getJobFromFields( array $fields ) { 00780 $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); 00781 if ( $title ) { 00782 $job = Job::factory( $fields['type'], $title, $fields['params'] ); 00783 $job->metadata['uuid'] = $fields['uuid']; 00784 00785 return $job; 00786 } 00787 00788 return false; 00789 } 00790 00795 protected function serialize( array $fields ) { 00796 $blob = serialize( $fields ); 00797 if ( $this->compression === 'gzip' 00798 && strlen( $blob ) >= 1024 00799 && function_exists( 'gzdeflate' ) 00800 ) { 00801 $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ); 00802 $blobz = serialize( $object ); 00803 00804 return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; 00805 } else { 00806 return $blob; 00807 } 00808 } 00809 00814 protected function unserialize( $blob ) { 00815 $fields = unserialize( $blob ); 00816 if ( is_object( $fields ) ) { 00817 if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { 00818 $fields = unserialize( gzinflate( $fields->blob ) ); 00819 } else { 00820 $fields = false; 00821 } 00822 } 00823 00824 return is_array( $fields ) ? $fields : false; 00825 } 00826 00833 protected function getConnection() { 00834 $conn = $this->redisPool->getConnection( $this->server ); 00835 if ( !$conn ) { 00836 throw new JobQueueConnectionError( "Unable to connect to redis server." ); 00837 } 00838 00839 return $conn; 00840 } 00841 00847 protected function throwRedisException( RedisConnRef $conn, $e ) { 00848 $this->redisPool->handleError( $conn, $e ); 00849 throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); 00850 } 00851 00857 private function getQueueKey( $prop, $type = null ) { 00858 $type = is_string( $type ) ? $type : $this->type; 00859 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00860 if ( strlen( $this->key ) ) { // namespaced queue (for testing) 00861 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop ); 00862 } else { 00863 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop ); 00864 } 00865 } 00866 00871 public function setTestingPrefix( $key ) { 00872 $this->key = $key; 00873 } 00874 }