MediaWiki  REL1_22
JobQueueRedis.php
Go to the documentation of this file.
00001 <?php
00059 class JobQueueRedis extends JobQueue {
00061     protected $redisPool;
00062 
00063     protected $server; // string; server address
00064     protected $compression; // string; compression method to use
00065 
00066     const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
00067 
00068     protected $key; // string; key to prefix the queue keys with (used for testing)
00069 
00080     public function __construct( array $params ) {
00081         parent::__construct( $params );
00082         $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
00083         $this->server = $params['redisServer'];
00084         $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
00085         $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
00086     }
00087 
00088     protected function supportedOrders() {
00089         return array( 'timestamp', 'fifo' );
00090     }
00091 
00092     protected function optimalOrder() {
00093         return 'fifo';
00094     }
00095 
00096     protected function supportsDelayedJobs() {
00097         return true;
00098     }
00099 
00105     protected function doIsEmpty() {
00106         return $this->doGetSize() == 0;
00107     }
00108 
00114     protected function doGetSize() {
00115         $conn = $this->getConnection();
00116         try {
00117             return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
00118         } catch ( RedisException $e ) {
00119             $this->throwRedisException( $this->server, $conn, $e );
00120         }
00121     }
00122 
00128     protected function doGetAcquiredCount() {
00129         if ( $this->claimTTL <= 0 ) {
00130             return 0; // no acknowledgements
00131         }
00132         $conn = $this->getConnection();
00133         try {
00134             $conn->multi( Redis::PIPELINE );
00135             $conn->zSize( $this->getQueueKey( 'z-claimed' ) );
00136             $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
00137             return array_sum( $conn->exec() );
00138         } catch ( RedisException $e ) {
00139             $this->throwRedisException( $this->server, $conn, $e );
00140         }
00141     }
00142 
00148     protected function doGetDelayedCount() {
00149         if ( !$this->checkDelay ) {
00150             return 0; // no delayed jobs
00151         }
00152         $conn = $this->getConnection();
00153         try {
00154             return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
00155         } catch ( RedisException $e ) {
00156             $this->throwRedisException( $this->server, $conn, $e );
00157         }
00158     }
00159 
00165     protected function doGetAbandonedCount() {
00166         if ( $this->claimTTL <= 0 ) {
00167             return 0; // no acknowledgements
00168         }
00169         $conn = $this->getConnection();
00170         try {
00171             return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
00172         } catch ( RedisException $e ) {
00173             $this->throwRedisException( $this->server, $conn, $e );
00174         }
00175     }
00176 
00184     protected function doBatchPush( array $jobs, $flags ) {
00185         // Convert the jobs into field maps (de-duplicated against each other)
00186         $items = array(); // (job ID => job fields map)
00187         foreach ( $jobs as $job ) {
00188             $item = $this->getNewJobFields( $job );
00189             if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate
00190                 $items[$item['sha1']] = $item;
00191             } else {
00192                 $items[$item['uuid']] = $item;
00193             }
00194         }
00195 
00196         if ( !count( $items ) ) {
00197             return true; // nothing to do
00198         }
00199 
00200         $conn = $this->getConnection();
00201         try {
00202             // Actually push the non-duplicate jobs into the queue...
00203             if ( $flags & self::QOS_ATOMIC ) {
00204                 $batches = array( $items ); // all or nothing
00205             } else {
00206                 $batches = array_chunk( $items, 500 ); // avoid tying up the server
00207             }
00208             $failed = 0;
00209             $pushed = 0;
00210             foreach ( $batches as $itemBatch ) {
00211                 $added = $this->pushBlobs( $conn, $itemBatch );
00212                 if ( is_int( $added ) ) {
00213                     $pushed += $added;
00214                 } else {
00215                     $failed += count( $itemBatch );
00216                 }
00217             }
00218             if ( $failed > 0 ) {
00219                 wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
00220                 return false;
00221             }
00222             JobQueue::incrStats( 'job-insert', $this->type, count( $items ) );
00223             JobQueue::incrStats( 'job-insert-duplicate', $this->type,
00224                 count( $items ) - $failed - $pushed );
00225         } catch ( RedisException $e ) {
00226             $this->throwRedisException( $this->server, $conn, $e );
00227         }
00228 
00229         return true;
00230     }
00231 
00238     protected function pushBlobs( RedisConnRef $conn, array $items ) {
00239         $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
00240         foreach ( $items as $item ) {
00241             $args[] = (string)$item['uuid'];
00242             $args[] = (string)$item['sha1'];
00243             $args[] = (string)$item['rtimestamp'];
00244             $args[] = (string)$this->serialize( $item );
00245         }
00246         static $script =
00247 <<<LUA
00248         if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
00249         local pushed = 0
00250         for i = 1,#ARGV,4 do
00251             local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
00252             if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then
00253                 if 1*rtimestamp > 0 then
00254                     -- Insert into delayed queue (release time as score)
00255                     redis.call('zAdd',KEYS[4],rtimestamp,id)
00256                 else
00257                     -- Insert into unclaimed queue
00258                     redis.call('lPush',KEYS[1],id)
00259                 end
00260                 if sha1 ~= '' then
00261                     redis.call('hSet',KEYS[2],id,sha1)
00262                     redis.call('hSet',KEYS[3],sha1,id)
00263                 end
00264                 redis.call('hSet',KEYS[5],id,blob)
00265                 pushed = pushed + 1
00266             end
00267         end
00268         return pushed
00269 LUA;
00270         return $conn->luaEval( $script,
00271             array_merge(
00272                 array(
00273                     $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
00274                     $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
00275                     $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
00276                     $this->getQueueKey( 'z-delayed' ), # KEYS[4]
00277                     $this->getQueueKey( 'h-data' ), # KEYS[5]
00278                 ),
00279                 $args
00280             ),
00281             5 # number of first argument(s) that are keys
00282         );
00283     }
00284 
00290     protected function doPop() {
00291         $job = false;
00292 
00293         // Push ready delayed jobs into the queue every 10 jobs to spread the load.
00294         // This is also done as a periodic task, but we don't want too much done at once.
00295         if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
00296             $this->releaseReadyDelayedJobs();
00297         }
00298 
00299         $conn = $this->getConnection();
00300         try {
00301             do {
00302                 if ( $this->claimTTL > 0 ) {
00303                     // Keep the claimed job list down for high-traffic queues
00304                     if ( mt_rand( 0, 99 ) == 0 ) {
00305                         $this->recycleAndDeleteStaleJobs();
00306                     }
00307                     $blob = $this->popAndAcquireBlob( $conn );
00308                 } else {
00309                     $blob = $this->popAndDeleteBlob( $conn );
00310                 }
00311                 if ( $blob === false ) {
00312                     break; // no jobs; nothing to do
00313                 }
00314 
00315                 JobQueue::incrStats( 'job-pop', $this->type );
00316                 $item = $this->unserialize( $blob );
00317                 if ( $item === false ) {
00318                     wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
00319                     continue;
00320                 }
00321 
00322                 // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed
00323                 $job = $this->getJobFromFields( $item ); // may be false
00324             } while ( !$job ); // job may be false if invalid
00325         } catch ( RedisException $e ) {
00326             $this->throwRedisException( $this->server, $conn, $e );
00327         }
00328 
00329         return $job;
00330     }
00331 
00337     protected function popAndDeleteBlob( RedisConnRef $conn ) {
00338         static $script =
00339 <<<LUA
00340         -- Pop an item off the queue
00341         local id = redis.call('rpop',KEYS[1])
00342         if not id then return false end
00343         -- Get the job data and remove it
00344         local item = redis.call('hGet',KEYS[4],id)
00345         redis.call('hDel',KEYS[4],id)
00346         -- Allow new duplicates of this job
00347         local sha1 = redis.call('hGet',KEYS[2],id)
00348         if sha1 then redis.call('hDel',KEYS[3],sha1) end
00349         redis.call('hDel',KEYS[2],id)
00350         -- Return the job data
00351         return item
00352 LUA;
00353         return $conn->luaEval( $script,
00354             array(
00355                 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
00356                 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
00357                 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
00358                 $this->getQueueKey( 'h-data' ), # KEYS[4]
00359             ),
00360             4 # number of first argument(s) that are keys
00361         );
00362     }
00363 
00369     protected function popAndAcquireBlob( RedisConnRef $conn ) {
00370         static $script =
00371 <<<LUA
00372         -- Pop an item off the queue
00373         local id = redis.call('rPop',KEYS[1])
00374         if not id then return false end
00375         -- Allow new duplicates of this job
00376         local sha1 = redis.call('hGet',KEYS[2],id)
00377         if sha1 then redis.call('hDel',KEYS[3],sha1) end
00378         redis.call('hDel',KEYS[2],id)
00379         -- Mark the jobs as claimed and return it
00380         redis.call('zAdd',KEYS[4],ARGV[1],id)
00381         redis.call('hIncrBy',KEYS[5],id,1)
00382         return redis.call('hGet',KEYS[6],id)
00383 LUA;
00384         return $conn->luaEval( $script,
00385             array(
00386                 $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
00387                 $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
00388                 $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
00389                 $this->getQueueKey( 'z-claimed' ), # KEYS[4]
00390                 $this->getQueueKey( 'h-attempts' ), # KEYS[5]
00391                 $this->getQueueKey( 'h-data' ), # KEYS[6]
00392                 time(), # ARGV[1] (injected to be replication-safe)
00393             ),
00394             6 # number of first argument(s) that are keys
00395         );
00396     }
00397 
00404     protected function doAck( Job $job ) {
00405         if ( !isset( $job->metadata['uuid'] ) ) {
00406             throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
00407         }
00408         if ( $this->claimTTL > 0 ) {
00409             $conn = $this->getConnection();
00410             try {
00411                 static $script =
00412 <<<LUA
00413                 -- Unmark the job as claimed
00414                 redis.call('zRem',KEYS[1],ARGV[1])
00415                 redis.call('hDel',KEYS[2],ARGV[1])
00416                 -- Delete the job data itself
00417                 return redis.call('hDel',KEYS[3],ARGV[1])
00418 LUA;
00419                 $res = $conn->luaEval( $script,
00420                     array(
00421                         $this->getQueueKey( 'z-claimed' ), # KEYS[1]
00422                         $this->getQueueKey( 'h-attempts' ), # KEYS[2]
00423                         $this->getQueueKey( 'h-data' ), # KEYS[3]
00424                         $job->metadata['uuid'] # ARGV[1]
00425                     ),
00426                     3 # number of first argument(s) that are keys
00427                 );
00428 
00429                 if ( !$res ) {
00430                     wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
00431                     return false;
00432                 }
00433             } catch ( RedisException $e ) {
00434                 $this->throwRedisException( $this->server, $conn, $e );
00435             }
00436         }
00437         return true;
00438     }
00439 
00446     protected function doDeduplicateRootJob( Job $job ) {
00447         if ( !$job->hasRootJobParams() ) {
00448             throw new MWException( "Cannot register root job; missing parameters." );
00449         }
00450         $params = $job->getRootJobParams();
00451 
00452         $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
00453 
00454         $conn = $this->getConnection();
00455         try {
00456             $timestamp = $conn->get( $key ); // current last timestamp of this job
00457             if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
00458                 return true; // a newer version of this root job was enqueued
00459             }
00460             // Update the timestamp of the last root job started at the location...
00461             return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
00462         } catch ( RedisException $e ) {
00463             $this->throwRedisException( $this->server, $conn, $e );
00464         }
00465     }
00466 
00472     protected function doIsRootJobOldDuplicate( Job $job ) {
00473         if ( !$job->hasRootJobParams() ) {
00474             return false; // job has no de-deplication info
00475         }
00476         $params = $job->getRootJobParams();
00477 
00478         $conn = $this->getConnection();
00479         try {
00480             // Get the last time this root job was enqueued
00481             $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
00482         } catch ( RedisException $e ) {
00483             $this->throwRedisException( $this->server, $conn, $e );
00484         }
00485 
00486         // Check if a new root job was started at the location after this one's...
00487         return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
00488     }
00489 
00494     protected function doDelete() {
00495         static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned',
00496             'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' );
00497 
00498         $conn = $this->getConnection();
00499         try {
00500             $keys = array();
00501             foreach ( $props as $prop ) {
00502                 $keys[] = $this->getQueueKey( $prop );
00503             }
00504             return ( $conn->delete( $keys ) !== false );
00505         } catch ( RedisException $e ) {
00506             $this->throwRedisException( $this->server, $conn, $e );
00507         }
00508     }
00509 
00514     public function getAllQueuedJobs() {
00515         $conn = $this->getConnection();
00516         try {
00517             $that = $this;
00518             return new MappedIterator(
00519                 $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
00520                 function( $uid ) use ( $that, $conn ) {
00521                     return $that->getJobFromUidInternal( $uid, $conn );
00522                 },
00523                 array( 'accept' => function ( $job ) { return is_object( $job ); } )
00524             );
00525         } catch ( RedisException $e ) {
00526             $this->throwRedisException( $this->server, $conn, $e );
00527         }
00528     }
00529 
00534     public function getAllDelayedJobs() {
00535         $conn = $this->getConnection();
00536         try {
00537             $that = $this;
00538             return new MappedIterator( // delayed jobs
00539                 $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
00540                 function( $uid ) use ( $that, $conn ) {
00541                     return $that->getJobFromUidInternal( $uid, $conn );
00542                 },
00543                 array( 'accept' => function ( $job ) { return is_object( $job ); } )
00544             );
00545         } catch ( RedisException $e ) {
00546             $this->throwRedisException( $this->server, $conn, $e );
00547         }
00548     }
00549 
00550     public function getCoalesceLocationInternal() {
00551         return "RedisServer:" . $this->server;
00552     }
00553 
00554     protected function doGetSiblingQueuesWithJobs( array $types ) {
00555         return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
00556     }
00557 
00558     protected function doGetSiblingQueueSizes( array $types ) {
00559         $sizes = array(); // (type => size)
00560         $types = array_values( $types ); // reindex
00561         try {
00562             $conn = $this->getConnection();
00563             $conn->multi( Redis::PIPELINE );
00564             foreach ( $types as $type ) {
00565                 $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
00566             }
00567             $res = $conn->exec();
00568             if ( is_array( $res ) ) {
00569                 foreach ( $res as $i => $size ) {
00570                     $sizes[$types[$i]] = $size;
00571                 }
00572             }
00573         } catch ( RedisException $e ) {
00574             $this->throwRedisException( $this->server, $conn, $e );
00575         }
00576         return $sizes;
00577     }
00578 
00587     public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
00588         try {
00589             $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
00590             if ( $data === false ) {
00591                 return false; // not found
00592             }
00593             $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
00594             if ( !is_array( $item ) ) { // this shouldn't happen
00595                 throw new MWException( "Could not find job with ID '$uid'." );
00596             }
00597             $title = Title::makeTitle( $item['namespace'], $item['title'] );
00598             $job = Job::factory( $item['type'], $title, $item['params'] );
00599             $job->metadata['uuid'] = $item['uuid'];
00600             return $job;
00601         } catch ( RedisException $e ) {
00602             $this->throwRedisException( $this->server, $conn, $e );
00603         }
00604     }
00605 
00612     public function releaseReadyDelayedJobs() {
00613         $count = 0;
00614 
00615         $conn = $this->getConnection();
00616         try {
00617             static $script =
00618 <<<LUA
00619             -- Get the list of ready delayed jobs, sorted by readiness
00620             local ids = redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
00621             -- Migrate the jobs from the "delayed" set to the "unclaimed" list
00622             for k,id in ipairs(ids) do
00623                 redis.call('lPush',KEYS[2],id)
00624                 redis.call('zRem',KEYS[1],id)
00625             end
00626             return #ids
00627 LUA;
00628             $count += (int)$conn->luaEval( $script,
00629                 array(
00630                     $this->getQueueKey( 'z-delayed' ), // KEYS[1]
00631                     $this->getQueueKey( 'l-unclaimed' ), // KEYS[2]
00632                     time() // ARGV[1]; max "delay until" UNIX timestamp
00633                 ),
00634                 2 # first two arguments are keys
00635             );
00636         } catch ( RedisException $e ) {
00637             $this->throwRedisException( $this->server, $conn, $e );
00638         }
00639 
00640         return $count;
00641     }
00642 
00649     public function recycleAndDeleteStaleJobs() {
00650         if ( $this->claimTTL <= 0 ) { // sanity
00651             throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." );
00652         }
00653         $count = 0;
00654         // For each job item that can be retried, we need to add it back to the
00655         // main queue and remove it from the list of currenty claimed job items.
00656         // For those that cannot, they are marked as dead and kept around for
00657         // investigation and manual job restoration but are eventually deleted.
00658         $conn = $this->getConnection();
00659         try {
00660             $now = time();
00661             static $script =
00662 <<<LUA
00663             local released,abandoned,pruned = 0,0,0
00664             -- Get all non-dead jobs that have an expired claim on them.
00665             -- The score for each item is the last claim timestamp (UNIX).
00666             local staleClaims = redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
00667             for k,id in ipairs(staleClaims) do
00668                 local timestamp = redis.call('zScore',KEYS[1],id)
00669                 local attempts = redis.call('hGet',KEYS[2],id)
00670                 if attempts < ARGV[3] then
00671                     -- Claim expired and retries left: re-enqueue the job
00672                     redis.call('lPush',KEYS[3],id)
00673                     redis.call('hIncrBy',KEYS[2],id,1)
00674                     released = released + 1
00675                 else
00676                     -- Claim expired and no retries left: mark the job as dead
00677                     redis.call('zAdd',KEYS[5],timestamp,id)
00678                     abandoned = abandoned + 1
00679                 end
00680                 redis.call('zRem',KEYS[1],id)
00681             end
00682             -- Get all of the dead jobs that have been marked as dead for too long.
00683             -- The score for each item is the last claim timestamp (UNIX).
00684             local deadClaims = redis.call('zRangeByScore',KEYS[5],0,ARGV[2])
00685             for k,id in ipairs(deadClaims) do
00686                 -- Stale and out of retries: remove any traces of the job
00687                 redis.call('zRem',KEYS[5],id)
00688                 redis.call('hDel',KEYS[2],id)
00689                 redis.call('hDel',KEYS[4],id)
00690                 pruned = pruned + 1
00691             end
00692             return {released,abandoned,pruned}
00693 LUA;
00694             $res = $conn->luaEval( $script,
00695                 array(
00696                     $this->getQueueKey( 'z-claimed' ), # KEYS[1]
00697                     $this->getQueueKey( 'h-attempts' ), # KEYS[2]
00698                     $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
00699                     $this->getQueueKey( 'h-data' ), # KEYS[4]
00700                     $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
00701                     $now - $this->claimTTL, # ARGV[1]
00702                     $now - self::MAX_AGE_PRUNE, # ARGV[2]
00703                     $this->maxTries # ARGV[3]
00704                 ),
00705                 5 # number of first argument(s) that are keys
00706             );
00707             if ( $res ) {
00708                 list( $released, $abandoned, $pruned ) = $res;
00709                 $count += $released + $pruned;
00710                 JobQueue::incrStats( 'job-recycle', $this->type, $released );
00711                 JobQueue::incrStats( 'job-abandon', $this->type, $abandoned );
00712             }
00713         } catch ( RedisException $e ) {
00714             $this->throwRedisException( $this->server, $conn, $e );
00715         }
00716 
00717         return $count;
00718     }
00719 
00723     protected function doGetPeriodicTasks() {
00724         $tasks = array();
00725         if ( $this->claimTTL > 0 ) {
00726             $tasks['recycleAndDeleteStaleJobs'] = array(
00727                 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
00728                 'period' => ceil( $this->claimTTL / 2 )
00729             );
00730         }
00731         if ( $this->checkDelay ) {
00732             $tasks['releaseReadyDelayedJobs'] = array(
00733                 'callback' => array( $this, 'releaseReadyDelayedJobs' ),
00734                 'period' => 300 // 5 minutes
00735             );
00736         }
00737         return $tasks;
00738     }
00739 
00744     protected function getNewJobFields( Job $job ) {
00745         return array(
00746             // Fields that describe the nature of the job
00747             'type'       => $job->getType(),
00748             'namespace'  => $job->getTitle()->getNamespace(),
00749             'title'      => $job->getTitle()->getDBkey(),
00750             'params'     => $job->getParams(),
00751             // Some jobs cannot run until a "release timestamp"
00752             'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
00753             // Additional job metadata
00754             'uuid'       => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
00755             'sha1'       => $job->ignoreDuplicates()
00756                 ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
00757                 : '',
00758             'timestamp'  => time() // UNIX timestamp
00759         );
00760     }
00761 
00766     protected function getJobFromFields( array $fields ) {
00767         $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
00768         if ( $title ) {
00769             $job = Job::factory( $fields['type'], $title, $fields['params'] );
00770             $job->metadata['uuid'] = $fields['uuid'];
00771             return $job;
00772         }
00773         return false;
00774     }
00775 
00780     protected function serialize( array $fields ) {
00781         $blob = serialize( $fields );
00782         if ( $this->compression === 'gzip'
00783             && strlen( $blob ) >= 1024 && function_exists( 'gzdeflate' ) )
00784         {
00785             $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' );
00786             $blobz = serialize( $object );
00787             return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
00788         } else {
00789             return $blob;
00790         }
00791     }
00792 
00797     protected function unserialize( $blob ) {
00798         $fields = unserialize( $blob );
00799         if ( is_object( $fields ) ) {
00800             if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) {
00801                 $fields = unserialize( gzinflate( $fields->blob ) );
00802             } else {
00803                 $fields = false;
00804             }
00805         }
00806         return is_array( $fields ) ? $fields : false;
00807     }
00808 
00815     protected function getConnection() {
00816         $conn = $this->redisPool->getConnection( $this->server );
00817         if ( !$conn ) {
00818             throw new JobQueueConnectionError( "Unable to connect to redis server." );
00819         }
00820         return $conn;
00821     }
00822 
00829     protected function throwRedisException( $server, RedisConnRef $conn, $e ) {
00830         $this->redisPool->handleException( $server, $conn, $e );
00831         throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
00832     }
00833 
00839     private function getQueueKey( $prop, $type = null ) {
00840         $type = is_string( $type ) ? $type : $this->type;
00841         list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
00842         if ( strlen( $this->key ) ) { // namespaced queue (for testing)
00843             return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
00844         } else {
00845             return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
00846         }
00847     }
00848 
00853     public function setTestingPrefix( $key ) {
00854         $this->key = $key;
00855     }
00856 }