[ Index ]

PHP Cross Reference of MediaWiki-1.24.0

title

Body

[close]

/includes/jobqueue/aggregator/ -> JobQueueAggregatorRedis.php (source)

   1  <?php
   2  /**
   3   * Job queue aggregator code that uses PhpRedis.
   4   *
   5   * This program is free software; you can redistribute it and/or modify
   6   * it under the terms of the GNU General Public License as published by
   7   * the Free Software Foundation; either version 2 of the License, or
   8   * (at your option) any later version.
   9   *
  10   * This program is distributed in the hope that it will be useful,
  11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13   * GNU General Public License for more details.
  14   *
  15   * You should have received a copy of the GNU General Public License along
  16   * with this program; if not, write to the Free Software Foundation, Inc.,
  17   * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18   * http://www.gnu.org/copyleft/gpl.html
  19   *
  20   * @file
  21   * @author Aaron Schulz
  22   */
  23  
  24  /**
  25   * Class to handle tracking information about all queues using PhpRedis
  26   *
  27   * @ingroup JobQueue
  28   * @ingroup Redis
  29   * @since 1.21
  30   */
  31  class JobQueueAggregatorRedis extends JobQueueAggregator {
  32      /** @var RedisConnectionPool */
  33      protected $redisPool;
  34  
  35      /** @var array List of Redis server addresses */
  36      protected $servers;
  37  
  38      /**
  39       * @param array $params Possible keys:
  40       *   - redisConfig  : An array of parameters to RedisConnectionPool::__construct().
  41       *   - redisServers : Array of server entries, the first being the primary and the
  42       *                    others being fallback servers. Each entry is either a hostname/port
  43       *                    combination or the absolute path of a UNIX socket.
  44       *                    If a hostname is specified but no port, the standard port number
  45       *                    6379 will be used. Required.
  46       */
  47  	protected function __construct( array $params ) {
  48          parent::__construct( $params );
  49          $this->servers = isset( $params['redisServers'] )
  50              ? $params['redisServers']
  51              : array( $params['redisServer'] ); // b/c
  52          $params['redisConfig']['serializer'] = 'none';
  53          $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
  54      }
  55  
  56  	protected function doNotifyQueueEmpty( $wiki, $type ) {
  57          $conn = $this->getConnection();
  58          if ( !$conn ) {
  59              return false;
  60          }
  61          try {
  62              $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) );
  63  
  64              return true;
  65          } catch ( RedisException $e ) {
  66              $this->handleException( $conn, $e );
  67  
  68              return false;
  69          }
  70      }
  71  
  72  	protected function doNotifyQueueNonEmpty( $wiki, $type ) {
  73          $conn = $this->getConnection();
  74          if ( !$conn ) {
  75              return false;
  76          }
  77          try {
  78              $conn->multi( Redis::PIPELINE );
  79              $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
  80              $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
  81              $conn->exec();
  82  
  83              return true;
  84          } catch ( RedisException $e ) {
  85              $this->handleException( $conn, $e );
  86  
  87              return false;
  88          }
  89      }
  90  
  91  	protected function doGetAllReadyWikiQueues() {
  92          $conn = $this->getConnection();
  93          if ( !$conn ) {
  94              return array();
  95          }
  96          try {
  97              $map = $conn->hGetAll( $this->getReadyQueueKey() );
  98  
  99              if ( is_array( $map ) && isset( $map['_epoch'] ) ) {
 100                  unset( $map['_epoch'] ); // ignore
 101                  $pendingDBs = array(); // (type => list of wikis)
 102                  foreach ( $map as $key => $time ) {
 103                      list( $type, $wiki ) = $this->dencQueueName( $key );
 104                      $pendingDBs[$type][] = $wiki;
 105                  }
 106              } else {
 107                  // Avoid duplicated effort
 108                  $rand = wfRandomString( 32 );
 109                  $conn->multi( Redis::MULTI );
 110                  $conn->setex( "{$rand}:lock", 3600, 1 );
 111                  $conn->renamenx( "{$rand}:lock", $this->getReadyQueueKey() . ":lock" );
 112                  if ( $conn->exec() !== array( true, true ) ) { // lock
 113                      $conn->delete( "{$rand}:lock" );
 114                      return array(); // already in progress
 115                  }
 116  
 117                  $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis)
 118  
 119                  $conn->multi( Redis::PIPELINE );
 120                  $now = time();
 121                  $map = array( '_epoch' => time() ); // dummy key for empty Redis collections
 122                  foreach ( $pendingDBs as $type => $wikis ) {
 123                      $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
 124                      foreach ( $wikis as $wiki ) {
 125                          $map[$this->encQueueName( $type, $wiki )] = $now;
 126                      }
 127                  }
 128                  $conn->hMSet( $this->getReadyQueueKey(), $map );
 129                  $conn->exec();
 130  
 131                  $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock
 132              }
 133  
 134              return $pendingDBs;
 135          } catch ( RedisException $e ) {
 136              $this->handleException( $conn, $e );
 137  
 138              return array();
 139          }
 140      }
 141  
 142  	protected function doPurge() {
 143          $conn = $this->getConnection();
 144          if ( !$conn ) {
 145              return false;
 146          }
 147          try {
 148              $conn->delete( $this->getReadyQueueKey() );
 149              // leave key at getQueueTypesKey() alone
 150          } catch ( RedisException $e ) {
 151              $this->handleException( $conn, $e );
 152  
 153              return false;
 154          }
 155  
 156          return true;
 157      }
 158  
 159      /**
 160       * Get a connection to the server that handles all sub-queues for this queue
 161       *
 162       * @return RedisConnRef|bool Returns false on failure
 163       * @throws MWException
 164       */
 165  	protected function getConnection() {
 166          $conn = false;
 167          foreach ( $this->servers as $server ) {
 168              $conn = $this->redisPool->getConnection( $server );
 169              if ( $conn ) {
 170                  break;
 171              }
 172          }
 173  
 174          return $conn;
 175      }
 176  
 177      /**
 178       * @param RedisConnRef $conn
 179       * @param RedisException $e
 180       * @return void
 181       */
 182  	protected function handleException( RedisConnRef $conn, $e ) {
 183          $this->redisPool->handleError( $conn, $e );
 184      }
 185  
 186      /**
 187       * @return string
 188       */
 189  	private function getReadyQueueKey() {
 190          return "jobqueue:aggregator:h-ready-queues:v2"; // global
 191      }
 192  
 193      /**
 194       * @return string
 195       */
 196  	private function getQueueTypesKey() {
 197          return "jobqueue:aggregator:h-queue-types:v2"; // global
 198      }
 199  
 200      /**
 201       * @param string $type
 202       * @param string $wiki
 203       * @return string
 204       */
 205  	private function encQueueName( $type, $wiki ) {
 206          return rawurlencode( $type ) . '/' . rawurlencode( $wiki );
 207      }
 208  
 209      /**
 210       * @param string $name
 211       * @return string
 212       */
 213  	private function dencQueueName( $name ) {
 214          list( $type, $wiki ) = explode( '/', $name, 2 );
 215  
 216          return array( rawurldecode( $type ), rawurldecode( $wiki ) );
 217      }
 218  }


Generated: Fri Nov 28 14:03:12 2014 Cross-referenced by PHPXref 0.7.1