[ Index ]

PHP Cross Reference of Phabricator

title

Body

[close]

/src/infrastructure/daemon/workers/ -> PhabricatorWorker.php (source)

   1  <?php
   2  
   3  /**
   4   * @task config   Configuring Retries and Failures
   5   */
   6  abstract class PhabricatorWorker {
   7  
   8    private $data;
   9    private static $runAllTasksInProcess = false;
  10    private $queuedTasks = array();
  11  
  12    // NOTE: Lower priority numbers execute first. The priority numbers have to
  13    // have the same ordering that IDs do (lowest first) so MySQL can use a
  14    // multipart key across both of them efficiently.
  15  
  16    const PRIORITY_ALERTS  = 1000;
  17    const PRIORITY_DEFAULT = 2000;
  18    const PRIORITY_BULK    = 3000;
  19    const PRIORITY_IMPORT  = 4000;
  20  
  21  
  22  /* -(  Configuring Retries and Failures  )----------------------------------- */
  23  
  24  
  25    /**
  26     * Return the number of seconds this worker needs hold a lease on the task for
  27     * while it performs work. For most tasks you can leave this at `null`, which
  28     * will give you a default lease (currently 2 hours).
  29     *
  30     * For tasks which may take a very long time to complete, you should return
  31     * an upper bound on the amount of time the task may require.
  32     *
  33     * @return int|null  Number of seconds this task needs to remain leased for,
  34     *                   or null for a default lease.
  35     *
  36     * @task config
  37     */
  38    public function getRequiredLeaseTime() {
  39      return null;
  40    }
  41  
  42  
  43    /**
  44     * Return the maximum number of times this task may be retried before it is
  45     * considered permanently failed. By default, tasks retry indefinitely. You
  46     * can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an
  47     * immediate permanent failure.
  48     *
  49     * @return int|null  Number of times the task will retry before permanent
  50     *                   failure. Return `null` to retry indefinitely.
  51     *
  52     * @task config
  53     */
  54    public function getMaximumRetryCount() {
  55      return null;
  56    }
  57  
  58  
  59    /**
  60     * Return the number of seconds a task should wait after a failure before
  61     * retrying. For most tasks you can leave this at `null`, which will give you
  62     * a short default retry period (currently 60 seconds).
  63     *
  64     * @param  PhabricatorWorkerTask  The task itself. This object is probably
  65     *                                useful mostly to examine the failure count
  66     *                                if you want to implement staggered retries,
  67     *                                or to examine the execution exception if
  68     *                                you want to react to different failures in
  69     *                                different ways.
  70     * @return int|null               Number of seconds to wait between retries,
  71     *                                or null for a default retry period
  72     *                                (currently 60 seconds).
  73     *
  74     * @task config
  75     */
  76    public function getWaitBeforeRetry(PhabricatorWorkerTask $task) {
  77      return null;
  78    }
  79  
  80    abstract protected function doWork();
  81  
  82    final public function __construct($data) {
  83      $this->data = $data;
  84    }
  85  
  86    final protected function getTaskData() {
  87      return $this->data;
  88    }
  89  
  90    final public function executeTask() {
  91      $this->doWork();
  92    }
  93  
  94    final public static function scheduleTask(
  95      $task_class,
  96      $data,
  97      $priority = null) {
  98  
  99      if ($priority === null) {
 100        $priority = self::PRIORITY_DEFAULT;
 101      }
 102  
 103      $task = id(new PhabricatorWorkerActiveTask())
 104        ->setTaskClass($task_class)
 105        ->setData($data)
 106        ->setPriority($priority);
 107  
 108      if (self::$runAllTasksInProcess) {
 109        // Do the work in-process.
 110        $worker = newv($task_class, array($data));
 111  
 112        while (true) {
 113          try {
 114            $worker->doWork();
 115            foreach ($worker->getQueuedTasks() as $queued_task) {
 116              list($queued_class, $queued_data, $queued_priority) = $queued_task;
 117              self::scheduleTask($queued_class, $queued_data, $queued_priority);
 118            }
 119            break;
 120          } catch (PhabricatorWorkerYieldException $ex) {
 121            phlog(
 122              pht(
 123                'In-process task "%s" yielded for %s seconds, sleeping...',
 124                $task_class,
 125                $ex->getDuration()));
 126            sleep($ex->getDuration());
 127          }
 128        }
 129  
 130        // Now, save a task row and immediately archive it so we can return an
 131        // object with a valid ID.
 132        $task->openTransaction();
 133          $task->save();
 134          $archived = $task->archiveTask(
 135            PhabricatorWorkerArchiveTask::RESULT_SUCCESS,
 136            0);
 137        $task->saveTransaction();
 138  
 139        return $archived;
 140      } else {
 141        $task->save();
 142        return $task;
 143      }
 144    }
 145  
 146  
 147    /**
 148     * Wait for tasks to complete. If tasks are not leased by other workers, they
 149     * will be executed in this process while waiting.
 150     *
 151     * @param list<int>   List of queued task IDs to wait for.
 152     * @return void
 153     */
 154    final public static function waitForTasks(array $task_ids) {
 155      if (!$task_ids) {
 156        return;
 157      }
 158  
 159      $task_table = new PhabricatorWorkerActiveTask();
 160  
 161      $waiting = array_fuse($task_ids);
 162      while ($waiting) {
 163        $conn_w = $task_table->establishConnection('w');
 164  
 165        // Check if any of the tasks we're waiting on are still queued. If they
 166        // are not, we're done waiting.
 167        $row = queryfx_one(
 168          $conn_w,
 169          'SELECT COUNT(*) N FROM %T WHERE id IN (%Ld)',
 170          $task_table->getTableName(),
 171          $waiting);
 172        if (!$row['N']) {
 173          // Nothing is queued anymore. Stop waiting.
 174          break;
 175        }
 176  
 177        $tasks = id(new PhabricatorWorkerLeaseQuery())
 178          ->withIDs($waiting)
 179          ->setLimit(1)
 180          ->execute();
 181  
 182        if (!$tasks) {
 183          // We were not successful in leasing anything. Sleep for a bit and
 184          // see if we have better luck later.
 185          sleep(1);
 186          continue;
 187        }
 188  
 189        $task = head($tasks)->executeTask();
 190  
 191        $ex = $task->getExecutionException();
 192        if ($ex) {
 193          throw $ex;
 194        }
 195      }
 196  
 197      $tasks = id(new PhabricatorWorkerArchiveTask())->loadAllWhere(
 198        'id IN (%Ld)',
 199        $task_ids);
 200  
 201      foreach ($tasks as $task) {
 202        if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) {
 203          throw new Exception(pht('Task %d failed!', $task->getID()));
 204        }
 205      }
 206    }
 207  
 208    public function renderForDisplay(PhabricatorUser $viewer) {
 209      $data = PhutilReadableSerializer::printableValue($this->data);
 210      return phutil_tag('pre', array(), $data);
 211    }
 212  
 213    /**
 214     * Set this flag to execute scheduled tasks synchronously, in the same
 215     * process. This is useful for debugging, and otherwise dramatically worse
 216     * in every way imaginable.
 217     */
 218    public static function setRunAllTasksInProcess($all) {
 219      self::$runAllTasksInProcess = $all;
 220    }
 221  
 222    final protected function log($pattern /* , ... */) {
 223      $console = PhutilConsole::getConsole();
 224      $argv = func_get_args();
 225      call_user_func_array(array($console, 'writeLog'), $argv);
 226      return $this;
 227    }
 228  
 229  
 230    /**
 231     * Queue a task to be executed after this one succeeds.
 232     *
 233     * The followup task will be queued only if this task completes cleanly.
 234     *
 235     * @param string    Task class to queue.
 236     * @param array     Data for the followup task.
 237     * @param int|null  Priority for the followup task.
 238     * @return this
 239     */
 240    final protected function queueTask($class, array $data, $priority = null) {
 241      $this->queuedTasks[] = array($class, $data, $priority);
 242      return $this;
 243    }
 244  
 245  
 246    /**
 247     * Get tasks queued as followups by @{method:queueTask}.
 248     *
 249     * @return list<tuple<string, wild, int|null>> Queued task specifications.
 250     */
 251    final public function getQueuedTasks() {
 252      return $this->queuedTasks;
 253    }
 254  
 255  }


Generated: Sun Nov 30 09:20:46 2014 Cross-referenced by PHPXref 0.7.1