MediaWiki
REL1_24
|
00001 <?php 00038 class RedisConnectionPool { 00046 protected $connectTimeout; 00048 protected $readTimeout; 00050 protected $password; 00052 protected $persistent; 00054 protected $serializer; 00058 protected $idlePoolSize = 0; 00059 00061 protected $connections = array(); 00063 protected $downServers = array(); 00064 00066 protected static $instances = array(); 00067 00069 const SERVER_DOWN_TTL = 30; 00070 00075 protected function __construct( array $options ) { 00076 if ( !class_exists( 'Redis' ) ) { 00077 throw new MWException( __CLASS__ . ' requires a Redis client library. ' . 00078 'See https://www.mediawiki.org/wiki/Redis#Setup' ); 00079 } 00080 $this->connectTimeout = $options['connectTimeout']; 00081 $this->readTimeout = $options['readTimeout']; 00082 $this->persistent = $options['persistent']; 00083 $this->password = $options['password']; 00084 if ( !isset( $options['serializer'] ) || $options['serializer'] === 'php' ) { 00085 $this->serializer = Redis::SERIALIZER_PHP; 00086 } elseif ( $options['serializer'] === 'igbinary' ) { 00087 $this->serializer = Redis::SERIALIZER_IGBINARY; 00088 } elseif ( $options['serializer'] === 'none' ) { 00089 $this->serializer = Redis::SERIALIZER_NONE; 00090 } else { 00091 throw new MWException( "Invalid serializer specified." ); 00092 } 00093 } 00094 00099 protected static function applyDefaultConfig( array $options ) { 00100 if ( !isset( $options['connectTimeout'] ) ) { 00101 $options['connectTimeout'] = 1; 00102 } 00103 if ( !isset( $options['readTimeout'] ) ) { 00104 $options['readTimeout'] = 1; 00105 } 00106 if ( !isset( $options['persistent'] ) ) { 00107 $options['persistent'] = false; 00108 } 00109 if ( !isset( $options['password'] ) ) { 00110 $options['password'] = null; 00111 } 00112 00113 return $options; 00114 } 00115 00131 public static function singleton( array $options ) { 00132 $options = self::applyDefaultConfig( $options ); 00133 // Map the options to a unique hash... 00134 ksort( $options ); // normalize to avoid pool fragmentation 00135 $id = sha1( serialize( $options ) ); 00136 // Initialize the object at the hash as needed... 00137 if ( !isset( self::$instances[$id] ) ) { 00138 self::$instances[$id] = new self( $options ); 00139 wfDebug( "Creating a new " . __CLASS__ . " instance with id $id.\n" ); 00140 } 00141 00142 return self::$instances[$id]; 00143 } 00144 00153 public function getConnection( $server ) { 00154 // Check the listing "dead" servers which have had a connection errors. 00155 // Servers are marked dead for a limited period of time, to 00156 // avoid excessive overhead from repeated connection timeouts. 00157 if ( isset( $this->downServers[$server] ) ) { 00158 $now = time(); 00159 if ( $now > $this->downServers[$server] ) { 00160 // Dead time expired 00161 unset( $this->downServers[$server] ); 00162 } else { 00163 // Server is dead 00164 wfDebug( "server $server is marked down for another " . 00165 ( $this->downServers[$server] - $now ) . " seconds, can't get connection\n" ); 00166 00167 return false; 00168 } 00169 } 00170 00171 // Check if a connection is already free for use 00172 if ( isset( $this->connections[$server] ) ) { 00173 foreach ( $this->connections[$server] as &$connection ) { 00174 if ( $connection['free'] ) { 00175 $connection['free'] = false; 00176 --$this->idlePoolSize; 00177 00178 return new RedisConnRef( $this, $server, $connection['conn'] ); 00179 } 00180 } 00181 } 00182 00183 if ( substr( $server, 0, 1 ) === '/' ) { 00184 // UNIX domain socket 00185 // These are required by the redis extension to start with a slash, but 00186 // we still need to set the port to a special value to make it work. 00187 $host = $server; 00188 $port = 0; 00189 } else { 00190 // TCP connection 00191 $hostPort = IP::splitHostAndPort( $server ); 00192 if ( !$hostPort ) { 00193 throw new MWException( __CLASS__ . ": invalid configured server \"$server\"" ); 00194 } 00195 list( $host, $port ) = $hostPort; 00196 if ( $port === false ) { 00197 $port = 6379; 00198 } 00199 } 00200 00201 $conn = new Redis(); 00202 try { 00203 if ( $this->persistent ) { 00204 $result = $conn->pconnect( $host, $port, $this->connectTimeout ); 00205 } else { 00206 $result = $conn->connect( $host, $port, $this->connectTimeout ); 00207 } 00208 if ( !$result ) { 00209 wfDebugLog( 'redis', "Could not connect to server $server" ); 00210 // Mark server down for some time to avoid further timeouts 00211 $this->downServers[$server] = time() + self::SERVER_DOWN_TTL; 00212 00213 return false; 00214 } 00215 if ( $this->password !== null ) { 00216 if ( !$conn->auth( $this->password ) ) { 00217 wfDebugLog( 'redis', "Authentication error connecting to $server" ); 00218 } 00219 } 00220 } catch ( RedisException $e ) { 00221 $this->downServers[$server] = time() + self::SERVER_DOWN_TTL; 00222 wfDebugLog( 'redis', "Redis exception connecting to $server: " . $e->getMessage() ); 00223 00224 return false; 00225 } 00226 00227 if ( $conn ) { 00228 $conn->setOption( Redis::OPT_READ_TIMEOUT, $this->readTimeout ); 00229 $conn->setOption( Redis::OPT_SERIALIZER, $this->serializer ); 00230 $this->connections[$server][] = array( 'conn' => $conn, 'free' => false ); 00231 00232 return new RedisConnRef( $this, $server, $conn ); 00233 } else { 00234 return false; 00235 } 00236 } 00237 00245 public function freeConnection( $server, Redis $conn ) { 00246 $found = false; 00247 00248 foreach ( $this->connections[$server] as &$connection ) { 00249 if ( $connection['conn'] === $conn && !$connection['free'] ) { 00250 $connection['free'] = true; 00251 ++$this->idlePoolSize; 00252 break; 00253 } 00254 } 00255 00256 $this->closeExcessIdleConections(); 00257 00258 return $found; 00259 } 00260 00264 protected function closeExcessIdleConections() { 00265 if ( $this->idlePoolSize <= count( $this->connections ) ) { 00266 return; // nothing to do (no more connections than servers) 00267 } 00268 00269 foreach ( $this->connections as &$serverConnections ) { 00270 foreach ( $serverConnections as $key => &$connection ) { 00271 if ( $connection['free'] ) { 00272 unset( $serverConnections[$key] ); 00273 if ( --$this->idlePoolSize <= count( $this->connections ) ) { 00274 return; // done (no more connections than servers) 00275 } 00276 } 00277 } 00278 } 00279 } 00280 00292 public function handleException( $server, RedisConnRef $cref, RedisException $e ) { 00293 return $this->handleError( $cref, $e ); 00294 } 00295 00305 public function handleError( RedisConnRef $cref, RedisException $e ) { 00306 $server = $cref->getServer(); 00307 wfDebugLog( 'redis', "Redis exception on server $server: " . $e->getMessage() . "\n" ); 00308 foreach ( $this->connections[$server] as $key => $connection ) { 00309 if ( $cref->isConnIdentical( $connection['conn'] ) ) { 00310 $this->idlePoolSize -= $connection['free'] ? 1 : 0; 00311 unset( $this->connections[$server][$key] ); 00312 break; 00313 } 00314 } 00315 } 00316 00333 public function reauthenticateConnection( $server, Redis $conn ) { 00334 if ( $this->password !== null ) { 00335 if ( !$conn->auth( $this->password ) ) { 00336 wfDebugLog( 'redis', "Authentication error connecting to $server" ); 00337 00338 return false; 00339 } 00340 } 00341 00342 return true; 00343 } 00344 00351 public function resetTimeout( Redis $conn, $timeout = null ) { 00352 $conn->setOption( Redis::OPT_READ_TIMEOUT, $timeout ?: $this->readTimeout ); 00353 } 00354 00358 function __destruct() { 00359 foreach ( $this->connections as $server => &$serverConnections ) { 00360 foreach ( $serverConnections as $key => &$connection ) { 00361 $connection['conn']->close(); 00362 } 00363 } 00364 } 00365 } 00366 00375 class RedisConnRef { 00377 protected $pool; 00379 protected $conn; 00380 00381 protected $server; // string 00382 protected $lastError; // string 00383 00389 public function __construct( RedisConnectionPool $pool, $server, Redis $conn ) { 00390 $this->pool = $pool; 00391 $this->server = $server; 00392 $this->conn = $conn; 00393 } 00394 00399 public function getServer() { 00400 return $this->server; 00401 } 00402 00403 public function getLastError() { 00404 return $this->lastError; 00405 } 00406 00407 public function clearLastError() { 00408 $this->lastError = null; 00409 } 00410 00411 public function __call( $name, $arguments ) { 00412 $conn = $this->conn; // convenience 00413 00414 // Work around https://github.com/nicolasff/phpredis/issues/70 00415 $lname = strtolower( $name ); 00416 if ( ( $lname === 'blpop' || $lname == 'brpop' ) 00417 && is_array( $arguments[0] ) && isset( $arguments[1] ) 00418 ) { 00419 $this->pool->resetTimeout( $conn, $arguments[1] + 1 ); 00420 } elseif ( $lname === 'brpoplpush' && isset( $arguments[2] ) ) { 00421 $this->pool->resetTimeout( $conn, $arguments[2] + 1 ); 00422 } 00423 00424 $conn->clearLastError(); 00425 try { 00426 $res = call_user_func_array( array( $conn, $name ), $arguments ); 00427 if ( preg_match( '/^ERR operation not permitted\b/', $conn->getLastError() ) ) { 00428 $this->pool->reauthenticateConnection( $this->server, $conn ); 00429 $conn->clearLastError(); 00430 $res = call_user_func_array( array( $conn, $name ), $arguments ); 00431 wfDebugLog( 'redis', "Used automatic re-authentication for method '$name'." ); 00432 } 00433 } catch ( RedisException $e ) { 00434 $this->pool->resetTimeout( $conn ); // restore 00435 throw $e; 00436 } 00437 00438 $this->lastError = $conn->getLastError() ?: $this->lastError; 00439 00440 $this->pool->resetTimeout( $conn ); // restore 00441 00442 return $res; 00443 } 00444 00452 public function luaEval( $script, array $params, $numKeys ) { 00453 $sha1 = sha1( $script ); // 40 char hex 00454 $conn = $this->conn; // convenience 00455 $server = $this->server; // convenience 00456 00457 // Try to run the server-side cached copy of the script 00458 $conn->clearLastError(); 00459 $res = $conn->evalSha( $sha1, $params, $numKeys ); 00460 // If we got a permission error reply that means that (a) we are not in 00461 // multi()/pipeline() and (b) some connection problem likely occurred. If 00462 // the password the client gave was just wrong, an exception should have 00463 // been thrown back in getConnection() previously. 00464 if ( preg_match( '/^ERR operation not permitted\b/', $conn->getLastError() ) ) { 00465 $this->pool->reauthenticateConnection( $server, $conn ); 00466 $conn->clearLastError(); 00467 $res = $conn->eval( $script, $params, $numKeys ); 00468 wfDebugLog( 'redis', "Used automatic re-authentication for Lua script $sha1." ); 00469 } 00470 // If the script is not in cache, use eval() to retry and cache it 00471 if ( preg_match( '/^NOSCRIPT/', $conn->getLastError() ) ) { 00472 $conn->clearLastError(); 00473 $res = $conn->eval( $script, $params, $numKeys ); 00474 wfDebugLog( 'redis', "Used eval() for Lua script $sha1." ); 00475 } 00476 00477 if ( $conn->getLastError() ) { // script bug? 00478 wfDebugLog( 'redis', "Lua script error on server $server: " . $conn->getLastError() ); 00479 } 00480 00481 $this->lastError = $conn->getLastError() ?: $this->lastError; 00482 00483 return $res; 00484 } 00485 00490 public function isConnIdentical( Redis $conn ) { 00491 return $this->conn === $conn; 00492 } 00493 00494 function __destruct() { 00495 $this->pool->freeConnection( $this->server, $this->conn ); 00496 } 00497 }