[ Index ]

PHP Cross Reference of Phabricator

title

Body

[close]

/src/infrastructure/daemon/workers/storage/ -> PhabricatorWorkerActiveTask.php (source)

   1  <?php
   2  
   3  final class PhabricatorWorkerActiveTask extends PhabricatorWorkerTask {
   4  
   5    protected $failureTime;
   6  
   7    private $serverTime;
   8    private $localTime;
   9  
  10    public function getConfiguration() {
  11      $parent = parent::getConfiguration();
  12  
  13      $config = array(
  14        self::CONFIG_IDS => self::IDS_COUNTER,
  15        self::CONFIG_TIMESTAMPS => false,
  16        self::CONFIG_KEY_SCHEMA => array(
  17          'dataID' => array(
  18            'columns' => array('dataID'),
  19            'unique' => true,
  20          ),
  21          'taskClass' => array(
  22            'columns' => array('taskClass'),
  23          ),
  24          'leaseExpires' => array(
  25            'columns' => array('leaseExpires'),
  26          ),
  27          'leaseOwner' => array(
  28            'columns' => array('leaseOwner(16)'),
  29          ),
  30          'key_failuretime' => array(
  31            'columns' => array('failureTime'),
  32          ),
  33          'leaseOwner_2' => array(
  34            'columns' => array('leaseOwner', 'priority', 'id'),
  35          ),
  36        ),
  37      );
  38  
  39      $config[self::CONFIG_COLUMN_SCHEMA] = array(
  40        // T6203/NULLABILITY
  41        // This isn't nullable in the archive table, so at a minimum these
  42        // should be the same.
  43        'dataID' => 'uint32?',
  44      ) + $parent[self::CONFIG_COLUMN_SCHEMA];
  45  
  46      return $config + $parent;
  47    }
  48  
  49    public function setServerTime($server_time) {
  50      $this->serverTime = $server_time;
  51      $this->localTime = time();
  52      return $this;
  53    }
  54  
  55    public function setLeaseDuration($lease_duration) {
  56      $this->checkLease();
  57      $server_lease_expires = $this->serverTime + $lease_duration;
  58      $this->setLeaseExpires($server_lease_expires);
  59  
  60      // NOTE: This is primarily to allow unit tests to set negative lease
  61      // durations so they don't have to wait around for leases to expire. We
  62      // check that the lease is valid above.
  63      return $this->forceSaveWithoutLease();
  64    }
  65  
  66    public function save() {
  67      $this->checkLease();
  68      return $this->forceSaveWithoutLease();
  69    }
  70  
  71    public function forceSaveWithoutLease() {
  72      $is_new = !$this->getID();
  73      if ($is_new) {
  74        $this->failureCount = 0;
  75      }
  76  
  77      if ($is_new && ($this->getData() !== null)) {
  78        $data = new PhabricatorWorkerTaskData();
  79        $data->setData($this->getData());
  80        $data->save();
  81  
  82        $this->setDataID($data->getID());
  83      }
  84  
  85      return parent::save();
  86    }
  87  
  88    protected function checkLease() {
  89      if ($this->leaseOwner) {
  90        $current_server_time = $this->serverTime + (time() - $this->localTime);
  91        if ($current_server_time >= $this->leaseExpires) {
  92          $id = $this->getID();
  93          $class = $this->getTaskClass();
  94          throw new Exception(
  95            "Trying to update Task {$id} ({$class}) after lease expiration!");
  96        }
  97      }
  98    }
  99  
 100    public function delete() {
 101      throw new Exception(
 102        'Active tasks can not be deleted directly. '.
 103        'Use archiveTask() to move tasks to the archive.');
 104    }
 105  
 106    public function archiveTask($result, $duration) {
 107      if ($this->getID() === null) {
 108        throw new Exception(
 109          "Attempting to archive a task which hasn't been save()d!");
 110      }
 111  
 112      $this->checkLease();
 113  
 114      $archive = id(new PhabricatorWorkerArchiveTask())
 115        ->setID($this->getID())
 116        ->setTaskClass($this->getTaskClass())
 117        ->setLeaseOwner($this->getLeaseOwner())
 118        ->setLeaseExpires($this->getLeaseExpires())
 119        ->setFailureCount($this->getFailureCount())
 120        ->setDataID($this->getDataID())
 121        ->setPriority($this->getPriority())
 122        ->setResult($result)
 123        ->setDuration($duration);
 124  
 125      // NOTE: This deletes the active task (this object)!
 126      $archive->save();
 127  
 128      return $archive;
 129    }
 130  
 131    public function executeTask() {
 132      // We do this outside of the try .. catch because we don't have permission
 133      // to release the lease otherwise.
 134      $this->checkLease();
 135  
 136      $did_succeed = false;
 137      try {
 138        $worker = $this->getWorkerInstance();
 139  
 140        $maximum_failures = $worker->getMaximumRetryCount();
 141        if ($maximum_failures !== null) {
 142          if ($this->getFailureCount() > $maximum_failures) {
 143            $id = $this->getID();
 144            throw new PhabricatorWorkerPermanentFailureException(
 145              "Task {$id} has exceeded the maximum number of failures ".
 146              "({$maximum_failures}).");
 147          }
 148        }
 149  
 150        $lease = $worker->getRequiredLeaseTime();
 151        if ($lease !== null) {
 152          $this->setLeaseDuration($lease);
 153        }
 154  
 155        $t_start = microtime(true);
 156          $worker->executeTask();
 157        $t_end = microtime(true);
 158        $duration = (int)(1000000 * ($t_end - $t_start));
 159  
 160        $result = $this->archiveTask(
 161          PhabricatorWorkerArchiveTask::RESULT_SUCCESS,
 162          $duration);
 163        $did_succeed = true;
 164      } catch (PhabricatorWorkerPermanentFailureException $ex) {
 165        $result = $this->archiveTask(
 166          PhabricatorWorkerArchiveTask::RESULT_FAILURE,
 167          0);
 168        $result->setExecutionException($ex);
 169      } catch (PhabricatorWorkerYieldException $ex) {
 170        $this->setExecutionException($ex);
 171  
 172        $retry = $ex->getDuration();
 173        $retry = max($retry, 5);
 174  
 175        // NOTE: As a side effect, this saves the object.
 176        $this->setLeaseDuration($retry);
 177  
 178        $result = $this;
 179      } catch (Exception $ex) {
 180        $this->setExecutionException($ex);
 181        $this->setFailureCount($this->getFailureCount() + 1);
 182        $this->setFailureTime(time());
 183  
 184        $retry = $worker->getWaitBeforeRetry($this);
 185        $retry = coalesce(
 186          $retry,
 187          PhabricatorWorkerLeaseQuery::getDefaultWaitBeforeRetry());
 188  
 189        // NOTE: As a side effect, this saves the object.
 190        $this->setLeaseDuration($retry);
 191  
 192        $result = $this;
 193      }
 194  
 195      // NOTE: If this throws, we don't want it to cause the task to fail again,
 196      // so execute it out here and just let the exception escape.
 197      if ($did_succeed) {
 198        foreach ($worker->getQueuedTasks() as $task) {
 199          list($class, $data) = $task;
 200          PhabricatorWorker::scheduleTask($class, $data, $this->getPriority());
 201        }
 202      }
 203  
 204      return $result;
 205    }
 206  
 207  }


Generated: Sun Nov 30 09:20:46 2014 Cross-referenced by PHPXref 0.7.1