[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { IdResolver } from "@atp/identity";
2import { WriteOpAction } from "@atp/repo";
3import { Event as FirehoseEvent, Firehose, MemoryRunner } from "@atp/sync";
4import { BackgroundQueue } from "../background.ts";
5import { Database } from "../db/index.ts";
6import { IndexingService } from "../indexing/index.ts";
7import { ServerConfig } from "../../config.ts";
8import { PushService } from "../../utils/push.ts";
9import { PushTokens } from "../routes/push-tokens.ts";
10
11const CURSOR_STATE_IDENTIFIER = "crosspost_comments_cursor";
12
13export class CrosspostRepoSubscription {
14 firehose: Firehose;
15 runner: MemoryRunner;
16 background: BackgroundQueue;
17 indexingSvc: IndexingService;
18 pushService: PushService;
19 private firehoseRunning = false;
20
21 constructor(
22 public opts: {
23 db: Database;
24 idResolver: IdResolver;
25 startCursor?: number;
26 cfg: ServerConfig;
27 },
28 ) {
29 const { db, idResolver, startCursor, cfg } = opts;
30 this.background = new BackgroundQueue(db);
31
32 const pushTokens = new PushTokens(db);
33 this.pushService = new PushService(pushTokens, db, {
34 enabled: cfg.pushEnabled,
35 fcmServiceAccount: cfg.fcmServiceAccount,
36 });
37
38 this.indexingSvc = new IndexingService(
39 db,
40 cfg,
41 idResolver,
42 this.background,
43 this.pushService,
44 );
45
46 const { runner, firehose } = createCrosspostFirehose({
47 idResolver,
48 service: cfg.relayUrl,
49 indexingSvc: this.indexingSvc,
50 db,
51 startCursor,
52 });
53 this.runner = runner;
54 this.firehose = firehose;
55 }
56
57 start() {
58 console.info("Starting crosspost firehose subscription");
59 this.firehoseRunning = true;
60 this.firehose.start();
61 }
62
63 async restart() {
64 await this.destroy();
65
66 const savedCursor = await this.opts.db.getCursorState(
67 CURSOR_STATE_IDENTIFIER,
68 );
69 const startCursor = savedCursor !== null ? savedCursor : undefined;
70
71 const { runner, firehose } = createCrosspostFirehose({
72 idResolver: this.opts.idResolver,
73 service: this.opts.cfg.relayUrl,
74 indexingSvc: this.indexingSvc,
75 db: this.opts.db,
76 startCursor,
77 });
78 this.runner = runner;
79 this.firehose = firehose;
80 this.start();
81 }
82
83 async processAll() {
84 await this.runner.processAll();
85 await this.background.processAll();
86 }
87
88 async destroy() {
89 try {
90 if (this.firehoseRunning) {
91 await this.firehose.destroy();
92 this.firehoseRunning = false;
93 }
94 console.info("Processing remaining runner tasks...");
95 if (this.opts.cfg.debugMode) {
96 const timeoutMs = 10000;
97 let destroyTimeoutId: number | undefined;
98 try {
99 const timeoutPromise = new Promise<never>((_, reject) => {
100 destroyTimeoutId = setTimeout(
101 () => reject(new Error(`Timeout after ${timeoutMs}ms`)),
102 timeoutMs,
103 );
104 });
105 await Promise.race([this.runner.destroy(), timeoutPromise]);
106 } catch (e) {
107 console.warn("Runner destroy timed out; continuing shutdown", {
108 e,
109 });
110 } finally {
111 if (destroyTimeoutId !== undefined) {
112 clearTimeout(destroyTimeoutId);
113 destroyTimeoutId = undefined;
114 }
115 }
116
117 let bgTimeoutId: number | undefined;
118 try {
119 const timeoutPromise = new Promise<never>((_, reject) => {
120 bgTimeoutId = setTimeout(
121 () => reject(new Error(`Timeout after ${timeoutMs}ms`)),
122 timeoutMs,
123 );
124 });
125 await Promise.race([this.background.processAll(), timeoutPromise]);
126 } catch (e) {
127 console.warn("Runner destroy timed out; continuing shutdown", {
128 e,
129 });
130 } finally {
131 if (bgTimeoutId !== undefined) {
132 clearTimeout(bgTimeoutId);
133 bgTimeoutId = undefined;
134 }
135 }
136 } else {
137 await this.runner.processAll();
138 await this.background.processAll();
139 }
140 } catch (error) {
141 console.error("Error during subscription destroy", { error });
142 throw error;
143 }
144 }
145}
146
147function createCrosspostFirehose(opts: {
148 idResolver: IdResolver;
149 service?: string;
150 indexingSvc: IndexingService;
151 db: Database;
152 startCursor?: number;
153}): { firehose: Firehose; runner: MemoryRunner } {
154 const { idResolver, service, indexingSvc, db, startCursor } = opts;
155
156 const runner = new MemoryRunner({
157 startCursor,
158 setCursorInterval: 30000,
159 setCursor: async (cursor: number) => {
160 await db.saveCursorState(cursor, CURSOR_STATE_IDENTIFIER);
161 console.info("Crosspost cursor saved to database", { cursor });
162 },
163 });
164
165 const firehose = new Firehose({
166 idResolver,
167 runner,
168 service,
169 onError: (err: Error) =>
170 console.error("error in crosspost subscription", { err }),
171 excludeAccount: true,
172 excludeIdentity: true,
173 excludeSync: true,
174 handleEvent: async (evt: FirehoseEvent) => {
175 if (evt.event === "create" || evt.event === "update") {
176 await indexingSvc.indexRecord(
177 evt.uri,
178 evt.cid,
179 evt.record,
180 evt.event === "create" ? WriteOpAction.Create : WriteOpAction.Update,
181 evt.time,
182 );
183 } else if (evt.event === "delete") {
184 await indexingSvc.deleteRecord(evt.uri);
185 }
186 },
187 filterCollections: ["app.bsky.feed.post"],
188 });
189
190 return { firehose, runner };
191}