MediaWiki  REL1_24
JobQueueAggregatorRedis.php
Go to the documentation of this file.
00001 <?php
00031 class JobQueueAggregatorRedis extends JobQueueAggregator {
00033     protected $redisPool;
00034 
00036     protected $servers;
00037 
00047     protected function __construct( array $params ) {
00048         parent::__construct( $params );
00049         $this->servers = isset( $params['redisServers'] )
00050             ? $params['redisServers']
00051             : array( $params['redisServer'] ); // b/c
00052         $params['redisConfig']['serializer'] = 'none';
00053         $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
00054     }
00055 
00056     protected function doNotifyQueueEmpty( $wiki, $type ) {
00057         $conn = $this->getConnection();
00058         if ( !$conn ) {
00059             return false;
00060         }
00061         try {
00062             $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) );
00063 
00064             return true;
00065         } catch ( RedisException $e ) {
00066             $this->handleException( $conn, $e );
00067 
00068             return false;
00069         }
00070     }
00071 
00072     protected function doNotifyQueueNonEmpty( $wiki, $type ) {
00073         $conn = $this->getConnection();
00074         if ( !$conn ) {
00075             return false;
00076         }
00077         try {
00078             $conn->multi( Redis::PIPELINE );
00079             $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
00080             $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
00081             $conn->exec();
00082 
00083             return true;
00084         } catch ( RedisException $e ) {
00085             $this->handleException( $conn, $e );
00086 
00087             return false;
00088         }
00089     }
00090 
00091     protected function doGetAllReadyWikiQueues() {
00092         $conn = $this->getConnection();
00093         if ( !$conn ) {
00094             return array();
00095         }
00096         try {
00097             $map = $conn->hGetAll( $this->getReadyQueueKey() );
00098 
00099             if ( is_array( $map ) && isset( $map['_epoch'] ) ) {
00100                 unset( $map['_epoch'] ); // ignore
00101                 $pendingDBs = array(); // (type => list of wikis)
00102                 foreach ( $map as $key => $time ) {
00103                     list( $type, $wiki ) = $this->dencQueueName( $key );
00104                     $pendingDBs[$type][] = $wiki;
00105                 }
00106             } else {
00107                 // Avoid duplicated effort
00108                 $rand = wfRandomString( 32 );
00109                 $conn->multi( Redis::MULTI );
00110                 $conn->setex( "{$rand}:lock", 3600, 1 );
00111                 $conn->renamenx( "{$rand}:lock", $this->getReadyQueueKey() . ":lock" );
00112                 if ( $conn->exec() !== array( true, true ) ) { // lock
00113                     $conn->delete( "{$rand}:lock" );
00114                     return array(); // already in progress
00115                 }
00116 
00117                 $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis)
00118 
00119                 $conn->multi( Redis::PIPELINE );
00120                 $now = time();
00121                 $map = array( '_epoch' => time() ); // dummy key for empty Redis collections
00122                 foreach ( $pendingDBs as $type => $wikis ) {
00123                     $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
00124                     foreach ( $wikis as $wiki ) {
00125                         $map[$this->encQueueName( $type, $wiki )] = $now;
00126                     }
00127                 }
00128                 $conn->hMSet( $this->getReadyQueueKey(), $map );
00129                 $conn->exec();
00130 
00131                 $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock
00132             }
00133 
00134             return $pendingDBs;
00135         } catch ( RedisException $e ) {
00136             $this->handleException( $conn, $e );
00137 
00138             return array();
00139         }
00140     }
00141 
00142     protected function doPurge() {
00143         $conn = $this->getConnection();
00144         if ( !$conn ) {
00145             return false;
00146         }
00147         try {
00148             $conn->delete( $this->getReadyQueueKey() );
00149             // leave key at getQueueTypesKey() alone
00150         } catch ( RedisException $e ) {
00151             $this->handleException( $conn, $e );
00152 
00153             return false;
00154         }
00155 
00156         return true;
00157     }
00158 
00165     protected function getConnection() {
00166         $conn = false;
00167         foreach ( $this->servers as $server ) {
00168             $conn = $this->redisPool->getConnection( $server );
00169             if ( $conn ) {
00170                 break;
00171             }
00172         }
00173 
00174         return $conn;
00175     }
00176 
00182     protected function handleException( RedisConnRef $conn, $e ) {
00183         $this->redisPool->handleError( $conn, $e );
00184     }
00185 
00189     private function getReadyQueueKey() {
00190         return "jobqueue:aggregator:h-ready-queues:v2"; // global
00191     }
00192 
00196     private function getQueueTypesKey() {
00197         return "jobqueue:aggregator:h-queue-types:v2"; // global
00198     }
00199 
00205     private function encQueueName( $type, $wiki ) {
00206         return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
00207     }
00208 
00213     private function dencQueueName( $name ) {
00214         list( $type, $wiki ) = explode( '/', $name, 2 );
00215 
00216         return array( rawurldecode( $type ), rawurldecode( $wiki ) );
00217     }
00218 }