[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eb947da8a3a8a7d485ff132b3ff0db4a8baaac19 205 lines 6.1 kB view raw
1import { IdResolver } from "@atp/identity"; 2import { WriteOpAction } from "@atp/repo"; 3import { Event as FirehoseEvent, Firehose, MemoryRunner } from "@atp/sync"; 4import { BackgroundQueue } from "./background.ts"; 5import { Database } from "./db/index.ts"; 6import { IndexingService } from "./indexing/index.ts"; 7import { getLogger, Logger } from "@logtape/logtape"; 8import { ServerConfig } from "../config.ts"; 9 10export class RepoSubscription { 11 firehose: Firehose; 12 runner: MemoryRunner; 13 background: BackgroundQueue; 14 indexingSvc: IndexingService; 15 logger: Logger; 16 private firehoseRunning = false; 17 18 constructor( 19 public opts: { 20 db: Database; 21 idResolver: IdResolver; 22 startCursor?: number; 23 cfg: ServerConfig; 24 }, 25 ) { 26 const { db, idResolver, startCursor, cfg } = opts; 27 this.logger = getLogger(["appview", "subscription"]); 28 this.background = new BackgroundQueue(db, this.logger); 29 this.indexingSvc = new IndexingService( 30 db, 31 cfg, 32 idResolver, 33 this.background, 34 ); 35 36 const { runner, firehose } = createFirehose({ 37 idResolver, 38 service: cfg.relayUrl, 39 indexingSvc: this.indexingSvc, 40 logger: this.logger, 41 db, 42 startCursor, 43 }); 44 this.runner = runner; 45 this.firehose = firehose; 46 } 47 48 start() { 49 this.logger.info("Starting firehose subscription"); 50 this.firehoseRunning = true; 51 this.firehose.start(); 52 } 53 54 async restart() { 55 await this.destroy(); 56 57 // Read fresh cursor from database 58 const savedCursor = await this.opts.db.getCursorState(); 59 const startCursor = savedCursor !== null ? savedCursor : undefined; 60 61 const { runner, firehose } = createFirehose({ 62 idResolver: this.opts.idResolver, 63 service: this.opts.cfg.relayUrl, 64 indexingSvc: this.indexingSvc, 65 logger: this.logger, 66 db: this.opts.db, 67 startCursor, 68 }); 69 this.runner = runner; 70 this.firehose = firehose; 71 this.start(); 72 } 73 74 async processAll() { 75 await this.runner.processAll(); 76 await this.background.processAll(); 77 } 78 79 async destroy() { 80 try { 81 if (this.firehoseRunning) { 82 await this.firehose.destroy(); 83 this.firehoseRunning = false; 84 } 85 this.logger.info("Processing remaining runner tasks..."); 86 if (this.opts.cfg.debugMode) { 87 const timeoutMs = 10000; 88 // Runner destroy with timeout and proper timer cleanup 89 let destroyTimeoutId: number | undefined; 90 try { 91 const timeoutPromise = new Promise<never>((_, reject) => { 92 destroyTimeoutId = setTimeout( 93 () => reject(new Error(`Timeout after ${timeoutMs}ms`)), 94 timeoutMs, 95 ); 96 }); 97 await Promise.race([this.runner.destroy(), timeoutPromise]); 98 } catch (e) { 99 this.logger.warn("Runner destroy timed out; continuing shutdown", { 100 e, 101 }); 102 } finally { 103 if (destroyTimeoutId !== undefined) { 104 clearTimeout(destroyTimeoutId); 105 destroyTimeoutId = undefined; 106 } 107 } 108 // Background drain with timeout and proper timer cleanup 109 let bgTimeoutId: number | undefined; 110 try { 111 const timeoutPromise = new Promise<never>((_, reject) => { 112 bgTimeoutId = setTimeout( 113 () => reject(new Error(`Timeout after ${timeoutMs}ms`)), 114 timeoutMs, 115 ); 116 }); 117 await Promise.race([this.background.processAll(), timeoutPromise]); 118 } catch (e) { 119 this.logger.warn("Runner destroy timed out; continuing shutdown", { 120 e, 121 }); 122 } finally { 123 if (bgTimeoutId !== undefined) { 124 clearTimeout(bgTimeoutId); 125 bgTimeoutId = undefined; 126 } 127 } 128 } else { 129 await this.runner.processAll(); 130 await this.background.processAll(); 131 } 132 } catch (error) { 133 this.logger.error("Error during subscription destroy", { error }); 134 throw error; 135 } 136 } 137} 138 139function createFirehose(opts: { 140 idResolver: IdResolver; 141 service?: string; 142 indexingSvc: IndexingService; 143 logger: Logger; 144 db: Database; 145 startCursor?: number; 146}): { firehose: Firehose; runner: MemoryRunner } { 147 const { idResolver, service, indexingSvc, logger, db, startCursor } = opts; 148 149 const runner = new MemoryRunner({ 150 startCursor, 151 setCursorInterval: 30000, // Save cursor every 30 seconds 152 setCursor: async (cursor: number) => { 153 await db.saveCursorState(cursor); 154 logger.info("Cursor saved to database", { cursor }); 155 }, 156 }); 157 const firehose = new Firehose({ 158 idResolver, 159 runner, 160 service, 161 onError: (err: Error) => logger.error("error in subscription", { err }), 162 handleEvent: async (evt: FirehoseEvent) => { 163 if (evt.event === "identity") { 164 await indexingSvc.indexHandle(evt.did, evt.time, true); 165 } else if (evt.event === "account") { 166 if (evt.active === false && evt.status === "deleted") { 167 await indexingSvc.deleteActor(evt.did); 168 } else { 169 await indexingSvc.updateActorStatus( 170 evt.did, 171 evt.active, 172 evt.status, 173 ); 174 } 175 } else if (evt.event === "sync") { 176 await Promise.all([ 177 indexingSvc.setCommitLastSeen(evt.did, evt.cid, evt.rev), 178 indexingSvc.indexHandle(evt.did, evt.time), 179 ]); 180 } else { 181 const indexFn = evt.event === "delete" 182 ? indexingSvc.deleteRecord(evt.uri) 183 : indexingSvc.indexRecord( 184 evt.uri, 185 evt.cid, 186 evt.record, 187 evt.event === "create" 188 ? WriteOpAction.Create 189 : WriteOpAction.Update, 190 evt.time, 191 ); 192 193 await Promise.all([ 194 indexFn, 195 indexingSvc.setCommitLastSeen(evt.did, evt.commit, evt.rev), 196 indexingSvc.indexHandle(evt.did, evt.time), 197 ]); 198 } 199 }, 200 filterCollections: [ 201 "so.sprk.*", 202 ], 203 }); 204 return { firehose, runner }; 205}