[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 213 lines 6.4 kB view raw
1import { IdResolver } from "@atp/identity"; 2import { type RepoRecord, WriteOpAction } from "@atp/repo"; 3import { Event as FirehoseEvent, Firehose, MemoryRunner } from "@atp/sync"; 4import { BackgroundQueue } from "./background.ts"; 5import { Database } from "./db/index.ts"; 6import { IndexingService } from "./indexing/index.ts"; 7import { ServerConfig } from "./../config.ts"; 8import { PushService } from "./../utils/push.ts"; 9import { PushTokens } from "./routes/push-tokens.ts"; 10 11export class RepoSubscription { 12 firehose: Firehose; 13 runner: MemoryRunner; 14 background: BackgroundQueue; 15 indexingSvc: IndexingService; 16 pushService: PushService; 17 private firehoseRunning = false; 18 19 constructor( 20 public opts: { 21 db: Database; 22 idResolver: IdResolver; 23 startCursor?: number; 24 cfg: ServerConfig; 25 }, 26 ) { 27 const { db, idResolver, startCursor, cfg } = opts; 28 this.background = new BackgroundQueue(db); 29 30 // Create push service (FCM handles both iOS and Android) 31 const pushTokens = new PushTokens(db); 32 this.pushService = new PushService(pushTokens, db, { 33 enabled: cfg.pushEnabled, 34 fcmServiceAccount: cfg.fcmServiceAccount, 35 }); 36 37 this.indexingSvc = new IndexingService( 38 db, 39 cfg, 40 idResolver, 41 this.background, 42 this.pushService, 43 ); 44 45 const { runner, firehose } = createFirehose({ 46 idResolver, 47 service: cfg.relayUrl, 48 indexingSvc: this.indexingSvc, 49 db, 50 startCursor, 51 }); 52 this.runner = runner; 53 this.firehose = firehose; 54 } 55 56 start() { 57 console.info("Starting firehose subscription"); 58 this.firehoseRunning = true; 59 this.firehose.start(); 60 } 61 62 async restart() { 63 await this.destroy(); 64 65 // Read fresh cursor from database 66 const savedCursor = await this.opts.db.getCursorState(); 67 const startCursor = savedCursor !== null ? savedCursor : undefined; 68 69 const { runner, firehose } = createFirehose({ 70 idResolver: this.opts.idResolver, 71 service: this.opts.cfg.relayUrl, 72 indexingSvc: this.indexingSvc, 73 db: this.opts.db, 74 startCursor, 75 }); 76 this.runner = runner; 77 this.firehose = firehose; 78 this.start(); 79 } 80 81 async processAll() { 82 await this.runner.processAll(); 83 await this.background.processAll(); 84 } 85 86 async destroy() { 87 try { 88 if (this.firehoseRunning) { 89 await this.firehose.destroy(); 90 this.firehoseRunning = false; 91 } 92 console.info("Processing remaining runner tasks..."); 93 if (this.opts.cfg.debugMode) { 94 const timeoutMs = 10000; 95 // Runner destroy with timeout and proper timer cleanup 96 let destroyTimeoutId: number | undefined; 97 try { 98 const timeoutPromise = new Promise<never>((_, reject) => { 99 destroyTimeoutId = setTimeout( 100 () => reject(new Error(`Timeout after ${timeoutMs}ms`)), 101 timeoutMs, 102 ); 103 }); 104 await Promise.race([this.runner.destroy(), timeoutPromise]); 105 } catch (e) { 106 console.warn("Runner destroy timed out; continuing shutdown", { 107 e, 108 }); 109 } finally { 110 if (destroyTimeoutId !== undefined) { 111 clearTimeout(destroyTimeoutId); 112 destroyTimeoutId = undefined; 113 } 114 } 115 // Background drain with timeout and proper timer cleanup 116 let bgTimeoutId: number | undefined; 117 try { 118 const timeoutPromise = new Promise<never>((_, reject) => { 119 bgTimeoutId = setTimeout( 120 () => reject(new Error(`Timeout after ${timeoutMs}ms`)), 121 timeoutMs, 122 ); 123 }); 124 await Promise.race([this.background.processAll(), timeoutPromise]); 125 } catch (e) { 126 console.warn("Runner destroy timed out; continuing shutdown", { 127 e, 128 }); 129 } finally { 130 if (bgTimeoutId !== undefined) { 131 clearTimeout(bgTimeoutId); 132 bgTimeoutId = undefined; 133 } 134 } 135 } else { 136 await this.runner.processAll(); 137 await this.background.processAll(); 138 } 139 } catch (error) { 140 console.error("Error during subscription destroy", { error }); 141 throw error; 142 } 143 } 144} 145 146function createFirehose(opts: { 147 idResolver: IdResolver; 148 service?: string; 149 indexingSvc: IndexingService; 150 db: Database; 151 startCursor?: number; 152}): { firehose: Firehose; runner: MemoryRunner } { 153 const { idResolver, service, indexingSvc, db, startCursor } = opts; 154 155 const runner = new MemoryRunner({ 156 startCursor, 157 setCursorInterval: 30000, // Save cursor every 30 seconds 158 setCursor: async (cursor: number) => { 159 const didSave = await db.saveCursorState(cursor); 160 if (didSave) { 161 console.info("Cursor saved to database", { cursor }); 162 } 163 }, 164 }); 165 const firehose = new Firehose({ 166 idResolver, 167 runner, 168 service, 169 onError: (err: Error) => console.error("error in subscription", { err }), 170 handleEvent: async (evt: FirehoseEvent) => { 171 if (evt.event === "identity") { 172 await indexingSvc.indexHandle(evt.did, evt.time, true); 173 } else if (evt.event === "account") { 174 if (evt.active === false && evt.status === "deleted") { 175 await indexingSvc.deleteActor(evt.did); 176 } else { 177 await indexingSvc.updateActorStatus( 178 evt.did, 179 evt.active, 180 evt.status, 181 ); 182 } 183 } else if (evt.event === "sync") { 184 await Promise.all([ 185 indexingSvc.setCommitLastSeen(evt.did, evt.cid, evt.rev), 186 indexingSvc.indexHandle(evt.did, evt.time), 187 ]); 188 } else { 189 const indexFn = evt.event === "delete" 190 ? indexingSvc.deleteRecord(evt.uri) 191 : indexingSvc.indexRecord( 192 evt.uri, 193 evt.cid, 194 evt.record as unknown as RepoRecord, 195 evt.event === "create" 196 ? WriteOpAction.Create 197 : WriteOpAction.Update, 198 evt.time, 199 ); 200 201 await Promise.all([ 202 indexFn, 203 indexingSvc.setCommitLastSeen(evt.did, evt.commit, evt.rev), 204 indexingSvc.indexHandle(evt.did, evt.time), 205 ]); 206 } 207 }, 208 filterCollections: [ 209 "so.sprk.*", 210 ], 211 }); 212 return { firehose, runner }; 213}