[ Index ]

PHP Cross Reference of MediaWiki-1.24.0

title

Body

[close]

/includes/jobqueue/ -> JobQueueRedis.php (source)

   1  <?php
   2  /**
   3   * Redis-backed job queue code.
   4   *
   5   * This program is free software; you can redistribute it and/or modify
   6   * it under the terms of the GNU General Public License as published by
   7   * the Free Software Foundation; either version 2 of the License, or
   8   * (at your option) any later version.
   9   *
  10   * This program is distributed in the hope that it will be useful,
  11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13   * GNU General Public License for more details.
  14   *
  15   * You should have received a copy of the GNU General Public License along
  16   * with this program; if not, write to the Free Software Foundation, Inc.,
  17   * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  18   * http://www.gnu.org/copyleft/gpl.html
  19   *
  20   * @file
  21   * @author Aaron Schulz
  22   */
  23  
  24  /**
  25   * Class to handle job queues stored in Redis
  26   *
  27   * This is faster, less resource intensive, queue that JobQueueDB.
  28   * All data for a queue using this class is placed into one redis server.
  29   *
  30   * There are eight main redis keys used to track jobs:
  31   *   - l-unclaimed  : A list of job IDs used for ready unclaimed jobs
  32   *   - z-claimed    : A sorted set of (job ID, UNIX timestamp as score) used for job retries
  33   *   - z-abandoned  : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
  34   *   - z-delayed    : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs
  35   *   - h-idBySha1   : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication
  36   *   - h-sha1ById   : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication
  37   *   - h-attempts   : A hash of (job ID => attempt count) used for job claiming/retries
  38   *   - h-data       : A hash of (job ID => serialized blobs) for job storage
  39   * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned.
  40   * If an ID appears in any of those lists, it should have a h-data entry for its ID.
  41   * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then
  42   * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById
  43   * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its
  44   * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
  45   *
  46   * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
  47   * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
  48   * All the keys are prefixed with the relevant wiki ID information.
  49   *
  50   * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations.
  51   * Additionally, it should be noted that redis has different persistence modes, such
  52   * as rdb snapshots, journaling, and no persistent. Appropriate configuration should be
  53   * made on the servers based on what queues are using it and what tolerance they have.
  54   *
  55   * @ingroup JobQueue
  56   * @ingroup Redis
  57   * @since 1.22
  58   */
  59  class JobQueueRedis extends JobQueue {
  60      /** @var RedisConnectionPool */
  61      protected $redisPool;
  62  
  63      /** @var string Server address */
  64      protected $server;
  65      /** @var string Compression method to use */
  66      protected $compression;
  67      /** @var bool */
  68      protected $daemonized;
  69  
  70      const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
  71  
  72      /** @var string Key to prefix the queue keys with (used for testing) */
  73      protected $key;
  74  
  75      /**
  76       * @param array $params Possible keys:
  77       *   - redisConfig : An array of parameters to RedisConnectionPool::__construct().
  78       *                   Note that the serializer option is ignored as "none" is always used.
  79       *   - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
  80       *                   If a hostname is specified but no port, the standard port number
  81       *                   6379 will be used. Required.
  82       *   - compression : The type of compression to use; one of (none,gzip).
  83       *   - daemonized  : Set to true if the redisJobRunnerService runs in the background.
  84       *                   This will disable job recycling/undelaying from the MediaWiki side
  85       *                   to avoid redundance and out-of-sync configuration.
  86       */
  87  	public function __construct( array $params ) {
  88          parent::__construct( $params );
  89          $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
  90          $this->server = $params['redisServer'];
  91          $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
  92          $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
  93          $this->daemonized = !empty( $params['daemonized'] );
  94      }
  95  
  96  	protected function supportedOrders() {
  97          return array( 'timestamp', 'fifo' );
  98      }
  99  
 100  	protected function optimalOrder() {
 101          return 'fifo';
 102      }
 103  
 104  	protected function supportsDelayedJobs() {
 105          return true;
 106      }
 107  
 108      /**
 109       * @see JobQueue::doIsEmpty()
 110       * @return bool
 111       * @throws MWException
 112       */
 113  	protected function doIsEmpty() {
 114          return $this->doGetSize() == 0;
 115      }
 116  
 117      /**
 118       * @see JobQueue::doGetSize()
 119       * @return int
 120       * @throws MWException
 121       */
 122  	protected function doGetSize() {
 123          $conn = $this->getConnection();
 124          try {
 125              return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
 126          } catch ( RedisException $e ) {
 127              $this->throwRedisException( $conn, $e );
 128          }
 129      }
 130  
 131      /**
 132       * @see JobQueue::doGetAcquiredCount()
 133       * @return int
 134       * @throws JobQueueError
 135       */
 136  	protected function doGetAcquiredCount() {
 137          if ( $this->claimTTL <= 0 ) {
 138              return 0; // no acknowledgements
 139          }
 140          $conn = $this->getConnection();
 141          try {
 142              $conn->multi( Redis::PIPELINE );
 143              $conn->zSize( $this->getQueueKey( 'z-claimed' ) );
 144              $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
 145  
 146              return array_sum( $conn->exec() );
 147          } catch ( RedisException $e ) {
 148              $this->throwRedisException( $conn, $e );
 149          }
 150      }
 151  
 152      /**
 153       * @see JobQueue::doGetDelayedCount()
 154       * @return int
 155       * @throws JobQueueError
 156       */
 157  	protected function doGetDelayedCount() {
 158          if ( !$this->checkDelay ) {
 159              return 0; // no delayed jobs
 160          }
 161          $conn = $this->getConnection();
 162          try {
 163              return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
 164          } catch ( RedisException $e ) {
 165              $this->throwRedisException( $conn, $e );
 166          }
 167      }
 168  
 169      /**
 170       * @see JobQueue::doGetAbandonedCount()
 171       * @return int
 172       * @throws JobQueueError
 173       */
 174  	protected function doGetAbandonedCount() {
 175          if ( $this->claimTTL <= 0 ) {
 176              return 0; // no acknowledgements
 177          }
 178          $conn = $this->getConnection();
 179          try {
 180              return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
 181          } catch ( RedisException $e ) {
 182              $this->throwRedisException( $conn, $e );
 183          }
 184      }
 185  
 186      /**
 187       * @see JobQueue::doBatchPush()
 188       * @param array $jobs
 189       * @param int $flags
 190       * @return void
 191       * @throws JobQueueError
 192       */
 193  	protected function doBatchPush( array $jobs, $flags ) {
 194          // Convert the jobs into field maps (de-duplicated against each other)
 195          $items = array(); // (job ID => job fields map)
 196          foreach ( $jobs as $job ) {
 197              $item = $this->getNewJobFields( $job );
 198              if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate
 199                  $items[$item['sha1']] = $item;
 200              } else {
 201                  $items[$item['uuid']] = $item;
 202              }
 203          }
 204  
 205          if ( !count( $items ) ) {
 206              return; // nothing to do
 207          }
 208  
 209          $conn = $this->getConnection();
 210          try {
 211              // Actually push the non-duplicate jobs into the queue...
 212              if ( $flags & self::QOS_ATOMIC ) {
 213                  $batches = array( $items ); // all or nothing
 214              } else {
 215                  $batches = array_chunk( $items, 500 ); // avoid tying up the server
 216              }
 217              $failed = 0;
 218              $pushed = 0;
 219              foreach ( $batches as $itemBatch ) {
 220                  $added = $this->pushBlobs( $conn, $itemBatch );
 221                  if ( is_int( $added ) ) {
 222                      $pushed += $added;
 223                  } else {
 224                      $failed += count( $itemBatch );
 225                  }
 226              }
 227              if ( $failed > 0 ) {
 228                  wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
 229  
 230                  throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." );
 231              }
 232              JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki );
 233              JobQueue::incrStats( 'job-insert-duplicate', $this->type,
 234                  count( $items ) - $failed - $pushed, $this->wiki );
 235          } catch ( RedisException $e ) {
 236              $this->throwRedisException( $conn, $e );
 237          }
 238      }
 239  
 240      /**
 241       * @param RedisConnRef $conn
 242       * @param array $items List of results from JobQueueRedis::getNewJobFields()
 243       * @return int Number of jobs inserted (duplicates are ignored)
 244       * @throws RedisException
 245       */
 246  	protected function pushBlobs( RedisConnRef $conn, array $items ) {
 247          $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
 248          foreach ( $items as $item ) {
 249              $args[] = (string)$item['uuid'];
 250              $args[] = (string)$item['sha1'];
 251              $args[] = (string)$item['rtimestamp'];
 252              $args[] = (string)$this->serialize( $item );
 253          }
 254          static $script =
 255  <<<LUA
 256          local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS)
 257          if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
 258          local pushed = 0
 259          for i = 1,#ARGV,4 do
 260              local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
 261              if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
 262                  if 1*rtimestamp > 0 then
 263                      -- Insert into delayed queue (release time as score)
 264                      redis.call('zAdd',kDelayed,rtimestamp,id)
 265                  else
 266                      -- Insert into unclaimed queue
 267                      redis.call('lPush',kUnclaimed,id)
 268                  end
 269                  if sha1 ~= '' then
 270                      redis.call('hSet',kSha1ById,id,sha1)
 271                      redis.call('hSet',kIdBySha1,sha1,id)
 272                  end
 273                  redis.call('hSet',kData,id,blob)
 274                  pushed = pushed + 1
 275              end
 276          end
 277          return pushed
 278  LUA;
 279          return $conn->luaEval( $script,
 280              array_merge(
 281                  array(
 282                      $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
 283                      $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
 284                      $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
 285                      $this->getQueueKey( 'z-delayed' ), # KEYS[4]
 286                      $this->getQueueKey( 'h-data' ), # KEYS[5]
 287                  ),
 288                  $args
 289              ),
 290              5 # number of first argument(s) that are keys
 291          );
 292      }
 293  
 294      /**
 295       * @see JobQueue::doPop()
 296       * @return Job|bool
 297       * @throws JobQueueError
 298       */
 299  	protected function doPop() {
 300          $job = false;
 301  
 302          // Push ready delayed jobs into the queue every 10 jobs to spread the load.
 303          // This is also done as a periodic task, but we don't want too much done at once.
 304          if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
 305              $this->recyclePruneAndUndelayJobs();
 306          }
 307  
 308          $conn = $this->getConnection();
 309          try {
 310              do {
 311                  if ( $this->claimTTL > 0 ) {
 312                      // Keep the claimed job list down for high-traffic queues
 313                      if ( mt_rand( 0, 99 ) == 0 ) {
 314                          $this->recyclePruneAndUndelayJobs();
 315                      }
 316                      $blob = $this->popAndAcquireBlob( $conn );
 317                  } else {
 318                      $blob = $this->popAndDeleteBlob( $conn );
 319                  }
 320                  if ( !is_string( $blob ) ) {
 321                      break; // no jobs; nothing to do
 322                  }
 323  
 324                  JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
 325                  $item = $this->unserialize( $blob );
 326                  if ( $item === false ) {
 327                      wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
 328                      continue;
 329                  }
 330  
 331                  // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed
 332                  $job = $this->getJobFromFields( $item ); // may be false
 333              } while ( !$job ); // job may be false if invalid
 334          } catch ( RedisException $e ) {
 335              $this->throwRedisException( $conn, $e );
 336          }
 337  
 338          return $job;
 339      }
 340  
 341      /**
 342       * @param RedisConnRef $conn
 343       * @return array Serialized string or false
 344       * @throws RedisException
 345       */
 346  	protected function popAndDeleteBlob( RedisConnRef $conn ) {
 347          static $script =
 348  <<<LUA
 349          local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS)
 350          -- Pop an item off the queue
 351          local id = redis.call('rpop',kUnclaimed)
 352          if not id then return false end
 353          -- Get the job data and remove it
 354          local item = redis.call('hGet',kData,id)
 355          redis.call('hDel',kData,id)
 356          -- Allow new duplicates of this job
 357          local sha1 = redis.call('hGet',kSha1ById,id)
 358          if sha1 then redis.call('hDel',kIdBySha1,sha1) end
 359          redis.call('hDel',kSha1ById,id)
 360          -- Return the job data
 361          return item
 362  LUA;
 363          return $conn->luaEval( $script,
 364              array(
 365                  $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
 366                  $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
 367                  $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
 368                  $this->getQueueKey( 'h-data' ), # KEYS[4]
 369              ),
 370              4 # number of first argument(s) that are keys
 371          );
 372      }
 373  
 374      /**
 375       * @param RedisConnRef $conn
 376       * @return array Serialized string or false
 377       * @throws RedisException
 378       */
 379  	protected function popAndAcquireBlob( RedisConnRef $conn ) {
 380          static $script =
 381  <<<LUA
 382          local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
 383          -- Pop an item off the queue
 384          local id = redis.call('rPop',kUnclaimed)
 385          if not id then return false end
 386          -- Allow new duplicates of this job
 387          local sha1 = redis.call('hGet',kSha1ById,id)
 388          if sha1 then redis.call('hDel',kIdBySha1,sha1) end
 389          redis.call('hDel',kSha1ById,id)
 390          -- Mark the jobs as claimed and return it
 391          redis.call('zAdd',kClaimed,ARGV[1],id)
 392          redis.call('hIncrBy',kAttempts,id,1)
 393          return redis.call('hGet',kData,id)
 394  LUA;
 395          return $conn->luaEval( $script,
 396              array(
 397                  $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
 398                  $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
 399                  $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
 400                  $this->getQueueKey( 'z-claimed' ), # KEYS[4]
 401                  $this->getQueueKey( 'h-attempts' ), # KEYS[5]
 402                  $this->getQueueKey( 'h-data' ), # KEYS[6]
 403                  time(), # ARGV[1] (injected to be replication-safe)
 404              ),
 405              6 # number of first argument(s) that are keys
 406          );
 407      }
 408  
 409      /**
 410       * @see JobQueue::doAck()
 411       * @param Job $job
 412       * @return Job|bool
 413       * @throws MWException|JobQueueError
 414       */
 415  	protected function doAck( Job $job ) {
 416          if ( !isset( $job->metadata['uuid'] ) ) {
 417              throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
 418          }
 419          if ( $this->claimTTL > 0 ) {
 420              $conn = $this->getConnection();
 421              try {
 422                  static $script =
 423  <<<LUA
 424                  local kClaimed, kAttempts, kData = unpack(KEYS)
 425                  -- Unmark the job as claimed
 426                  redis.call('zRem',kClaimed,ARGV[1])
 427                  redis.call('hDel',kAttempts,ARGV[1])
 428                  -- Delete the job data itself
 429                  return redis.call('hDel',kData,ARGV[1])
 430  LUA;
 431                  $res = $conn->luaEval( $script,
 432                      array(
 433                          $this->getQueueKey( 'z-claimed' ), # KEYS[1]
 434                          $this->getQueueKey( 'h-attempts' ), # KEYS[2]
 435                          $this->getQueueKey( 'h-data' ), # KEYS[3]
 436                          $job->metadata['uuid'] # ARGV[1]
 437                      ),
 438                      3 # number of first argument(s) that are keys
 439                  );
 440  
 441                  if ( !$res ) {
 442                      wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
 443  
 444                      return false;
 445                  }
 446              } catch ( RedisException $e ) {
 447                  $this->throwRedisException( $conn, $e );
 448              }
 449          }
 450  
 451          return true;
 452      }
 453  
 454      /**
 455       * @see JobQueue::doDeduplicateRootJob()
 456       * @param Job $job
 457       * @return bool
 458       * @throws MWException|JobQueueError
 459       */
 460  	protected function doDeduplicateRootJob( Job $job ) {
 461          if ( !$job->hasRootJobParams() ) {
 462              throw new MWException( "Cannot register root job; missing parameters." );
 463          }
 464          $params = $job->getRootJobParams();
 465  
 466          $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
 467  
 468          $conn = $this->getConnection();
 469          try {
 470              $timestamp = $conn->get( $key ); // current last timestamp of this job
 471              if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
 472                  return true; // a newer version of this root job was enqueued
 473              }
 474  
 475              // Update the timestamp of the last root job started at the location...
 476              return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
 477          } catch ( RedisException $e ) {
 478              $this->throwRedisException( $conn, $e );
 479          }
 480      }
 481  
 482      /**
 483       * @see JobQueue::doIsRootJobOldDuplicate()
 484       * @param Job $job
 485       * @return bool
 486       * @throws JobQueueError
 487       */
 488  	protected function doIsRootJobOldDuplicate( Job $job ) {
 489          if ( !$job->hasRootJobParams() ) {
 490              return false; // job has no de-deplication info
 491          }
 492          $params = $job->getRootJobParams();
 493  
 494          $conn = $this->getConnection();
 495          try {
 496              // Get the last time this root job was enqueued
 497              $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
 498          } catch ( RedisException $e ) {
 499              $this->throwRedisException( $conn, $e );
 500          }
 501  
 502          // Check if a new root job was started at the location after this one's...
 503          return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
 504      }
 505  
 506      /**
 507       * @see JobQueue::doDelete()
 508       * @return bool
 509       * @throws JobQueueError
 510       */
 511  	protected function doDelete() {
 512          static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned',
 513              'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' );
 514  
 515          $conn = $this->getConnection();
 516          try {
 517              $keys = array();
 518              foreach ( $props as $prop ) {
 519                  $keys[] = $this->getQueueKey( $prop );
 520              }
 521  
 522              return ( $conn->delete( $keys ) !== false );
 523          } catch ( RedisException $e ) {
 524              $this->throwRedisException( $conn, $e );
 525          }
 526      }
 527  
 528      /**
 529       * @see JobQueue::getAllQueuedJobs()
 530       * @return Iterator
 531       */
 532  	public function getAllQueuedJobs() {
 533          $conn = $this->getConnection();
 534          try {
 535              $that = $this;
 536  
 537              return new MappedIterator(
 538                  $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
 539                  function ( $uid ) use ( $that, $conn ) {
 540                      return $that->getJobFromUidInternal( $uid, $conn );
 541                  },
 542                  array( 'accept' => function ( $job ) {
 543                      return is_object( $job );
 544                  } )
 545              );
 546          } catch ( RedisException $e ) {
 547              $this->throwRedisException( $conn, $e );
 548          }
 549      }
 550  
 551      /**
 552       * @see JobQueue::getAllQueuedJobs()
 553       * @return Iterator
 554       */
 555  	public function getAllDelayedJobs() {
 556          $conn = $this->getConnection();
 557          try {
 558              $that = $this;
 559  
 560              return new MappedIterator( // delayed jobs
 561                  $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
 562                  function ( $uid ) use ( $that, $conn ) {
 563                      return $that->getJobFromUidInternal( $uid, $conn );
 564                  },
 565                  array( 'accept' => function ( $job ) {
 566                      return is_object( $job );
 567                  } )
 568              );
 569          } catch ( RedisException $e ) {
 570              $this->throwRedisException( $conn, $e );
 571          }
 572      }
 573  
 574  	public function getCoalesceLocationInternal() {
 575          return "RedisServer:" . $this->server;
 576      }
 577  
 578  	protected function doGetSiblingQueuesWithJobs( array $types ) {
 579          return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
 580      }
 581  
 582  	protected function doGetSiblingQueueSizes( array $types ) {
 583          $sizes = array(); // (type => size)
 584          $types = array_values( $types ); // reindex
 585          $conn = $this->getConnection();
 586          try {
 587              $conn->multi( Redis::PIPELINE );
 588              foreach ( $types as $type ) {
 589                  $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
 590              }
 591              $res = $conn->exec();
 592              if ( is_array( $res ) ) {
 593                  foreach ( $res as $i => $size ) {
 594                      $sizes[$types[$i]] = $size;
 595                  }
 596              }
 597          } catch ( RedisException $e ) {
 598              $this->throwRedisException( $conn, $e );
 599          }
 600  
 601          return $sizes;
 602      }
 603  
 604      /**
 605       * This function should not be called outside JobQueueRedis
 606       *
 607       * @param string $uid
 608       * @param RedisConnRef $conn
 609       * @return Job|bool Returns false if the job does not exist
 610       * @throws MWException|JobQueueError
 611       */
 612  	public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
 613          try {
 614              $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
 615              if ( $data === false ) {
 616                  return false; // not found
 617              }
 618              $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
 619              if ( !is_array( $item ) ) { // this shouldn't happen
 620                  throw new MWException( "Could not find job with ID '$uid'." );
 621              }
 622              $title = Title::makeTitle( $item['namespace'], $item['title'] );
 623              $job = Job::factory( $item['type'], $title, $item['params'] );
 624              $job->metadata['uuid'] = $item['uuid'];
 625  
 626              return $job;
 627          } catch ( RedisException $e ) {
 628              $this->throwRedisException( $conn, $e );
 629          }
 630      }
 631  
 632      /**
 633       * Recycle or destroy any jobs that have been claimed for too long
 634       * and release any ready delayed jobs into the queue
 635       *
 636       * @return int Number of jobs recycled/deleted/undelayed
 637       * @throws MWException|JobQueueError
 638       */
 639  	public function recyclePruneAndUndelayJobs() {
 640          $count = 0;
 641          // For each job item that can be retried, we need to add it back to the
 642          // main queue and remove it from the list of currenty claimed job items.
 643          // For those that cannot, they are marked as dead and kept around for
 644          // investigation and manual job restoration but are eventually deleted.
 645          $conn = $this->getConnection();
 646          try {
 647              $now = time();
 648              static $script =
 649  <<<LUA
 650              local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS)
 651              local released,abandoned,pruned,undelayed = 0,0,0,0
 652              -- Get all non-dead jobs that have an expired claim on them.
 653              -- The score for each item is the last claim timestamp (UNIX).
 654              local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
 655              for k,id in ipairs(staleClaims) do
 656                  local timestamp = redis.call('zScore',kClaimed,id)
 657                  local attempts = redis.call('hGet',kAttempts,id)
 658                  if attempts < ARGV[3] then
 659                      -- Claim expired and retries left: re-enqueue the job
 660                      redis.call('lPush',kUnclaimed,id)
 661                      redis.call('hIncrBy',kAttempts,id,1)
 662                      released = released + 1
 663                  else
 664                      -- Claim expired and no retries left: mark the job as dead
 665                      redis.call('zAdd',kAbandoned,timestamp,id)
 666                      abandoned = abandoned + 1
 667                  end
 668                  redis.call('zRem',kClaimed,id)
 669              end
 670              -- Get all of the dead jobs that have been marked as dead for too long.
 671              -- The score for each item is the last claim timestamp (UNIX).
 672              local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2])
 673              for k,id in ipairs(deadClaims) do
 674                  -- Stale and out of retries: remove any traces of the job
 675                  redis.call('zRem',kAbandoned,id)
 676                  redis.call('hDel',kAttempts,id)
 677                  redis.call('hDel',kData,id)
 678                  pruned = pruned + 1
 679              end
 680              -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp)
 681              local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4])
 682              -- Migrate the jobs from the "delayed" set to the "unclaimed" list
 683              for k,id in ipairs(ids) do
 684                  redis.call('lPush',kUnclaimed,id)
 685                  redis.call('zRem',kDelayed,id)
 686              end
 687              undelayed = #ids
 688              return {released,abandoned,pruned,undelayed}
 689  LUA;
 690              $res = $conn->luaEval( $script,
 691                  array(
 692                      $this->getQueueKey( 'z-claimed' ), # KEYS[1]
 693                      $this->getQueueKey( 'h-attempts' ), # KEYS[2]
 694                      $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
 695                      $this->getQueueKey( 'h-data' ), # KEYS[4]
 696                      $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
 697                      $this->getQueueKey( 'z-delayed' ), # KEYS[6]
 698                      $now - $this->claimTTL, # ARGV[1]
 699                      $now - self::MAX_AGE_PRUNE, # ARGV[2]
 700                      $this->maxTries, # ARGV[3]
 701                      $now # ARGV[4]
 702                  ),
 703                  6 # number of first argument(s) that are keys
 704              );
 705              if ( $res ) {
 706                  list( $released, $abandoned, $pruned, $undelayed ) = $res;
 707                  $count += $released + $pruned + $undelayed;
 708                  JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki );
 709                  JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki );
 710                  JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki );
 711              }
 712          } catch ( RedisException $e ) {
 713              $this->throwRedisException( $conn, $e );
 714          }
 715  
 716          return $count;
 717      }
 718  
 719      /**
 720       * @return array
 721       */
 722  	protected function doGetPeriodicTasks() {
 723          if ( $this->daemonized ) {
 724              return array(); // managed in the runner loop
 725          }
 726          $periods = array( 3600 ); // standard cleanup (useful on config change)
 727          if ( $this->claimTTL > 0 ) {
 728              $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing
 729          }
 730          if ( $this->checkDelay ) {
 731              $periods[] = 300; // 5 minutes
 732          }
 733          $period = min( $periods );
 734          $period = max( $period, 30 ); // sanity
 735  
 736          return array(
 737              'recyclePruneAndUndelayJobs' => array(
 738                  'callback' => array( $this, 'recyclePruneAndUndelayJobs' ),
 739                  'period'   => $period,
 740              )
 741          );
 742      }
 743  
 744      /**
 745       * @param IJobSpecification $job
 746       * @return array
 747       */
 748  	protected function getNewJobFields( IJobSpecification $job ) {
 749          return array(
 750              // Fields that describe the nature of the job
 751              'type' => $job->getType(),
 752              'namespace' => $job->getTitle()->getNamespace(),
 753              'title' => $job->getTitle()->getDBkey(),
 754              'params' => $job->getParams(),
 755              // Some jobs cannot run until a "release timestamp"
 756              'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
 757              // Additional job metadata
 758              'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
 759              'sha1' => $job->ignoreDuplicates()
 760                  ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
 761                  : '',
 762              'timestamp' => time() // UNIX timestamp
 763          );
 764      }
 765  
 766      /**
 767       * @param array $fields
 768       * @return Job|bool
 769       */
 770  	protected function getJobFromFields( array $fields ) {
 771          $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
 772          if ( $title ) {
 773              $job = Job::factory( $fields['type'], $title, $fields['params'] );
 774              $job->metadata['uuid'] = $fields['uuid'];
 775  
 776              return $job;
 777          }
 778  
 779          return false;
 780      }
 781  
 782      /**
 783       * @param array $fields
 784       * @return string Serialized and possibly compressed version of $fields
 785       */
 786  	protected function serialize( array $fields ) {
 787          $blob = serialize( $fields );
 788          if ( $this->compression === 'gzip'
 789              && strlen( $blob ) >= 1024
 790              && function_exists( 'gzdeflate' )
 791          ) {
 792              $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' );
 793              $blobz = serialize( $object );
 794  
 795              return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
 796          } else {
 797              return $blob;
 798          }
 799      }
 800  
 801      /**
 802       * @param string $blob
 803       * @return array|bool Unserialized version of $blob or false
 804       */
 805  	protected function unserialize( $blob ) {
 806          $fields = unserialize( $blob );
 807          if ( is_object( $fields ) ) {
 808              if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) {
 809                  $fields = unserialize( gzinflate( $fields->blob ) );
 810              } else {
 811                  $fields = false;
 812              }
 813          }
 814  
 815          return is_array( $fields ) ? $fields : false;
 816      }
 817  
 818      /**
 819       * Get a connection to the server that handles all sub-queues for this queue
 820       *
 821       * @return RedisConnRef
 822       * @throws JobQueueConnectionError
 823       */
 824  	protected function getConnection() {
 825          $conn = $this->redisPool->getConnection( $this->server );
 826          if ( !$conn ) {
 827              throw new JobQueueConnectionError( "Unable to connect to redis server." );
 828          }
 829  
 830          return $conn;
 831      }
 832  
 833      /**
 834       * @param RedisConnRef $conn
 835       * @param RedisException $e
 836       * @throws JobQueueError
 837       */
 838  	protected function throwRedisException( RedisConnRef $conn, $e ) {
 839          $this->redisPool->handleError( $conn, $e );
 840          throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
 841      }
 842  
 843      /**
 844       * @param string $prop
 845       * @param string|null $type
 846       * @return string
 847       */
 848  	private function getQueueKey( $prop, $type = null ) {
 849          $type = is_string( $type ) ? $type : $this->type;
 850          list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
 851          if ( strlen( $this->key ) ) { // namespaced queue (for testing)
 852              return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
 853          } else {
 854              return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
 855          }
 856      }
 857  
 858      /**
 859       * @param string $key
 860       * @return void
 861       */
 862  	public function setTestingPrefix( $key ) {
 863          $this->key = $key;
 864      }
 865  }


Generated: Fri Nov 28 14:03:12 2014 Cross-referenced by PHPXref 0.7.1