[ Index ] |
PHP Cross Reference of Phabricator |
[Summary view] [Print] [Text view]
1 <?php 2 3 /** 4 * @task config Configuring Retries and Failures 5 */ 6 abstract class PhabricatorWorker { 7 8 private $data; 9 private static $runAllTasksInProcess = false; 10 private $queuedTasks = array(); 11 12 // NOTE: Lower priority numbers execute first. The priority numbers have to 13 // have the same ordering that IDs do (lowest first) so MySQL can use a 14 // multipart key across both of them efficiently. 15 16 const PRIORITY_ALERTS = 1000; 17 const PRIORITY_DEFAULT = 2000; 18 const PRIORITY_BULK = 3000; 19 const PRIORITY_IMPORT = 4000; 20 21 22 /* -( Configuring Retries and Failures )----------------------------------- */ 23 24 25 /** 26 * Return the number of seconds this worker needs hold a lease on the task for 27 * while it performs work. For most tasks you can leave this at `null`, which 28 * will give you a default lease (currently 2 hours). 29 * 30 * For tasks which may take a very long time to complete, you should return 31 * an upper bound on the amount of time the task may require. 32 * 33 * @return int|null Number of seconds this task needs to remain leased for, 34 * or null for a default lease. 35 * 36 * @task config 37 */ 38 public function getRequiredLeaseTime() { 39 return null; 40 } 41 42 43 /** 44 * Return the maximum number of times this task may be retried before it is 45 * considered permanently failed. By default, tasks retry indefinitely. You 46 * can throw a @{class:PhabricatorWorkerPermanentFailureException} to cause an 47 * immediate permanent failure. 48 * 49 * @return int|null Number of times the task will retry before permanent 50 * failure. Return `null` to retry indefinitely. 51 * 52 * @task config 53 */ 54 public function getMaximumRetryCount() { 55 return null; 56 } 57 58 59 /** 60 * Return the number of seconds a task should wait after a failure before 61 * retrying. For most tasks you can leave this at `null`, which will give you 62 * a short default retry period (currently 60 seconds). 63 * 64 * @param PhabricatorWorkerTask The task itself. This object is probably 65 * useful mostly to examine the failure count 66 * if you want to implement staggered retries, 67 * or to examine the execution exception if 68 * you want to react to different failures in 69 * different ways. 70 * @return int|null Number of seconds to wait between retries, 71 * or null for a default retry period 72 * (currently 60 seconds). 73 * 74 * @task config 75 */ 76 public function getWaitBeforeRetry(PhabricatorWorkerTask $task) { 77 return null; 78 } 79 80 abstract protected function doWork(); 81 82 final public function __construct($data) { 83 $this->data = $data; 84 } 85 86 final protected function getTaskData() { 87 return $this->data; 88 } 89 90 final public function executeTask() { 91 $this->doWork(); 92 } 93 94 final public static function scheduleTask( 95 $task_class, 96 $data, 97 $priority = null) { 98 99 if ($priority === null) { 100 $priority = self::PRIORITY_DEFAULT; 101 } 102 103 $task = id(new PhabricatorWorkerActiveTask()) 104 ->setTaskClass($task_class) 105 ->setData($data) 106 ->setPriority($priority); 107 108 if (self::$runAllTasksInProcess) { 109 // Do the work in-process. 110 $worker = newv($task_class, array($data)); 111 112 while (true) { 113 try { 114 $worker->doWork(); 115 foreach ($worker->getQueuedTasks() as $queued_task) { 116 list($queued_class, $queued_data, $queued_priority) = $queued_task; 117 self::scheduleTask($queued_class, $queued_data, $queued_priority); 118 } 119 break; 120 } catch (PhabricatorWorkerYieldException $ex) { 121 phlog( 122 pht( 123 'In-process task "%s" yielded for %s seconds, sleeping...', 124 $task_class, 125 $ex->getDuration())); 126 sleep($ex->getDuration()); 127 } 128 } 129 130 // Now, save a task row and immediately archive it so we can return an 131 // object with a valid ID. 132 $task->openTransaction(); 133 $task->save(); 134 $archived = $task->archiveTask( 135 PhabricatorWorkerArchiveTask::RESULT_SUCCESS, 136 0); 137 $task->saveTransaction(); 138 139 return $archived; 140 } else { 141 $task->save(); 142 return $task; 143 } 144 } 145 146 147 /** 148 * Wait for tasks to complete. If tasks are not leased by other workers, they 149 * will be executed in this process while waiting. 150 * 151 * @param list<int> List of queued task IDs to wait for. 152 * @return void 153 */ 154 final public static function waitForTasks(array $task_ids) { 155 if (!$task_ids) { 156 return; 157 } 158 159 $task_table = new PhabricatorWorkerActiveTask(); 160 161 $waiting = array_fuse($task_ids); 162 while ($waiting) { 163 $conn_w = $task_table->establishConnection('w'); 164 165 // Check if any of the tasks we're waiting on are still queued. If they 166 // are not, we're done waiting. 167 $row = queryfx_one( 168 $conn_w, 169 'SELECT COUNT(*) N FROM %T WHERE id IN (%Ld)', 170 $task_table->getTableName(), 171 $waiting); 172 if (!$row['N']) { 173 // Nothing is queued anymore. Stop waiting. 174 break; 175 } 176 177 $tasks = id(new PhabricatorWorkerLeaseQuery()) 178 ->withIDs($waiting) 179 ->setLimit(1) 180 ->execute(); 181 182 if (!$tasks) { 183 // We were not successful in leasing anything. Sleep for a bit and 184 // see if we have better luck later. 185 sleep(1); 186 continue; 187 } 188 189 $task = head($tasks)->executeTask(); 190 191 $ex = $task->getExecutionException(); 192 if ($ex) { 193 throw $ex; 194 } 195 } 196 197 $tasks = id(new PhabricatorWorkerArchiveTask())->loadAllWhere( 198 'id IN (%Ld)', 199 $task_ids); 200 201 foreach ($tasks as $task) { 202 if ($task->getResult() != PhabricatorWorkerArchiveTask::RESULT_SUCCESS) { 203 throw new Exception(pht('Task %d failed!', $task->getID())); 204 } 205 } 206 } 207 208 public function renderForDisplay(PhabricatorUser $viewer) { 209 $data = PhutilReadableSerializer::printableValue($this->data); 210 return phutil_tag('pre', array(), $data); 211 } 212 213 /** 214 * Set this flag to execute scheduled tasks synchronously, in the same 215 * process. This is useful for debugging, and otherwise dramatically worse 216 * in every way imaginable. 217 */ 218 public static function setRunAllTasksInProcess($all) { 219 self::$runAllTasksInProcess = $all; 220 } 221 222 final protected function log($pattern /* , ... */) { 223 $console = PhutilConsole::getConsole(); 224 $argv = func_get_args(); 225 call_user_func_array(array($console, 'writeLog'), $argv); 226 return $this; 227 } 228 229 230 /** 231 * Queue a task to be executed after this one succeeds. 232 * 233 * The followup task will be queued only if this task completes cleanly. 234 * 235 * @param string Task class to queue. 236 * @param array Data for the followup task. 237 * @param int|null Priority for the followup task. 238 * @return this 239 */ 240 final protected function queueTask($class, array $data, $priority = null) { 241 $this->queuedTasks[] = array($class, $data, $priority); 242 return $this; 243 } 244 245 246 /** 247 * Get tasks queued as followups by @{method:queueTask}. 248 * 249 * @return list<tuple<string, wild, int|null>> Queued task specifications. 250 */ 251 final public function getQueuedTasks() { 252 return $this->queuedTasks; 253 } 254 255 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Sun Nov 30 09:20:46 2014 | Cross-referenced by PHPXref 0.7.1 |