MediaWiki
REL1_24
|
00001 <?php 00059 class JobQueueRedis extends JobQueue { 00061 protected $redisPool; 00062 00064 protected $server; 00066 protected $compression; 00068 protected $daemonized; 00069 00070 const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) 00071 00073 protected $key; 00074 00087 public function __construct( array $params ) { 00088 parent::__construct( $params ); 00089 $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua 00090 $this->server = $params['redisServer']; 00091 $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; 00092 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); 00093 $this->daemonized = !empty( $params['daemonized'] ); 00094 } 00095 00096 protected function supportedOrders() { 00097 return array( 'timestamp', 'fifo' ); 00098 } 00099 00100 protected function optimalOrder() { 00101 return 'fifo'; 00102 } 00103 00104 protected function supportsDelayedJobs() { 00105 return true; 00106 } 00107 00113 protected function doIsEmpty() { 00114 return $this->doGetSize() == 0; 00115 } 00116 00122 protected function doGetSize() { 00123 $conn = $this->getConnection(); 00124 try { 00125 return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); 00126 } catch ( RedisException $e ) { 00127 $this->throwRedisException( $conn, $e ); 00128 } 00129 } 00130 00136 protected function doGetAcquiredCount() { 00137 if ( $this->claimTTL <= 0 ) { 00138 return 0; // no acknowledgements 00139 } 00140 $conn = $this->getConnection(); 00141 try { 00142 $conn->multi( Redis::PIPELINE ); 00143 $conn->zSize( $this->getQueueKey( 'z-claimed' ) ); 00144 $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); 00145 00146 return array_sum( $conn->exec() ); 00147 } catch ( RedisException $e ) { 00148 $this->throwRedisException( $conn, $e ); 00149 } 00150 } 00151 00157 protected function doGetDelayedCount() { 00158 if ( !$this->checkDelay ) { 00159 return 0; // no delayed jobs 00160 } 00161 $conn = $this->getConnection(); 00162 try { 00163 return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); 00164 } catch ( RedisException $e ) { 00165 $this->throwRedisException( $conn, $e ); 00166 } 00167 } 00168 00174 protected function doGetAbandonedCount() { 00175 if ( $this->claimTTL <= 0 ) { 00176 return 0; // no acknowledgements 00177 } 00178 $conn = $this->getConnection(); 00179 try { 00180 return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); 00181 } catch ( RedisException $e ) { 00182 $this->throwRedisException( $conn, $e ); 00183 } 00184 } 00185 00193 protected function doBatchPush( array $jobs, $flags ) { 00194 // Convert the jobs into field maps (de-duplicated against each other) 00195 $items = array(); // (job ID => job fields map) 00196 foreach ( $jobs as $job ) { 00197 $item = $this->getNewJobFields( $job ); 00198 if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate 00199 $items[$item['sha1']] = $item; 00200 } else { 00201 $items[$item['uuid']] = $item; 00202 } 00203 } 00204 00205 if ( !count( $items ) ) { 00206 return; // nothing to do 00207 } 00208 00209 $conn = $this->getConnection(); 00210 try { 00211 // Actually push the non-duplicate jobs into the queue... 00212 if ( $flags & self::QOS_ATOMIC ) { 00213 $batches = array( $items ); // all or nothing 00214 } else { 00215 $batches = array_chunk( $items, 500 ); // avoid tying up the server 00216 } 00217 $failed = 0; 00218 $pushed = 0; 00219 foreach ( $batches as $itemBatch ) { 00220 $added = $this->pushBlobs( $conn, $itemBatch ); 00221 if ( is_int( $added ) ) { 00222 $pushed += $added; 00223 } else { 00224 $failed += count( $itemBatch ); 00225 } 00226 } 00227 if ( $failed > 0 ) { 00228 wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); 00229 00230 throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." ); 00231 } 00232 JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki ); 00233 JobQueue::incrStats( 'job-insert-duplicate', $this->type, 00234 count( $items ) - $failed - $pushed, $this->wiki ); 00235 } catch ( RedisException $e ) { 00236 $this->throwRedisException( $conn, $e ); 00237 } 00238 } 00239 00246 protected function pushBlobs( RedisConnRef $conn, array $items ) { 00247 $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) 00248 foreach ( $items as $item ) { 00249 $args[] = (string)$item['uuid']; 00250 $args[] = (string)$item['sha1']; 00251 $args[] = (string)$item['rtimestamp']; 00252 $args[] = (string)$this->serialize( $item ); 00253 } 00254 static $script = 00255 <<<LUA 00256 local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS) 00257 if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end 00258 local pushed = 0 00259 for i = 1,#ARGV,4 do 00260 local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] 00261 if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then 00262 if 1*rtimestamp > 0 then 00263 -- Insert into delayed queue (release time as score) 00264 redis.call('zAdd',kDelayed,rtimestamp,id) 00265 else 00266 -- Insert into unclaimed queue 00267 redis.call('lPush',kUnclaimed,id) 00268 end 00269 if sha1 ~= '' then 00270 redis.call('hSet',kSha1ById,id,sha1) 00271 redis.call('hSet',kIdBySha1,sha1,id) 00272 end 00273 redis.call('hSet',kData,id,blob) 00274 pushed = pushed + 1 00275 end 00276 end 00277 return pushed 00278 LUA; 00279 return $conn->luaEval( $script, 00280 array_merge( 00281 array( 00282 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 00283 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 00284 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 00285 $this->getQueueKey( 'z-delayed' ), # KEYS[4] 00286 $this->getQueueKey( 'h-data' ), # KEYS[5] 00287 ), 00288 $args 00289 ), 00290 5 # number of first argument(s) that are keys 00291 ); 00292 } 00293 00299 protected function doPop() { 00300 $job = false; 00301 00302 // Push ready delayed jobs into the queue every 10 jobs to spread the load. 00303 // This is also done as a periodic task, but we don't want too much done at once. 00304 if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) { 00305 $this->recyclePruneAndUndelayJobs(); 00306 } 00307 00308 $conn = $this->getConnection(); 00309 try { 00310 do { 00311 if ( $this->claimTTL > 0 ) { 00312 // Keep the claimed job list down for high-traffic queues 00313 if ( mt_rand( 0, 99 ) == 0 ) { 00314 $this->recyclePruneAndUndelayJobs(); 00315 } 00316 $blob = $this->popAndAcquireBlob( $conn ); 00317 } else { 00318 $blob = $this->popAndDeleteBlob( $conn ); 00319 } 00320 if ( !is_string( $blob ) ) { 00321 break; // no jobs; nothing to do 00322 } 00323 00324 JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki ); 00325 $item = $this->unserialize( $blob ); 00326 if ( $item === false ) { 00327 wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); 00328 continue; 00329 } 00330 00331 // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed 00332 $job = $this->getJobFromFields( $item ); // may be false 00333 } while ( !$job ); // job may be false if invalid 00334 } catch ( RedisException $e ) { 00335 $this->throwRedisException( $conn, $e ); 00336 } 00337 00338 return $job; 00339 } 00340 00346 protected function popAndDeleteBlob( RedisConnRef $conn ) { 00347 static $script = 00348 <<<LUA 00349 local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS) 00350 -- Pop an item off the queue 00351 local id = redis.call('rpop',kUnclaimed) 00352 if not id then return false end 00353 -- Get the job data and remove it 00354 local item = redis.call('hGet',kData,id) 00355 redis.call('hDel',kData,id) 00356 -- Allow new duplicates of this job 00357 local sha1 = redis.call('hGet',kSha1ById,id) 00358 if sha1 then redis.call('hDel',kIdBySha1,sha1) end 00359 redis.call('hDel',kSha1ById,id) 00360 -- Return the job data 00361 return item 00362 LUA; 00363 return $conn->luaEval( $script, 00364 array( 00365 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 00366 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 00367 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 00368 $this->getQueueKey( 'h-data' ), # KEYS[4] 00369 ), 00370 4 # number of first argument(s) that are keys 00371 ); 00372 } 00373 00379 protected function popAndAcquireBlob( RedisConnRef $conn ) { 00380 static $script = 00381 <<<LUA 00382 local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS) 00383 -- Pop an item off the queue 00384 local id = redis.call('rPop',kUnclaimed) 00385 if not id then return false end 00386 -- Allow new duplicates of this job 00387 local sha1 = redis.call('hGet',kSha1ById,id) 00388 if sha1 then redis.call('hDel',kIdBySha1,sha1) end 00389 redis.call('hDel',kSha1ById,id) 00390 -- Mark the jobs as claimed and return it 00391 redis.call('zAdd',kClaimed,ARGV[1],id) 00392 redis.call('hIncrBy',kAttempts,id,1) 00393 return redis.call('hGet',kData,id) 00394 LUA; 00395 return $conn->luaEval( $script, 00396 array( 00397 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] 00398 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] 00399 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] 00400 $this->getQueueKey( 'z-claimed' ), # KEYS[4] 00401 $this->getQueueKey( 'h-attempts' ), # KEYS[5] 00402 $this->getQueueKey( 'h-data' ), # KEYS[6] 00403 time(), # ARGV[1] (injected to be replication-safe) 00404 ), 00405 6 # number of first argument(s) that are keys 00406 ); 00407 } 00408 00415 protected function doAck( Job $job ) { 00416 if ( !isset( $job->metadata['uuid'] ) ) { 00417 throw new MWException( "Job of type '{$job->getType()}' has no UUID." ); 00418 } 00419 if ( $this->claimTTL > 0 ) { 00420 $conn = $this->getConnection(); 00421 try { 00422 static $script = 00423 <<<LUA 00424 local kClaimed, kAttempts, kData = unpack(KEYS) 00425 -- Unmark the job as claimed 00426 redis.call('zRem',kClaimed,ARGV[1]) 00427 redis.call('hDel',kAttempts,ARGV[1]) 00428 -- Delete the job data itself 00429 return redis.call('hDel',kData,ARGV[1]) 00430 LUA; 00431 $res = $conn->luaEval( $script, 00432 array( 00433 $this->getQueueKey( 'z-claimed' ), # KEYS[1] 00434 $this->getQueueKey( 'h-attempts' ), # KEYS[2] 00435 $this->getQueueKey( 'h-data' ), # KEYS[3] 00436 $job->metadata['uuid'] # ARGV[1] 00437 ), 00438 3 # number of first argument(s) that are keys 00439 ); 00440 00441 if ( !$res ) { 00442 wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); 00443 00444 return false; 00445 } 00446 } catch ( RedisException $e ) { 00447 $this->throwRedisException( $conn, $e ); 00448 } 00449 } 00450 00451 return true; 00452 } 00453 00460 protected function doDeduplicateRootJob( Job $job ) { 00461 if ( !$job->hasRootJobParams() ) { 00462 throw new MWException( "Cannot register root job; missing parameters." ); 00463 } 00464 $params = $job->getRootJobParams(); 00465 00466 $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); 00467 00468 $conn = $this->getConnection(); 00469 try { 00470 $timestamp = $conn->get( $key ); // current last timestamp of this job 00471 if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { 00472 return true; // a newer version of this root job was enqueued 00473 } 00474 00475 // Update the timestamp of the last root job started at the location... 00476 return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks 00477 } catch ( RedisException $e ) { 00478 $this->throwRedisException( $conn, $e ); 00479 } 00480 } 00481 00488 protected function doIsRootJobOldDuplicate( Job $job ) { 00489 if ( !$job->hasRootJobParams() ) { 00490 return false; // job has no de-deplication info 00491 } 00492 $params = $job->getRootJobParams(); 00493 00494 $conn = $this->getConnection(); 00495 try { 00496 // Get the last time this root job was enqueued 00497 $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); 00498 } catch ( RedisException $e ) { 00499 $this->throwRedisException( $conn, $e ); 00500 } 00501 00502 // Check if a new root job was started at the location after this one's... 00503 return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); 00504 } 00505 00511 protected function doDelete() { 00512 static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned', 00513 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ); 00514 00515 $conn = $this->getConnection(); 00516 try { 00517 $keys = array(); 00518 foreach ( $props as $prop ) { 00519 $keys[] = $this->getQueueKey( $prop ); 00520 } 00521 00522 return ( $conn->delete( $keys ) !== false ); 00523 } catch ( RedisException $e ) { 00524 $this->throwRedisException( $conn, $e ); 00525 } 00526 } 00527 00532 public function getAllQueuedJobs() { 00533 $conn = $this->getConnection(); 00534 try { 00535 $that = $this; 00536 00537 return new MappedIterator( 00538 $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), 00539 function ( $uid ) use ( $that, $conn ) { 00540 return $that->getJobFromUidInternal( $uid, $conn ); 00541 }, 00542 array( 'accept' => function ( $job ) { 00543 return is_object( $job ); 00544 } ) 00545 ); 00546 } catch ( RedisException $e ) { 00547 $this->throwRedisException( $conn, $e ); 00548 } 00549 } 00550 00555 public function getAllDelayedJobs() { 00556 $conn = $this->getConnection(); 00557 try { 00558 $that = $this; 00559 00560 return new MappedIterator( // delayed jobs 00561 $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), 00562 function ( $uid ) use ( $that, $conn ) { 00563 return $that->getJobFromUidInternal( $uid, $conn ); 00564 }, 00565 array( 'accept' => function ( $job ) { 00566 return is_object( $job ); 00567 } ) 00568 ); 00569 } catch ( RedisException $e ) { 00570 $this->throwRedisException( $conn, $e ); 00571 } 00572 } 00573 00574 public function getCoalesceLocationInternal() { 00575 return "RedisServer:" . $this->server; 00576 } 00577 00578 protected function doGetSiblingQueuesWithJobs( array $types ) { 00579 return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); 00580 } 00581 00582 protected function doGetSiblingQueueSizes( array $types ) { 00583 $sizes = array(); // (type => size) 00584 $types = array_values( $types ); // reindex 00585 $conn = $this->getConnection(); 00586 try { 00587 $conn->multi( Redis::PIPELINE ); 00588 foreach ( $types as $type ) { 00589 $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); 00590 } 00591 $res = $conn->exec(); 00592 if ( is_array( $res ) ) { 00593 foreach ( $res as $i => $size ) { 00594 $sizes[$types[$i]] = $size; 00595 } 00596 } 00597 } catch ( RedisException $e ) { 00598 $this->throwRedisException( $conn, $e ); 00599 } 00600 00601 return $sizes; 00602 } 00603 00612 public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { 00613 try { 00614 $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); 00615 if ( $data === false ) { 00616 return false; // not found 00617 } 00618 $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); 00619 if ( !is_array( $item ) ) { // this shouldn't happen 00620 throw new MWException( "Could not find job with ID '$uid'." ); 00621 } 00622 $title = Title::makeTitle( $item['namespace'], $item['title'] ); 00623 $job = Job::factory( $item['type'], $title, $item['params'] ); 00624 $job->metadata['uuid'] = $item['uuid']; 00625 00626 return $job; 00627 } catch ( RedisException $e ) { 00628 $this->throwRedisException( $conn, $e ); 00629 } 00630 } 00631 00639 public function recyclePruneAndUndelayJobs() { 00640 $count = 0; 00641 // For each job item that can be retried, we need to add it back to the 00642 // main queue and remove it from the list of currenty claimed job items. 00643 // For those that cannot, they are marked as dead and kept around for 00644 // investigation and manual job restoration but are eventually deleted. 00645 $conn = $this->getConnection(); 00646 try { 00647 $now = time(); 00648 static $script = 00649 <<<LUA 00650 local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS) 00651 local released,abandoned,pruned,undelayed = 0,0,0,0 00652 -- Get all non-dead jobs that have an expired claim on them. 00653 -- The score for each item is the last claim timestamp (UNIX). 00654 local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1]) 00655 for k,id in ipairs(staleClaims) do 00656 local timestamp = redis.call('zScore',kClaimed,id) 00657 local attempts = redis.call('hGet',kAttempts,id) 00658 if attempts < ARGV[3] then 00659 -- Claim expired and retries left: re-enqueue the job 00660 redis.call('lPush',kUnclaimed,id) 00661 redis.call('hIncrBy',kAttempts,id,1) 00662 released = released + 1 00663 else 00664 -- Claim expired and no retries left: mark the job as dead 00665 redis.call('zAdd',kAbandoned,timestamp,id) 00666 abandoned = abandoned + 1 00667 end 00668 redis.call('zRem',kClaimed,id) 00669 end 00670 -- Get all of the dead jobs that have been marked as dead for too long. 00671 -- The score for each item is the last claim timestamp (UNIX). 00672 local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2]) 00673 for k,id in ipairs(deadClaims) do 00674 -- Stale and out of retries: remove any traces of the job 00675 redis.call('zRem',kAbandoned,id) 00676 redis.call('hDel',kAttempts,id) 00677 redis.call('hDel',kData,id) 00678 pruned = pruned + 1 00679 end 00680 -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp) 00681 local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4]) 00682 -- Migrate the jobs from the "delayed" set to the "unclaimed" list 00683 for k,id in ipairs(ids) do 00684 redis.call('lPush',kUnclaimed,id) 00685 redis.call('zRem',kDelayed,id) 00686 end 00687 undelayed = #ids 00688 return {released,abandoned,pruned,undelayed} 00689 LUA; 00690 $res = $conn->luaEval( $script, 00691 array( 00692 $this->getQueueKey( 'z-claimed' ), # KEYS[1] 00693 $this->getQueueKey( 'h-attempts' ), # KEYS[2] 00694 $this->getQueueKey( 'l-unclaimed' ), # KEYS[3] 00695 $this->getQueueKey( 'h-data' ), # KEYS[4] 00696 $this->getQueueKey( 'z-abandoned' ), # KEYS[5] 00697 $this->getQueueKey( 'z-delayed' ), # KEYS[6] 00698 $now - $this->claimTTL, # ARGV[1] 00699 $now - self::MAX_AGE_PRUNE, # ARGV[2] 00700 $this->maxTries, # ARGV[3] 00701 $now # ARGV[4] 00702 ), 00703 6 # number of first argument(s) that are keys 00704 ); 00705 if ( $res ) { 00706 list( $released, $abandoned, $pruned, $undelayed ) = $res; 00707 $count += $released + $pruned + $undelayed; 00708 JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki ); 00709 JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki ); 00710 JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki ); 00711 } 00712 } catch ( RedisException $e ) { 00713 $this->throwRedisException( $conn, $e ); 00714 } 00715 00716 return $count; 00717 } 00718 00722 protected function doGetPeriodicTasks() { 00723 if ( $this->daemonized ) { 00724 return array(); // managed in the runner loop 00725 } 00726 $periods = array( 3600 ); // standard cleanup (useful on config change) 00727 if ( $this->claimTTL > 0 ) { 00728 $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing 00729 } 00730 if ( $this->checkDelay ) { 00731 $periods[] = 300; // 5 minutes 00732 } 00733 $period = min( $periods ); 00734 $period = max( $period, 30 ); // sanity 00735 00736 return array( 00737 'recyclePruneAndUndelayJobs' => array( 00738 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ), 00739 'period' => $period, 00740 ) 00741 ); 00742 } 00743 00748 protected function getNewJobFields( IJobSpecification $job ) { 00749 return array( 00750 // Fields that describe the nature of the job 00751 'type' => $job->getType(), 00752 'namespace' => $job->getTitle()->getNamespace(), 00753 'title' => $job->getTitle()->getDBkey(), 00754 'params' => $job->getParams(), 00755 // Some jobs cannot run until a "release timestamp" 00756 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, 00757 // Additional job metadata 00758 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), 00759 'sha1' => $job->ignoreDuplicates() 00760 ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) 00761 : '', 00762 'timestamp' => time() // UNIX timestamp 00763 ); 00764 } 00765 00770 protected function getJobFromFields( array $fields ) { 00771 $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] ); 00772 if ( $title ) { 00773 $job = Job::factory( $fields['type'], $title, $fields['params'] ); 00774 $job->metadata['uuid'] = $fields['uuid']; 00775 00776 return $job; 00777 } 00778 00779 return false; 00780 } 00781 00786 protected function serialize( array $fields ) { 00787 $blob = serialize( $fields ); 00788 if ( $this->compression === 'gzip' 00789 && strlen( $blob ) >= 1024 00790 && function_exists( 'gzdeflate' ) 00791 ) { 00792 $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ); 00793 $blobz = serialize( $object ); 00794 00795 return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; 00796 } else { 00797 return $blob; 00798 } 00799 } 00800 00805 protected function unserialize( $blob ) { 00806 $fields = unserialize( $blob ); 00807 if ( is_object( $fields ) ) { 00808 if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { 00809 $fields = unserialize( gzinflate( $fields->blob ) ); 00810 } else { 00811 $fields = false; 00812 } 00813 } 00814 00815 return is_array( $fields ) ? $fields : false; 00816 } 00817 00824 protected function getConnection() { 00825 $conn = $this->redisPool->getConnection( $this->server ); 00826 if ( !$conn ) { 00827 throw new JobQueueConnectionError( "Unable to connect to redis server." ); 00828 } 00829 00830 return $conn; 00831 } 00832 00838 protected function throwRedisException( RedisConnRef $conn, $e ) { 00839 $this->redisPool->handleError( $conn, $e ); 00840 throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); 00841 } 00842 00848 private function getQueueKey( $prop, $type = null ) { 00849 $type = is_string( $type ) ? $type : $this->type; 00850 list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); 00851 if ( strlen( $this->key ) ) { // namespaced queue (for testing) 00852 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop ); 00853 } else { 00854 return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop ); 00855 } 00856 } 00857 00862 public function setTestingPrefix( $key ) { 00863 $this->key = $key; 00864 } 00865 }