[ Index ]

PHP Cross Reference of Phabricator

title

Body

[close]

/src/infrastructure/daemon/workers/query/ -> PhabricatorWorkerLeaseQuery.php (source)

   1  <?php
   2  
   3  /**
   4   * Select and lease tasks from the worker task queue.
   5   */
   6  final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery {
   7  
   8    const PHASE_UNLEASED = 'unleased';
   9    const PHASE_EXPIRED  = 'expired';
  10  
  11    private $ids;
  12    private $limit;
  13    private $skipLease;
  14  
  15    public static function getDefaultWaitBeforeRetry() {
  16      return phutil_units('5 minutes in seconds');
  17    }
  18  
  19    public static function getDefaultLeaseDuration() {
  20      return phutil_units('2 hours in seconds');
  21    }
  22  
  23    /**
  24     * Set this flag to select tasks from the top of the queue without leasing
  25     * them.
  26     *
  27     * This can be used to show which tasks are coming up next without altering
  28     * the queue's behavior.
  29     *
  30     * @param bool True to skip the lease acquisition step.
  31     */
  32    public function setSkipLease($skip) {
  33      $this->skipLease = $skip;
  34      return $this;
  35    }
  36  
  37    public function withIDs(array $ids) {
  38      $this->ids = $ids;
  39      return $this;
  40    }
  41  
  42    public function setLimit($limit) {
  43      $this->limit = $limit;
  44      return $this;
  45    }
  46  
  47    public function execute() {
  48      if (!$this->limit) {
  49        throw new Exception('You must setLimit() when leasing tasks.');
  50      }
  51  
  52      $task_table = new PhabricatorWorkerActiveTask();
  53      $taskdata_table = new PhabricatorWorkerTaskData();
  54      $lease_ownership_name = $this->getLeaseOwnershipName();
  55  
  56      $conn_w = $task_table->establishConnection('w');
  57  
  58      // Try to satisfy the request from new, unleased tasks first. If we don't
  59      // find enough tasks, try tasks with expired leases (i.e., tasks which have
  60      // previously failed).
  61  
  62      $phases = array(
  63        self::PHASE_UNLEASED,
  64        self::PHASE_EXPIRED,
  65      );
  66      $limit = $this->limit;
  67  
  68      $leased = 0;
  69      $task_ids = array();
  70      foreach ($phases as $phase) {
  71        // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query
  72        // goes very, very slowly. The `ORDER BY` triggers this, although we get
  73        // the same apparent results without it. Without the ORDER BY, binary
  74        // read slaves complain that the query isn't repeatable. To avoid both
  75        // problems, do a SELECT and then an UPDATE.
  76  
  77        $rows = queryfx_all(
  78          $conn_w,
  79          'SELECT id, leaseOwner FROM %T %Q %Q %Q',
  80          $task_table->getTableName(),
  81          $this->buildWhereClause($conn_w, $phase),
  82          $this->buildOrderClause($conn_w, $phase),
  83          $this->buildLimitClause($conn_w, $limit - $leased));
  84  
  85        // NOTE: Sometimes, we'll race with another worker and they'll grab
  86        // this task before we do. We could reduce how often this happens by
  87        // selecting more tasks than we need, then shuffling them and trying
  88        // to lock only the number we're actually after. However, the amount
  89        // of time workers spend here should be very small relative to their
  90        // total runtime, so keep it simple for the moment.
  91  
  92        if ($rows) {
  93          if ($this->skipLease) {
  94            $leased += count($rows);
  95            $task_ids += array_fuse(ipull($rows, 'id'));
  96          } else {
  97            queryfx(
  98              $conn_w,
  99              'UPDATE %T task
 100                SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d
 101                %Q',
 102              $task_table->getTableName(),
 103              $lease_ownership_name,
 104              self::getDefaultLeaseDuration(),
 105              $this->buildUpdateWhereClause($conn_w, $phase, $rows));
 106  
 107            $leased += $conn_w->getAffectedRows();
 108          }
 109  
 110          if ($leased == $limit) {
 111            break;
 112          }
 113        }
 114      }
 115  
 116      if (!$leased) {
 117        return array();
 118      }
 119  
 120      if ($this->skipLease) {
 121        $selection_condition = qsprintf(
 122          $conn_w,
 123          'task.id IN (%Ld)',
 124          $task_ids);
 125      } else {
 126        $selection_condition = qsprintf(
 127          $conn_w,
 128          'task.leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP()',
 129          $lease_ownership_name);
 130      }
 131  
 132      $data = queryfx_all(
 133        $conn_w,
 134        'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime
 135          FROM %T task LEFT JOIN %T taskdata
 136            ON taskdata.id = task.dataID
 137          WHERE %Q %Q %Q',
 138        $task_table->getTableName(),
 139        $taskdata_table->getTableName(),
 140        $selection_condition,
 141        $this->buildOrderClause($conn_w, $phase),
 142        $this->buildLimitClause($conn_w, $limit));
 143  
 144      $tasks = $task_table->loadAllFromArray($data);
 145      $tasks = mpull($tasks, null, 'getID');
 146  
 147      foreach ($data as $row) {
 148        $tasks[$row['id']]->setServerTime($row['_serverTime']);
 149        if ($row['_taskData']) {
 150          $task_data = json_decode($row['_taskData'], true);
 151        } else {
 152          $task_data = null;
 153        }
 154        $tasks[$row['id']]->setData($task_data);
 155      }
 156  
 157      return $tasks;
 158    }
 159  
 160    private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) {
 161      $where = array();
 162  
 163      switch ($phase) {
 164        case self::PHASE_UNLEASED:
 165          $where[] = 'leaseOwner IS NULL';
 166          break;
 167        case self::PHASE_EXPIRED:
 168          $where[] = 'leaseExpires < UNIX_TIMESTAMP()';
 169          break;
 170        default:
 171          throw new Exception("Unknown phase '{$phase}'!");
 172      }
 173  
 174      if ($this->ids) {
 175        $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids);
 176      }
 177  
 178      return $this->formatWhereClause($where);
 179    }
 180  
 181    private function buildUpdateWhereClause(
 182      AphrontDatabaseConnection $conn_w,
 183      $phase,
 184      array $rows) {
 185  
 186      $where = array();
 187  
 188      // NOTE: This is basically working around the MySQL behavior that
 189      // `IN (NULL)` doesn't match NULL.
 190  
 191      switch ($phase) {
 192        case self::PHASE_UNLEASED:
 193          $where[] = qsprintf($conn_w, 'leaseOwner IS NULL');
 194          $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id'));
 195          break;
 196        case self::PHASE_EXPIRED:
 197          $in = array();
 198          foreach ($rows as $row) {
 199            $in[] = qsprintf(
 200              $conn_w,
 201              '(id = %d AND leaseOwner = %s)',
 202              $row['id'],
 203              $row['leaseOwner']);
 204          }
 205          $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in));
 206          break;
 207        default:
 208          throw new Exception("Unknown phase '{$phase}'!");
 209      }
 210  
 211      return $this->formatWhereClause($where);
 212    }
 213  
 214    private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) {
 215      switch ($phase) {
 216        case self::PHASE_UNLEASED:
 217          // When selecting new tasks, we want to consume them in order of
 218          // increasing priority (and then FIFO).
 219          return qsprintf($conn_w, 'ORDER BY priority ASC, id ASC');
 220        case self::PHASE_EXPIRED:
 221          // When selecting failed tasks, we want to consume them in roughly
 222          // FIFO order of their failures, which is not necessarily their original
 223          // queue order.
 224  
 225          // Particularly, this is important for tasks which use soft failures to
 226          // indicate that they are waiting on other tasks to complete: we need to
 227          // push them to the end of the queue after they fail, at least on
 228          // average, so we don't deadlock retrying the same blocked task over
 229          // and over again.
 230          return qsprintf($conn_w, 'ORDER BY leaseExpires ASC');
 231        default:
 232          throw new Exception(pht('Unknown phase "%s"!', $phase));
 233      }
 234    }
 235  
 236    private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) {
 237      return qsprintf($conn_w, 'LIMIT %d', $limit);
 238    }
 239  
 240    private function getLeaseOwnershipName() {
 241      static $sequence = 0;
 242  
 243      $parts = array(
 244        getmypid(),
 245        time(),
 246        php_uname('n'),
 247        ++$sequence,
 248      );
 249  
 250      return implode(':', $parts);
 251    }
 252  
 253  }


Generated: Sun Nov 30 09:20:46 2014 Cross-referenced by PHPXref 0.7.1