1 <?php
26 use Psr\Log\LoggerAwareInterface;
27 use Psr\Log\LoggerInterface;
42 class RedisConnectionPool implements LoggerAwareInterface {
50  protected $connectTimeout;
52  protected $readTimeout;
54  protected $password;
56  protected $persistent;
58  protected $serializer;
62  protected $idlePoolSize = 0;
65  protected $connections = [];
67  protected $downServers = [];
70  protected static $instances = [];
73  const SERVER_DOWN_TTL = 30;
78  protected $logger;
84  protected function __construct( array $options ) {
85  if ( !class_exists( 'Redis' ) ) {
86  throw new Exception( __CLASS__ . ' requires a Redis client library. ' .
87  'See' );
88  }
89  if ( isset( $options['logger'] ) ) {
90  $this->setLogger( $options['logger'] );
91  } else {
92  $this->setLogger( LoggerFactory::getInstance( 'redis' ) );
93  }
94  $this->connectTimeout = $options['connectTimeout'];
95  $this->readTimeout = $options['readTimeout'];
96  $this->persistent = $options['persistent'];
97  $this->password = $options['password'];
98  if ( !isset( $options['serializer'] ) || $options['serializer'] === 'php' ) {
99  $this->serializer = Redis::SERIALIZER_PHP;
100  } elseif ( $options['serializer'] === 'igbinary' ) {
101  $this->serializer = Redis::SERIALIZER_IGBINARY;
102  } elseif ( $options['serializer'] === 'none' ) {
103  $this->serializer = Redis::SERIALIZER_NONE;
104  } else {
105  throw new InvalidArgumentException( "Invalid serializer specified." );
106  }
107  }
113  public function setLogger( LoggerInterface $logger ) {
114  $this->logger = $logger;
115  }
121  protected static function applyDefaultConfig( array $options ) {
122  if ( !isset( $options['connectTimeout'] ) ) {
123  $options['connectTimeout'] = 1;
124  }
125  if ( !isset( $options['readTimeout'] ) ) {
126  $options['readTimeout'] = 1;
127  }
128  if ( !isset( $options['persistent'] ) ) {
129  $options['persistent'] = false;
130  }
131  if ( !isset( $options['password'] ) ) {
132  $options['password'] = null;
133  }
135  return $options;
136  }
153  public static function singleton( array $options ) {
154  $options = self::applyDefaultConfig( $options );
155  // Map the options to a unique hash...
156  ksort( $options ); // normalize to avoid pool fragmentation
157  $id = sha1( serialize( $options ) );
158  // Initialize the object at the hash as needed...
159  if ( !isset( self::$instances[$id] ) ) {
160  self::$instances[$id] = new self( $options );
161  LoggerFactory::getInstance( 'redis' )->debug(
162  "Creating a new " . __CLASS__ . " instance with id $id."
163  );
164  }
166  return self::$instances[$id];
167  }
173  public static function destroySingletons() {
174  self::$instances = [];
175  }
185  public function getConnection( $server ) {
186  // Check the listing "dead" servers which have had a connection errors.
187  // Servers are marked dead for a limited period of time, to
188  // avoid excessive overhead from repeated connection timeouts.
189  if ( isset( $this->downServers[$server] ) ) {
190  $now = time();
191  if ( $now > $this->downServers[$server] ) {
192  // Dead time expired
193  unset( $this->downServers[$server] );
194  } else {
195  // Server is dead
196  $this->logger->debug(
197  'Server "{redis_server}" is marked down for another ' .
198  ( $this->downServers[$server] - $now ) . 'seconds',
199  [ 'redis_server' => $server ]
200  );
202  return false;
203  }
204  }
206  // Check if a connection is already free for use
207  if ( isset( $this->connections[$server] ) ) {
208  foreach ( $this->connections[$server] as &$connection ) {
209  if ( $connection['free'] ) {
210  $connection['free'] = false;
213  return new RedisConnRef(
214  $this, $server, $connection['conn'], $this->logger
215  );
216  }
217  }
218  }
220  if ( substr( $server, 0, 1 ) === '/' ) {
221  // UNIX domain socket
222  // These are required by the redis extension to start with a slash, but
223  // we still need to set the port to a special value to make it work.
224  $host = $server;
225  $port = 0;
226  } else {
227  // TCP connection
228  $hostPort = IP::splitHostAndPort( $server );
229  if ( !$server || !$hostPort ) {
230  throw new InvalidArgumentException(
231  __CLASS__ . ": invalid configured server \"$server\""
232  );
233  }
234  list( $host, $port ) = $hostPort;
235  if ( $port === false ) {
236  $port = 6379;
237  }
238  }
240  $conn = new Redis();
241  try {
242  if ( $this->persistent ) {
243  $result = $conn->pconnect( $host, $port, $this->connectTimeout );
244  } else {
245  $result = $conn->connect( $host, $port, $this->connectTimeout );
246  }
247  if ( !$result ) {
248  $this->logger->error(
249  'Could not connect to server "{redis_server}"',
250  [ 'redis_server' => $server ]
251  );
252  // Mark server down for some time to avoid further timeouts
253  $this->downServers[$server] = time() + self::SERVER_DOWN_TTL;
255  return false;
256  }
257  if ( $this->password !== null ) {
258  if ( !$conn->auth( $this->password ) ) {
259  $this->logger->error(
260  'Authentication error connecting to "{redis_server}"',
261  [ 'redis_server' => $server ]
262  );
263  }
264  }
265  } catch ( RedisException $e ) {
266  $this->downServers[$server] = time() + self::SERVER_DOWN_TTL;
267  $this->logger->error(
268  'Redis exception connecting to "{redis_server}"',
269  [
270  'redis_server' => $server,
271  'exception' => $e,
272  ]
273  );
275  return false;
276  }
278  if ( $conn ) {
279  $conn->setOption( Redis::OPT_READ_TIMEOUT, $this->readTimeout );
280  $conn->setOption( Redis::OPT_SERIALIZER, $this->serializer );
281  $this->connections[$server][] = [ 'conn' => $conn, 'free' => false ];
283  return new RedisConnRef( $this, $server, $conn, $this->logger );
284  } else {
285  return false;
286  }
287  }
296  public function freeConnection( $server, Redis $conn ) {
297  $found = false;
299  foreach ( $this->connections[$server] as &$connection ) {
300  if ( $connection['conn'] === $conn && !$connection['free'] ) {
301  $connection['free'] = true;
303  break;
304  }
305  }
307  $this->closeExcessIdleConections();
309  return $found;
310  }
315  protected function closeExcessIdleConections() {
316  if ( $this->idlePoolSize <= count( $this->connections ) ) {
317  return; // nothing to do (no more connections than servers)
318  }
320  foreach ( $this->connections as &$serverConnections ) {
321  foreach ( $serverConnections as $key => &$connection ) {
322  if ( $connection['free'] ) {
323  unset( $serverConnections[$key] );
324  if ( --$this->idlePoolSize <= count( $this->connections ) ) {
325  return; // done (no more connections than servers)
326  }
327  }
328  }
329  }
330  }
343  public function handleException( $server, RedisConnRef $cref, RedisException $e ) {
344  $this->handleError( $cref, $e );
345  }
356  public function handleError( RedisConnRef $cref, RedisException $e ) {
357  $server = $cref->getServer();
358  $this->logger->error(
359  'Redis exception on server "{redis_server}"',
360  [
361  'redis_server' => $server,
362  'exception' => $e,
363  ]
364  );
365  foreach ( $this->connections[$server] as $key => $connection ) {
366  if ( $cref->isConnIdentical( $connection['conn'] ) ) {
367  $this->idlePoolSize -= $connection['free'] ? 1 : 0;
368  unset( $this->connections[$server][$key] );
369  break;
370  }
371  }
372  }
390  public function reauthenticateConnection( $server, Redis $conn ) {
391  if ( $this->password !== null ) {
392  if ( !$conn->auth( $this->password ) ) {
393  $this->logger->error(
394  'Authentication error connecting to "{redis_server}"',
395  [ 'redis_server' => $server ]
396  );
398  return false;
399  }
400  }
402  return true;
403  }
411  public function resetTimeout( Redis $conn, $timeout = null ) {
412  $conn->setOption( Redis::OPT_READ_TIMEOUT, $timeout ?: $this->readTimeout );
413  }
418  function __destruct() {
419  foreach ( $this->connections as $server => &$serverConnections ) {
420  foreach ( $serverConnections as $key => &$connection ) {
421  $connection['conn']->close();
422  }
423  }
424  }
425 }
437  protected $pool;
439  protected $conn;
441  protected $server; // string
442  protected $lastError; // string
447  protected $logger;
455  public function __construct(
456  RedisConnectionPool $pool, $server, Redis $conn, LoggerInterface $logger
457  ) {
458  $this->pool = $pool;
459  $this->server = $server;
460  $this->conn = $conn;
461  $this->logger = $logger;
462  }
468  public function getServer() {
469  return $this->server;
470  }
472  public function getLastError() {
473  return $this->lastError;
474  }
476  public function clearLastError() {
477  $this->lastError = null;
478  }
480  public function __call( $name, $arguments ) {
481  $conn = $this->conn; // convenience
483  // Work around
484  $lname = strtolower( $name );
485  if ( ( $lname === 'blpop' || $lname == 'brpop' )
486  && is_array( $arguments[0] ) && isset( $arguments[1] )
487  ) {
488  $this->pool->resetTimeout( $conn, $arguments[1] + 1 );
489  } elseif ( $lname === 'brpoplpush' && isset( $arguments[2] ) ) {
490  $this->pool->resetTimeout( $conn, $arguments[2] + 1 );
491  }
493  $conn->clearLastError();
494  try {
495  $res = call_user_func_array( [ $conn, $name ], $arguments );
496  if ( preg_match( '/^ERR operation not permitted\b/', $conn->getLastError() ) ) {
497  $this->pool->reauthenticateConnection( $this->server, $conn );
498  $conn->clearLastError();
499  $res = call_user_func_array( [ $conn, $name ], $arguments );
500  $this->logger->info(
501  "Used automatic re-authentication for method '$name'.",
502  [ 'redis_server' => $this->server ]
503  );
504  }
505  } catch ( RedisException $e ) {
506  $this->pool->resetTimeout( $conn ); // restore
507  throw $e;
508  }
510  $this->lastError = $conn->getLastError() ?: $this->lastError;
512  $this->pool->resetTimeout( $conn ); // restore
514  return $res;
515  }
524  public function luaEval( $script, array $params, $numKeys ) {
525  $sha1 = sha1( $script ); // 40 char hex
526  $conn = $this->conn; // convenience
527  $server = $this->server; // convenience
529  // Try to run the server-side cached copy of the script
530  $conn->clearLastError();
531  $res = $conn->evalSha( $sha1, $params, $numKeys );
532  // If we got a permission error reply that means that (a) we are not in
533  // multi()/pipeline() and (b) some connection problem likely occurred. If
534  // the password the client gave was just wrong, an exception should have
535  // been thrown back in getConnection() previously.
536  if ( preg_match( '/^ERR operation not permitted\b/', $conn->getLastError() ) ) {
537  $this->pool->reauthenticateConnection( $server, $conn );
538  $conn->clearLastError();
539  $res = $conn->eval( $script, $params, $numKeys );
540  $this->logger->info(
541  "Used automatic re-authentication for Lua script '$sha1'.",
542  [ 'redis_server' => $server ]
543  );
544  }
545  // If the script is not in cache, use eval() to retry and cache it
546  if ( preg_match( '/^NOSCRIPT/', $conn->getLastError() ) ) {
547  $conn->clearLastError();
548  $res = $conn->eval( $script, $params, $numKeys );
549  $this->logger->info(
550  "Used eval() for Lua script '$sha1'.",
551  [ 'redis_server' => $server ]
552  );
553  }
555  if ( $conn->getLastError() ) { // script bug?
556  $this->logger->error(
557  'Lua script error on server "{redis_server}": {lua_error}',
558  [
559  'redis_server' => $server,
560  'lua_error' => $conn->getLastError()
561  ]
562  );
563  }
565  $this->lastError = $conn->getLastError() ?: $this->lastError;
567  return $res;
568  }
574  public function isConnIdentical( Redis $conn ) {
575  return $this->conn === $conn;
576  }
578  function __destruct() {
579  $this->pool->freeConnection( $this->server, $this->conn );
580  }
581 }
