[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
3
fork

Configure Feed

Select the types of activity you want to include in your feed.

firehose saving cursor (#38)

* firehose saving cursor

* Update utils/memory-runner.ts

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

authored by

Roscoe Rubin-Rottenberg
Copilot
and committed by
GitHub
f8a44e85 7327799f

+195 -14
+30 -6
data-plane/server/subscription.ts
··· 17 17 private firehoseRunning = false; 18 18 19 19 constructor( 20 - public opts: { service: string; db: Database; idResolver: IdResolver }, 20 + public opts: { 21 + service: string; 22 + db: Database; 23 + idResolver: IdResolver; 24 + startCursor?: number; 25 + }, 21 26 ) { 22 - const { service, db, idResolver } = opts; 27 + const { service, db, idResolver, startCursor } = opts; 23 28 this.logger = getLogger(["appview", "subscription"]); 24 29 this.background = new BackgroundQueue(db, this.logger); 25 30 this.indexingSvc = new IndexingService( ··· 34 39 service, 35 40 indexingSvc: this.indexingSvc, 36 41 logger: this.logger, 42 + db, 43 + startCursor, 37 44 }); 38 45 this.runner = runner; 39 46 this.firehose = firehose; ··· 43 50 const connected = await this.indexingSvc.db.waitForConnection(30000); 44 51 if (!connected) { 45 52 throw new Error( 46 - "Failed to connect to database during subscription restart", 53 + "Failed to connect to database during subscription start", 47 54 ); 48 55 } 49 56 this.logger.info("Starting firehose subscription"); ··· 59 66 "Failed to connect to database during subscription restart", 60 67 ); 61 68 } 69 + 70 + // Read fresh cursor from database 71 + const savedCursor = await this.opts.db.getCursorState(); 72 + const startCursor = savedCursor !== null 73 + ? savedCursor 74 + : (env.NODE_ENV === "production" ? 0 : undefined); 75 + 62 76 const { runner, firehose } = createFirehose({ 63 77 idResolver: this.opts.idResolver, 64 78 service: this.opts.service, 65 79 indexingSvc: this.indexingSvc, 66 80 logger: this.logger, 81 + db: this.opts.db, 82 + startCursor, 67 83 }); 68 84 this.runner = runner; 69 85 this.firehose = firehose; ··· 140 156 service: string; 141 157 indexingSvc: IndexingService; 142 158 logger: Logger; 159 + db: Database; 160 + startCursor?: number; 143 161 }): { firehose: Firehose; runner: MemoryRunner } { 144 - const { idResolver, service, indexingSvc, logger } = opts; 145 - const startCursor = env.NODE_ENV === "production" ? 0 : undefined; 146 - logger.info("Creating firehose subscription", { service }); 162 + const { idResolver, service, indexingSvc, logger, db, startCursor } = opts; 163 + 164 + logger.info("Creating firehose subscription", { service, startCursor }); 165 + 147 166 const runner = new MemoryRunner({ 148 167 startCursor, 149 168 concurrency: env.RUNNER_CONCURRENCY, 169 + cursorSaveIntervalMs: 30000, // Save cursor every 30 seconds 170 + setCursor: async (cursor: number) => { 171 + await db.saveCursorState(cursor); 172 + logger.debug("Cursor saved to database", { cursor }); 173 + }, 150 174 }); 151 175 const firehose = new Firehose({ 152 176 idResolver,
+21 -6
main.ts
··· 134 134 } 135 135 136 136 // Setup function to create context and app 137 - export function setupApp(): { app: Hono<AppEnv>; ctx: AppContext } { 137 + export async function setupApp(): Promise< 138 + { app: Hono<AppEnv>; ctx: AppContext } 139 + > { 138 140 // Setup logger and database 139 141 const appLogger = getLogger(["appview"]); 140 142 const db = new Database(); 141 143 db.connect(); 142 144 145 + // Wait for database connection 146 + const connected = await db.waitForConnection(30000); 147 + if (!connected) { 148 + throw new Error("Failed to connect to database during startup"); 149 + } 150 + 151 + // Read cursor from database 152 + const savedCursor = await db.getCursorState(); 153 + const startCursor = savedCursor !== null ? savedCursor : undefined; 154 + 155 + appLogger.info("Database cursor loaded", { cursor: startCursor }); 156 + 143 157 // DID and resolver setup 144 158 const baseIdResolver = createIdResolver(); 145 159 const resolver = createBidirectionalResolver(baseIdResolver); ··· 150 164 service: env.RELAY_URL, 151 165 db, 152 166 idResolver: baseIdResolver, 167 + startCursor, 153 168 }); 154 169 const takedownService = new TakedownService(db); 155 170 const authVerifier = createAuthVerifier(db, { ··· 175 190 } 176 191 177 192 // Start server function 178 - export function startServer() { 179 - const { app, ctx } = setupApp(); 193 + export async function startServer() { 194 + const { app, ctx } = await setupApp(); 180 195 181 196 // Start HTTP server immediately 182 197 const { HOST, PORT } = env; ··· 257 272 // Default export for backward compatibility (creates app without starting services) 258 273 let defaultApp: Hono<AppEnv> | null = null; 259 274 260 - export default function getApp(): Hono<AppEnv> { 275 + export default async function getApp(): Promise<Hono<AppEnv>> { 261 276 if (!defaultApp) { 262 - const result = setupApp(); 277 + const result = await setupApp(); 263 278 defaultApp = result.app; 264 279 } 265 280 return defaultApp; ··· 267 282 268 283 // Start the server if this file is run directly 269 284 if (import.meta.main) { 270 - startServer(); 285 + await startServer(); 271 286 }
+95
main_test.ts
··· 9 9 import { TakedownService } from "./services/takedown.ts"; 10 10 import { createAuthVerifier } from "./services/auth-verifier.ts"; 11 11 import { RepoSubscription } from "./data-plane/server/subscription.ts"; 12 + import { MemoryRunner } from "./utils/memory-runner.ts"; 12 13 import { getLogger } from "@logtape/logtape"; 13 14 14 15 Deno.env.set("SERVICE_DID", "did:web:test"); ··· 31 32 connect: () => Promise.resolve(), 32 33 disconnect: () => Promise.resolve(), 33 34 models: {}, 35 + getCursorState: () => Promise.resolve(null), 36 + saveCursorState: () => Promise.resolve(), 34 37 } as unknown as Database; 35 38 36 39 const takedownService = new TakedownService(mockDb); ··· 38 41 service: "wss://relay1.us-west.bsky.network", 39 42 db: mockDb, 40 43 idResolver: baseIdResolver, 44 + startCursor: undefined, 41 45 }); 42 46 const authVerifier = createAuthVerifier(mockDb, { 43 47 ownDid: serviceDid, ··· 112 116 ); 113 117 console.log("Well-known endpoint test passed"); 114 118 }); 119 + 120 + Deno.test("Cursor Persistence Test", async () => { 121 + console.log("Testing cursor persistence..."); 122 + 123 + // Mock database with cursor state 124 + let storedCursor: number | null = 42; 125 + const mockDb = { 126 + connect: () => Promise.resolve(), 127 + disconnect: () => Promise.resolve(), 128 + waitForConnection: () => Promise.resolve(true), 129 + models: {}, 130 + getCursorState: () => Promise.resolve(storedCursor), 131 + saveCursorState: (cursor: number) => { 132 + storedCursor = cursor; 133 + return Promise.resolve(); 134 + }, 135 + } as unknown as Database; 136 + 137 + const baseIdResolver = createIdResolver(); 138 + 139 + // Test 1: Cursor is loaded from database 140 + const sub1 = new RepoSubscription({ 141 + service: "wss://relay1.us-west.bsky.network", 142 + db: mockDb, 143 + idResolver: baseIdResolver, 144 + startCursor: 42, // This should be what was read from DB 145 + }); 146 + 147 + console.log("Initial cursor:", sub1.runner.getCursor()); 148 + 149 + // Test 2: Simulate cursor update 150 + if (sub1.runner.opts.setCursor) { 151 + await sub1.runner.opts.setCursor(100); 152 + } 153 + 154 + console.log("Stored cursor after update:", storedCursor); 155 + 156 + // Test 3: Create new subscription, should load updated cursor 157 + const sub2 = new RepoSubscription({ 158 + service: "wss://relay1.us-west.bsky.network", 159 + db: mockDb, 160 + idResolver: baseIdResolver, 161 + startCursor: 100, // This should be the updated cursor from DB 162 + }); 163 + 164 + console.log("New subscription cursor:", sub2.runner.getCursor()); 165 + 166 + console.log("Cursor persistence test passed"); 167 + }); 168 + 169 + Deno.test("Cursor Save Throttling Test", async () => { 170 + console.log("Testing cursor save throttling..."); 171 + 172 + let saveCount = 0; 173 + let lastSavedCursor: number | undefined; 174 + 175 + // Create a direct MemoryRunner to test throttling 176 + const runner = new MemoryRunner({ 177 + startCursor: 0, 178 + cursorSaveIntervalMs: 100, // Use 100ms for faster testing 179 + setCursor: (cursor: number) => { 180 + saveCount++; 181 + lastSavedCursor = cursor; 182 + console.log(`Save #${saveCount}: cursor ${cursor}`); 183 + }, 184 + }); 185 + 186 + // Simulate rapid cursor updates through trackEvent (the proper way) 187 + await runner.trackEvent("did1", 10, async () => {/* mock work */}); 188 + await runner.trackEvent("did2", 20, async () => {/* mock work */}); 189 + await runner.trackEvent("did3", 30, async () => {/* mock work */}); 190 + await runner.trackEvent("did4", 40, async () => {/* mock work */}); 191 + await runner.trackEvent("did5", 50, async () => {/* mock work */}); 192 + 193 + console.log(`Immediate saves: ${saveCount}`); 194 + console.log(`Last saved cursor: ${lastSavedCursor}`); 195 + 196 + // Wait for throttling to potentially trigger more saves 197 + await new Promise((resolve) => setTimeout(resolve, 200)); 198 + 199 + console.log(`After delay - Save count: ${saveCount}`); 200 + console.log(`Last saved cursor: ${lastSavedCursor}`); 201 + 202 + // Force save on destroy 203 + await runner.destroy(); 204 + 205 + console.log(`After destroy - Save count: ${saveCount}`); 206 + console.log(`Final saved cursor: ${lastSavedCursor}`); 207 + 208 + console.log("Cursor save throttling test completed"); 209 + });
+49 -2
utils/memory-runner.ts
··· 2 2 import { ConsecutiveList, EventRunner } from "@atproto/sync"; 3 3 4 4 export type MemoryRunnerOptions = { 5 - setCursor?: (cursor: number) => Promise<void>; 5 + setCursor?: (cursor: number) => Promise<void> | void; 6 6 concurrency?: number; 7 7 startCursor?: number; 8 + cursorSaveIntervalMs?: number; 8 9 }; 9 10 10 11 // A queue with arbitrarily many partitions, each processing work sequentially. ··· 14 15 mainQueue: PQueue; 15 16 partitions = new Map<string, PQueue>(); 16 17 cursor: number | undefined; 18 + private lastSaveTime = 0; 19 + private lastCursor: number | undefined; 20 + private saveTimeout: number | undefined; 17 21 18 22 constructor(public opts: MemoryRunnerOptions = {}) { 19 23 this.mainQueue = new PQueue({ concurrency: opts.concurrency ?? Infinity }); ··· 50 54 if (latest !== undefined) { 51 55 this.cursor = latest; 52 56 if (this.opts.setCursor) { 53 - await this.opts.setCursor(this.cursor); 57 + await this.throttledSaveCursor(this.cursor); 54 58 } 55 59 } 56 60 }); ··· 65 69 await this.mainQueue.onIdle(); 66 70 this.mainQueue.clear(); 67 71 this.partitions.forEach((p) => p.clear()); 72 + 73 + // Force save the latest cursor before shutdown 74 + await this.forceSaveCursor(); 75 + } 76 + 77 + private async throttledSaveCursor(cursor: number): Promise<void> { 78 + if (!this.opts.setCursor) return; 79 + 80 + this.lastCursor = cursor; 81 + const now = Date.now(); 82 + const saveInterval = this.opts.cursorSaveIntervalMs ?? 30000; 83 + 84 + // If we haven't saved recently, save immediately 85 + if (now - this.lastSaveTime >= saveInterval) { 86 + this.lastSaveTime = now; 87 + await this.opts.setCursor(cursor); 88 + } else { 89 + // Schedule a save for later if not already scheduled 90 + if (this.saveTimeout === undefined) { 91 + const timeUntilNextSave = saveInterval - (now - this.lastSaveTime); 92 + this.saveTimeout = setTimeout(async () => { 93 + try { 94 + if (this.lastCursor !== undefined && this.opts.setCursor) { 95 + this.lastSaveTime = Date.now(); 96 + await this.opts.setCursor(this.lastCursor); 97 + } 98 + } catch (err) { 99 + console.error("Error saving cursor in setTimeout:", err); 100 + } 101 + this.saveTimeout = undefined; 102 + }, timeUntilNextSave); 103 + } 104 + } 105 + } 106 + 107 + async forceSaveCursor(): Promise<void> { 108 + if (this.saveTimeout !== undefined) { 109 + clearTimeout(this.saveTimeout); 110 + this.saveTimeout = undefined; 111 + } 112 + if (this.lastCursor !== undefined && this.opts.setCursor) { 113 + await this.opts.setCursor(this.lastCursor); 114 + } 68 115 } 69 116 }