[ Index ] |
PHP Cross Reference of Phabricator |
[Summary view] [Print] [Text view]
1 <?php 2 3 /** 4 * Select and lease tasks from the worker task queue. 5 */ 6 final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery { 7 8 const PHASE_UNLEASED = 'unleased'; 9 const PHASE_EXPIRED = 'expired'; 10 11 private $ids; 12 private $limit; 13 private $skipLease; 14 15 public static function getDefaultWaitBeforeRetry() { 16 return phutil_units('5 minutes in seconds'); 17 } 18 19 public static function getDefaultLeaseDuration() { 20 return phutil_units('2 hours in seconds'); 21 } 22 23 /** 24 * Set this flag to select tasks from the top of the queue without leasing 25 * them. 26 * 27 * This can be used to show which tasks are coming up next without altering 28 * the queue's behavior. 29 * 30 * @param bool True to skip the lease acquisition step. 31 */ 32 public function setSkipLease($skip) { 33 $this->skipLease = $skip; 34 return $this; 35 } 36 37 public function withIDs(array $ids) { 38 $this->ids = $ids; 39 return $this; 40 } 41 42 public function setLimit($limit) { 43 $this->limit = $limit; 44 return $this; 45 } 46 47 public function execute() { 48 if (!$this->limit) { 49 throw new Exception('You must setLimit() when leasing tasks.'); 50 } 51 52 $task_table = new PhabricatorWorkerActiveTask(); 53 $taskdata_table = new PhabricatorWorkerTaskData(); 54 $lease_ownership_name = $this->getLeaseOwnershipName(); 55 56 $conn_w = $task_table->establishConnection('w'); 57 58 // Try to satisfy the request from new, unleased tasks first. If we don't 59 // find enough tasks, try tasks with expired leases (i.e., tasks which have 60 // previously failed). 61 62 $phases = array( 63 self::PHASE_UNLEASED, 64 self::PHASE_EXPIRED, 65 ); 66 $limit = $this->limit; 67 68 $leased = 0; 69 $task_ids = array(); 70 foreach ($phases as $phase) { 71 // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query 72 // goes very, very slowly. The `ORDER BY` triggers this, although we get 73 // the same apparent results without it. Without the ORDER BY, binary 74 // read slaves complain that the query isn't repeatable. To avoid both 75 // problems, do a SELECT and then an UPDATE. 76 77 $rows = queryfx_all( 78 $conn_w, 79 'SELECT id, leaseOwner FROM %T %Q %Q %Q', 80 $task_table->getTableName(), 81 $this->buildWhereClause($conn_w, $phase), 82 $this->buildOrderClause($conn_w, $phase), 83 $this->buildLimitClause($conn_w, $limit - $leased)); 84 85 // NOTE: Sometimes, we'll race with another worker and they'll grab 86 // this task before we do. We could reduce how often this happens by 87 // selecting more tasks than we need, then shuffling them and trying 88 // to lock only the number we're actually after. However, the amount 89 // of time workers spend here should be very small relative to their 90 // total runtime, so keep it simple for the moment. 91 92 if ($rows) { 93 if ($this->skipLease) { 94 $leased += count($rows); 95 $task_ids += array_fuse(ipull($rows, 'id')); 96 } else { 97 queryfx( 98 $conn_w, 99 'UPDATE %T task 100 SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d 101 %Q', 102 $task_table->getTableName(), 103 $lease_ownership_name, 104 self::getDefaultLeaseDuration(), 105 $this->buildUpdateWhereClause($conn_w, $phase, $rows)); 106 107 $leased += $conn_w->getAffectedRows(); 108 } 109 110 if ($leased == $limit) { 111 break; 112 } 113 } 114 } 115 116 if (!$leased) { 117 return array(); 118 } 119 120 if ($this->skipLease) { 121 $selection_condition = qsprintf( 122 $conn_w, 123 'task.id IN (%Ld)', 124 $task_ids); 125 } else { 126 $selection_condition = qsprintf( 127 $conn_w, 128 'task.leaseOwner = %s AND leaseExpires > UNIX_TIMESTAMP()', 129 $lease_ownership_name); 130 } 131 132 $data = queryfx_all( 133 $conn_w, 134 'SELECT task.*, taskdata.data _taskData, UNIX_TIMESTAMP() _serverTime 135 FROM %T task LEFT JOIN %T taskdata 136 ON taskdata.id = task.dataID 137 WHERE %Q %Q %Q', 138 $task_table->getTableName(), 139 $taskdata_table->getTableName(), 140 $selection_condition, 141 $this->buildOrderClause($conn_w, $phase), 142 $this->buildLimitClause($conn_w, $limit)); 143 144 $tasks = $task_table->loadAllFromArray($data); 145 $tasks = mpull($tasks, null, 'getID'); 146 147 foreach ($data as $row) { 148 $tasks[$row['id']]->setServerTime($row['_serverTime']); 149 if ($row['_taskData']) { 150 $task_data = json_decode($row['_taskData'], true); 151 } else { 152 $task_data = null; 153 } 154 $tasks[$row['id']]->setData($task_data); 155 } 156 157 return $tasks; 158 } 159 160 private function buildWhereClause(AphrontDatabaseConnection $conn_w, $phase) { 161 $where = array(); 162 163 switch ($phase) { 164 case self::PHASE_UNLEASED: 165 $where[] = 'leaseOwner IS NULL'; 166 break; 167 case self::PHASE_EXPIRED: 168 $where[] = 'leaseExpires < UNIX_TIMESTAMP()'; 169 break; 170 default: 171 throw new Exception("Unknown phase '{$phase}'!"); 172 } 173 174 if ($this->ids) { 175 $where[] = qsprintf($conn_w, 'id IN (%Ld)', $this->ids); 176 } 177 178 return $this->formatWhereClause($where); 179 } 180 181 private function buildUpdateWhereClause( 182 AphrontDatabaseConnection $conn_w, 183 $phase, 184 array $rows) { 185 186 $where = array(); 187 188 // NOTE: This is basically working around the MySQL behavior that 189 // `IN (NULL)` doesn't match NULL. 190 191 switch ($phase) { 192 case self::PHASE_UNLEASED: 193 $where[] = qsprintf($conn_w, 'leaseOwner IS NULL'); 194 $where[] = qsprintf($conn_w, 'id IN (%Ld)', ipull($rows, 'id')); 195 break; 196 case self::PHASE_EXPIRED: 197 $in = array(); 198 foreach ($rows as $row) { 199 $in[] = qsprintf( 200 $conn_w, 201 '(id = %d AND leaseOwner = %s)', 202 $row['id'], 203 $row['leaseOwner']); 204 } 205 $where[] = qsprintf($conn_w, '(%Q)', implode(' OR ', $in)); 206 break; 207 default: 208 throw new Exception("Unknown phase '{$phase}'!"); 209 } 210 211 return $this->formatWhereClause($where); 212 } 213 214 private function buildOrderClause(AphrontDatabaseConnection $conn_w, $phase) { 215 switch ($phase) { 216 case self::PHASE_UNLEASED: 217 // When selecting new tasks, we want to consume them in order of 218 // increasing priority (and then FIFO). 219 return qsprintf($conn_w, 'ORDER BY priority ASC, id ASC'); 220 case self::PHASE_EXPIRED: 221 // When selecting failed tasks, we want to consume them in roughly 222 // FIFO order of their failures, which is not necessarily their original 223 // queue order. 224 225 // Particularly, this is important for tasks which use soft failures to 226 // indicate that they are waiting on other tasks to complete: we need to 227 // push them to the end of the queue after they fail, at least on 228 // average, so we don't deadlock retrying the same blocked task over 229 // and over again. 230 return qsprintf($conn_w, 'ORDER BY leaseExpires ASC'); 231 default: 232 throw new Exception(pht('Unknown phase "%s"!', $phase)); 233 } 234 } 235 236 private function buildLimitClause(AphrontDatabaseConnection $conn_w, $limit) { 237 return qsprintf($conn_w, 'LIMIT %d', $limit); 238 } 239 240 private function getLeaseOwnershipName() { 241 static $sequence = 0; 242 243 $parts = array( 244 getmypid(), 245 time(), 246 php_uname('n'), 247 ++$sequence, 248 ); 249 250 return implode(':', $parts); 251 } 252 253 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Sun Nov 30 09:20:46 2014 | Cross-referenced by PHPXref 0.7.1 |