MediaWiki
REL1_22
|
00001 <?php 00031 class JobQueueAggregatorRedis extends JobQueueAggregator { 00033 protected $redisPool; 00034 00043 protected function __construct( array $params ) { 00044 parent::__construct( $params ); 00045 $this->server = $params['redisServer']; 00046 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); 00047 } 00048 00052 protected function doNotifyQueueEmpty( $wiki, $type ) { 00053 $conn = $this->getConnection(); 00054 if ( !$conn ) { 00055 return false; 00056 } 00057 try { 00058 $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); 00059 return true; 00060 } catch ( RedisException $e ) { 00061 $this->handleException( $conn, $e ); 00062 return false; 00063 } 00064 } 00065 00069 protected function doNotifyQueueNonEmpty( $wiki, $type ) { 00070 $conn = $this->getConnection(); 00071 if ( !$conn ) { 00072 return false; 00073 } 00074 try { 00075 $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); 00076 return true; 00077 } catch ( RedisException $e ) { 00078 $this->handleException( $conn, $e ); 00079 return false; 00080 } 00081 } 00082 00086 protected function doGetAllReadyWikiQueues() { 00087 $conn = $this->getConnection(); 00088 if ( !$conn ) { 00089 return array(); 00090 } 00091 try { 00092 $conn->multi( Redis::PIPELINE ); 00093 $conn->exists( $this->getReadyQueueKey() ); 00094 $conn->hGetAll( $this->getReadyQueueKey() ); 00095 list( $exists, $map ) = $conn->exec(); 00096 00097 if ( $exists ) { // cache hit 00098 $pendingDBs = array(); // (type => list of wikis) 00099 foreach ( $map as $key => $time ) { 00100 list( $type, $wiki ) = $this->dencQueueName( $key ); 00101 $pendingDBs[$type][] = $wiki; 00102 } 00103 } else { // cache miss 00104 // Avoid duplicated effort 00105 $conn->multi( Redis::MULTI ); 00106 $conn->setnx( $this->getReadyQueueKey() . ":lock", 1 ); 00107 $conn->expire( $this->getReadyQueueKey() . ":lock", 3600 ); 00108 if ( $conn->exec() !== array( true, true ) ) { // lock 00109 return array(); // already in progress 00110 } 00111 00112 $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) 00113 00114 $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock 00115 00116 $now = time(); 00117 $map = array(); 00118 foreach ( $pendingDBs as $type => $wikis ) { 00119 foreach ( $wikis as $wiki ) { 00120 $map[$this->encQueueName( $type, $wiki )] = $now; 00121 } 00122 } 00123 $conn->hMSet( $this->getReadyQueueKey(), $map ); 00124 } 00125 00126 return $pendingDBs; 00127 } catch ( RedisException $e ) { 00128 $this->handleException( $conn, $e ); 00129 return array(); 00130 } 00131 } 00132 00136 protected function doPurge() { 00137 $conn = $this->getConnection(); 00138 if ( !$conn ) { 00139 return false; 00140 } 00141 try { 00142 $conn->delete( $this->getReadyQueueKey() ); 00143 } catch ( RedisException $e ) { 00144 $this->handleException( $conn, $e ); 00145 return false; 00146 } 00147 return true; 00148 } 00149 00156 protected function getConnection() { 00157 return $this->redisPool->getConnection( $this->server ); 00158 } 00159 00165 protected function handleException( RedisConnRef $conn, $e ) { 00166 $this->redisPool->handleException( $this->server, $conn, $e ); 00167 } 00168 00172 private function getReadyQueueKey() { 00173 return "jobqueue:aggregator:h-ready-queues:v1"; // global 00174 } 00175 00181 private function encQueueName( $type, $wiki ) { 00182 return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); 00183 } 00184 00189 private function dencQueueName( $name ) { 00190 list( $type, $wiki ) = explode( '/', $name, 2 ); 00191 return array( rawurldecode( $type ), rawurldecode( $wiki ) ); 00192 } 00193 }