[ Index ]

PHP Cross Reference of Phabricator

title

Body

[close]

/src/applications/repository/daemon/ -> PhabricatorRepositoryPullLocalDaemon.php (source)

   1  <?php
   2  
   3  /**
   4   * Run pull commands on local working copies to keep them up to date. This
   5   * daemon handles all repository types.
   6   *
   7   * By default, the daemon pulls **every** repository. If you want it to be
   8   * responsible for only some repositories, you can launch it with a list of
   9   * PHIDs or callsigns:
  10   *
  11   *   ./phd launch repositorypulllocal -- X Q Z
  12   *
  13   * You can also launch a daemon which is responsible for all //but// one or
  14   * more repositories:
  15   *
  16   *   ./phd launch repositorypulllocal -- --not A --not B
  17   *
  18   * If you have a very large number of repositories and some aren't being pulled
  19   * as frequently as you'd like, you can either change the pull frequency of
  20   * the less-important repositories to a larger number (so the daemon will skip
  21   * them more often) or launch one daemon for all the less-important repositories
  22   * and one for the more important repositories (or one for each more important
  23   * repository).
  24   *
  25   * @task pull   Pulling Repositories
  26   */
  27  final class PhabricatorRepositoryPullLocalDaemon
  28    extends PhabricatorDaemon {
  29  
  30  
  31  /* -(  Pulling Repositories  )----------------------------------------------- */
  32  
  33  
  34    /**
  35     * @task pull
  36     */
  37    public function run() {
  38      $argv = $this->getArgv();
  39      array_unshift($argv, __CLASS__);
  40      $args = new PhutilArgumentParser($argv);
  41      $args->parse(
  42        array(
  43          array(
  44            'name'      => 'no-discovery',
  45            'help'      => 'Pull only, without discovering commits.',
  46          ),
  47          array(
  48            'name'      => 'not',
  49            'param'     => 'repository',
  50            'repeat'    => true,
  51            'help'      => 'Do not pull __repository__.',
  52          ),
  53          array(
  54            'name'      => 'repositories',
  55            'wildcard'  => true,
  56            'help'      => 'Pull specific __repositories__ instead of all.',
  57          ),
  58        ));
  59  
  60      $no_discovery   = $args->getArg('no-discovery');
  61      $include = $args->getArg('repositories');
  62      $exclude = $args->getArg('not');
  63  
  64      // Each repository has an individual pull frequency; after we pull it,
  65      // wait that long to pull it again. When we start up, try to pull everything
  66      // serially.
  67      $retry_after = array();
  68  
  69      $min_sleep = 15;
  70      $max_futures = 4;
  71      $futures = array();
  72      $queue = array();
  73  
  74      while (!$this->shouldExit()) {
  75        $pullable = $this->loadPullableRepositories($include, $exclude);
  76  
  77        // If any repositories have the NEEDS_UPDATE flag set, pull them
  78        // as soon as possible.
  79        $need_update_messages = $this->loadRepositoryUpdateMessages();
  80        foreach ($need_update_messages as $message) {
  81          $repo = idx($pullable, $message->getRepositoryID());
  82          if (!$repo) {
  83            continue;
  84          }
  85  
  86          $this->log(
  87            pht(
  88              'Got an update message for repository "%s"!',
  89              $repo->getMonogram()));
  90  
  91          $retry_after[$message->getRepositoryID()] = time();
  92        }
  93  
  94        // If any repositories were deleted, remove them from the retry timer map
  95        // so we don't end up with a retry timer that never gets updated and
  96        // causes us to sleep for the minimum amount of time.
  97        $retry_after = array_select_keys(
  98          $retry_after,
  99          array_keys($pullable));
 100  
 101  
 102        // Figure out which repositories we need to queue for an update.
 103        foreach ($pullable as $id => $repository) {
 104          $monogram = $repository->getMonogram();
 105  
 106          if (isset($futures[$id])) {
 107            $this->log(pht('Repository "%s" is currently updating.', $monogram));
 108            continue;
 109          }
 110  
 111          if (isset($queue[$id])) {
 112            $this->log(pht('Repository "%s" is already queued.', $monogram));
 113            continue;
 114          }
 115  
 116          $after = idx($retry_after, $id, 0);
 117          if ($after > time()) {
 118            $this->log(
 119              pht(
 120                'Repository "%s" is not due for an update for %s second(s).',
 121                $monogram,
 122                new PhutilNumber($after - time())));
 123            continue;
 124          }
 125  
 126          if (!$after) {
 127            $this->log(
 128              pht(
 129                'Scheduling repository "%s" for an initial update.',
 130                $monogram));
 131          } else {
 132            $this->log(
 133              pht(
 134                'Scheduling repository "%s" for an update (%s seconds overdue).',
 135                $monogram,
 136                new PhutilNumber(time() - $after)));
 137          }
 138  
 139          $queue[$id] = $after;
 140        }
 141  
 142        // Process repositories in the order they became candidates for updates.
 143        asort($queue);
 144  
 145        // Dequeue repositories until we hit maximum parallelism.
 146        while ($queue && (count($futures) < $max_futures)) {
 147          foreach ($queue as $id => $time) {
 148            $repository = idx($pullable, $id);
 149            if (!$repository) {
 150              $this->log(
 151                pht('Repository %s is no longer pullable; skipping.', $id));
 152              break;
 153            }
 154  
 155            $monogram = $repository->getMonogram();
 156            $this->log(pht('Starting update for repository "%s".', $monogram));
 157  
 158            unset($queue[$id]);
 159            $futures[$id] = $this->buildUpdateFuture(
 160              $repository,
 161              $no_discovery);
 162  
 163            break;
 164          }
 165        }
 166  
 167        if ($queue) {
 168          $this->log(
 169            pht(
 170              'Not enough process slots to schedule the other %s '.
 171              'repository(s) for updates yet.',
 172              new PhutilNumber(count($queue))));
 173        }
 174  
 175        if ($futures) {
 176          $iterator = id(new FutureIterator($futures))
 177            ->setUpdateInterval($min_sleep);
 178  
 179          foreach ($iterator as $id => $future) {
 180            $this->stillWorking();
 181  
 182            if ($future === null) {
 183              $this->log(pht('Waiting for updates to complete...'));
 184              $this->stillWorking();
 185  
 186              if ($this->loadRepositoryUpdateMessages()) {
 187                $this->log(pht('Interrupted by pending updates!'));
 188                break;
 189              }
 190  
 191              continue;
 192            }
 193  
 194            unset($futures[$id]);
 195            $retry_after[$id] = $this->resolveUpdateFuture(
 196              $pullable[$id],
 197              $future,
 198              $min_sleep);
 199  
 200            // We have a free slot now, so go try to fill it.
 201            break;
 202          }
 203  
 204          // Jump back into prioritization if we had any futures to deal with.
 205          continue;
 206        }
 207  
 208        $this->waitForUpdates($min_sleep, $retry_after);
 209      }
 210  
 211    }
 212  
 213  
 214    /**
 215     * @task pull
 216     */
 217    private function buildUpdateFuture(
 218      PhabricatorRepository $repository,
 219      $no_discovery) {
 220  
 221      $bin = dirname(phutil_get_library_root('phabricator')).'/bin/repository';
 222  
 223      $flags = array();
 224      if ($no_discovery) {
 225        $flags[] = '--no-discovery';
 226      }
 227  
 228      $callsign = $repository->getCallsign();
 229  
 230      $future = new ExecFuture('%s update %Ls -- %s', $bin, $flags, $callsign);
 231  
 232      // Sometimes, the underlying VCS commands will hang indefinitely. We've
 233      // observed this occasionally with GitHub, and other users have observed
 234      // it with other VCS servers.
 235  
 236      // To limit the damage this can cause, kill the update out after a
 237      // reasonable amount of time, under the assumption that it has hung.
 238  
 239      // Since it's hard to know what a "reasonable" amount of time is given that
 240      // users may be downloading a repository full of pirated movies over a
 241      // potato, these limits are fairly generous. Repositories exceeding these
 242      // limits can be manually pulled with `bin/repository update X`, which can
 243      // just run for as long as it wants.
 244  
 245      if ($repository->isImporting()) {
 246        $timeout = phutil_units('4 hours in seconds');
 247      } else {
 248        $timeout = phutil_units('15 minutes in seconds');
 249      }
 250  
 251      $future->setTimeout($timeout);
 252  
 253      return $future;
 254    }
 255  
 256  
 257    /**
 258     * @task pull
 259     */
 260    private function loadRepositoryUpdateMessages() {
 261      $type_need_update = PhabricatorRepositoryStatusMessage::TYPE_NEEDS_UPDATE;
 262      return id(new PhabricatorRepositoryStatusMessage())
 263        ->loadAllWhere('statusType = %s', $type_need_update);
 264    }
 265  
 266  
 267    /**
 268     * @task pull
 269     */
 270    private function loadPullableRepositories(array $include, array $exclude) {
 271      $query = id(new PhabricatorRepositoryQuery())
 272        ->setViewer($this->getViewer());
 273  
 274      if ($include) {
 275        $query->withCallsigns($include);
 276      }
 277  
 278      $repositories = $query->execute();
 279  
 280      if ($include) {
 281        $by_callsign = mpull($repositories, null, 'getCallsign');
 282        foreach ($include as $name) {
 283          if (empty($by_callsign[$name])) {
 284            throw new Exception(
 285              "No repository exists with callsign '{$name}'!");
 286          }
 287        }
 288      }
 289  
 290      if ($exclude) {
 291        $exclude = array_fuse($exclude);
 292        foreach ($repositories as $key => $repository) {
 293          if (isset($exclude[$repository->getCallsign()])) {
 294            unset($repositories[$key]);
 295          }
 296        }
 297      }
 298  
 299      foreach ($repositories as $key => $repository) {
 300        if (!$repository->isTracked()) {
 301          unset($repositories[$key]);
 302        }
 303      }
 304  
 305      // Shuffle the repositories, then re-key the array since shuffle()
 306      // discards keys. This is mostly for startup, we'll use soft priorities
 307      // later.
 308      shuffle($repositories);
 309      $repositories = mpull($repositories, null, 'getID');
 310  
 311      return $repositories;
 312    }
 313  
 314  
 315    /**
 316     * @task pull
 317     */
 318    private function resolveUpdateFuture(
 319      PhabricatorRepository $repository,
 320      ExecFuture $future,
 321      $min_sleep) {
 322  
 323      $monogram = $repository->getMonogram();
 324  
 325      $this->log(pht('Resolving update for "%s".', $monogram));
 326  
 327      try {
 328        list($stdout, $stderr) = $future->resolvex();
 329      } catch (Exception $ex) {
 330        $proxy = new PhutilProxyException(
 331          pht(
 332            'Error while updating the "%s" repository.',
 333            $repository->getMonogram()),
 334          $ex);
 335        phlog($proxy);
 336  
 337        return time() + $min_sleep;
 338      }
 339  
 340      if (strlen($stderr)) {
 341        $stderr_msg = pht(
 342          'Unexpected output while updating repository "%s": %s',
 343          $monogram,
 344          $stderr);
 345        phlog($stderr_msg);
 346      }
 347  
 348      // For now, continue respecting this deprecated setting for raising the
 349      // minimum pull frequency.
 350      // TODO: Remove this some day once this code has been completely stable
 351      // for a while.
 352      $sleep_for = (int)$repository->getDetail('pull-frequency');
 353      $min_sleep = max($sleep_for, $min_sleep);
 354  
 355      $smart_wait = $repository->loadUpdateInterval($min_sleep);
 356  
 357      $this->log(
 358        pht(
 359          'Based on activity in repository "%s", considering a wait of %s '.
 360          'seconds before update.',
 361          $repository->getMonogram(),
 362          new PhutilNumber($smart_wait)));
 363  
 364      return time() + $smart_wait;
 365    }
 366  
 367  
 368  
 369    /**
 370     * Sleep for a short period of time, waiting for update messages from the
 371     *
 372     *
 373     * @task pull
 374     */
 375    private function waitForUpdates($min_sleep, array $retry_after) {
 376      $this->log(
 377        pht('No repositories need updates right now, sleeping...'));
 378  
 379      $sleep_until = time() + $min_sleep;
 380      if ($retry_after) {
 381        $sleep_until = min($sleep_until, min($retry_after));
 382      }
 383  
 384      while (($sleep_until - time()) > 0) {
 385        $sleep_duration = ($sleep_until - time());
 386  
 387        $this->log(
 388          pht(
 389            'Sleeping for %s more second(s)...',
 390            new PhutilNumber($sleep_duration)));
 391  
 392        $this->sleep(1);
 393  
 394        if ($this->shouldExit()) {
 395          $this->log(pht('Awakened from sleep by graceful shutdown!'));
 396          return;
 397        }
 398  
 399        if ($this->loadRepositoryUpdateMessages()) {
 400          $this->log(pht('Awakened from sleep by pending updates!'));
 401          break;
 402        }
 403      }
 404    }
 405  
 406  }


Generated: Sun Nov 30 09:20:46 2014 Cross-referenced by PHPXref 0.7.1