MediaWiki  REL1_22
JobQueueAggregatorRedis.php
Go to the documentation of this file.
00001 <?php
00031 class JobQueueAggregatorRedis extends JobQueueAggregator {
00033     protected $redisPool;
00034 
00043     protected function __construct( array $params ) {
00044         parent::__construct( $params );
00045         $this->server = $params['redisServer'];
00046         $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
00047     }
00048 
00052     protected function doNotifyQueueEmpty( $wiki, $type ) {
00053         $conn = $this->getConnection();
00054         if ( !$conn ) {
00055             return false;
00056         }
00057         try {
00058             $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) );
00059             return true;
00060         } catch ( RedisException $e ) {
00061             $this->handleException( $conn, $e );
00062             return false;
00063         }
00064     }
00065 
00069     protected function doNotifyQueueNonEmpty( $wiki, $type ) {
00070         $conn = $this->getConnection();
00071         if ( !$conn ) {
00072             return false;
00073         }
00074         try {
00075             $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
00076             return true;
00077         } catch ( RedisException $e ) {
00078             $this->handleException( $conn, $e );
00079             return false;
00080         }
00081     }
00082 
00086     protected function doGetAllReadyWikiQueues() {
00087         $conn = $this->getConnection();
00088         if ( !$conn ) {
00089             return array();
00090         }
00091         try {
00092             $conn->multi( Redis::PIPELINE );
00093             $conn->exists( $this->getReadyQueueKey() );
00094             $conn->hGetAll( $this->getReadyQueueKey() );
00095             list( $exists, $map ) = $conn->exec();
00096 
00097             if ( $exists ) { // cache hit
00098                 $pendingDBs = array(); // (type => list of wikis)
00099                 foreach ( $map as $key => $time ) {
00100                     list( $type, $wiki ) = $this->dencQueueName( $key );
00101                     $pendingDBs[$type][] = $wiki;
00102                 }
00103             } else { // cache miss
00104                 // Avoid duplicated effort
00105                 $conn->multi( Redis::MULTI );
00106                 $conn->setnx( $this->getReadyQueueKey() . ":lock", 1 );
00107                 $conn->expire( $this->getReadyQueueKey() . ":lock", 3600 );
00108                 if ( $conn->exec() !== array( true, true ) ) { // lock
00109                     return array(); // already in progress
00110                 }
00111 
00112                 $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis)
00113 
00114                 $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock
00115 
00116                 $now = time();
00117                 $map = array();
00118                 foreach ( $pendingDBs as $type => $wikis ) {
00119                     foreach ( $wikis as $wiki ) {
00120                         $map[$this->encQueueName( $type, $wiki )] = $now;
00121                     }
00122                 }
00123                 $conn->hMSet( $this->getReadyQueueKey(), $map );
00124             }
00125 
00126             return $pendingDBs;
00127         } catch ( RedisException $e ) {
00128             $this->handleException( $conn, $e );
00129             return array();
00130         }
00131     }
00132 
00136     protected function doPurge() {
00137         $conn = $this->getConnection();
00138         if ( !$conn ) {
00139             return false;
00140         }
00141         try {
00142             $conn->delete( $this->getReadyQueueKey() );
00143         } catch ( RedisException $e ) {
00144             $this->handleException( $conn, $e );
00145             return false;
00146         }
00147         return true;
00148     }
00149 
00156     protected function getConnection() {
00157         return $this->redisPool->getConnection( $this->server );
00158     }
00159 
00165     protected function handleException( RedisConnRef $conn, $e ) {
00166         $this->redisPool->handleException( $this->server, $conn, $e );
00167     }
00168 
00172     private function getReadyQueueKey() {
00173         return "jobqueue:aggregator:h-ready-queues:v1"; // global
00174     }
00175 
00181     private function encQueueName( $type, $wiki ) {
00182         return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
00183     }
00184 
00189     private function dencQueueName( $name ) {
00190         list( $type, $wiki ) = explode( '/', $name, 2 );
00191         return array( rawurldecode( $type ), rawurldecode( $wiki ) );
00192     }
00193 }