[ Index ] |
PHP Cross Reference of Phabricator |
[Summary view] [Print] [Text view]
1 <?php 2 3 /** 4 * Run pull commands on local working copies to keep them up to date. This 5 * daemon handles all repository types. 6 * 7 * By default, the daemon pulls **every** repository. If you want it to be 8 * responsible for only some repositories, you can launch it with a list of 9 * PHIDs or callsigns: 10 * 11 * ./phd launch repositorypulllocal -- X Q Z 12 * 13 * You can also launch a daemon which is responsible for all //but// one or 14 * more repositories: 15 * 16 * ./phd launch repositorypulllocal -- --not A --not B 17 * 18 * If you have a very large number of repositories and some aren't being pulled 19 * as frequently as you'd like, you can either change the pull frequency of 20 * the less-important repositories to a larger number (so the daemon will skip 21 * them more often) or launch one daemon for all the less-important repositories 22 * and one for the more important repositories (or one for each more important 23 * repository). 24 * 25 * @task pull Pulling Repositories 26 */ 27 final class PhabricatorRepositoryPullLocalDaemon 28 extends PhabricatorDaemon { 29 30 31 /* -( Pulling Repositories )----------------------------------------------- */ 32 33 34 /** 35 * @task pull 36 */ 37 public function run() { 38 $argv = $this->getArgv(); 39 array_unshift($argv, __CLASS__); 40 $args = new PhutilArgumentParser($argv); 41 $args->parse( 42 array( 43 array( 44 'name' => 'no-discovery', 45 'help' => 'Pull only, without discovering commits.', 46 ), 47 array( 48 'name' => 'not', 49 'param' => 'repository', 50 'repeat' => true, 51 'help' => 'Do not pull __repository__.', 52 ), 53 array( 54 'name' => 'repositories', 55 'wildcard' => true, 56 'help' => 'Pull specific __repositories__ instead of all.', 57 ), 58 )); 59 60 $no_discovery = $args->getArg('no-discovery'); 61 $include = $args->getArg('repositories'); 62 $exclude = $args->getArg('not'); 63 64 // Each repository has an individual pull frequency; after we pull it, 65 // wait that long to pull it again. When we start up, try to pull everything 66 // serially. 67 $retry_after = array(); 68 69 $min_sleep = 15; 70 $max_futures = 4; 71 $futures = array(); 72 $queue = array(); 73 74 while (!$this->shouldExit()) { 75 $pullable = $this->loadPullableRepositories($include, $exclude); 76 77 // If any repositories have the NEEDS_UPDATE flag set, pull them 78 // as soon as possible. 79 $need_update_messages = $this->loadRepositoryUpdateMessages(); 80 foreach ($need_update_messages as $message) { 81 $repo = idx($pullable, $message->getRepositoryID()); 82 if (!$repo) { 83 continue; 84 } 85 86 $this->log( 87 pht( 88 'Got an update message for repository "%s"!', 89 $repo->getMonogram())); 90 91 $retry_after[$message->getRepositoryID()] = time(); 92 } 93 94 // If any repositories were deleted, remove them from the retry timer map 95 // so we don't end up with a retry timer that never gets updated and 96 // causes us to sleep for the minimum amount of time. 97 $retry_after = array_select_keys( 98 $retry_after, 99 array_keys($pullable)); 100 101 102 // Figure out which repositories we need to queue for an update. 103 foreach ($pullable as $id => $repository) { 104 $monogram = $repository->getMonogram(); 105 106 if (isset($futures[$id])) { 107 $this->log(pht('Repository "%s" is currently updating.', $monogram)); 108 continue; 109 } 110 111 if (isset($queue[$id])) { 112 $this->log(pht('Repository "%s" is already queued.', $monogram)); 113 continue; 114 } 115 116 $after = idx($retry_after, $id, 0); 117 if ($after > time()) { 118 $this->log( 119 pht( 120 'Repository "%s" is not due for an update for %s second(s).', 121 $monogram, 122 new PhutilNumber($after - time()))); 123 continue; 124 } 125 126 if (!$after) { 127 $this->log( 128 pht( 129 'Scheduling repository "%s" for an initial update.', 130 $monogram)); 131 } else { 132 $this->log( 133 pht( 134 'Scheduling repository "%s" for an update (%s seconds overdue).', 135 $monogram, 136 new PhutilNumber(time() - $after))); 137 } 138 139 $queue[$id] = $after; 140 } 141 142 // Process repositories in the order they became candidates for updates. 143 asort($queue); 144 145 // Dequeue repositories until we hit maximum parallelism. 146 while ($queue && (count($futures) < $max_futures)) { 147 foreach ($queue as $id => $time) { 148 $repository = idx($pullable, $id); 149 if (!$repository) { 150 $this->log( 151 pht('Repository %s is no longer pullable; skipping.', $id)); 152 break; 153 } 154 155 $monogram = $repository->getMonogram(); 156 $this->log(pht('Starting update for repository "%s".', $monogram)); 157 158 unset($queue[$id]); 159 $futures[$id] = $this->buildUpdateFuture( 160 $repository, 161 $no_discovery); 162 163 break; 164 } 165 } 166 167 if ($queue) { 168 $this->log( 169 pht( 170 'Not enough process slots to schedule the other %s '. 171 'repository(s) for updates yet.', 172 new PhutilNumber(count($queue)))); 173 } 174 175 if ($futures) { 176 $iterator = id(new FutureIterator($futures)) 177 ->setUpdateInterval($min_sleep); 178 179 foreach ($iterator as $id => $future) { 180 $this->stillWorking(); 181 182 if ($future === null) { 183 $this->log(pht('Waiting for updates to complete...')); 184 $this->stillWorking(); 185 186 if ($this->loadRepositoryUpdateMessages()) { 187 $this->log(pht('Interrupted by pending updates!')); 188 break; 189 } 190 191 continue; 192 } 193 194 unset($futures[$id]); 195 $retry_after[$id] = $this->resolveUpdateFuture( 196 $pullable[$id], 197 $future, 198 $min_sleep); 199 200 // We have a free slot now, so go try to fill it. 201 break; 202 } 203 204 // Jump back into prioritization if we had any futures to deal with. 205 continue; 206 } 207 208 $this->waitForUpdates($min_sleep, $retry_after); 209 } 210 211 } 212 213 214 /** 215 * @task pull 216 */ 217 private function buildUpdateFuture( 218 PhabricatorRepository $repository, 219 $no_discovery) { 220 221 $bin = dirname(phutil_get_library_root('phabricator')).'/bin/repository'; 222 223 $flags = array(); 224 if ($no_discovery) { 225 $flags[] = '--no-discovery'; 226 } 227 228 $callsign = $repository->getCallsign(); 229 230 $future = new ExecFuture('%s update %Ls -- %s', $bin, $flags, $callsign); 231 232 // Sometimes, the underlying VCS commands will hang indefinitely. We've 233 // observed this occasionally with GitHub, and other users have observed 234 // it with other VCS servers. 235 236 // To limit the damage this can cause, kill the update out after a 237 // reasonable amount of time, under the assumption that it has hung. 238 239 // Since it's hard to know what a "reasonable" amount of time is given that 240 // users may be downloading a repository full of pirated movies over a 241 // potato, these limits are fairly generous. Repositories exceeding these 242 // limits can be manually pulled with `bin/repository update X`, which can 243 // just run for as long as it wants. 244 245 if ($repository->isImporting()) { 246 $timeout = phutil_units('4 hours in seconds'); 247 } else { 248 $timeout = phutil_units('15 minutes in seconds'); 249 } 250 251 $future->setTimeout($timeout); 252 253 return $future; 254 } 255 256 257 /** 258 * @task pull 259 */ 260 private function loadRepositoryUpdateMessages() { 261 $type_need_update = PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE; 262 return id(new PhabricatorRepositoryStatusMessage()) 263 ->loadAllWhere('statusType = %s', $type_need_update); 264 } 265 266 267 /** 268 * @task pull 269 */ 270 private function loadPullableRepositories(array $include, array $exclude) { 271 $query = id(new PhabricatorRepositoryQuery()) 272 ->setViewer($this->getViewer()); 273 274 if ($include) { 275 $query->withCallsigns($include); 276 } 277 278 $repositories = $query->execute(); 279 280 if ($include) { 281 $by_callsign = mpull($repositories, null, 'getCallsign'); 282 foreach ($include as $name) { 283 if (empty($by_callsign[$name])) { 284 throw new Exception( 285 "No repository exists with callsign '{$name}'!"); 286 } 287 } 288 } 289 290 if ($exclude) { 291 $exclude = array_fuse($exclude); 292 foreach ($repositories as $key => $repository) { 293 if (isset($exclude[$repository->getCallsign()])) { 294 unset($repositories[$key]); 295 } 296 } 297 } 298 299 foreach ($repositories as $key => $repository) { 300 if (!$repository->isTracked()) { 301 unset($repositories[$key]); 302 } 303 } 304 305 // Shuffle the repositories, then re-key the array since shuffle() 306 // discards keys. This is mostly for startup, we'll use soft priorities 307 // later. 308 shuffle($repositories); 309 $repositories = mpull($repositories, null, 'getID'); 310 311 return $repositories; 312 } 313 314 315 /** 316 * @task pull 317 */ 318 private function resolveUpdateFuture( 319 PhabricatorRepository $repository, 320 ExecFuture $future, 321 $min_sleep) { 322 323 $monogram = $repository->getMonogram(); 324 325 $this->log(pht('Resolving update for "%s".', $monogram)); 326 327 try { 328 list($stdout, $stderr) = $future->resolvex(); 329 } catch (Exception $ex) { 330 $proxy = new PhutilProxyException( 331 pht( 332 'Error while updating the "%s" repository.', 333 $repository->getMonogram()), 334 $ex); 335 phlog($proxy); 336 337 return time() + $min_sleep; 338 } 339 340 if (strlen($stderr)) { 341 $stderr_msg = pht( 342 'Unexpected output while updating repository "%s": %s', 343 $monogram, 344 $stderr); 345 phlog($stderr_msg); 346 } 347 348 // For now, continue respecting this deprecated setting for raising the 349 // minimum pull frequency. 350 // TODO: Remove this some day once this code has been completely stable 351 // for a while. 352 $sleep_for = (int)$repository->getDetail('pull-frequency'); 353 $min_sleep = max($sleep_for, $min_sleep); 354 355 $smart_wait = $repository->loadUpdateInterval($min_sleep); 356 357 $this->log( 358 pht( 359 'Based on activity in repository "%s", considering a wait of %s '. 360 'seconds before update.', 361 $repository->getMonogram(), 362 new PhutilNumber($smart_wait))); 363 364 return time() + $smart_wait; 365 } 366 367 368 369 /** 370 * Sleep for a short period of time, waiting for update messages from the 371 * 372 * 373 * @task pull 374 */ 375 private function waitForUpdates($min_sleep, array $retry_after) { 376 $this->log( 377 pht('No repositories need updates right now, sleeping...')); 378 379 $sleep_until = time() + $min_sleep; 380 if ($retry_after) { 381 $sleep_until = min($sleep_until, min($retry_after)); 382 } 383 384 while (($sleep_until - time()) > 0) { 385 $sleep_duration = ($sleep_until - time()); 386 387 $this->log( 388 pht( 389 'Sleeping for %s more second(s)...', 390 new PhutilNumber($sleep_duration))); 391 392 $this->sleep(1); 393 394 if ($this->shouldExit()) { 395 $this->log(pht('Awakened from sleep by graceful shutdown!')); 396 return; 397 } 398 399 if ($this->loadRepositoryUpdateMessages()) { 400 $this->log(pht('Awakened from sleep by pending updates!')); 401 break; 402 } 403 } 404 } 405 406 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Sun Nov 30 09:20:46 2014 | Cross-referenced by PHPXref 0.7.1 |