MediaWiki
REL1_24
|
00001 <?php 00031 class JobQueueAggregatorRedis extends JobQueueAggregator { 00033 protected $redisPool; 00034 00036 protected $servers; 00037 00047 protected function __construct( array $params ) { 00048 parent::__construct( $params ); 00049 $this->servers = isset( $params['redisServers'] ) 00050 ? $params['redisServers'] 00051 : array( $params['redisServer'] ); // b/c 00052 $params['redisConfig']['serializer'] = 'none'; 00053 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); 00054 } 00055 00056 protected function doNotifyQueueEmpty( $wiki, $type ) { 00057 $conn = $this->getConnection(); 00058 if ( !$conn ) { 00059 return false; 00060 } 00061 try { 00062 $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); 00063 00064 return true; 00065 } catch ( RedisException $e ) { 00066 $this->handleException( $conn, $e ); 00067 00068 return false; 00069 } 00070 } 00071 00072 protected function doNotifyQueueNonEmpty( $wiki, $type ) { 00073 $conn = $this->getConnection(); 00074 if ( !$conn ) { 00075 return false; 00076 } 00077 try { 00078 $conn->multi( Redis::PIPELINE ); 00079 $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' ); 00080 $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); 00081 $conn->exec(); 00082 00083 return true; 00084 } catch ( RedisException $e ) { 00085 $this->handleException( $conn, $e ); 00086 00087 return false; 00088 } 00089 } 00090 00091 protected function doGetAllReadyWikiQueues() { 00092 $conn = $this->getConnection(); 00093 if ( !$conn ) { 00094 return array(); 00095 } 00096 try { 00097 $map = $conn->hGetAll( $this->getReadyQueueKey() ); 00098 00099 if ( is_array( $map ) && isset( $map['_epoch'] ) ) { 00100 unset( $map['_epoch'] ); // ignore 00101 $pendingDBs = array(); // (type => list of wikis) 00102 foreach ( $map as $key => $time ) { 00103 list( $type, $wiki ) = $this->dencQueueName( $key ); 00104 $pendingDBs[$type][] = $wiki; 00105 } 00106 } else { 00107 // Avoid duplicated effort 00108 $rand = wfRandomString( 32 ); 00109 $conn->multi( Redis::MULTI ); 00110 $conn->setex( "{$rand}:lock", 3600, 1 ); 00111 $conn->renamenx( "{$rand}:lock", $this->getReadyQueueKey() . ":lock" ); 00112 if ( $conn->exec() !== array( true, true ) ) { // lock 00113 $conn->delete( "{$rand}:lock" ); 00114 return array(); // already in progress 00115 } 00116 00117 $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) 00118 00119 $conn->multi( Redis::PIPELINE ); 00120 $now = time(); 00121 $map = array( '_epoch' => time() ); // dummy key for empty Redis collections 00122 foreach ( $pendingDBs as $type => $wikis ) { 00123 $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' ); 00124 foreach ( $wikis as $wiki ) { 00125 $map[$this->encQueueName( $type, $wiki )] = $now; 00126 } 00127 } 00128 $conn->hMSet( $this->getReadyQueueKey(), $map ); 00129 $conn->exec(); 00130 00131 $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock 00132 } 00133 00134 return $pendingDBs; 00135 } catch ( RedisException $e ) { 00136 $this->handleException( $conn, $e ); 00137 00138 return array(); 00139 } 00140 } 00141 00142 protected function doPurge() { 00143 $conn = $this->getConnection(); 00144 if ( !$conn ) { 00145 return false; 00146 } 00147 try { 00148 $conn->delete( $this->getReadyQueueKey() ); 00149 // leave key at getQueueTypesKey() alone 00150 } catch ( RedisException $e ) { 00151 $this->handleException( $conn, $e ); 00152 00153 return false; 00154 } 00155 00156 return true; 00157 } 00158 00165 protected function getConnection() { 00166 $conn = false; 00167 foreach ( $this->servers as $server ) { 00168 $conn = $this->redisPool->getConnection( $server ); 00169 if ( $conn ) { 00170 break; 00171 } 00172 } 00173 00174 return $conn; 00175 } 00176 00182 protected function handleException( RedisConnRef $conn, $e ) { 00183 $this->redisPool->handleError( $conn, $e ); 00184 } 00185 00189 private function getReadyQueueKey() { 00190 return "jobqueue:aggregator:h-ready-queues:v2"; // global 00191 } 00192 00196 private function getQueueTypesKey() { 00197 return "jobqueue:aggregator:h-queue-types:v2"; // global 00198 } 00199 00205 private function encQueueName( $type, $wiki ) { 00206 return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); 00207 } 00208 00213 private function dencQueueName( $name ) { 00214 list( $type, $wiki ) = explode( '/', $name, 2 ); 00215 00216 return array( rawurldecode( $type ), rawurldecode( $wiki ) ); 00217 } 00218 }