@recaptime-dev's working patches + fork for Phorge, a community fork of Phabricator. (Upstream dev and stable branches are at upstream/main and upstream/stable respectively.) hq.recaptime.dev/wiki/Phorge
phorge phabricator
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement clock/trigger infrastructure for scheduling actions

Summary:
Ref T6881. Hopefully, this is the hard part.

This adds a new daemon (the "trigger" daemon) which processes triggers, schedules them, and then executes them at the scheduled time. The design is a little complicated, but has these goals:

- High resistance to race conditions: only the application writes to the trigger table; only the daemon writes to the event table. We won't lose events if someone saves a meeting at the same time as we're sending a reminder out for it.
- Execution guarantees: scheduled events are guaranteed to execute exactly once.
- Support for arbitrarily large queues: the daemon will make progress even if there are millions of triggers in queue. The cost to update the queue is proportional to the number of changes in it; the cost to process the queue is proportional to the number of events to execute.
- Relatively good observability: you can monitor the state of the trigger queue reasonably well from the web UI.
- Modular Infrastructure: this is a very low-level construct that Calendar, Phortune, etc., should be able to build on top of.

It doesn't have this stuff yet:

- Not very robust to bad actions: a misbehaving trigger can stop the queue fairly easily. This is OK for now since we aren't planning to make it part of any other applications for a while. We do still get execute-exaclty-once, but it might not happen for a long time (until someone goes and fixes the queue), when we could theoretically continue executing other events.
- Doesn't start automatically: normal users don't need to run this thing yet so I'm not starting it by default.
- Not super well tested: I've vetted the basics but haven't run real workloads through this yet.
- No sophisticated tooling: I added some basic stuff but it's missing some pieces we'll have to build sooner or later, e.g. `bin/trigger cancel` or whatever.
- Intentionally not realtime: This design puts execution guarantees far above realtime concerns, and will not give you precise event execution at 1-second resolution. I think this is the correct goal to pursue architecturally, and certainly correct for subscriptions and meeting reminders. Events which execute after they have become irrelevant can simply decline to do anything (like a meeting reminder which executes after the meeting is over).

In general, the expectation for applications is:

- When creating an object (like a calendar event) that needs to trigger a scheduled action, write a trigger (and save the PHID if you plan to update it later).
- The daemon will process the event and schedule the action efficiently, in a race-free way.
- If you want to move the action, update the trigger and the daemon will take care of it.
- Your action will eventually dump a task into the task queue, and the task daemons will actually perform it.

Test Plan:
Using a test script like this:

```
<?php

require_once 'scripts/__init_script__.php';

$trigger = id(new PhabricatorWorkerTrigger())
->setAction(
new PhabricatorLogTriggerAction(
array(
'message' => 'test',
)))
->setClock(
new PhabricatorMetronomicTriggerClock(
array(
'period' => 33,
)))
->save();

var_dump($trigger);
```

...I queued triggers and ran the daemon:

- Verified triggers fire;
- verified triggers reschedule;
- verified trigger events show up in the web UI;
- tried different periods;
- added some triggers while the daemon was running;
- examined `phd debug` output for anything suspicious.

It seems to work in trivial use case, at least.

Reviewers: btrahan

Reviewed By: btrahan

Subscribers: epriestley

Maniphest Tasks: T6881

Differential Revision: https://secure.phabricator.com/D11419

+926
+11
resources/sql/autopatches/20150115.trigger.1.sql
··· 1 + CREATE TABLE {$NAMESPACE}_worker.worker_trigger ( 2 + id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, 3 + phid VARBINARY(64) NOT NULL, 4 + triggerVersion INT UNSIGNED NOT NULL, 5 + clockClass VARCHAR(64) NOT NULL COLLATE {$COLLATE_TEXT}, 6 + clockProperties LONGTEXT NOT NULL COLLATE {$COLLATE_TEXT}, 7 + actionClass VARCHAR(64) NOT NULL COLLATE {$COLLATE_TEXT}, 8 + actionProperties LONGTEXT NOT NULL COLLATE {$COLLATE_TEXT}, 9 + UNIQUE KEY `key_phid` (phid), 10 + UNIQUE KEY `key_trigger` (triggerVersion) 11 + ) ENGINE=InnoDB, COLLATE {$COLLATE_TEXT};
+8
resources/sql/autopatches/20150115.trigger.2.sql
··· 1 + CREATE TABLE {$NAMESPACE}_worker.worker_triggerevent ( 2 + id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, 3 + triggerID INT UNSIGNED NOT NULL, 4 + lastEventEpoch INT UNSIGNED, 5 + nextEventEpoch INT UNSIGNED, 6 + UNIQUE KEY `key_trigger` (triggerID), 7 + KEY `key_next` (nextEventEpoch) 8 + ) ENGINE=InnoDB, COLLATE {$COLLATE_TEXT};
+18
src/__phutil_library_map__.php
··· 1873 1873 'PhabricatorListFilterUIExample' => 'applications/uiexample/examples/PhabricatorListFilterUIExample.php', 1874 1874 'PhabricatorLocalDiskFileStorageEngine' => 'applications/files/engine/PhabricatorLocalDiskFileStorageEngine.php', 1875 1875 'PhabricatorLocalTimeTestCase' => 'view/__tests__/PhabricatorLocalTimeTestCase.php', 1876 + 'PhabricatorLogTriggerAction' => 'infrastructure/daemon/workers/action/PhabricatorLogTriggerAction.php', 1876 1877 'PhabricatorLogoutController' => 'applications/auth/controller/PhabricatorLogoutController.php', 1877 1878 'PhabricatorLunarPhasePolicyRule' => 'applications/policy/rule/PhabricatorLunarPhasePolicyRule.php', 1878 1879 'PhabricatorMacroApplication' => 'applications/macro/application/PhabricatorMacroApplication.php', ··· 1968 1969 'PhabricatorMetaMTASchemaSpec' => 'applications/metamta/storage/PhabricatorMetaMTASchemaSpec.php', 1969 1970 'PhabricatorMetaMTASendGridReceiveController' => 'applications/metamta/controller/PhabricatorMetaMTASendGridReceiveController.php', 1970 1971 'PhabricatorMetaMTAWorker' => 'applications/metamta/PhabricatorMetaMTAWorker.php', 1972 + 'PhabricatorMetronomicTriggerClock' => 'infrastructure/daemon/workers/clock/PhabricatorMetronomicTriggerClock.php', 1971 1973 'PhabricatorMultiColumnUIExample' => 'applications/uiexample/examples/PhabricatorMultiColumnUIExample.php', 1972 1974 'PhabricatorMultiFactorSettingsPanel' => 'applications/settings/panel/PhabricatorMultiFactorSettingsPanel.php', 1973 1975 'PhabricatorMustVerifyEmailController' => 'applications/auth/controller/PhabricatorMustVerifyEmailController.php', ··· 1977 1979 'PhabricatorNamedQuery' => 'applications/search/storage/PhabricatorNamedQuery.php', 1978 1980 'PhabricatorNamedQueryQuery' => 'applications/search/query/PhabricatorNamedQueryQuery.php', 1979 1981 'PhabricatorNavigationRemarkupRule' => 'infrastructure/markup/rule/PhabricatorNavigationRemarkupRule.php', 1982 + 'PhabricatorNeverTriggerClock' => 'infrastructure/daemon/workers/clock/PhabricatorNeverTriggerClock.php', 1980 1983 'PhabricatorNotificationAdHocFeedStory' => 'applications/notification/feed/PhabricatorNotificationAdHocFeedStory.php', 1981 1984 'PhabricatorNotificationBuilder' => 'applications/notification/builder/PhabricatorNotificationBuilder.php', 1982 1985 'PhabricatorNotificationClearController' => 'applications/notification/controller/PhabricatorNotificationClearController.php', ··· 2522 2525 'PhabricatorTransformedFile' => 'applications/files/storage/PhabricatorTransformedFile.php', 2523 2526 'PhabricatorTranslation' => 'infrastructure/internationalization/translation/PhabricatorTranslation.php', 2524 2527 'PhabricatorTranslationsConfigOptions' => 'applications/config/option/PhabricatorTranslationsConfigOptions.php', 2528 + 'PhabricatorTriggerAction' => 'infrastructure/daemon/workers/action/PhabricatorTriggerAction.php', 2525 2529 'PhabricatorTriggerClock' => 'infrastructure/daemon/workers/clock/PhabricatorTriggerClock.php', 2526 2530 'PhabricatorTriggerClockTestCase' => 'infrastructure/daemon/workers/clock/__tests__/PhabricatorTriggerClockTestCase.php', 2531 + 'PhabricatorTriggerDaemon' => 'infrastructure/daemon/workers/PhabricatorTriggerDaemon.php', 2527 2532 'PhabricatorTrivialTestCase' => 'infrastructure/testing/__tests__/PhabricatorTrivialTestCase.php', 2528 2533 'PhabricatorTwitchAuthProvider' => 'applications/auth/provider/PhabricatorTwitchAuthProvider.php', 2529 2534 'PhabricatorTwitterAuthProvider' => 'applications/auth/provider/PhabricatorTwitterAuthProvider.php', ··· 2591 2596 'PhabricatorWorkerTaskData' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTaskData.php', 2592 2597 'PhabricatorWorkerTaskDetailController' => 'applications/daemon/controller/PhabricatorWorkerTaskDetailController.php', 2593 2598 'PhabricatorWorkerTestCase' => 'infrastructure/daemon/workers/__tests__/PhabricatorWorkerTestCase.php', 2599 + 'PhabricatorWorkerTrigger' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTrigger.php', 2600 + 'PhabricatorWorkerTriggerEvent' => 'infrastructure/daemon/workers/storage/PhabricatorWorkerTriggerEvent.php', 2601 + 'PhabricatorWorkerTriggerPHIDType' => 'infrastructure/daemon/workers/phid/PhabricatorWorkerTriggerPHIDType.php', 2602 + 'PhabricatorWorkerTriggerQuery' => 'infrastructure/daemon/workers/query/PhabricatorWorkerTriggerQuery.php', 2594 2603 'PhabricatorWorkerYieldException' => 'infrastructure/daemon/workers/exception/PhabricatorWorkerYieldException.php', 2595 2604 'PhabricatorWorkingCopyDiscoveryTestCase' => 'applications/repository/engine/__tests__/PhabricatorWorkingCopyDiscoveryTestCase.php', 2596 2605 'PhabricatorWorkingCopyPullTestCase' => 'applications/repository/engine/__tests__/PhabricatorWorkingCopyPullTestCase.php', ··· 5083 5092 'PhabricatorListFilterUIExample' => 'PhabricatorUIExample', 5084 5093 'PhabricatorLocalDiskFileStorageEngine' => 'PhabricatorFileStorageEngine', 5085 5094 'PhabricatorLocalTimeTestCase' => 'PhabricatorTestCase', 5095 + 'PhabricatorLogTriggerAction' => 'PhabricatorTriggerAction', 5086 5096 'PhabricatorLogoutController' => 'PhabricatorAuthController', 5087 5097 'PhabricatorLunarPhasePolicyRule' => 'PhabricatorPolicyRule', 5088 5098 'PhabricatorMacroApplication' => 'PhabricatorApplication', ··· 5169 5179 'PhabricatorMetaMTASchemaSpec' => 'PhabricatorConfigSchemaSpec', 5170 5180 'PhabricatorMetaMTASendGridReceiveController' => 'PhabricatorMetaMTAController', 5171 5181 'PhabricatorMetaMTAWorker' => 'PhabricatorWorker', 5182 + 'PhabricatorMetronomicTriggerClock' => 'PhabricatorTriggerClock', 5172 5183 'PhabricatorMultiColumnUIExample' => 'PhabricatorUIExample', 5173 5184 'PhabricatorMultiFactorSettingsPanel' => 'PhabricatorSettingsPanel', 5174 5185 'PhabricatorMustVerifyEmailController' => 'PhabricatorAuthController', ··· 5181 5192 ), 5182 5193 'PhabricatorNamedQueryQuery' => 'PhabricatorCursorPagedPolicyAwareQuery', 5183 5194 'PhabricatorNavigationRemarkupRule' => 'PhutilRemarkupRule', 5195 + 'PhabricatorNeverTriggerClock' => 'PhabricatorTriggerClock', 5184 5196 'PhabricatorNotificationAdHocFeedStory' => 'PhabricatorFeedStory', 5185 5197 'PhabricatorNotificationClearController' => 'PhabricatorNotificationController', 5186 5198 'PhabricatorNotificationConfigOptions' => 'PhabricatorApplicationConfigOptions', ··· 5778 5790 'PhabricatorTransactionsApplication' => 'PhabricatorApplication', 5779 5791 'PhabricatorTransformedFile' => 'PhabricatorFileDAO', 5780 5792 'PhabricatorTranslationsConfigOptions' => 'PhabricatorApplicationConfigOptions', 5793 + 'PhabricatorTriggerAction' => 'Phobject', 5781 5794 'PhabricatorTriggerClock' => 'Phobject', 5782 5795 'PhabricatorTriggerClockTestCase' => 'PhabricatorTestCase', 5796 + 'PhabricatorTriggerDaemon' => 'PhabricatorDaemon', 5783 5797 'PhabricatorTrivialTestCase' => 'PhabricatorTestCase', 5784 5798 'PhabricatorTwitchAuthProvider' => 'PhabricatorOAuth2AuthProvider', 5785 5799 'PhabricatorTwitterAuthProvider' => 'PhabricatorOAuth1AuthProvider', ··· 5857 5871 'PhabricatorWorkerTaskData' => 'PhabricatorWorkerDAO', 5858 5872 'PhabricatorWorkerTaskDetailController' => 'PhabricatorDaemonController', 5859 5873 'PhabricatorWorkerTestCase' => 'PhabricatorTestCase', 5874 + 'PhabricatorWorkerTrigger' => 'PhabricatorWorkerDAO', 5875 + 'PhabricatorWorkerTriggerEvent' => 'PhabricatorWorkerDAO', 5876 + 'PhabricatorWorkerTriggerPHIDType' => 'PhabricatorPHIDType', 5877 + 'PhabricatorWorkerTriggerQuery' => 'PhabricatorOffsetPagedQuery', 5860 5878 'PhabricatorWorkerYieldException' => 'Exception', 5861 5879 'PhabricatorWorkingCopyDiscoveryTestCase' => 'PhabricatorWorkingCopyTestCase', 5862 5880 'PhabricatorWorkingCopyPullTestCase' => 'PhabricatorWorkingCopyTestCase',
+56
src/applications/daemon/controller/PhabricatorDaemonConsoleController.php
··· 189 189 ->setTasks($upcoming) 190 190 ->setNoDataString(pht('Task queue is empty.'))); 191 191 192 + $triggers = id(new PhabricatorWorkerTriggerQuery()) 193 + ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION) 194 + ->needEvents(true) 195 + ->setLimit(10) 196 + ->execute(); 197 + 198 + $triggers_table = $this->buildTriggersTable($triggers); 199 + 200 + $triggers_panel = id(new PHUIObjectBoxView()) 201 + ->setHeaderText(pht('Upcoming Triggers')) 202 + ->appendChild($triggers_table); 203 + 192 204 $crumbs = $this->buildApplicationCrumbs(); 193 205 $crumbs->addTextCrumb(pht('Console')); 194 206 ··· 202 214 $queued_panel, 203 215 $leased_panel, 204 216 $upcoming_panel, 217 + $triggers_panel, 205 218 )); 206 219 207 220 return $this->buildApplicationPage( ··· 210 223 'title' => pht('Console'), 211 224 'device' => false, 212 225 )); 226 + } 227 + 228 + private function buildTriggersTable(array $triggers) { 229 + $viewer = $this->getViewer(); 230 + 231 + $rows = array(); 232 + foreach ($triggers as $trigger) { 233 + $event = $trigger->getEvent(); 234 + if ($event) { 235 + $last_epoch = $event->getLastEventEpoch(); 236 + $next_epoch = $event->getNextEventEpoch(); 237 + } else { 238 + $last_epoch = null; 239 + $next_epoch = null; 240 + } 241 + 242 + $rows[] = array( 243 + $trigger->getID(), 244 + $trigger->getClockClass(), 245 + $trigger->getActionClass(), 246 + $last_epoch ? phabricator_datetime($last_epoch, $viewer) : null, 247 + $next_epoch ? phabricator_datetime($next_epoch, $viewer) : null, 248 + ); 249 + } 250 + 251 + return id(new AphrontTableView($rows)) 252 + ->setNoDataString(pht('There are no upcoming event triggers.')) 253 + ->setHeaders( 254 + array( 255 + 'ID', 256 + 'Clock', 257 + 'Action', 258 + 'Last', 259 + 'Next', 260 + )) 261 + ->setColumnClasses( 262 + array( 263 + '', 264 + '', 265 + 'wide', 266 + 'date', 267 + 'date', 268 + )); 213 269 } 214 270 215 271 }
+290
src/infrastructure/daemon/workers/PhabricatorTriggerDaemon.php
··· 1 + <?php 2 + 3 + /** 4 + * Schedule and execute event triggers, which run code at specific times. 5 + */ 6 + final class PhabricatorTriggerDaemon 7 + extends PhabricatorDaemon { 8 + 9 + const COUNTER_VERSION = 'trigger.version'; 10 + const COUNTER_CURSOR = 'trigger.cursor'; 11 + 12 + protected function run() { 13 + 14 + // The trigger daemon is a low-level infrastructure daemon which schedules 15 + // and executes chronological events. Examples include a subscription which 16 + // generates a bill on the 12th of every month, or a reminder email 15 17 + // minutes before a meeting. 18 + 19 + // Only one trigger daemon can run at a time, and very little work should 20 + // happen in the daemon process. In general, triggered events should 21 + // just schedule a task into the normal daemon worker queue and then 22 + // return. This allows the real work to take longer to execute without 23 + // disrupting other triggers. 24 + 25 + // The trigger mechanism guarantees that events will execute exactly once, 26 + // but does not guarantee that they will execute at precisely the specified 27 + // time. Under normal circumstances, they should execute within a minute or 28 + // so of the desired time, so this mechanism can be used for things like 29 + // meeting reminders. 30 + 31 + // If the trigger queue backs up (for example, because it is overwhelmed by 32 + // trigger updates, doesn't run for a while, or a trigger action is written 33 + // inefficiently) or the daemon queue backs up (usually for similar 34 + // reasons), events may execute an arbitrarily long time after they were 35 + // scheduled to execute. In some cases (like billing a subscription) this 36 + // may be desirable; in other cases (like sending a meeting reminder) the 37 + // action may want to check the current time and see if the event is still 38 + // relevant. 39 + 40 + // The trigger daemon works in two phases: 41 + // 42 + // 1. A scheduling phase processes recently updated triggers and 43 + // schedules them for future execution. For example, this phase would 44 + // see that a meeting trigger had been changed recently, determine 45 + // when the reminder for it should execute, and then schedule the 46 + // action to execute at that future date. 47 + // 2. An execution phase runs the actions for any scheduled events which 48 + // are due to execute. 49 + // 50 + // The major goal of this design is to deliver on the guarantee that events 51 + // will execute exactly once. It prevents race conditions in scheduling 52 + // and execution by ensuring there is only one writer for either of these 53 + // phases. Without this separation of responsibilities, web processes 54 + // trying to reschedule events after an update could race with other web 55 + // processes or the daemon. 56 + 57 + do { 58 + $lock = PhabricatorGlobalLock::newLock('trigger'); 59 + 60 + try { 61 + $lock->lock(5); 62 + } catch (PhutilLockException $ex) { 63 + throw new PhutilProxyException( 64 + pht( 65 + 'Another process is holding the trigger lock. Usually, this '. 66 + 'means another copy of the trigger daemon is running elsewhere. '. 67 + 'Multiple processes are not permitted to update triggers '. 68 + 'simultaneously.'), 69 + $ex); 70 + } 71 + 72 + // Run the scheduling phase. This finds updated triggers which we have 73 + // not scheduled yet and schedules them. 74 + $last_version = $this->loadCurrentCursor(); 75 + $head_version = $this->loadCurrentVersion(); 76 + 77 + // The cursor points at the next record to process, so we can only skip 78 + // this step if we're ahead of the version number. 79 + if ($last_version <= $head_version) { 80 + $this->scheduleTriggers($last_version); 81 + } 82 + 83 + // Run the execution phase. This finds events which are due to execute 84 + // and runs them. 85 + $this->executeTriggers(); 86 + 87 + $lock->unlock(); 88 + 89 + $this->sleep($this->getSleepDuration()); 90 + } while (!$this->shouldExit()); 91 + } 92 + 93 + 94 + /** 95 + * Process all of the triggers which have been updated since the last time 96 + * the daemon ran, scheduling them into the event table. 97 + * 98 + * @param int Cursor for the next version update to process. 99 + * @return void 100 + */ 101 + private function scheduleTriggers($cursor) { 102 + $limit = 100; 103 + 104 + $query = id(new PhabricatorWorkerTriggerQuery()) 105 + ->withVersionBetween($cursor, null) 106 + ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_VERSION) 107 + ->needEvents(true) 108 + ->setLimit($limit); 109 + while (true) { 110 + $triggers = $query->execute(); 111 + 112 + foreach ($triggers as $trigger) { 113 + $event = $trigger->getEvent(); 114 + if ($event) { 115 + $last_epoch = $event->getLastEventEpoch(); 116 + } else { 117 + $last_epoch = null; 118 + } 119 + 120 + $next_epoch = $trigger->getNextEventEpoch( 121 + $last_epoch, 122 + $is_reschedule = false); 123 + 124 + $new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger) 125 + ->setLastEventEpoch($last_epoch) 126 + ->setNextEventEpoch($next_epoch); 127 + 128 + $new_event->openTransaction(); 129 + if ($event) { 130 + $event->delete(); 131 + } 132 + 133 + // Always save the new event. Note that we save it even if the next 134 + // epoch is `null`, indicating that it will never fire, because we 135 + // would lose the last epoch information if we delete it. 136 + // 137 + // In particular, some events may want to execute exactly once. 138 + // Retaining the last epoch allows them to do this, even if the 139 + // trigger is updated. 140 + $new_event->save(); 141 + 142 + // Move the cursor forward to make sure we don't reprocess this 143 + // trigger until it is updated again. 144 + $this->updateCursor($trigger->getTriggerVersion() + 1); 145 + $new_event->saveTransaction(); 146 + } 147 + 148 + // If we saw fewer than a full page of updated triggers, we're caught 149 + // up, so we can move on to the execution phase. 150 + if (count($triggers) < $limit) { 151 + break; 152 + } 153 + 154 + // Otherwise, skip past the stuff we just processed and grab another 155 + // page of updated triggers. 156 + $min = last($triggers)->getTriggerVersion() + 1; 157 + $query->withVersionBetween($min, null); 158 + 159 + $this->stillWorking(); 160 + } 161 + } 162 + 163 + 164 + /** 165 + * Run scheduled event triggers which are due for execution. 166 + * 167 + * @return void 168 + */ 169 + private function executeTriggers() { 170 + 171 + // We run only a limited number of triggers before ending the execution 172 + // phase. If we ran until exhaustion, we could end up executing very 173 + // out-of-date triggers if there was a long backlog: trigger changes 174 + // during this phase are not reflected in the event table until we run 175 + // another scheduling phase. 176 + 177 + // If we exit this phase with triggers still ready to execute we'll 178 + // jump back into the scheduling phase immediately, so this just makes 179 + // sure we don't spend an unreasonably long amount of time without 180 + // processing trigger updates and doing rescheduling. 181 + 182 + $limit = 100; 183 + $now = PhabricatorTime::getNow(); 184 + 185 + $triggers = id(new PhabricatorWorkerTriggerQuery()) 186 + ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION) 187 + ->withNextEventBetween(null, $now) 188 + ->needEvents(true) 189 + ->setLimit($limit) 190 + ->execute(); 191 + foreach ($triggers as $trigger) { 192 + $event = $trigger->getEvent(); 193 + 194 + // Execute the trigger action. 195 + $trigger->executeTrigger( 196 + $event->getLastEventEpoch(), 197 + $event->getNextEventEpoch()); 198 + 199 + // Now that we've executed the trigger, the current trigger epoch is 200 + // going to become the last epoch. 201 + $last_epoch = $event->getNextEventEpoch(); 202 + 203 + // If this is a recurring trigger, give it an opportunity to reschedule. 204 + $reschedule_epoch = $trigger->getNextEventEpoch( 205 + $last_epoch, 206 + $is_reschedule = true); 207 + 208 + // Don't reschedule events unless the next occurrence is in the future. 209 + if (($reschedule_epoch !== null) && 210 + ($last_epoch !== null) && 211 + ($reschedule_epoch <= $last_epoch)) { 212 + throw new Exception( 213 + pht( 214 + 'Trigger is attempting to perform a routine reschedule where '. 215 + 'the next event (at %s) does not occur after the previous event '. 216 + '(at %s). Routine reschedules must strictly move event triggers '. 217 + 'forward through time to avoid executing a trigger an infinite '. 218 + 'number of times instantaneously.', 219 + $reschedule_epoch, 220 + $last_epoch)); 221 + } 222 + 223 + $new_event = PhabricatorWorkerTriggerEvent::initializeNewEvent($trigger) 224 + ->setLastEventEpoch($last_epoch) 225 + ->setNextEventEpoch($reschedule_epoch); 226 + 227 + $event->openTransaction(); 228 + // Remove the event we just processed. 229 + $event->delete(); 230 + 231 + // See note in the scheduling phase about this; we save the new event 232 + // even if the next epoch is `null`. 233 + $new_event->save(); 234 + $event->saveTransaction(); 235 + } 236 + } 237 + 238 + 239 + /** 240 + * Get the number of seconds to sleep for before starting the next scheduling 241 + * phase. 242 + * 243 + * If no events are scheduled soon, we'll sleep for 60 seconds. Otherwise, 244 + * we'll sleep until the next scheduled event. 245 + * 246 + * @return int Number of seconds to sleep for. 247 + */ 248 + private function getSleepDuration() { 249 + $sleep = 60; 250 + 251 + $next_triggers = id(new PhabricatorWorkerTriggerQuery()) 252 + ->setOrder(PhabricatorWorkerTriggerQuery::ORDER_EXECUTION) 253 + ->setLimit(1) 254 + ->needEvents(true) 255 + ->execute(); 256 + if ($next_triggers) { 257 + $next_trigger = head($next_triggers); 258 + $next_epoch = $next_trigger->getEvent()->getNextEventEpoch(); 259 + $until = max(0, $next_epoch - PhabricatorTime::getNow()); 260 + $sleep = min($sleep, $until); 261 + } 262 + 263 + return $sleep; 264 + } 265 + 266 + 267 + /* -( Counters )----------------------------------------------------------- */ 268 + 269 + 270 + private function loadCurrentCursor() { 271 + return $this->loadCurrentCounter(self::COUNTER_CURSOR); 272 + } 273 + 274 + private function loadCurrentVersion() { 275 + return $this->loadCurrentCounter(self::COUNTER_VERSION); 276 + } 277 + 278 + private function updateCursor($value) { 279 + LiskDAO::overwriteCounterValue( 280 + id(new PhabricatorWorkerTrigger())->establishConnection('w'), 281 + self::COUNTER_CURSOR, 282 + $value); 283 + } 284 + 285 + private function loadCurrentCounter($counter_name) { 286 + return (int)LiskDAO::loadCurrentCounterValue( 287 + id(new PhabricatorWorkerTrigger())->establishConnection('w'), 288 + $counter_name); 289 + } 290 + }
+30
src/infrastructure/daemon/workers/action/PhabricatorLogTriggerAction.php
··· 1 + <?php 2 + 3 + /** 4 + * Trivial action which logs a message. 5 + * 6 + * This action is primarily useful for testing triggers. 7 + */ 8 + final class PhabricatorLogTriggerAction 9 + extends PhabricatorTriggerAction { 10 + 11 + public function validateProperties(array $properties) { 12 + PhutilTypeSpec::checkMap( 13 + $properties, 14 + array( 15 + 'message' => 'string', 16 + )); 17 + } 18 + 19 + public function execute($last_epoch, $this_epoch) { 20 + $message = pht( 21 + '(%s -> %s @ %s) %s', 22 + $last_epoch ? date('Y-m-d g:i:s A', $last_epoch) : 'null', 23 + date('Y-m-d g:i:s A', $this_epoch), 24 + date('Y-m-d g:i:s A', PhabricatorTime::getNow()), 25 + $this->getProperty('message')); 26 + 27 + phlog($message); 28 + } 29 + 30 + }
+66
src/infrastructure/daemon/workers/action/PhabricatorTriggerAction.php
··· 1 + <?php 2 + 3 + /** 4 + * A trigger action reacts to a scheduled event. 5 + */ 6 + abstract class PhabricatorTriggerAction extends Phobject { 7 + 8 + private $properties; 9 + 10 + public function __construct(array $properties) { 11 + $this->validateProperties($properties); 12 + $this->properties = $properties; 13 + } 14 + 15 + public function getProperties() { 16 + return $this->properties; 17 + } 18 + 19 + public function getProperty($key, $default = null) { 20 + return idx($this->properties, $key, $default); 21 + } 22 + 23 + 24 + /** 25 + * Validate action configuration. 26 + * 27 + * @param map<string, wild> Map of action properties. 28 + * @return void 29 + */ 30 + abstract public function validateProperties(array $properties); 31 + 32 + 33 + /** 34 + * Execute this action. 35 + * 36 + * IMPORTANT: Trigger actions must execute quickly! 37 + * 38 + * In most cases, trigger actions should queue a worker task and then exit. 39 + * The actual trigger execution occurs in a locked section in the trigger 40 + * daemon and blocks all other triggers. By queueing a task instead of 41 + * performing processing directly, triggers can execute more involved actions 42 + * without blocking other triggers. 43 + * 44 + * An action may trigger a long time after it is scheduled. For example, 45 + * a meeting reminder may be scheduled at 9:45 AM, but the action may not 46 + * execute until later (for example, because the server was down for 47 + * maintenance). You can detect cases like this by comparing `$this_epoch` 48 + * (which holds the time the event was scheduled to execute at) to 49 + * `PhabricatorTime::getNow()` (which returns the current time). In the 50 + * case of a meeting reminder, you may want to ignore the action if it 51 + * executes too late to be useful (for example, after a meeting is over). 52 + * 53 + * Because actions should normally queue a task and there may be a second, 54 + * arbitrarily long delay between trigger execution and task execution, it 55 + * may be simplest to pass the trigger time to the task and then make the 56 + * decision to discard the action there. 57 + * 58 + * @param int|null Last time the event occurred, or null if it has never 59 + * triggered before. 60 + * @param int The scheduled time for the current action. This may be 61 + * significantly different from the current time. 62 + * @return void 63 + */ 64 + abstract public function execute($last_epoch, $this_epoch); 65 + 66 + }
+33
src/infrastructure/daemon/workers/clock/PhabricatorMetronomicTriggerClock.php
··· 1 + <?php 2 + 3 + /** 4 + * Triggers an event repeatedly, delaying a fixed number of seconds between 5 + * triggers. 6 + * 7 + * For example, this clock can trigger an event every 30 seconds. 8 + */ 9 + final class PhabricatorMetronomicTriggerClock 10 + extends PhabricatorTriggerClock { 11 + 12 + public function validateProperties(array $properties) { 13 + PhutilTypeSpec::checkMap( 14 + $properties, 15 + array( 16 + 'period' => 'int', 17 + )); 18 + } 19 + 20 + public function getNextEventEpoch($last_epoch, $is_reschedule) { 21 + $period = $this->getProperty('period'); 22 + 23 + if ($last_epoch) { 24 + $next = $last_epoch + $period; 25 + $next = max($next, $last_epoch + 1); 26 + } else { 27 + $next = PhabricatorTime::getNow() + $period; 28 + } 29 + 30 + return $next; 31 + } 32 + 33 + }
+21
src/infrastructure/daemon/workers/clock/PhabricatorNeverTriggerClock.php
··· 1 + <?php 2 + 3 + /** 4 + * Never triggers an event. 5 + * 6 + * This clock can be used for testing, or to cancel events. 7 + */ 8 + final class PhabricatorNeverTriggerClock 9 + extends PhabricatorTriggerClock { 10 + 11 + public function validateProperties(array $properties) { 12 + PhutilTypeSpec::checkMap( 13 + $properties, 14 + array()); 15 + } 16 + 17 + public function getNextEventEpoch($last_epoch, $is_reschedule) { 18 + return null; 19 + } 20 + 21 + }
+9
src/infrastructure/daemon/workers/clock/__tests__/PhabricatorTriggerClockTestCase.php
··· 21 21 pht('Should trigger only once.')); 22 22 } 23 23 24 + public function testNeverTriggerClock() { 25 + $clock = new PhabricatorNeverTriggerClock(array()); 26 + 27 + $this->assertEqual( 28 + null, 29 + $clock->getNextEventEpoch(null, false), 30 + pht('Should never trigger.')); 31 + } 32 + 24 33 public function testSubscriptionTriggerClock() { 25 34 $start = strtotime('2014-01-31 2:34:56 UTC'); 26 35
+41
src/infrastructure/daemon/workers/phid/PhabricatorWorkerTriggerPHIDType.php
··· 1 + <?php 2 + 3 + final class PhabricatorWorkerTriggerPHIDType extends PhabricatorPHIDType { 4 + 5 + const TYPECONST = 'TRIG'; 6 + 7 + public function getTypeName() { 8 + return pht('Trigger'); 9 + } 10 + 11 + public function newObject() { 12 + return new PhabricatorWorkerTriggerPHIDType(); 13 + } 14 + 15 + protected function buildQueryForObjects( 16 + PhabricatorObjectQuery $query, 17 + array $phids) { 18 + 19 + // TODO: Maybe straighten this out eventually, but these aren't policy 20 + // objects and don't have an applicable query which we can return here. 21 + // Since we should never call this normally, just leave it stubbed for 22 + // now. 23 + 24 + throw new PhutilMethodNotImplementedException(); 25 + } 26 + 27 + public function loadHandles( 28 + PhabricatorHandleQuery $query, 29 + array $handles, 30 + array $objects) { 31 + 32 + foreach ($handles as $phid => $handle) { 33 + $trigger = $objects[$phid]; 34 + 35 + $id = $trigger->getID(); 36 + 37 + $handle->setName(pht('Trigger %d', $id)); 38 + } 39 + } 40 + 41 + }
+178
src/infrastructure/daemon/workers/query/PhabricatorWorkerTriggerQuery.php
··· 1 + <?php 2 + 3 + final class PhabricatorWorkerTriggerQuery 4 + extends PhabricatorOffsetPagedQuery { 5 + 6 + const ORDER_EXECUTION = 'execution'; 7 + const ORDER_VERSION = 'version'; 8 + 9 + private $versionMin; 10 + private $versionMax; 11 + private $nextEpochMin; 12 + private $nextEpochMax; 13 + 14 + private $needEvents; 15 + private $order = self::ORDER_EXECUTION; 16 + 17 + public function withVersionBetween($min, $max) { 18 + $this->versionMin = $min; 19 + $this->versionMax = $max; 20 + return $this; 21 + } 22 + 23 + public function withNextEventBetween($min, $max) { 24 + $this->nextEpochMin = $min; 25 + $this->nextEpochMax = $max; 26 + return $this; 27 + } 28 + 29 + public function needEvents($need_events) { 30 + $this->needEvents = $need_events; 31 + return $this; 32 + } 33 + 34 + public function setOrder($order) { 35 + $this->order = $order; 36 + return $this; 37 + } 38 + 39 + public function execute() { 40 + $task_table = new PhabricatorWorkerTrigger(); 41 + 42 + $conn_r = $task_table->establishConnection('r'); 43 + 44 + $rows = queryfx_all( 45 + $conn_r, 46 + 'SELECT t.* FROM %T t %Q %Q %Q %Q', 47 + $task_table->getTableName(), 48 + $this->buildJoinClause($conn_r), 49 + $this->buildWhereClause($conn_r), 50 + $this->buildOrderClause($conn_r), 51 + $this->buildLimitClause($conn_r)); 52 + 53 + $triggers = $task_table->loadAllFromArray($rows); 54 + 55 + if ($triggers) { 56 + if ($this->needEvents) { 57 + $ids = mpull($triggers, 'getID'); 58 + 59 + $events = id(new PhabricatorWorkerTriggerEvent())->loadAllWhere( 60 + 'triggerID IN (%Ld)', 61 + $ids); 62 + $events = mpull($events, null, 'getTriggerID'); 63 + 64 + foreach ($triggers as $key => $trigger) { 65 + $event = idx($events, $trigger->getID()); 66 + $trigger->attachEvent($event); 67 + } 68 + } 69 + 70 + foreach ($triggers as $key => $trigger) { 71 + $clock_class = $trigger->getClockClass(); 72 + if (!is_subclass_of($clock_class, 'PhabricatorTriggerClock')) { 73 + unset($triggers[$key]); 74 + continue; 75 + } 76 + 77 + try { 78 + $argv = array($trigger->getClockProperties()); 79 + $clock = newv($clock_class, $argv); 80 + } catch (Exception $ex) { 81 + unset($triggers[$key]); 82 + continue; 83 + } 84 + 85 + $trigger->attachClock($clock); 86 + } 87 + 88 + 89 + foreach ($triggers as $key => $trigger) { 90 + $action_class = $trigger->getActionClass(); 91 + if (!is_subclass_of($action_class, 'PhabricatorTriggerAction')) { 92 + unset($triggers[$key]); 93 + continue; 94 + } 95 + 96 + try { 97 + $argv = array($trigger->getActionProperties()); 98 + $action = newv($action_class, $argv); 99 + } catch (Exception $ex) { 100 + unset($triggers[$key]); 101 + continue; 102 + } 103 + 104 + $trigger->attachAction($action); 105 + } 106 + } 107 + 108 + return $triggers; 109 + } 110 + 111 + private function buildJoinClause(AphrontDatabaseConnection $conn_r) { 112 + $joins = array(); 113 + 114 + if (($this->nextEpochMin !== null) || 115 + ($this->nextEpochMax !== null) || 116 + ($this->order == PhabricatorWorkerTriggerQuery::ORDER_EXECUTION)) { 117 + $joins[] = qsprintf( 118 + $conn_r, 119 + 'JOIN %T e ON e.triggerID = t.id', 120 + id(new PhabricatorWorkerTriggerEvent())->getTableName()); 121 + } 122 + 123 + return implode(' ', $joins); 124 + } 125 + 126 + private function buildWhereClause(AphrontDatabaseConnection $conn_r) { 127 + $where = array(); 128 + 129 + if ($this->versionMin !== null) { 130 + $where[] = qsprintf( 131 + $conn_r, 132 + 't.triggerVersion >= %d', 133 + $this->versionMin); 134 + } 135 + 136 + if ($this->versionMax !== null) { 137 + $where[] = qsprintf( 138 + $conn_r, 139 + 't.triggerVersion <= %d', 140 + $this->versionMax); 141 + } 142 + 143 + if ($this->nextEpochMin !== null) { 144 + $where[] = qsprintf( 145 + $conn_r, 146 + 'e.nextEventEpoch >= %d', 147 + $this->nextEpochMin); 148 + } 149 + 150 + if ($this->nextEpochMax !== null) { 151 + $where[] = qsprintf( 152 + $conn_r, 153 + 'e.nextEventEpoch <= %d', 154 + $this->nextEpochMax); 155 + } 156 + 157 + return $this->formatWhereClause($where); 158 + } 159 + 160 + private function buildOrderClause(AphrontDatabaseConnection $conn_r) { 161 + switch ($this->order) { 162 + case self::ORDER_EXECUTION: 163 + return qsprintf( 164 + $conn_r, 165 + 'ORDER BY e.nextEventEpoch ASC, e.id ASC'); 166 + case self::ORDER_VERSION: 167 + return qsprintf( 168 + $conn_r, 169 + 'ORDER BY t.triggerVersion ASC'); 170 + default: 171 + throw new Exception( 172 + pht( 173 + 'Unsupported order "%s".', 174 + $this->order)); 175 + } 176 + } 177 + 178 + }
+130
src/infrastructure/daemon/workers/storage/PhabricatorWorkerTrigger.php
··· 1 + <?php 2 + 3 + final class PhabricatorWorkerTrigger 4 + extends PhabricatorWorkerDAO { 5 + 6 + protected $triggerVersion; 7 + protected $clockClass; 8 + protected $clockProperties; 9 + protected $actionClass; 10 + protected $actionProperties; 11 + 12 + private $action = self::ATTACHABLE; 13 + private $clock = self::ATTACHABLE; 14 + private $event = self::ATTACHABLE; 15 + 16 + protected function getConfiguration() { 17 + return array( 18 + self::CONFIG_TIMESTAMPS => false, 19 + self::CONFIG_AUX_PHID => true, 20 + self::CONFIG_SERIALIZATION => array( 21 + 'clockProperties' => self::SERIALIZATION_JSON, 22 + 'actionProperties' => self::SERIALIZATION_JSON, 23 + ), 24 + self::CONFIG_COLUMN_SCHEMA => array( 25 + 'triggerVersion' => 'uint32', 26 + 'clockClass' => 'text64', 27 + 'actionClass' => 'text64', 28 + ), 29 + self::CONFIG_KEY_SCHEMA => array( 30 + 'key_trigger' => array( 31 + 'columns' => array('triggerVersion'), 32 + 'unique' => true, 33 + ), 34 + ), 35 + ) + parent::getConfiguration(); 36 + } 37 + 38 + public function save() { 39 + $conn_w = $this->establishConnection('w'); 40 + 41 + $this->openTransaction(); 42 + $next_version = LiskDAO::loadNextCounterValue( 43 + $conn_w, 44 + PhabricatorTriggerDaemon::COUNTER_VERSION); 45 + $this->setTriggerVersion($next_version); 46 + 47 + $result = parent::save(); 48 + $this->saveTransaction(); 49 + 50 + return $this; 51 + } 52 + 53 + public function generatePHID() { 54 + return PhabricatorPHID::generateNewPHID( 55 + PhabricatorWorkerTriggerPHIDType::TYPECONST); 56 + } 57 + 58 + /** 59 + * Return the next time this trigger should execute. 60 + * 61 + * This method can be called either after the daemon executed the trigger 62 + * successfully (giving the trigger an opportunity to reschedule itself 63 + * into the future, if it is a recurring event) or after the trigger itself 64 + * is changed (usually because of an application edit). The `$is_reschedule` 65 + * parameter distinguishes between these cases. 66 + * 67 + * @param int|null Epoch of the most recent successful event execution. 68 + * @param bool `true` if we're trying to reschedule the event after 69 + * execution; `false` if this is in response to a trigger update. 70 + * @return int|null Return an epoch to schedule the next event execution, 71 + * or `null` to stop the event from executing again. 72 + */ 73 + public function getNextEventEpoch($last_epoch, $is_reschedule) { 74 + return $this->getClock()->getNextEventEpoch($last_epoch, $is_reschedule); 75 + } 76 + 77 + 78 + /** 79 + * Execute the event. 80 + * 81 + * @param int|null Epoch of previous execution, or null if this is the first 82 + * execution. 83 + * @param int Scheduled epoch of this execution. This may not be the same 84 + * as the current time. 85 + * @return void 86 + */ 87 + public function executeTrigger($last_event, $this_event) { 88 + return $this->getAction()->execute($last_event, $this_event); 89 + } 90 + 91 + public function getEvent() { 92 + return $this->assertAttached($this->event); 93 + } 94 + 95 + public function attachEvent(PhabricatorWorkerTriggerEvent $event = null) { 96 + $this->event = $event; 97 + return $this; 98 + } 99 + 100 + public function setAction(PhabricatorTriggerAction $action) { 101 + $this->actionClass = get_class($action); 102 + $this->actionProperties = $action->getProperties(); 103 + return $this->attachAction($action); 104 + } 105 + 106 + public function getAction() { 107 + return $this->assertAttached($this->action); 108 + } 109 + 110 + public function attachAction(PhabricatorTriggerAction $action) { 111 + $this->action = $action; 112 + return $this; 113 + } 114 + 115 + public function setClock(PhabricatorTriggerClock $clock) { 116 + $this->clockClass = get_class($clock); 117 + $this->clockProperties = $clock->getProperties(); 118 + return $this->attachClock($clock); 119 + } 120 + 121 + public function getClock() { 122 + return $this->assertAttached($this->clock); 123 + } 124 + 125 + public function attachClock(PhabricatorTriggerClock $clock) { 126 + $this->clock = $clock; 127 + return $this; 128 + } 129 + 130 + }
+35
src/infrastructure/daemon/workers/storage/PhabricatorWorkerTriggerEvent.php
··· 1 + <?php 2 + 3 + final class PhabricatorWorkerTriggerEvent 4 + extends PhabricatorWorkerDAO { 5 + 6 + protected $triggerID; 7 + protected $lastEventEpoch; 8 + protected $nextEventEpoch; 9 + 10 + protected function getConfiguration() { 11 + return array( 12 + self::CONFIG_TIMESTAMPS => false, 13 + self::CONFIG_COLUMN_SCHEMA => array( 14 + 'lastEventEpoch' => 'epoch?', 15 + 'nextEventEpoch' => 'epoch?', 16 + ), 17 + self::CONFIG_KEY_SCHEMA => array( 18 + 'key_trigger' => array( 19 + 'columns' => array('triggerID'), 20 + 'unique' => true, 21 + ), 22 + 'key_next' => array( 23 + 'columns' => array('nextEventEpoch'), 24 + ), 25 + ), 26 + ) + parent::getConfiguration(); 27 + } 28 + 29 + public static function initializeNewEvent(PhabricatorWorkerTrigger $trigger) { 30 + $event = new PhabricatorWorkerTriggerEvent(); 31 + $event->setTriggerID($trigger->getID()); 32 + return $event; 33 + } 34 + 35 + }