[ Index ]

PHP Cross Reference of Phabricator

title

Body

[close]

/src/applications/fact/extract/ -> PhabricatorFactUpdateIterator.php (source)

   1  <?php
   2  
   3  /**
   4   * Iterate over objects by update time in a stable way. This iterator only works
   5   * for "normal" Lisk objects: objects with an autoincrement ID and a
   6   * dateModified column.
   7   */
   8  final class PhabricatorFactUpdateIterator extends PhutilBufferedIterator {
   9  
  10    private $cursor;
  11    private $object;
  12    private $position;
  13    private $ignoreUpdatesDuration = 15;
  14  
  15    private $set;
  16  
  17    public function __construct(LiskDAO $object) {
  18      $this->set = new LiskDAOSet();
  19      $this->object = $object->putInSet($this->set);
  20    }
  21  
  22    public function setPosition($position) {
  23      $this->position = $position;
  24      return $this;
  25    }
  26  
  27    protected function didRewind() {
  28      $this->cursor = $this->position;
  29    }
  30  
  31    protected function getCursorFromObject($object) {
  32      if ($object->hasProperty('dateModified')) {
  33        return $object->getDateModified().':'.$object->getID();
  34      } else {
  35        return $object->getID();
  36      }
  37    }
  38  
  39    public function key() {
  40      return $this->getCursorFromObject($this->current());
  41    }
  42  
  43    protected function loadPage() {
  44      $this->set->clearSet();
  45  
  46      if ($this->object->hasProperty('dateModified')) {
  47        if ($this->cursor) {
  48          list($after_epoch, $after_id) = explode(':', $this->cursor);
  49        } else {
  50          $after_epoch = 0;
  51          $after_id = 0;
  52        }
  53  
  54        // NOTE: We ignore recent updates because once we process an update we'll
  55        // never process rows behind it again. We need to read only rows which
  56        // we're sure no new rows will be inserted behind. If we read a row that
  57        // was updated on the current second, another update later on in this
  58        // second could affect an object with a lower ID, and we'd skip that
  59        // update. To avoid this, just ignore any rows which have been updated in
  60        // the last few seconds. This also reduces the amount of work we need to
  61        // do if an object is repeatedly updated; we will just look at the end
  62        // state without processing the intermediate states. Finally, this gives
  63        // us reasonable protections against clock skew between the machine the
  64        // daemon is running on and any machines performing writes.
  65  
  66        $page = $this->object->loadAllWhere(
  67          '((dateModified > %d) OR (dateModified = %d AND id > %d))
  68            AND (dateModified < %d - %d)
  69            ORDER BY dateModified ASC, id ASC LIMIT %d',
  70          $after_epoch,
  71          $after_epoch,
  72          $after_id,
  73          time(),
  74          $this->ignoreUpdatesDuration,
  75          $this->getPageSize());
  76      } else {
  77        if ($this->cursor) {
  78          $after_id = $this->cursor;
  79        } else {
  80          $after_id = 0;
  81        }
  82  
  83        $page = $this->object->loadAllWhere(
  84          'id > %d ORDER BY id ASC LIMIT %d',
  85          $after_id,
  86          $this->getPageSize());
  87      }
  88  
  89      if ($page) {
  90        $this->cursor = $this->getCursorFromObject(end($page));
  91      }
  92  
  93      return $page;
  94    }
  95  
  96  }


Generated: Sun Nov 30 09:20:46 2014 Cross-referenced by PHPXref 0.7.1