@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.)
hq.recaptime.dev/wiki/Phorge
phorge
phabricator
1<?php
2
3/**
4 * Iterate over objects by update time in a stable way. This iterator only works
5 * for "normal" Lisk objects: objects with an auto-increment ID and a
6 * dateModified column.
7 */
8final class PhabricatorFactUpdateIterator extends PhutilBufferedIterator {
9
10 private $cursor;
11 private $object;
12 private $position;
13 private $ignoreUpdatesDuration = 15;
14
15 public function __construct(LiskDAO $object) {
16 $this->object = $object;
17 }
18
19 public function setPosition($position) {
20 $this->position = $position;
21 return $this;
22 }
23
24 protected function didRewind() {
25 $this->cursor = $this->position;
26 }
27
28 protected function getCursorFromObject($object) {
29 if ($object->hasProperty('dateModified')) {
30 return $object->getDateModified().':'.$object->getID();
31 } else {
32 return $object->getID();
33 }
34 }
35
36 #[\ReturnTypeWillChange]
37 public function key() {
38 return $this->getCursorFromObject($this->current());
39 }
40
41 protected function loadPage() {
42 if ($this->object->hasProperty('dateModified')) {
43 if ($this->cursor) {
44 list($after_epoch, $after_id) = explode(':', $this->cursor);
45 } else {
46 $after_epoch = 0;
47 $after_id = 0;
48 }
49
50 // NOTE: We ignore recent updates because once we process an update we'll
51 // never process rows behind it again. We need to read only rows which
52 // we're sure no new rows will be inserted behind. If we read a row that
53 // was updated on the current second, another update later on in this
54 // second could affect an object with a lower ID, and we'd skip that
55 // update. To avoid this, just ignore any rows which have been updated in
56 // the last few seconds. This also reduces the amount of work we need to
57 // do if an object is repeatedly updated; we will just look at the end
58 // state without processing the intermediate states. Finally, this gives
59 // us reasonable protections against clock skew between the machine the
60 // daemon is running on and any machines performing writes.
61
62 $page = $this->object->loadAllWhere(
63 '((dateModified > %d) OR (dateModified = %d AND id > %d))
64 AND (dateModified < %d - %d)
65 ORDER BY dateModified ASC, id ASC LIMIT %d',
66 $after_epoch,
67 $after_epoch,
68 $after_id,
69 time(),
70 $this->ignoreUpdatesDuration,
71 $this->getPageSize());
72 } else {
73 if ($this->cursor) {
74 $after_id = $this->cursor;
75 } else {
76 $after_id = 0;
77 }
78
79 $page = $this->object->loadAllWhere(
80 'id > %d ORDER BY id ASC LIMIT %d',
81 $after_id,
82 $this->getPageSize());
83 }
84
85 if ($page) {
86 $this->cursor = $this->getCursorFromObject(end($page));
87 }
88
89 return $page;
90 }
91
92}