Suite of AT Protocol TypeScript libraries built on web standards
21
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix memoryrunner cursor increments

+27 -65
+27 -65
sync/runner/memory-runner.ts
··· 6 6 setCursor?: (cursor: number) => Promise<void>; 7 7 concurrency?: number; 8 8 startCursor?: number; 9 - setCursorInterval?: number; // milliseconds between cursor saves, 0 for immediate saves 9 + setCursorInterval?: number; // milliseconds between persisted cursor saves (throttling) 10 10 }; 11 11 12 12 // A queue with arbitrarily many partitions, each processing work sequentially. 13 13 // Partitions are created lazily and taken out of memory when they go idle. 14 14 export class MemoryRunner implements EventRunner { 15 - consecutive: ConsecutiveList<number> = new ConsecutiveList<number>(); 15 + consecutive = new ConsecutiveList<number>(); 16 16 mainQueue: PQueue; 17 - partitions: Map<string, PQueue> = new Map<string, PQueue>(); 17 + partitions = new Map<string, PQueue>(); 18 18 cursor: number | undefined; 19 - private lastSavedCursor: number | undefined; 20 - private saveCursorTimer: number | undefined; 21 - private readonly useInterval: boolean; 22 - private readonly intervalMs: number; 23 - private readonly setCursor: ((cursor: number) => Promise<void>) | undefined; 24 - private pendingSaveCursor: number | undefined; 19 + private lastCursorSave = 0; 20 + private savingCursor = false; 25 21 26 22 constructor(public opts: MemoryRunnerOptions = {}) { 27 23 this.mainQueue = new PQueue({ concurrency: opts.concurrency ?? Infinity }); 28 24 this.cursor = opts.startCursor; 29 - this.setCursor = opts.setCursor; 30 - this.intervalMs = opts.setCursorInterval ?? 0; 31 - this.useInterval = this.intervalMs > 0; 32 25 } 33 26 34 - getCursor(): number | undefined { 27 + getCursor() { 35 28 return this.cursor; 36 29 } 37 30 38 - addTask(partitionId: string, task: () => Promise<void>): Promise<void> { 39 - if (this.mainQueue.isPaused) return Promise.resolve(); 31 + addTask(partitionId: string, task: () => Promise<void>) { 32 + if (this.mainQueue.isPaused) return; 40 33 return this.mainQueue.add(() => { 41 34 return this.getPartition(partitionId).add(task); 42 35 }); ··· 60 53 const latest = item.complete().at(-1); 61 54 if (latest !== undefined) { 62 55 this.cursor = latest; 63 - if (this.setCursor) { 64 - if (this.useInterval) { 65 - this.scheduleIntervalSave(); 56 + const { setCursor, setCursorInterval } = this.opts; 57 + if (setCursor) { 58 + if (!setCursorInterval) { 59 + await setCursor(this.cursor); 60 + this.lastCursorSave = Date.now(); 66 61 } else { 67 - this.setCursor(this.cursor).catch(console.error); 68 - this.lastSavedCursor = this.cursor; 62 + const now = Date.now(); 63 + if ( 64 + now - this.lastCursorSave >= setCursorInterval && 65 + !this.savingCursor 66 + ) { 67 + // Set timestamp & flag before awaiting to avoid multiple saves racing in same interval 68 + this.lastCursorSave = now; 69 + this.savingCursor = true; 70 + try { 71 + await setCursor(this.cursor); 72 + } finally { 73 + this.savingCursor = false; 74 + } 75 + } 69 76 } 70 77 } 71 78 } 72 79 }); 73 80 } 74 81 75 - private scheduleIntervalSave(): void { 76 - // Fast path: if cursor hasn't changed or timer already scheduled for this cursor 77 - if ( 78 - this.cursor === this.lastSavedCursor || 79 - this.cursor === this.pendingSaveCursor 80 - ) return; 81 - 82 - this.pendingSaveCursor = this.cursor; 83 - 84 - if (this.saveCursorTimer) { 85 - clearTimeout(this.saveCursorTimer); 86 - } 87 - 88 - this.saveCursorTimer = setTimeout(() => { 89 - const cursorToSave = this.pendingSaveCursor!; 90 - this.setCursor!(cursorToSave) 91 - .then(() => { 92 - this.lastSavedCursor = cursorToSave; 93 - }) 94 - .catch(console.error); 95 - this.saveCursorTimer = undefined; 96 - this.pendingSaveCursor = undefined; 97 - }, this.intervalMs); 98 - } 99 - 100 82 async processAll() { 101 83 await this.mainQueue.onIdle(); 102 84 } ··· 105 87 this.mainQueue.pause(); 106 88 this.mainQueue.clear(); 107 89 this.partitions.forEach((p) => p.clear()); 108 - 109 - // Clear any pending cursor save timer and perform final save 110 - if (this.saveCursorTimer) { 111 - clearTimeout(this.saveCursorTimer); 112 - this.saveCursorTimer = undefined; 113 - } 114 - 115 - // Perform final cursor save if needed 116 - if ( 117 - this.setCursor && this.cursor !== undefined && 118 - this.cursor !== this.lastSavedCursor 119 - ) { 120 - try { 121 - await this.setCursor(this.cursor); 122 - this.lastSavedCursor = this.cursor; 123 - } catch (error) { 124 - console.error("Failed to save cursor during destroy:", error); 125 - } 126 - } 127 - 128 90 await this.mainQueue.onIdle(); 129 91 } 130 92 }