MediaWiki
REL1_21
|
00001 <?php 00038 class RedisConnectionPool { 00039 // Settings for all connections in this pool 00040 protected $connectTimeout; // string; connection timeout 00041 protected $persistent; // bool; whether connections persist 00042 protected $password; // string; plaintext auth password 00043 protected $serializer; // integer; the serializer to use (Redis::SERIALIZER_*) 00044 00045 protected $idlePoolSize = 0; // integer; current idle pool size 00046 00048 protected $connections = array(); 00050 protected $downServers = array(); 00051 00053 protected static $instances = array(); // (pool ID => RedisConnectionPool) 00054 00055 const SERVER_DOWN_TTL = 30; // integer; seconds to cache servers as "down" 00056 00068 protected function __construct( array $options ) { 00069 if ( !extension_loaded( 'redis' ) ) { 00070 throw new MWException( __CLASS__. ' requires the phpredis extension: ' . 00071 'https://github.com/nicolasff/phpredis' ); 00072 } 00073 $this->connectTimeout = $options['connectTimeout']; 00074 $this->persistent = $options['persistent']; 00075 $this->password = $options['password']; 00076 if ( !isset( $options['serializer'] ) || $options['serializer'] === 'php' ) { 00077 $this->serializer = Redis::SERIALIZER_PHP; 00078 } elseif ( $options['serializer'] === 'igbinary' ) { 00079 $this->serializer = Redis::SERIALIZER_IGBINARY; 00080 } elseif ( $options['serializer'] === 'none' ) { 00081 $this->serializer = Redis::SERIALIZER_NONE; 00082 } else { 00083 throw new MWException( "Invalid serializer specified." ); 00084 } 00085 } 00086 00091 protected static function applyDefaultConfig( array $options ) { 00092 if ( !isset( $options['connectTimeout'] ) ) { 00093 $options['connectTimeout'] = 1; 00094 } 00095 if ( !isset( $options['persistent'] ) ) { 00096 $options['persistent'] = false; 00097 } 00098 if ( !isset( $options['password'] ) ) { 00099 $options['password'] = null; 00100 } 00101 return $options; 00102 } 00103 00108 public static function singleton( array $options ) { 00109 $options = self::applyDefaultConfig( $options ); 00110 // Map the options to a unique hash... 00111 ksort( $options ); // normalize to avoid pool fragmentation 00112 $id = sha1( serialize( $options ) ); 00113 // Initialize the object at the hash as needed... 00114 if ( !isset( self::$instances[$id] ) ) { 00115 self::$instances[$id] = new self( $options ); 00116 wfDebug( "Creating a new " . __CLASS__ . " instance with id $id." ); 00117 } 00118 return self::$instances[$id]; 00119 } 00120 00129 public function getConnection( $server ) { 00130 // Check the listing "dead" servers which have had a connection errors. 00131 // Servers are marked dead for a limited period of time, to 00132 // avoid excessive overhead from repeated connection timeouts. 00133 if ( isset( $this->downServers[$server] ) ) { 00134 $now = time(); 00135 if ( $now > $this->downServers[$server] ) { 00136 // Dead time expired 00137 unset( $this->downServers[$server] ); 00138 } else { 00139 // Server is dead 00140 wfDebug( "server $server is marked down for another " . 00141 ( $this->downServers[$server] - $now ) . " seconds, can't get connection" ); 00142 return false; 00143 } 00144 } 00145 00146 // Check if a connection is already free for use 00147 if ( isset( $this->connections[$server] ) ) { 00148 foreach ( $this->connections[$server] as &$connection ) { 00149 if ( $connection['free'] ) { 00150 $connection['free'] = false; 00151 --$this->idlePoolSize; 00152 return new RedisConnRef( $this, $server, $connection['conn'] ); 00153 } 00154 } 00155 } 00156 00157 if ( substr( $server, 0, 1 ) === '/' ) { 00158 // UNIX domain socket 00159 // These are required by the redis extension to start with a slash, but 00160 // we still need to set the port to a special value to make it work. 00161 $host = $server; 00162 $port = 0; 00163 } else { 00164 // TCP connection 00165 $hostPort = IP::splitHostAndPort( $server ); 00166 if ( !$hostPort ) { 00167 throw new MWException( __CLASS__.": invalid configured server \"$server\"" ); 00168 } 00169 list( $host, $port ) = $hostPort; 00170 if ( $port === false ) { 00171 $port = 6379; 00172 } 00173 } 00174 00175 $conn = new Redis(); 00176 try { 00177 if ( $this->persistent ) { 00178 $result = $conn->pconnect( $host, $port, $this->connectTimeout ); 00179 } else { 00180 $result = $conn->connect( $host, $port, $this->connectTimeout ); 00181 } 00182 if ( !$result ) { 00183 wfDebugLog( 'redis', "Could not connect to server $server" ); 00184 // Mark server down for some time to avoid further timeouts 00185 $this->downServers[$server] = time() + self::SERVER_DOWN_TTL; 00186 return false; 00187 } 00188 if ( $this->password !== null ) { 00189 if ( !$conn->auth( $this->password ) ) { 00190 wfDebugLog( 'redis', "Authentication error connecting to $server" ); 00191 } 00192 } 00193 } catch ( RedisException $e ) { 00194 $this->downServers[$server] = time() + self::SERVER_DOWN_TTL; 00195 wfDebugLog( 'redis', "Redis exception: " . $e->getMessage() . "\n" ); 00196 return false; 00197 } 00198 00199 if ( $conn ) { 00200 $conn->setOption( Redis::OPT_SERIALIZER, $this->serializer ); 00201 $this->connections[$server][] = array( 'conn' => $conn, 'free' => false ); 00202 return new RedisConnRef( $this, $server, $conn ); 00203 } else { 00204 return false; 00205 } 00206 } 00207 00215 public function freeConnection( $server, Redis $conn ) { 00216 $found = false; 00217 00218 foreach ( $this->connections[$server] as &$connection ) { 00219 if ( $connection['conn'] === $conn && !$connection['free'] ) { 00220 $connection['free'] = true; 00221 ++$this->idlePoolSize; 00222 break; 00223 } 00224 } 00225 00226 $this->closeExcessIdleConections(); 00227 00228 return $found; 00229 } 00230 00236 protected function closeExcessIdleConections() { 00237 if ( $this->idlePoolSize <= count( $this->connections ) ) { 00238 return; // nothing to do (no more connections than servers) 00239 } 00240 00241 foreach ( $this->connections as $server => &$serverConnections ) { 00242 foreach ( $serverConnections as $key => &$connection ) { 00243 if ( $connection['free'] ) { 00244 unset( $serverConnections[$key] ); 00245 if ( --$this->idlePoolSize <= count( $this->connections ) ) { 00246 return; // done (no more connections than servers) 00247 } 00248 } 00249 } 00250 } 00251 } 00252 00264 public function handleException( $server, RedisConnRef $cref, RedisException $e ) { 00265 wfDebugLog( 'redis', "Redis exception on server $server: " . $e->getMessage() . "\n" ); 00266 foreach ( $this->connections[$server] as $key => $connection ) { 00267 if ( $cref->isConnIdentical( $connection['conn'] ) ) { 00268 $this->idlePoolSize -= $connection['free'] ? 1 : 0; 00269 unset( $this->connections[$server][$key] ); 00270 break; 00271 } 00272 } 00273 } 00274 } 00275 00282 class RedisConnRef { 00284 protected $pool; 00286 protected $conn; 00287 00288 protected $server; // string 00289 00295 public function __construct( RedisConnectionPool $pool, $server, Redis $conn ) { 00296 $this->pool = $pool; 00297 $this->server = $server; 00298 $this->conn = $conn; 00299 } 00300 00301 public function __call( $name, $arguments ) { 00302 return call_user_func_array( array( $this->conn, $name ), $arguments ); 00303 } 00304 00305 public function isConnIdentical( Redis $conn ) { 00306 return $this->conn === $conn; 00307 } 00308 00309 function __destruct() { 00310 $this->pool->freeConnection( $this->server, $this->conn ); 00311 } 00312 }