Suite of AT Protocol TypeScript libraries built on web standards
21
fork

Configure Feed

Select the types of activity you want to include in your feed.

setcursorinterval and support for wildcard collections

+96 -16
+34 -14
sync/firehose/index.ts
··· 68 68 private sub: Subscription<RepoEvent>; 69 69 private abortController: AbortController; 70 70 private destoryDefer: Deferrable; 71 + private matchCollection: ((col: string) => boolean) | null = null; 71 72 72 73 constructor(public opts: FirehoseOptions) { 73 74 this.destoryDefer = createDeferrable(); ··· 75 76 if (this.opts.getCursor && this.opts.runner) { 76 77 throw new Error("Must set only `getCursor` or `runner`"); 77 78 } 79 + if (opts.filterCollections) { 80 + const exact = new Set<string>(); 81 + const prefixes: string[] = []; 82 + 83 + for (const pattern of opts.filterCollections) { 84 + if (pattern.endsWith(".*")) { 85 + prefixes.push(pattern.slice(0, -2)); 86 + } else { 87 + exact.add(pattern); 88 + } 89 + } 90 + this.matchCollection = (col: string): boolean => { 91 + if (exact.has(col)) return true; 92 + for (const prefix of prefixes) { 93 + if (col.startsWith(prefix)) return true; 94 + } 95 + return false; 96 + }; 97 + } 78 98 this.sub = new Subscription({ 79 99 ...opts, 80 100 service: opts.service ?? "wss://bsky.network", 81 101 method: "com.atproto.sync.subscribeRepos", 82 102 signal: this.abortController.signal, 83 - getParams: async () => { 103 + getParams: () => { 84 104 const getCursorFn = () => 85 105 this.opts.runner?.getCursor() ?? this.opts.getCursor; 86 106 if (!getCursorFn) { 87 107 return undefined; 88 108 } 89 - const cursor = await getCursorFn(); 109 + const cursor = getCursorFn(); 90 110 return { cursor }; 91 111 }, 92 112 validate: (value: unknown) => { ··· 111 131 const parsed = await this.parseEvt(evt); 112 132 for (const write of parsed) { 113 133 try { 114 - await this.opts.handleEvent(write); 134 + this.opts.handleEvent(write); 115 135 } catch (err) { 116 136 this.opts.onError(new FirehoseHandlerError(err, write)); 117 137 } ··· 139 159 try { 140 160 if (isCommit(evt) && !this.opts.excludeCommit) { 141 161 return this.opts.unauthenticatedCommits 142 - ? await parseCommitUnauthenticated(evt, this.opts.filterCollections) 162 + ? await parseCommitUnauthenticated(evt, this.matchCollection) 143 163 : await parseCommitAuthenticated( 144 164 this.opts.idResolver, 145 165 evt, 146 - this.opts.filterCollections, 166 + this.matchCollection, 147 167 ); 148 168 } else if (isAccount(evt) && !this.opts.excludeAccount) { 149 169 const parsed = parseAccount(evt); ··· 171 191 const parsed = await this.parseEvt(evt); 172 192 for (const write of parsed) { 173 193 try { 174 - await this.opts.handleEvent(write); 194 + this.opts.handleEvent(write); 175 195 } catch (err) { 176 196 this.opts.onError(new FirehoseHandlerError(err, write)); 177 197 } ··· 187 207 export const parseCommitAuthenticated = async ( 188 208 idResolver: IdResolver, 189 209 evt: Commit, 190 - filterCollections?: string[], 210 + matchCollection?: ((col: string) => boolean) | null, 191 211 forceKeyRefresh = false, 192 212 ): Promise<CommitEvt[]> => { 193 213 const did = evt.repo; 194 - const ops = maybeFilterOps(evt.ops, filterCollections); 214 + const ops = maybeFilterOps(evt.ops, matchCollection); 195 215 if (ops.length === 0) { 196 216 return []; 197 217 } ··· 213 233 }); 214 234 } catch (err) { 215 235 if (err instanceof RepoVerificationError && !forceKeyRefresh) { 216 - return parseCommitAuthenticated(idResolver, evt, filterCollections, true); 236 + return parseCommitAuthenticated(idResolver, evt, matchCollection, true); 217 237 } 218 238 throw err; 219 239 } ··· 231 251 232 252 export const parseCommitUnauthenticated = ( 233 253 evt: Commit, 234 - filterCollections?: string[], 254 + matchCollection?: ((col: string) => boolean) | null, 235 255 ): Promise<CommitEvt[]> => { 236 - const ops = maybeFilterOps(evt.ops, filterCollections); 256 + const ops = maybeFilterOps(evt.ops, matchCollection); 237 257 return formatCommitOps(evt, ops); 238 258 }; 239 259 240 260 const maybeFilterOps = ( 241 261 ops: RepoOp[], 242 - filterCollections?: string[], 262 + matchCollection?: ((col: string) => boolean) | null, 243 263 ): RepoOp[] => { 244 - if (!filterCollections) return ops; 264 + if (!matchCollection) return ops; 245 265 return ops.filter((op) => { 246 266 const { collection } = parseDataKey(op.path); 247 - return filterCollections.includes(collection); 267 + return matchCollection(collection); 248 268 }); 249 269 }; 250 270
+62 -2
sync/runner/memory-runner.ts
··· 6 6 setCursor?: (cursor: number) => Promise<void>; 7 7 concurrency?: number; 8 8 startCursor?: number; 9 + setCursorInterval?: number; // milliseconds between cursor saves, 0 for immediate saves 9 10 }; 10 11 11 12 // A queue with arbitrarily many partitions, each processing work sequentially. ··· 15 16 mainQueue: PQueue; 16 17 partitions: Map<string, PQueue> = new Map<string, PQueue>(); 17 18 cursor: number | undefined; 19 + private lastSavedCursor: number | undefined; 20 + private saveCursorTimer: number | undefined; 21 + private readonly useInterval: boolean; 22 + private readonly intervalMs: number; 23 + private readonly setCursor: ((cursor: number) => Promise<void>) | undefined; 24 + private pendingSaveCursor: number | undefined; 18 25 19 26 constructor(public opts: MemoryRunnerOptions = {}) { 20 27 this.mainQueue = new PQueue({ concurrency: opts.concurrency ?? Infinity }); 21 28 this.cursor = opts.startCursor; 29 + this.setCursor = opts.setCursor; 30 + this.intervalMs = opts.setCursorInterval ?? 0; 31 + this.useInterval = this.intervalMs > 0; 22 32 } 23 33 24 34 getCursor(): number | undefined { ··· 50 60 const latest = item.complete().at(-1); 51 61 if (latest !== undefined) { 52 62 this.cursor = latest; 53 - if (this.opts.setCursor) { 54 - await this.opts.setCursor(this.cursor); 63 + if (this.setCursor) { 64 + if (this.useInterval) { 65 + this.scheduleIntervalSave(); 66 + } else { 67 + this.setCursor(this.cursor).catch(console.error); 68 + this.lastSavedCursor = this.cursor; 69 + } 55 70 } 56 71 } 57 72 }); 58 73 } 59 74 75 + private scheduleIntervalSave(): void { 76 + // Fast path: if cursor hasn't changed or timer already scheduled for this cursor 77 + if ( 78 + this.cursor === this.lastSavedCursor || 79 + this.cursor === this.pendingSaveCursor 80 + ) return; 81 + 82 + this.pendingSaveCursor = this.cursor; 83 + 84 + if (this.saveCursorTimer) { 85 + clearTimeout(this.saveCursorTimer); 86 + } 87 + 88 + this.saveCursorTimer = setTimeout(() => { 89 + const cursorToSave = this.pendingSaveCursor!; 90 + this.setCursor!(cursorToSave) 91 + .then(() => { 92 + this.lastSavedCursor = cursorToSave; 93 + }) 94 + .catch(console.error); 95 + this.saveCursorTimer = undefined; 96 + this.pendingSaveCursor = undefined; 97 + }, this.intervalMs); 98 + } 99 + 60 100 async processAll() { 61 101 await this.mainQueue.onIdle(); 62 102 } ··· 65 105 this.mainQueue.pause(); 66 106 this.mainQueue.clear(); 67 107 this.partitions.forEach((p) => p.clear()); 108 + 109 + // Clear any pending cursor save timer and perform final save 110 + if (this.saveCursorTimer) { 111 + clearTimeout(this.saveCursorTimer); 112 + this.saveCursorTimer = undefined; 113 + } 114 + 115 + // Perform final cursor save if needed 116 + if ( 117 + this.setCursor && this.cursor !== undefined && 118 + this.cursor !== this.lastSavedCursor 119 + ) { 120 + try { 121 + await this.setCursor(this.cursor); 122 + this.lastSavedCursor = this.cursor; 123 + } catch (error) { 124 + console.error("Failed to save cursor during destroy:", error); 125 + } 126 + } 127 + 68 128 await this.mainQueue.onIdle(); 69 129 } 70 130 }