MediaWiki  REL1_24
RedisConnectionPool.php
Go to the documentation of this file.
00001 <?php
00038 class RedisConnectionPool {
00046     protected $connectTimeout;
00048     protected $readTimeout;
00050     protected $password;
00052     protected $persistent;
00054     protected $serializer;
00058     protected $idlePoolSize = 0;
00059 
00061     protected $connections = array();
00063     protected $downServers = array();
00064 
00066     protected static $instances = array();
00067 
00069     const SERVER_DOWN_TTL = 30;
00070 
00075     protected function __construct( array $options ) {
00076         if ( !class_exists( 'Redis' ) ) {
00077             throw new MWException( __CLASS__ . ' requires a Redis client library. ' .
00078                 'See https://www.mediawiki.org/wiki/Redis#Setup' );
00079         }
00080         $this->connectTimeout = $options['connectTimeout'];
00081         $this->readTimeout = $options['readTimeout'];
00082         $this->persistent = $options['persistent'];
00083         $this->password = $options['password'];
00084         if ( !isset( $options['serializer'] ) || $options['serializer'] === 'php' ) {
00085             $this->serializer = Redis::SERIALIZER_PHP;
00086         } elseif ( $options['serializer'] === 'igbinary' ) {
00087             $this->serializer = Redis::SERIALIZER_IGBINARY;
00088         } elseif ( $options['serializer'] === 'none' ) {
00089             $this->serializer = Redis::SERIALIZER_NONE;
00090         } else {
00091             throw new MWException( "Invalid serializer specified." );
00092         }
00093     }
00094 
00099     protected static function applyDefaultConfig( array $options ) {
00100         if ( !isset( $options['connectTimeout'] ) ) {
00101             $options['connectTimeout'] = 1;
00102         }
00103         if ( !isset( $options['readTimeout'] ) ) {
00104             $options['readTimeout'] = 1;
00105         }
00106         if ( !isset( $options['persistent'] ) ) {
00107             $options['persistent'] = false;
00108         }
00109         if ( !isset( $options['password'] ) ) {
00110             $options['password'] = null;
00111         }
00112 
00113         return $options;
00114     }
00115 
00131     public static function singleton( array $options ) {
00132         $options = self::applyDefaultConfig( $options );
00133         // Map the options to a unique hash...
00134         ksort( $options ); // normalize to avoid pool fragmentation
00135         $id = sha1( serialize( $options ) );
00136         // Initialize the object at the hash as needed...
00137         if ( !isset( self::$instances[$id] ) ) {
00138             self::$instances[$id] = new self( $options );
00139             wfDebug( "Creating a new " . __CLASS__ . " instance with id $id.\n" );
00140         }
00141 
00142         return self::$instances[$id];
00143     }
00144 
00153     public function getConnection( $server ) {
00154         // Check the listing "dead" servers which have had a connection errors.
00155         // Servers are marked dead for a limited period of time, to
00156         // avoid excessive overhead from repeated connection timeouts.
00157         if ( isset( $this->downServers[$server] ) ) {
00158             $now = time();
00159             if ( $now > $this->downServers[$server] ) {
00160                 // Dead time expired
00161                 unset( $this->downServers[$server] );
00162             } else {
00163                 // Server is dead
00164                 wfDebug( "server $server is marked down for another " .
00165                     ( $this->downServers[$server] - $now ) . " seconds, can't get connection\n" );
00166 
00167                 return false;
00168             }
00169         }
00170 
00171         // Check if a connection is already free for use
00172         if ( isset( $this->connections[$server] ) ) {
00173             foreach ( $this->connections[$server] as &$connection ) {
00174                 if ( $connection['free'] ) {
00175                     $connection['free'] = false;
00176                     --$this->idlePoolSize;
00177 
00178                     return new RedisConnRef( $this, $server, $connection['conn'] );
00179                 }
00180             }
00181         }
00182 
00183         if ( substr( $server, 0, 1 ) === '/' ) {
00184             // UNIX domain socket
00185             // These are required by the redis extension to start with a slash, but
00186             // we still need to set the port to a special value to make it work.
00187             $host = $server;
00188             $port = 0;
00189         } else {
00190             // TCP connection
00191             $hostPort = IP::splitHostAndPort( $server );
00192             if ( !$hostPort ) {
00193                 throw new MWException( __CLASS__ . ": invalid configured server \"$server\"" );
00194             }
00195             list( $host, $port ) = $hostPort;
00196             if ( $port === false ) {
00197                 $port = 6379;
00198             }
00199         }
00200 
00201         $conn = new Redis();
00202         try {
00203             if ( $this->persistent ) {
00204                 $result = $conn->pconnect( $host, $port, $this->connectTimeout );
00205             } else {
00206                 $result = $conn->connect( $host, $port, $this->connectTimeout );
00207             }
00208             if ( !$result ) {
00209                 wfDebugLog( 'redis', "Could not connect to server $server" );
00210                 // Mark server down for some time to avoid further timeouts
00211                 $this->downServers[$server] = time() + self::SERVER_DOWN_TTL;
00212 
00213                 return false;
00214             }
00215             if ( $this->password !== null ) {
00216                 if ( !$conn->auth( $this->password ) ) {
00217                     wfDebugLog( 'redis', "Authentication error connecting to $server" );
00218                 }
00219             }
00220         } catch ( RedisException $e ) {
00221             $this->downServers[$server] = time() + self::SERVER_DOWN_TTL;
00222             wfDebugLog( 'redis', "Redis exception connecting to $server: " . $e->getMessage() );
00223 
00224             return false;
00225         }
00226 
00227         if ( $conn ) {
00228             $conn->setOption( Redis::OPT_READ_TIMEOUT, $this->readTimeout );
00229             $conn->setOption( Redis::OPT_SERIALIZER, $this->serializer );
00230             $this->connections[$server][] = array( 'conn' => $conn, 'free' => false );
00231 
00232             return new RedisConnRef( $this, $server, $conn );
00233         } else {
00234             return false;
00235         }
00236     }
00237 
00245     public function freeConnection( $server, Redis $conn ) {
00246         $found = false;
00247 
00248         foreach ( $this->connections[$server] as &$connection ) {
00249             if ( $connection['conn'] === $conn && !$connection['free'] ) {
00250                 $connection['free'] = true;
00251                 ++$this->idlePoolSize;
00252                 break;
00253             }
00254         }
00255 
00256         $this->closeExcessIdleConections();
00257 
00258         return $found;
00259     }
00260 
00264     protected function closeExcessIdleConections() {
00265         if ( $this->idlePoolSize <= count( $this->connections ) ) {
00266             return; // nothing to do (no more connections than servers)
00267         }
00268 
00269         foreach ( $this->connections as &$serverConnections ) {
00270             foreach ( $serverConnections as $key => &$connection ) {
00271                 if ( $connection['free'] ) {
00272                     unset( $serverConnections[$key] );
00273                     if ( --$this->idlePoolSize <= count( $this->connections ) ) {
00274                         return; // done (no more connections than servers)
00275                     }
00276                 }
00277             }
00278         }
00279     }
00280 
00292     public function handleException( $server, RedisConnRef $cref, RedisException $e ) {
00293         return $this->handleError( $cref, $e );
00294     }
00295 
00305     public function handleError( RedisConnRef $cref, RedisException $e ) {
00306         $server = $cref->getServer();
00307         wfDebugLog( 'redis', "Redis exception on server $server: " . $e->getMessage() . "\n" );
00308         foreach ( $this->connections[$server] as $key => $connection ) {
00309             if ( $cref->isConnIdentical( $connection['conn'] ) ) {
00310                 $this->idlePoolSize -= $connection['free'] ? 1 : 0;
00311                 unset( $this->connections[$server][$key] );
00312                 break;
00313             }
00314         }
00315     }
00316 
00333     public function reauthenticateConnection( $server, Redis $conn ) {
00334         if ( $this->password !== null ) {
00335             if ( !$conn->auth( $this->password ) ) {
00336                 wfDebugLog( 'redis', "Authentication error connecting to $server" );
00337 
00338                 return false;
00339             }
00340         }
00341 
00342         return true;
00343     }
00344 
00351     public function resetTimeout( Redis $conn, $timeout = null ) {
00352         $conn->setOption( Redis::OPT_READ_TIMEOUT, $timeout ?: $this->readTimeout );
00353     }
00354 
00358     function __destruct() {
00359         foreach ( $this->connections as $server => &$serverConnections ) {
00360             foreach ( $serverConnections as $key => &$connection ) {
00361                 $connection['conn']->close();
00362             }
00363         }
00364     }
00365 }
00366 
00375 class RedisConnRef {
00377     protected $pool;
00379     protected $conn;
00380 
00381     protected $server; // string
00382     protected $lastError; // string
00383 
00389     public function __construct( RedisConnectionPool $pool, $server, Redis $conn ) {
00390         $this->pool = $pool;
00391         $this->server = $server;
00392         $this->conn = $conn;
00393     }
00394 
00399     public function getServer() {
00400         return $this->server;
00401     }
00402 
00403     public function getLastError() {
00404         return $this->lastError;
00405     }
00406 
00407     public function clearLastError() {
00408         $this->lastError = null;
00409     }
00410 
00411     public function __call( $name, $arguments ) {
00412         $conn = $this->conn; // convenience
00413 
00414         // Work around https://github.com/nicolasff/phpredis/issues/70
00415         $lname = strtolower( $name );
00416         if ( ( $lname === 'blpop' || $lname == 'brpop' )
00417             && is_array( $arguments[0] ) && isset( $arguments[1] )
00418         ) {
00419             $this->pool->resetTimeout( $conn, $arguments[1] + 1 );
00420         } elseif ( $lname === 'brpoplpush' && isset( $arguments[2] ) ) {
00421             $this->pool->resetTimeout( $conn, $arguments[2] + 1 );
00422         }
00423 
00424         $conn->clearLastError();
00425         try {
00426             $res = call_user_func_array( array( $conn, $name ), $arguments );
00427             if ( preg_match( '/^ERR operation not permitted\b/', $conn->getLastError() ) ) {
00428                 $this->pool->reauthenticateConnection( $this->server, $conn );
00429                 $conn->clearLastError();
00430                 $res = call_user_func_array( array( $conn, $name ), $arguments );
00431                 wfDebugLog( 'redis', "Used automatic re-authentication for method '$name'." );
00432             }
00433         } catch ( RedisException $e ) {
00434             $this->pool->resetTimeout( $conn ); // restore
00435             throw $e;
00436         }
00437 
00438         $this->lastError = $conn->getLastError() ?: $this->lastError;
00439 
00440         $this->pool->resetTimeout( $conn ); // restore
00441 
00442         return $res;
00443     }
00444 
00452     public function luaEval( $script, array $params, $numKeys ) {
00453         $sha1 = sha1( $script ); // 40 char hex
00454         $conn = $this->conn; // convenience
00455         $server = $this->server; // convenience
00456 
00457         // Try to run the server-side cached copy of the script
00458         $conn->clearLastError();
00459         $res = $conn->evalSha( $sha1, $params, $numKeys );
00460         // If we got a permission error reply that means that (a) we are not in
00461         // multi()/pipeline() and (b) some connection problem likely occurred. If
00462         // the password the client gave was just wrong, an exception should have
00463         // been thrown back in getConnection() previously.
00464         if ( preg_match( '/^ERR operation not permitted\b/', $conn->getLastError() ) ) {
00465             $this->pool->reauthenticateConnection( $server, $conn );
00466             $conn->clearLastError();
00467             $res = $conn->eval( $script, $params, $numKeys );
00468             wfDebugLog( 'redis', "Used automatic re-authentication for Lua script $sha1." );
00469         }
00470         // If the script is not in cache, use eval() to retry and cache it
00471         if ( preg_match( '/^NOSCRIPT/', $conn->getLastError() ) ) {
00472             $conn->clearLastError();
00473             $res = $conn->eval( $script, $params, $numKeys );
00474             wfDebugLog( 'redis', "Used eval() for Lua script $sha1." );
00475         }
00476 
00477         if ( $conn->getLastError() ) { // script bug?
00478             wfDebugLog( 'redis', "Lua script error on server $server: " . $conn->getLastError() );
00479         }
00480 
00481         $this->lastError = $conn->getLastError() ?: $this->lastError;
00482 
00483         return $res;
00484     }
00485 
00490     public function isConnIdentical( Redis $conn ) {
00491         return $this->conn === $conn;
00492     }
00493 
00494     function __destruct() {
00495         $this->pool->freeConnection( $this->server, $this->conn );
00496     }
00497 }