MediaWiki
REL1_21
|
00001 <?php 00030 class JobQueueAggregatorRedis extends JobQueueAggregator { 00032 protected $redisPool; 00033 00042 protected function __construct( array $params ) { 00043 parent::__construct( $params ); 00044 $this->server = $params['redisServer']; 00045 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); 00046 } 00047 00051 protected function doNotifyQueueEmpty( $wiki, $type ) { 00052 $conn = $this->getConnection(); 00053 if ( !$conn ) { 00054 return false; 00055 } 00056 try { 00057 $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); 00058 return true; 00059 } catch ( RedisException $e ) { 00060 $this->handleException( $conn, $e ); 00061 return false; 00062 } 00063 } 00064 00068 protected function doNotifyQueueNonEmpty( $wiki, $type ) { 00069 $conn = $this->getConnection(); 00070 if ( !$conn ) { 00071 return false; 00072 } 00073 try { 00074 $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); 00075 return true; 00076 } catch ( RedisException $e ) { 00077 $this->handleException( $conn, $e ); 00078 return false; 00079 } 00080 } 00081 00085 protected function doGetAllReadyWikiQueues() { 00086 $conn = $this->getConnection(); 00087 if ( !$conn ) { 00088 return array(); 00089 } 00090 try { 00091 $conn->multi( Redis::PIPELINE ); 00092 $conn->exists( $this->getReadyQueueKey() ); 00093 $conn->hGetAll( $this->getReadyQueueKey() ); 00094 list( $exists, $map ) = $conn->exec(); 00095 00096 if ( $exists ) { // cache hit 00097 $pendingDBs = array(); // (type => list of wikis) 00098 foreach ( $map as $key => $time ) { 00099 list( $type, $wiki ) = $this->dencQueueName( $key ); 00100 $pendingDBs[$type][] = $wiki; 00101 } 00102 } else { // cache miss 00103 $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) 00104 00105 $now = time(); 00106 $map = array(); 00107 foreach ( $pendingDBs as $type => $wikis ) { 00108 foreach ( $wikis as $wiki ) { 00109 $map[$this->encQueueName( $type, $wiki )] = $now; 00110 } 00111 } 00112 $conn->hMSet( $this->getReadyQueueKey(), $map ); 00113 } 00114 00115 return $pendingDBs; 00116 } catch ( RedisException $e ) { 00117 $this->handleException( $conn, $e ); 00118 return array(); 00119 } 00120 } 00121 00128 protected function getConnection() { 00129 return $this->redisPool->getConnection( $this->server ); 00130 } 00131 00137 protected function handleException( RedisConnRef $conn, $e ) { 00138 $this->redisPool->handleException( $this->server, $conn, $e ); 00139 } 00140 00144 private function getReadyQueueKey() { 00145 return "jobqueue:aggregator:h-ready-queues:v1"; // global 00146 } 00147 00153 private function encQueueName( $type, $wiki ) { 00154 return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); 00155 } 00156 00161 private function dencQueueName( $name ) { 00162 list( $type, $wiki ) = explode( '/', $name, 2 ); 00163 return array( rawurldecode( $type ), rawurldecode( $wiki ) ); 00164 } 00165 }