[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 3b73895e29748ca524bbe040b656ddb4e167104b 191 lines 5.5 kB view raw
1import { IdResolver } from "@atp/identity"; 2import { WriteOpAction } from "@atp/repo"; 3import { Event as FirehoseEvent, Firehose, MemoryRunner } from "@atp/sync"; 4import { BackgroundQueue } from "../background.ts"; 5import { Database } from "../db/index.ts"; 6import { IndexingService } from "../indexing/index.ts"; 7import { ServerConfig } from "../../config.ts"; 8import { PushService } from "../../utils/push.ts"; 9import { PushTokens } from "../routes/push-tokens.ts"; 10 11const CURSOR_STATE_IDENTIFIER = "crosspost_comments_cursor"; 12 13export class CrosspostRepoSubscription { 14 firehose: Firehose; 15 runner: MemoryRunner; 16 background: BackgroundQueue; 17 indexingSvc: IndexingService; 18 pushService: PushService; 19 private firehoseRunning = false; 20 21 constructor( 22 public opts: { 23 db: Database; 24 idResolver: IdResolver; 25 startCursor?: number; 26 cfg: ServerConfig; 27 }, 28 ) { 29 const { db, idResolver, startCursor, cfg } = opts; 30 this.background = new BackgroundQueue(db); 31 32 const pushTokens = new PushTokens(db); 33 this.pushService = new PushService(pushTokens, db, { 34 enabled: cfg.pushEnabled, 35 fcmServiceAccount: cfg.fcmServiceAccount, 36 }); 37 38 this.indexingSvc = new IndexingService( 39 db, 40 cfg, 41 idResolver, 42 this.background, 43 this.pushService, 44 ); 45 46 const { runner, firehose } = createCrosspostFirehose({ 47 idResolver, 48 service: cfg.relayUrl, 49 indexingSvc: this.indexingSvc, 50 db, 51 startCursor, 52 }); 53 this.runner = runner; 54 this.firehose = firehose; 55 } 56 57 start() { 58 console.info("Starting crosspost firehose subscription"); 59 this.firehoseRunning = true; 60 this.firehose.start(); 61 } 62 63 async restart() { 64 await this.destroy(); 65 66 const savedCursor = await this.opts.db.getCursorState( 67 CURSOR_STATE_IDENTIFIER, 68 ); 69 const startCursor = savedCursor !== null ? savedCursor : undefined; 70 71 const { runner, firehose } = createCrosspostFirehose({ 72 idResolver: this.opts.idResolver, 73 service: this.opts.cfg.relayUrl, 74 indexingSvc: this.indexingSvc, 75 db: this.opts.db, 76 startCursor, 77 }); 78 this.runner = runner; 79 this.firehose = firehose; 80 this.start(); 81 } 82 83 async processAll() { 84 await this.runner.processAll(); 85 await this.background.processAll(); 86 } 87 88 async destroy() { 89 try { 90 if (this.firehoseRunning) { 91 await this.firehose.destroy(); 92 this.firehoseRunning = false; 93 } 94 console.info("Processing remaining runner tasks..."); 95 if (this.opts.cfg.debugMode) { 96 const timeoutMs = 10000; 97 let destroyTimeoutId: number | undefined; 98 try { 99 const timeoutPromise = new Promise<never>((_, reject) => { 100 destroyTimeoutId = setTimeout( 101 () => reject(new Error(`Timeout after ${timeoutMs}ms`)), 102 timeoutMs, 103 ); 104 }); 105 await Promise.race([this.runner.destroy(), timeoutPromise]); 106 } catch (e) { 107 console.warn("Runner destroy timed out; continuing shutdown", { 108 e, 109 }); 110 } finally { 111 if (destroyTimeoutId !== undefined) { 112 clearTimeout(destroyTimeoutId); 113 destroyTimeoutId = undefined; 114 } 115 } 116 117 let bgTimeoutId: number | undefined; 118 try { 119 const timeoutPromise = new Promise<never>((_, reject) => { 120 bgTimeoutId = setTimeout( 121 () => reject(new Error(`Timeout after ${timeoutMs}ms`)), 122 timeoutMs, 123 ); 124 }); 125 await Promise.race([this.background.processAll(), timeoutPromise]); 126 } catch (e) { 127 console.warn("Runner destroy timed out; continuing shutdown", { 128 e, 129 }); 130 } finally { 131 if (bgTimeoutId !== undefined) { 132 clearTimeout(bgTimeoutId); 133 bgTimeoutId = undefined; 134 } 135 } 136 } else { 137 await this.runner.processAll(); 138 await this.background.processAll(); 139 } 140 } catch (error) { 141 console.error("Error during subscription destroy", { error }); 142 throw error; 143 } 144 } 145} 146 147function createCrosspostFirehose(opts: { 148 idResolver: IdResolver; 149 service?: string; 150 indexingSvc: IndexingService; 151 db: Database; 152 startCursor?: number; 153}): { firehose: Firehose; runner: MemoryRunner } { 154 const { idResolver, service, indexingSvc, db, startCursor } = opts; 155 156 const runner = new MemoryRunner({ 157 startCursor, 158 setCursorInterval: 30000, 159 setCursor: async (cursor: number) => { 160 await db.saveCursorState(cursor, CURSOR_STATE_IDENTIFIER); 161 console.info("Crosspost cursor saved to database", { cursor }); 162 }, 163 }); 164 165 const firehose = new Firehose({ 166 idResolver, 167 runner, 168 service, 169 onError: (err: Error) => 170 console.error("error in crosspost subscription", { err }), 171 excludeAccount: true, 172 excludeIdentity: true, 173 excludeSync: true, 174 handleEvent: async (evt: FirehoseEvent) => { 175 if (evt.event === "create" || evt.event === "update") { 176 await indexingSvc.indexRecord( 177 evt.uri, 178 evt.cid, 179 evt.record, 180 evt.event === "create" ? WriteOpAction.Create : WriteOpAction.Update, 181 evt.time, 182 ); 183 } else if (evt.event === "delete") { 184 await indexingSvc.deleteRecord(evt.uri); 185 } 186 }, 187 filterCollections: ["app.bsky.feed.post"], 188 }); 189 190 return { firehose, runner }; 191}