[ Index ] |
PHP Cross Reference of Phabricator |
[Summary view] [Print] [Text view]
1 <?php 2 3 /** 4 * Iterate over objects by update time in a stable way. This iterator only works 5 * for "normal" Lisk objects: objects with an autoincrement ID and a 6 * dateModified column. 7 */ 8 final class PhabricatorFactUpdateIterator extends PhutilBufferedIterator { 9 10 private $cursor; 11 private $object; 12 private $position; 13 private $ignoreUpdatesDuration = 15; 14 15 private $set; 16 17 public function __construct(LiskDAO $object) { 18 $this->set = new LiskDAOSet(); 19 $this->object = $object->putInSet($this->set); 20 } 21 22 public function setPosition($position) { 23 $this->position = $position; 24 return $this; 25 } 26 27 protected function didRewind() { 28 $this->cursor = $this->position; 29 } 30 31 protected function getCursorFromObject($object) { 32 if ($object->hasProperty('dateModified')) { 33 return $object->getDateModified().':'.$object->getID(); 34 } else { 35 return $object->getID(); 36 } 37 } 38 39 public function key() { 40 return $this->getCursorFromObject($this->current()); 41 } 42 43 protected function loadPage() { 44 $this->set->clearSet(); 45 46 if ($this->object->hasProperty('dateModified')) { 47 if ($this->cursor) { 48 list($after_epoch, $after_id) = explode(':', $this->cursor); 49 } else { 50 $after_epoch = 0; 51 $after_id = 0; 52 } 53 54 // NOTE: We ignore recent updates because once we process an update we'll 55 // never process rows behind it again. We need to read only rows which 56 // we're sure no new rows will be inserted behind. If we read a row that 57 // was updated on the current second, another update later on in this 58 // second could affect an object with a lower ID, and we'd skip that 59 // update. To avoid this, just ignore any rows which have been updated in 60 // the last few seconds. This also reduces the amount of work we need to 61 // do if an object is repeatedly updated; we will just look at the end 62 // state without processing the intermediate states. Finally, this gives 63 // us reasonable protections against clock skew between the machine the 64 // daemon is running on and any machines performing writes. 65 66 $page = $this->object->loadAllWhere( 67 '((dateModified > %d) OR (dateModified = %d AND id > %d)) 68 AND (dateModified < %d - %d) 69 ORDER BY dateModified ASC, id ASC LIMIT %d', 70 $after_epoch, 71 $after_epoch, 72 $after_id, 73 time(), 74 $this->ignoreUpdatesDuration, 75 $this->getPageSize()); 76 } else { 77 if ($this->cursor) { 78 $after_id = $this->cursor; 79 } else { 80 $after_id = 0; 81 } 82 83 $page = $this->object->loadAllWhere( 84 'id > %d ORDER BY id ASC LIMIT %d', 85 $after_id, 86 $this->getPageSize()); 87 } 88 89 if ($page) { 90 $this->cursor = $this->getCursorFromObject(end($page)); 91 } 92 93 return $page; 94 } 95 96 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body
Generated: Sun Nov 30 09:20:46 2014 | Cross-referenced by PHPXref 0.7.1 |