MediaWiki  REL1_21
JobQueueAggregatorRedis.php
Go to the documentation of this file.
00001 <?php
00030 class JobQueueAggregatorRedis extends JobQueueAggregator {
00032         protected $redisPool;
00033 
00042         protected function __construct( array $params ) {
00043                 parent::__construct( $params );
00044                 $this->server = $params['redisServer'];
00045                 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
00046         }
00047 
00051         protected function doNotifyQueueEmpty( $wiki, $type ) {
00052                 $conn = $this->getConnection();
00053                 if ( !$conn ) {
00054                         return false;
00055                 }
00056                 try {
00057                         $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) );
00058                         return true;
00059                 } catch ( RedisException $e ) {
00060                         $this->handleException( $conn, $e );
00061                         return false;
00062                 }
00063         }
00064 
00068         protected function doNotifyQueueNonEmpty( $wiki, $type ) {
00069                 $conn = $this->getConnection();
00070                 if ( !$conn ) {
00071                         return false;
00072                 }
00073                 try {
00074                         $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
00075                         return true;
00076                 } catch ( RedisException $e ) {
00077                         $this->handleException( $conn, $e );
00078                         return false;
00079                 }
00080         }
00081 
00085         protected function doGetAllReadyWikiQueues() {
00086                 $conn = $this->getConnection();
00087                 if ( !$conn ) {
00088                         return array();
00089                 }
00090                 try {
00091                         $conn->multi( Redis::PIPELINE );
00092                         $conn->exists( $this->getReadyQueueKey() );
00093                         $conn->hGetAll( $this->getReadyQueueKey() );
00094                         list( $exists, $map ) = $conn->exec();
00095 
00096                         if ( $exists ) { // cache hit
00097                                 $pendingDBs = array(); // (type => list of wikis)
00098                                 foreach ( $map as $key => $time ) {
00099                                         list( $type, $wiki ) = $this->dencQueueName( $key );
00100                                         $pendingDBs[$type][] = $wiki;
00101                                 }
00102                         } else { // cache miss
00103                                 $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis)
00104 
00105                                 $now = time();
00106                                 $map = array();
00107                                 foreach ( $pendingDBs as $type => $wikis ) {
00108                                         foreach ( $wikis as $wiki ) {
00109                                                 $map[$this->encQueueName( $type, $wiki )] = $now;
00110                                         }
00111                                 }
00112                                 $conn->hMSet( $this->getReadyQueueKey(), $map );
00113                         }
00114 
00115                         return $pendingDBs;
00116                 } catch ( RedisException $e ) {
00117                         $this->handleException( $conn, $e );
00118                         return array();
00119                 }
00120         }
00121 
00128         protected function getConnection() {
00129                 return $this->redisPool->getConnection( $this->server );
00130         }
00131 
00137         protected function handleException( RedisConnRef $conn, $e ) {
00138                 $this->redisPool->handleException( $this->server, $conn, $e );
00139         }
00140 
00144         private function getReadyQueueKey() {
00145                 return "jobqueue:aggregator:h-ready-queues:v1"; // global
00146         }
00147 
00153         private function encQueueName( $type, $wiki ) {
00154                 return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
00155         }
00156 
00161         private function dencQueueName( $name ) {
00162                 list( $type, $wiki ) = explode( '/', $name, 2 );
00163                 return array( rawurldecode( $type ), rawurldecode( $wiki ) );
00164         }
00165 }