@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at recaptime-dev/main 92 lines 2.8 kB view raw
1<?php 2 3/** 4 * Iterate over objects by update time in a stable way. This iterator only works 5 * for "normal" Lisk objects: objects with an auto-increment ID and a 6 * dateModified column. 7 */ 8final class PhabricatorFactUpdateIterator extends PhutilBufferedIterator { 9 10 private $cursor; 11 private $object; 12 private $position; 13 private $ignoreUpdatesDuration = 15; 14 15 public function __construct(LiskDAO $object) { 16 $this->object = $object; 17 } 18 19 public function setPosition($position) { 20 $this->position = $position; 21 return $this; 22 } 23 24 protected function didRewind() { 25 $this->cursor = $this->position; 26 } 27 28 protected function getCursorFromObject($object) { 29 if ($object->hasProperty('dateModified')) { 30 return $object->getDateModified().':'.$object->getID(); 31 } else { 32 return $object->getID(); 33 } 34 } 35 36 #[\ReturnTypeWillChange] 37 public function key() { 38 return $this->getCursorFromObject($this->current()); 39 } 40 41 protected function loadPage() { 42 if ($this->object->hasProperty('dateModified')) { 43 if ($this->cursor) { 44 list($after_epoch, $after_id) = explode(':', $this->cursor); 45 } else { 46 $after_epoch = 0; 47 $after_id = 0; 48 } 49 50 // NOTE: We ignore recent updates because once we process an update we'll 51 // never process rows behind it again. We need to read only rows which 52 // we're sure no new rows will be inserted behind. If we read a row that 53 // was updated on the current second, another update later on in this 54 // second could affect an object with a lower ID, and we'd skip that 55 // update. To avoid this, just ignore any rows which have been updated in 56 // the last few seconds. This also reduces the amount of work we need to 57 // do if an object is repeatedly updated; we will just look at the end 58 // state without processing the intermediate states. Finally, this gives 59 // us reasonable protections against clock skew between the machine the 60 // daemon is running on and any machines performing writes. 61 62 $page = $this->object->loadAllWhere( 63 '((dateModified > %d) OR (dateModified = %d AND id > %d)) 64 AND (dateModified < %d - %d) 65 ORDER BY dateModified ASC, id ASC LIMIT %d', 66 $after_epoch, 67 $after_epoch, 68 $after_id, 69 time(), 70 $this->ignoreUpdatesDuration, 71 $this->getPageSize()); 72 } else { 73 if ($this->cursor) { 74 $after_id = $this->cursor; 75 } else { 76 $after_id = 0; 77 } 78 79 $page = $this->object->loadAllWhere( 80 'id > %d ORDER BY id ASC LIMIT %d', 81 $after_id, 82 $this->getPageSize()); 83 } 84 85 if ($page) { 86 $this->cursor = $this->getCursorFromObject(end($page)); 87 } 88 89 return $page; 90 } 91 92}