MediaWiki
REL1_23
|
00001 <?php 00031 class JobQueueAggregatorRedis extends JobQueueAggregator { 00033 protected $redisPool; 00034 00036 protected $servers; 00037 00048 protected function __construct( array $params ) { 00049 parent::__construct( $params ); 00050 $this->servers = isset( $params['redisServers'] ) 00051 ? $params['redisServers'] 00052 : array( $params['redisServer'] ); // b/c 00053 $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); 00054 } 00055 00056 protected function doNotifyQueueEmpty( $wiki, $type ) { 00057 $conn = $this->getConnection(); 00058 if ( !$conn ) { 00059 return false; 00060 } 00061 try { 00062 $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); 00063 00064 return true; 00065 } catch ( RedisException $e ) { 00066 $this->handleException( $conn, $e ); 00067 00068 return false; 00069 } 00070 } 00071 00072 protected function doNotifyQueueNonEmpty( $wiki, $type ) { 00073 $conn = $this->getConnection(); 00074 if ( !$conn ) { 00075 return false; 00076 } 00077 try { 00078 $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); 00079 00080 return true; 00081 } catch ( RedisException $e ) { 00082 $this->handleException( $conn, $e ); 00083 00084 return false; 00085 } 00086 } 00087 00088 protected function doGetAllReadyWikiQueues() { 00089 $conn = $this->getConnection(); 00090 if ( !$conn ) { 00091 return array(); 00092 } 00093 try { 00094 $conn->multi( Redis::PIPELINE ); 00095 $conn->exists( $this->getReadyQueueKey() ); 00096 $conn->hGetAll( $this->getReadyQueueKey() ); 00097 list( $exists, $map ) = $conn->exec(); 00098 00099 if ( $exists ) { // cache hit 00100 $pendingDBs = array(); // (type => list of wikis) 00101 foreach ( $map as $key => $time ) { 00102 list( $type, $wiki ) = $this->dencQueueName( $key ); 00103 $pendingDBs[$type][] = $wiki; 00104 } 00105 } else { // cache miss 00106 // Avoid duplicated effort 00107 $rand = wfRandomString( 32 ); 00108 $conn->multi( Redis::MULTI ); 00109 $conn->setex( "{$rand}:lock", 3600, 1 ); 00110 $conn->renamenx( "{$rand}:lock", $this->getReadyQueueKey() . ":lock" ); 00111 if ( $conn->exec() !== array( true, true ) ) { // lock 00112 $conn->delete( "{$rand}:lock" ); 00113 return array(); // already in progress 00114 } 00115 00116 $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) 00117 00118 $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock 00119 00120 $now = time(); 00121 $map = array(); 00122 foreach ( $pendingDBs as $type => $wikis ) { 00123 foreach ( $wikis as $wiki ) { 00124 $map[$this->encQueueName( $type, $wiki )] = $now; 00125 } 00126 } 00127 $conn->hMSet( $this->getReadyQueueKey(), $map ); 00128 } 00129 00130 return $pendingDBs; 00131 } catch ( RedisException $e ) { 00132 $this->handleException( $conn, $e ); 00133 00134 return array(); 00135 } 00136 } 00137 00138 protected function doPurge() { 00139 $conn = $this->getConnection(); 00140 if ( !$conn ) { 00141 return false; 00142 } 00143 try { 00144 $conn->delete( $this->getReadyQueueKey() ); 00145 } catch ( RedisException $e ) { 00146 $this->handleException( $conn, $e ); 00147 00148 return false; 00149 } 00150 00151 return true; 00152 } 00153 00160 protected function getConnection() { 00161 $conn = false; 00162 foreach ( $this->servers as $server ) { 00163 $conn = $this->redisPool->getConnection( $server ); 00164 if ( $conn ) { 00165 break; 00166 } 00167 } 00168 00169 return $conn; 00170 } 00171 00177 protected function handleException( RedisConnRef $conn, $e ) { 00178 $this->redisPool->handleError( $conn, $e ); 00179 } 00180 00184 private function getReadyQueueKey() { 00185 return "jobqueue:aggregator:h-ready-queues:v1"; // global 00186 } 00187 00193 private function encQueueName( $type, $wiki ) { 00194 return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); 00195 } 00196 00201 private function dencQueueName( $name ) { 00202 list( $type, $wiki ) = explode( '/', $name, 2 ); 00203 00204 return array( rawurldecode( $type ), rawurldecode( $wiki ) ); 00205 } 00206 }