Suite of AT Protocol TypeScript libraries built on web standards
21
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 154 lines 4.9 kB view raw
1import PQueue from "p-queue"; 2import { ConsecutiveList } from "./consecutive-list.ts"; 3import type { EventRunner } from "./types.ts"; 4import { SyncTelemetry, type SyncTelemetryOptions } from "../telemetry.ts"; 5 6/** 7 * Options for {@link MemoryRunner} 8 * @param setCursor Method to save the current cursor 9 * @param concurrency Maximum amount of concurrent events being processed 10 * @param startCursor Starting Cursor for filling in downtime 11 * @param setCursorInterval Interval on which to run setCursor 12 */ 13export type MemoryRunnerOptions = { 14 setCursor?: (cursor: number) => Promise<void>; 15 concurrency?: number; 16 startCursor?: number; 17 setCursorInterval?: number; // milliseconds between persisted cursor saves (throttling) 18 telemetry?: SyncTelemetryOptions; 19}; 20 21/** A queue with arbitrarily many partitions, each processing work sequentially. 22 * Partitions are created lazily and taken out of memory when they go idle. 23 */ 24export class MemoryRunner implements EventRunner { 25 consecutive: ConsecutiveList<number> = new ConsecutiveList<number>(); 26 mainQueue: PQueue; 27 partitions: Map<string, PQueue> = new Map<string, PQueue>(); 28 cursor: number | undefined; 29 private lastCursorSave = 0; 30 private savingCursor = false; 31 private telemetry: SyncTelemetry; 32 33 constructor(public opts: MemoryRunnerOptions = {}) { 34 this.mainQueue = new PQueue({ concurrency: opts.concurrency ?? Infinity }); 35 this.cursor = opts.startCursor; 36 this.telemetry = new SyncTelemetry(opts.telemetry); 37 } 38 39 getTelemetry(): SyncTelemetry { 40 return this.telemetry; 41 } 42 43 setTelemetry(telemetry: SyncTelemetry): void { 44 this.telemetry = telemetry; 45 } 46 47 getCursor(): number | undefined { 48 return this.cursor; 49 } 50 51 async addTask( 52 partitionId: string, 53 task: () => Promise<void>, 54 ): Promise<void> { 55 if (this.mainQueue.isPaused) return; 56 return await this.mainQueue.add(() => { 57 return this.getPartition(partitionId).add(task); 58 }); 59 } 60 61 private getPartition(partitionId: string) { 62 let partition = this.partitions.get(partitionId); 63 if (!partition) { 64 partition = new PQueue({ concurrency: 1 }); 65 partition.once("idle", () => this.partitions.delete(partitionId)); 66 this.partitions.set(partitionId, partition); 67 } 68 return partition; 69 } 70 71 async trackEvent(did: string, seq: number, handler: () => Promise<void>) { 72 if (this.mainQueue.isPaused) return; 73 const item = this.consecutive.push(seq); 74 await this.addTask(did, async () => { 75 const taskStart = performance.now(); 76 try { 77 await handler(); 78 this.telemetry.recordRunnerTaskDuration( 79 performance.now() - taskStart, 80 "ok", 81 ); 82 } catch (err) { 83 this.telemetry.recordRunnerTaskDuration( 84 performance.now() - taskStart, 85 "error", 86 ); 87 this.telemetry.recordError("runner"); 88 throw err; 89 } 90 const latest = item.complete().at(-1); 91 if (latest !== undefined) { 92 this.cursor = latest; 93 const { setCursor, setCursorInterval } = this.opts; 94 if (setCursor) { 95 if (!setCursorInterval) { 96 const saveStart = performance.now(); 97 try { 98 await setCursor(this.cursor!); 99 this.telemetry.recordCursorSaveDuration( 100 performance.now() - saveStart, 101 "ok", 102 ); 103 } catch (err) { 104 this.telemetry.recordCursorSaveDuration( 105 performance.now() - saveStart, 106 "error", 107 ); 108 this.telemetry.recordError("runner"); 109 throw err; 110 } 111 this.lastCursorSave = Date.now(); 112 } else { 113 const now = Date.now(); 114 if ( 115 now - this.lastCursorSave >= setCursorInterval && 116 !this.savingCursor 117 ) { 118 this.lastCursorSave = now; 119 this.savingCursor = true; 120 const saveStart = performance.now(); 121 try { 122 await setCursor(this.cursor!); 123 this.telemetry.recordCursorSaveDuration( 124 performance.now() - saveStart, 125 "ok", 126 ); 127 } catch (err) { 128 this.telemetry.recordCursorSaveDuration( 129 performance.now() - saveStart, 130 "error", 131 ); 132 this.telemetry.recordError("runner"); 133 throw err; 134 } finally { 135 this.savingCursor = false; 136 } 137 } 138 } 139 } 140 } 141 }); 142 } 143 144 async processAll() { 145 await this.mainQueue.onIdle(); 146 } 147 148 async destroy() { 149 this.mainQueue.pause(); 150 this.mainQueue.clear(); 151 this.partitions.forEach((p) => p.clear()); 152 await this.mainQueue.onIdle(); 153 } 154}