MediaWiki  REL1_23
JobQueueAggregatorRedis.php
Go to the documentation of this file.
00001 <?php
00031 class JobQueueAggregatorRedis extends JobQueueAggregator {
00033     protected $redisPool;
00034 
00036     protected $servers;
00037 
00048     protected function __construct( array $params ) {
00049         parent::__construct( $params );
00050         $this->servers = isset( $params['redisServers'] )
00051             ? $params['redisServers']
00052             : array( $params['redisServer'] ); // b/c
00053         $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
00054     }
00055 
00056     protected function doNotifyQueueEmpty( $wiki, $type ) {
00057         $conn = $this->getConnection();
00058         if ( !$conn ) {
00059             return false;
00060         }
00061         try {
00062             $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) );
00063 
00064             return true;
00065         } catch ( RedisException $e ) {
00066             $this->handleException( $conn, $e );
00067 
00068             return false;
00069         }
00070     }
00071 
00072     protected function doNotifyQueueNonEmpty( $wiki, $type ) {
00073         $conn = $this->getConnection();
00074         if ( !$conn ) {
00075             return false;
00076         }
00077         try {
00078             $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
00079 
00080             return true;
00081         } catch ( RedisException $e ) {
00082             $this->handleException( $conn, $e );
00083 
00084             return false;
00085         }
00086     }
00087 
00088     protected function doGetAllReadyWikiQueues() {
00089         $conn = $this->getConnection();
00090         if ( !$conn ) {
00091             return array();
00092         }
00093         try {
00094             $conn->multi( Redis::PIPELINE );
00095             $conn->exists( $this->getReadyQueueKey() );
00096             $conn->hGetAll( $this->getReadyQueueKey() );
00097             list( $exists, $map ) = $conn->exec();
00098 
00099             if ( $exists ) { // cache hit
00100                 $pendingDBs = array(); // (type => list of wikis)
00101                 foreach ( $map as $key => $time ) {
00102                     list( $type, $wiki ) = $this->dencQueueName( $key );
00103                     $pendingDBs[$type][] = $wiki;
00104                 }
00105             } else { // cache miss
00106                 // Avoid duplicated effort
00107                 $rand = wfRandomString( 32 );
00108                 $conn->multi( Redis::MULTI );
00109                 $conn->setex( "{$rand}:lock", 3600, 1 );
00110                 $conn->renamenx( "{$rand}:lock", $this->getReadyQueueKey() . ":lock" );
00111                 if ( $conn->exec() !== array( true, true ) ) { // lock
00112                     $conn->delete( "{$rand}:lock" );
00113                     return array(); // already in progress
00114                 }
00115 
00116                 $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis)
00117 
00118                 $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock
00119 
00120                 $now = time();
00121                 $map = array();
00122                 foreach ( $pendingDBs as $type => $wikis ) {
00123                     foreach ( $wikis as $wiki ) {
00124                         $map[$this->encQueueName( $type, $wiki )] = $now;
00125                     }
00126                 }
00127                 $conn->hMSet( $this->getReadyQueueKey(), $map );
00128             }
00129 
00130             return $pendingDBs;
00131         } catch ( RedisException $e ) {
00132             $this->handleException( $conn, $e );
00133 
00134             return array();
00135         }
00136     }
00137 
00138     protected function doPurge() {
00139         $conn = $this->getConnection();
00140         if ( !$conn ) {
00141             return false;
00142         }
00143         try {
00144             $conn->delete( $this->getReadyQueueKey() );
00145         } catch ( RedisException $e ) {
00146             $this->handleException( $conn, $e );
00147 
00148             return false;
00149         }
00150 
00151         return true;
00152     }
00153 
00160     protected function getConnection() {
00161         $conn = false;
00162         foreach ( $this->servers as $server ) {
00163             $conn = $this->redisPool->getConnection( $server );
00164             if ( $conn ) {
00165                 break;
00166             }
00167         }
00168 
00169         return $conn;
00170     }
00171 
00177     protected function handleException( RedisConnRef $conn, $e ) {
00178         $this->redisPool->handleError( $conn, $e );
00179     }
00180 
00184     private function getReadyQueueKey() {
00185         return "jobqueue:aggregator:h-ready-queues:v1"; // global
00186     }
00187 
00193     private function encQueueName( $type, $wiki ) {
00194         return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
00195     }
00196 
00201     private function dencQueueName( $name ) {
00202         list( $type, $wiki ) = explode( '/', $name, 2 );
00203 
00204         return array( rawurldecode( $type ), rawurldecode( $wiki ) );
00205     }
00206 }