MediaWiki  REL1_23
JobQueueRedis.php
Go to the documentation of this file.
00001 <?php
00059 class JobQueueRedis extends JobQueue {
00061     protected $redisPool;
00062 
00064     protected $server;
00065 
00067     protected $compression;
00068 
00069     const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
00070 
00072     protected $key;
00073 
00078     protected $maximumPeriodicTaskSeconds;
00079 
00094     public function __construct( array $params ) {
00095         parent::__construct( $params );
00096         $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
00097         $this->server = $params['redisServer'];
00098         $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
00099         $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
00100         $this->maximumPeriodicTaskSeconds = isset( $params['maximumPeriodicTaskSeconds'] ) ?
00101             $params['maximumPeriodicTaskSeconds'] : null;
00102     }
00103 
00104     protected function supportedOrders() {
00105         return array( 'timestamp', 'fifo' );
00106     }
00107 
00108     protected function optimalOrder() {
00109         return 'fifo';
00110     }
00111 
00112     protected function supportsDelayedJobs() {
00113         return true;
00114     }
00115 
00121     protected function doIsEmpty() {
00122         return $this->doGetSize() == 0;
00123     }
00124 
00130     protected function doGetSize() {
00131         $conn = $this->getConnection();
00132         try {
00133             return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
00134         } catch ( RedisException $e ) {
00135             $this->throwRedisException( $conn, $e );
00136         }
00137     }
00138 
00144     protected function doGetAcquiredCount() {
00145         if ( $this->claimTTL <= 0 ) {
00146             return 0; // no acknowledgements
00147         }
00148         $conn = $this->getConnection();
00149         try {
00150             $conn->multi( Redis::PIPELINE );
00151             $conn->zSize( $this->getQueueKey( 'z-claimed' ) );
00152             $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
00153 
00154             return array_sum( $conn->exec() );
00155         } catch ( RedisException $e ) {
00156             $this->throwRedisException( $conn, $e );
00157         }
00158     }
00159 
00165     protected function doGetDelayedCount() {
00166         if ( !$this->checkDelay ) {
00167             return 0; // no delayed jobs
00168         }
00169         $conn = $this->getConnection();
00170         try {
00171             return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
00172         } catch ( RedisException $e ) {
00173             $this->throwRedisException( $conn, $e );
00174         }
00175     }
00176 
00182     protected function doGetAbandonedCount() {
00183         if ( $this->claimTTL <= 0 ) {
00184             return 0; // no acknowledgements
00185         }
00186         $conn = $this->getConnection();
00187         try {
00188             return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
00189         } catch ( RedisException $e ) {
00190             $this->throwRedisException( $conn, $e );
00191         }
00192     }
00193 
00201     protected function doBatchPush( array $jobs, $flags ) {
00202         // Convert the jobs into field maps (de-duplicated against each other)
00203         $items = array(); // (job ID => job fields map)
00204         foreach ( $jobs as $job ) {
00205             $item = $this->getNewJobFields( $job );
00206             if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate
00207                 $items[$item['sha1']] = $item;
00208             } else {
00209                 $items[$item['uuid']] = $item;
00210             }
00211         }
00212 
00213         if ( !count( $items ) ) {
00214             return true; // nothing to do
00215         }
00216 
00217         $conn = $this->getConnection();
00218         try {
00219             // Actually push the non-duplicate jobs into the queue...
00220             if ( $flags & self::QOS_ATOMIC ) {
00221                 $batches = array( $items ); // all or nothing
00222             } else {
00223                 $batches = array_chunk( $items, 500 ); // avoid tying up the server
00224             }
00225             $failed = 0;
00226             $pushed = 0;
00227             foreach ( $batches as $itemBatch ) {
00228                 $added = $this->pushBlobs( $conn, $itemBatch );
00229                 if ( is_int( $added ) ) {
00230                     $pushed += $added;
00231                 } else {
00232                     $failed += count( $itemBatch );
00233                 }
00234             }
00235             if ( $failed > 0 ) {
00236                 wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
00237 
00238                 return false;
00239             }
00240             JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki );
00241             JobQueue::incrStats( 'job-insert-duplicate', $this->type,
00242                 count( $items ) - $failed - $pushed, $this->wiki );
00243         } catch ( RedisException $e ) {
00244             $this->throwRedisException( $conn, $e );
00245         }
00246 
00247         return true;
00248     }
00249 
00256     protected function pushBlobs( RedisConnRef $conn, array $items ) {
00257         $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
00258         foreach ( $items as $item ) {
00259             $args[] = (string)$item['uuid'];
00260             $args[] = (string)$item['sha1'];
00261             $args[] = (string)$item['rtimestamp'];
00262             $args[] = (string)$this->serialize( $item );
00263         }
00264         static $script =
00265 <<<LUA
00266         local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS)
00267         if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
00268         local pushed = 0
00269         for i = 1,#ARGV,4 do
00270             local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
00271             if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
00272                 if 1*rtimestamp > 0 then
00273                     -- Insert into delayed queue (release time as score)
00274                     redis.call('zAdd',kDelayed,rtimestamp,id)
00275                 else
00276                     -- Insert into unclaimed queue
00277                     redis.call('lPush',kUnclaimed,id)
00278                 end
00279                 if sha1 ~= '' then
00280                     redis.call('hSet',kSha1ById,id,sha1)
00281                     redis.call('hSet',kIdBySha1,sha1,id)
00282                 end
00283                 redis.call('hSet',kData,id,blob)
00284                 pushed = pushed + 1
00285             end
00286         end
00287         return pushed
00288 LUA;
00289         return $conn->luaEval( $script,
00290             array_merge(
00291                 array(
00292                     $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
00293                     $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
00294                     $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
00295                     $this->getQueueKey( 'z-delayed' ), # KEYS[4]
00296                     $this->getQueueKey( 'h-data' ), # KEYS[5]
00297                 ),
00298                 $args
00299             ),
00300             5 # number of first argument(s) that are keys
00301         );
00302     }
00303 
00309     protected function doPop() {
00310         $job = false;
00311 
00312         // Push ready delayed jobs into the queue every 10 jobs to spread the load.
00313         // This is also done as a periodic task, but we don't want too much done at once.
00314         if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
00315             $this->recyclePruneAndUndelayJobs();
00316         }
00317 
00318         $conn = $this->getConnection();
00319         try {
00320             do {
00321                 if ( $this->claimTTL > 0 ) {
00322                     // Keep the claimed job list down for high-traffic queues
00323                     if ( mt_rand( 0, 99 ) == 0 ) {
00324                         $this->recyclePruneAndUndelayJobs();
00325                     }
00326                     $blob = $this->popAndAcquireBlob( $conn );
00327                 } else {
00328                     $blob = $this->popAndDeleteBlob( $conn );
00329                 }
00330                 if ( $blob === false ) {
00331                     break; // no jobs; nothing to do
00332                 }
00333 
00334                 JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
00335                 $item = $this->unserialize( $blob );
00336                 if ( $item === false ) {
00337                     wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
00338                     continue;
00339                 }
00340 
00341                 // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed
00342                 $job = $this->getJobFromFields( $item ); // may be false
00343             } while ( !$job ); // job may be false if invalid
00344         } catch ( RedisException $e ) {
00345             $this->throwRedisException( $conn, $e );
00346         }
00347 
00348         return $job;
00349     }
00350 
00356     protected function popAndDeleteBlob( RedisConnRef $conn ) {
00357         static $script =
00358 <<<LUA
00359         local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS)
00360         -- Pop an item off the queue
00361         local id = redis.call('rpop',kUnclaimed)
00362         if not id then return false end
00363         -- Get the job data and remove it
00364         local item = redis.call('hGet',kData,id)
00365         redis.call('hDel',kData,id)
00366         -- Allow new duplicates of this job
00367         local sha1 = redis.call('hGet',kSha1ById,id)
00368         if sha1 then redis.call('hDel',kIdBySha1,sha1) end
00369         redis.call('hDel',kSha1ById,id)
00370         -- Return the job data
00371         return item
00372 LUA;
00373         return $conn->luaEval( $script,
00374             array(
00375                 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
00376                 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
00377                 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
00378                 $this->getQueueKey( 'h-data' ), # KEYS[4]
00379             ),
00380             4 # number of first argument(s) that are keys
00381         );
00382     }
00383 
00389     protected function popAndAcquireBlob( RedisConnRef $conn ) {
00390         static $script =
00391 <<<LUA
00392         local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
00393         -- Pop an item off the queue
00394         local id = redis.call('rPop',kUnclaimed)
00395         if not id then return false end
00396         -- Allow new duplicates of this job
00397         local sha1 = redis.call('hGet',kSha1ById,id)
00398         if sha1 then redis.call('hDel',kIdBySha1,sha1) end
00399         redis.call('hDel',kSha1ById,id)
00400         -- Mark the jobs as claimed and return it
00401         redis.call('zAdd',kClaimed,ARGV[1],id)
00402         redis.call('hIncrBy',kAttempts,id,1)
00403         return redis.call('hGet',kData,id)
00404 LUA;
00405         return $conn->luaEval( $script,
00406             array(
00407                 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
00408                 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
00409                 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
00410                 $this->getQueueKey( 'z-claimed' ), # KEYS[4]
00411                 $this->getQueueKey( 'h-attempts' ), # KEYS[5]
00412                 $this->getQueueKey( 'h-data' ), # KEYS[6]
00413                 time(), # ARGV[1] (injected to be replication-safe)
00414             ),
00415             6 # number of first argument(s) that are keys
00416         );
00417     }
00418 
00425     protected function doAck( Job $job ) {
00426         if ( !isset( $job->metadata['uuid'] ) ) {
00427             throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
00428         }
00429         if ( $this->claimTTL > 0 ) {
00430             $conn = $this->getConnection();
00431             try {
00432                 static $script =
00433 <<<LUA
00434                 local kClaimed, kAttempts, kData = unpack(KEYS)
00435                 -- Unmark the job as claimed
00436                 redis.call('zRem',kClaimed,ARGV[1])
00437                 redis.call('hDel',kAttempts,ARGV[1])
00438                 -- Delete the job data itself
00439                 return redis.call('hDel',kData,ARGV[1])
00440 LUA;
00441                 $res = $conn->luaEval( $script,
00442                     array(
00443                         $this->getQueueKey( 'z-claimed' ), # KEYS[1]
00444                         $this->getQueueKey( 'h-attempts' ), # KEYS[2]
00445                         $this->getQueueKey( 'h-data' ), # KEYS[3]
00446                         $job->metadata['uuid'] # ARGV[1]
00447                     ),
00448                     3 # number of first argument(s) that are keys
00449                 );
00450 
00451                 if ( !$res ) {
00452                     wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
00453 
00454                     return false;
00455                 }
00456             } catch ( RedisException $e ) {
00457                 $this->throwRedisException( $conn, $e );
00458             }
00459         }
00460 
00461         return true;
00462     }
00463 
00470     protected function doDeduplicateRootJob( Job $job ) {
00471         if ( !$job->hasRootJobParams() ) {
00472             throw new MWException( "Cannot register root job; missing parameters." );
00473         }
00474         $params = $job->getRootJobParams();
00475 
00476         $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
00477 
00478         $conn = $this->getConnection();
00479         try {
00480             $timestamp = $conn->get( $key ); // current last timestamp of this job
00481             if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
00482                 return true; // a newer version of this root job was enqueued
00483             }
00484 
00485             // Update the timestamp of the last root job started at the location...
00486             return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
00487         } catch ( RedisException $e ) {
00488             $this->throwRedisException( $conn, $e );
00489         }
00490     }
00491 
00498     protected function doIsRootJobOldDuplicate( Job $job ) {
00499         if ( !$job->hasRootJobParams() ) {
00500             return false; // job has no de-deplication info
00501         }
00502         $params = $job->getRootJobParams();
00503 
00504         $conn = $this->getConnection();
00505         try {
00506             // Get the last time this root job was enqueued
00507             $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
00508         } catch ( RedisException $e ) {
00509             $this->throwRedisException( $conn, $e );
00510         }
00511 
00512         // Check if a new root job was started at the location after this one's...
00513         return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
00514     }
00515 
00521     protected function doDelete() {
00522         static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned',
00523             'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' );
00524 
00525         $conn = $this->getConnection();
00526         try {
00527             $keys = array();
00528             foreach ( $props as $prop ) {
00529                 $keys[] = $this->getQueueKey( $prop );
00530             }
00531 
00532             return ( $conn->delete( $keys ) !== false );
00533         } catch ( RedisException $e ) {
00534             $this->throwRedisException( $conn, $e );
00535         }
00536     }
00537 
00542     public function getAllQueuedJobs() {
00543         $conn = $this->getConnection();
00544         try {
00545             $that = $this;
00546 
00547             return new MappedIterator(
00548                 $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
00549                 function ( $uid ) use ( $that, $conn ) {
00550                     return $that->getJobFromUidInternal( $uid, $conn );
00551                 },
00552                 array( 'accept' => function ( $job ) {
00553                     return is_object( $job );
00554                 } )
00555             );
00556         } catch ( RedisException $e ) {
00557             $this->throwRedisException( $conn, $e );
00558         }
00559     }
00560 
00565     public function getAllDelayedJobs() {
00566         $conn = $this->getConnection();
00567         try {
00568             $that = $this;
00569 
00570             return new MappedIterator( // delayed jobs
00571                 $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
00572                 function ( $uid ) use ( $that, $conn ) {
00573                     return $that->getJobFromUidInternal( $uid, $conn );
00574                 },
00575                 array( 'accept' => function ( $job ) {
00576                     return is_object( $job );
00577                 } )
00578             );
00579         } catch ( RedisException $e ) {
00580             $this->throwRedisException( $conn, $e );
00581         }
00582     }
00583 
00584     public function getCoalesceLocationInternal() {
00585         return "RedisServer:" . $this->server;
00586     }
00587 
00588     protected function doGetSiblingQueuesWithJobs( array $types ) {
00589         return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
00590     }
00591 
00592     protected function doGetSiblingQueueSizes( array $types ) {
00593         $sizes = array(); // (type => size)
00594         $types = array_values( $types ); // reindex
00595         $conn = $this->getConnection();
00596         try {
00597             $conn->multi( Redis::PIPELINE );
00598             foreach ( $types as $type ) {
00599                 $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
00600             }
00601             $res = $conn->exec();
00602             if ( is_array( $res ) ) {
00603                 foreach ( $res as $i => $size ) {
00604                     $sizes[$types[$i]] = $size;
00605                 }
00606             }
00607         } catch ( RedisException $e ) {
00608             $this->throwRedisException( $conn, $e );
00609         }
00610 
00611         return $sizes;
00612     }
00613 
00622     public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
00623         try {
00624             $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
00625             if ( $data === false ) {
00626                 return false; // not found
00627             }
00628             $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
00629             if ( !is_array( $item ) ) { // this shouldn't happen
00630                 throw new MWException( "Could not find job with ID '$uid'." );
00631             }
00632             $title = Title::makeTitle( $item['namespace'], $item['title'] );
00633             $job = Job::factory( $item['type'], $title, $item['params'] );
00634             $job->metadata['uuid'] = $item['uuid'];
00635 
00636             return $job;
00637         } catch ( RedisException $e ) {
00638             $this->throwRedisException( $conn, $e );
00639         }
00640     }
00641 
00649     public function recyclePruneAndUndelayJobs() {
00650         $count = 0;
00651         // For each job item that can be retried, we need to add it back to the
00652         // main queue and remove it from the list of currenty claimed job items.
00653         // For those that cannot, they are marked as dead and kept around for
00654         // investigation and manual job restoration but are eventually deleted.
00655         $conn = $this->getConnection();
00656         try {
00657             $now = time();
00658             static $script =
00659 <<<LUA
00660             local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS)
00661             local released,abandoned,pruned,undelayed = 0,0,0,0
00662             -- Get all non-dead jobs that have an expired claim on them.
00663             -- The score for each item is the last claim timestamp (UNIX).
00664             local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
00665             for k,id in ipairs(staleClaims) do
00666                 local timestamp = redis.call('zScore',kClaimed,id)
00667                 local attempts = redis.call('hGet',kAttempts,id)
00668                 if attempts < ARGV[3] then
00669                     -- Claim expired and retries left: re-enqueue the job
00670                     redis.call('lPush',kUnclaimed,id)
00671                     redis.call('hIncrBy',kAttempts,id,1)
00672                     released = released + 1
00673                 else
00674                     -- Claim expired and no retries left: mark the job as dead
00675                     redis.call('zAdd',kAbandoned,timestamp,id)
00676                     abandoned = abandoned + 1
00677                 end
00678                 redis.call('zRem',kClaimed,id)
00679             end
00680             -- Get all of the dead jobs that have been marked as dead for too long.
00681             -- The score for each item is the last claim timestamp (UNIX).
00682             local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2])
00683             for k,id in ipairs(deadClaims) do
00684                 -- Stale and out of retries: remove any traces of the job
00685                 redis.call('zRem',kAbandoned,id)
00686                 redis.call('hDel',kAttempts,id)
00687                 redis.call('hDel',kData,id)
00688                 pruned = pruned + 1
00689             end
00690             -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp)
00691             local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4])
00692             -- Migrate the jobs from the "delayed" set to the "unclaimed" list
00693             for k,id in ipairs(ids) do
00694                 redis.call('lPush',kUnclaimed,id)
00695                 redis.call('zRem',kDelayed,id)
00696             end
00697             undelayed = #ids
00698             return {released,abandoned,pruned,undelayed}
00699 LUA;
00700             $res = $conn->luaEval( $script,
00701                 array(
00702                     $this->getQueueKey( 'z-claimed' ), # KEYS[1]
00703                     $this->getQueueKey( 'h-attempts' ), # KEYS[2]
00704                     $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
00705                     $this->getQueueKey( 'h-data' ), # KEYS[4]
00706                     $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
00707                     $this->getQueueKey( 'z-delayed' ), # KEYS[6]
00708                     $now - $this->claimTTL, # ARGV[1]
00709                     $now - self::MAX_AGE_PRUNE, # ARGV[2]
00710                     $this->maxTries, # ARGV[3]
00711                     $now # ARGV[4]
00712                 ),
00713                 6 # number of first argument(s) that are keys
00714             );
00715             if ( $res ) {
00716                 list( $released, $abandoned, $pruned, $undelayed ) = $res;
00717                 $count += $released + $pruned + $undelayed;
00718                 JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki );
00719                 JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki );
00720             }
00721         } catch ( RedisException $e ) {
00722             $this->throwRedisException( $conn, $e );
00723         }
00724 
00725         return $count;
00726     }
00727 
00731     protected function doGetPeriodicTasks() {
00732         $periods = array( 3600 ); // standard cleanup (useful on config change)
00733         if ( $this->claimTTL > 0 ) {
00734             $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing
00735         }
00736         if ( $this->checkDelay ) {
00737             $periods[] = 300; // 5 minutes
00738         }
00739         $period = min( $periods );
00740         $period = max( $period, 30 ); // sanity
00741         // Support override for faster testing
00742         if ( $this->maximumPeriodicTaskSeconds !== null ) {
00743             $period = min( $period, $this->maximumPeriodicTaskSeconds );
00744         }
00745         return array(
00746             'recyclePruneAndUndelayJobs' => array(
00747                 'callback' => array( $this, 'recyclePruneAndUndelayJobs' ),
00748                 'period'   => $period,
00749             )
00750         );
00751     }
00752 
00757     protected function getNewJobFields( IJobSpecification $job ) {
00758         return array(
00759             // Fields that describe the nature of the job
00760             'type' => $job->getType(),
00761             'namespace' => $job->getTitle()->getNamespace(),
00762             'title' => $job->getTitle()->getDBkey(),
00763             'params' => $job->getParams(),
00764             // Some jobs cannot run until a "release timestamp"
00765             'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
00766             // Additional job metadata
00767             'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
00768             'sha1' => $job->ignoreDuplicates()
00769                 ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
00770                 : '',
00771             'timestamp' => time() // UNIX timestamp
00772         );
00773     }
00774 
00779     protected function getJobFromFields( array $fields ) {
00780         $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
00781         if ( $title ) {
00782             $job = Job::factory( $fields['type'], $title, $fields['params'] );
00783             $job->metadata['uuid'] = $fields['uuid'];
00784 
00785             return $job;
00786         }
00787 
00788         return false;
00789     }
00790 
00795     protected function serialize( array $fields ) {
00796         $blob = serialize( $fields );
00797         if ( $this->compression === 'gzip'
00798             && strlen( $blob ) >= 1024
00799             && function_exists( 'gzdeflate' )
00800         ) {
00801             $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' );
00802             $blobz = serialize( $object );
00803 
00804             return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
00805         } else {
00806             return $blob;
00807         }
00808     }
00809 
00814     protected function unserialize( $blob ) {
00815         $fields = unserialize( $blob );
00816         if ( is_object( $fields ) ) {
00817             if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) {
00818                 $fields = unserialize( gzinflate( $fields->blob ) );
00819             } else {
00820                 $fields = false;
00821             }
00822         }
00823 
00824         return is_array( $fields ) ? $fields : false;
00825     }
00826 
00833     protected function getConnection() {
00834         $conn = $this->redisPool->getConnection( $this->server );
00835         if ( !$conn ) {
00836             throw new JobQueueConnectionError( "Unable to connect to redis server." );
00837         }
00838 
00839         return $conn;
00840     }
00841 
00847     protected function throwRedisException( RedisConnRef $conn, $e ) {
00848         $this->redisPool->handleError( $conn, $e );
00849         throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
00850     }
00851 
00857     private function getQueueKey( $prop, $type = null ) {
00858         $type = is_string( $type ) ? $type : $this->type;
00859         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00860         if ( strlen( $this->key ) ) { // namespaced queue (for testing)
00861             return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
00862         } else {
00863             return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
00864         }
00865     }
00866 
00871     public function setTestingPrefix( $key ) {
00872         $this->key = $key;
00873     }
00874 }