Suite of AT Protocol TypeScript libraries built on web standards
1import PQueue from "p-queue";
2import { ConsecutiveList } from "./consecutive-list.ts";
3import type { EventRunner } from "./types.ts";
4import { SyncTelemetry, type SyncTelemetryOptions } from "../telemetry.ts";
5
6/**
7 * Options for {@link MemoryRunner}
8 * @param setCursor Method to save the current cursor
9 * @param concurrency Maximum amount of concurrent events being processed
10 * @param startCursor Starting Cursor for filling in downtime
11 * @param setCursorInterval Interval on which to run setCursor
12 */
13export type MemoryRunnerOptions = {
14 setCursor?: (cursor: number) => Promise<void>;
15 concurrency?: number;
16 startCursor?: number;
17 setCursorInterval?: number; // milliseconds between persisted cursor saves (throttling)
18 telemetry?: SyncTelemetryOptions;
19};
20
21/** A queue with arbitrarily many partitions, each processing work sequentially.
22 * Partitions are created lazily and taken out of memory when they go idle.
23 */
24export class MemoryRunner implements EventRunner {
25 consecutive: ConsecutiveList<number> = new ConsecutiveList<number>();
26 mainQueue: PQueue;
27 partitions: Map<string, PQueue> = new Map<string, PQueue>();
28 cursor: number | undefined;
29 private lastCursorSave = 0;
30 private savingCursor = false;
31 private telemetry: SyncTelemetry;
32
33 constructor(public opts: MemoryRunnerOptions = {}) {
34 this.mainQueue = new PQueue({ concurrency: opts.concurrency ?? Infinity });
35 this.cursor = opts.startCursor;
36 this.telemetry = new SyncTelemetry(opts.telemetry);
37 }
38
39 getTelemetry(): SyncTelemetry {
40 return this.telemetry;
41 }
42
43 setTelemetry(telemetry: SyncTelemetry): void {
44 this.telemetry = telemetry;
45 }
46
47 getCursor(): number | undefined {
48 return this.cursor;
49 }
50
51 async addTask(
52 partitionId: string,
53 task: () => Promise<void>,
54 ): Promise<void> {
55 if (this.mainQueue.isPaused) return;
56 return await this.mainQueue.add(() => {
57 return this.getPartition(partitionId).add(task);
58 });
59 }
60
61 private getPartition(partitionId: string) {
62 let partition = this.partitions.get(partitionId);
63 if (!partition) {
64 partition = new PQueue({ concurrency: 1 });
65 partition.once("idle", () => this.partitions.delete(partitionId));
66 this.partitions.set(partitionId, partition);
67 }
68 return partition;
69 }
70
71 async trackEvent(did: string, seq: number, handler: () => Promise<void>) {
72 if (this.mainQueue.isPaused) return;
73 const item = this.consecutive.push(seq);
74 await this.addTask(did, async () => {
75 const taskStart = performance.now();
76 try {
77 await handler();
78 this.telemetry.recordRunnerTaskDuration(
79 performance.now() - taskStart,
80 "ok",
81 );
82 } catch (err) {
83 this.telemetry.recordRunnerTaskDuration(
84 performance.now() - taskStart,
85 "error",
86 );
87 this.telemetry.recordError("runner");
88 throw err;
89 }
90 const latest = item.complete().at(-1);
91 if (latest !== undefined) {
92 this.cursor = latest;
93 const { setCursor, setCursorInterval } = this.opts;
94 if (setCursor) {
95 if (!setCursorInterval) {
96 const saveStart = performance.now();
97 try {
98 await setCursor(this.cursor!);
99 this.telemetry.recordCursorSaveDuration(
100 performance.now() - saveStart,
101 "ok",
102 );
103 } catch (err) {
104 this.telemetry.recordCursorSaveDuration(
105 performance.now() - saveStart,
106 "error",
107 );
108 this.telemetry.recordError("runner");
109 throw err;
110 }
111 this.lastCursorSave = Date.now();
112 } else {
113 const now = Date.now();
114 if (
115 now - this.lastCursorSave >= setCursorInterval &&
116 !this.savingCursor
117 ) {
118 this.lastCursorSave = now;
119 this.savingCursor = true;
120 const saveStart = performance.now();
121 try {
122 await setCursor(this.cursor!);
123 this.telemetry.recordCursorSaveDuration(
124 performance.now() - saveStart,
125 "ok",
126 );
127 } catch (err) {
128 this.telemetry.recordCursorSaveDuration(
129 performance.now() - saveStart,
130 "error",
131 );
132 this.telemetry.recordError("runner");
133 throw err;
134 } finally {
135 this.savingCursor = false;
136 }
137 }
138 }
139 }
140 }
141 });
142 }
143
144 async processAll() {
145 await this.mainQueue.onIdle();
146 }
147
148 async destroy() {
149 this.mainQueue.pause();
150 this.mainQueue.clear();
151 this.partitions.forEach((p) => p.clear());
152 await this.mainQueue.onIdle();
153 }
154}