[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { IdResolver } from "@atp/identity";
2import { type RepoRecord, WriteOpAction } from "@atp/repo";
3import { Event as FirehoseEvent, Firehose, MemoryRunner } from "@atp/sync";
4import { BackgroundQueue } from "./background.ts";
5import { Database } from "./db/index.ts";
6import { IndexingService } from "./indexing/index.ts";
7import { ServerConfig } from "./../config.ts";
8import { PushService } from "./../utils/push.ts";
9import { PushTokens } from "./routes/push-tokens.ts";
10
11export class RepoSubscription {
12 firehose: Firehose;
13 runner: MemoryRunner;
14 background: BackgroundQueue;
15 indexingSvc: IndexingService;
16 pushService: PushService;
17 private firehoseRunning = false;
18
19 constructor(
20 public opts: {
21 db: Database;
22 idResolver: IdResolver;
23 startCursor?: number;
24 cfg: ServerConfig;
25 },
26 ) {
27 const { db, idResolver, startCursor, cfg } = opts;
28 this.background = new BackgroundQueue(db);
29
30 // Create push service (FCM handles both iOS and Android)
31 const pushTokens = new PushTokens(db);
32 this.pushService = new PushService(pushTokens, db, {
33 enabled: cfg.pushEnabled,
34 fcmServiceAccount: cfg.fcmServiceAccount,
35 });
36
37 this.indexingSvc = new IndexingService(
38 db,
39 cfg,
40 idResolver,
41 this.background,
42 this.pushService,
43 );
44
45 const { runner, firehose } = createFirehose({
46 idResolver,
47 service: cfg.relayUrl,
48 indexingSvc: this.indexingSvc,
49 db,
50 startCursor,
51 });
52 this.runner = runner;
53 this.firehose = firehose;
54 }
55
56 start() {
57 console.info("Starting firehose subscription");
58 this.firehoseRunning = true;
59 this.firehose.start();
60 }
61
62 async restart() {
63 await this.destroy();
64
65 // Read fresh cursor from database
66 const savedCursor = await this.opts.db.getCursorState();
67 const startCursor = savedCursor !== null ? savedCursor : undefined;
68
69 const { runner, firehose } = createFirehose({
70 idResolver: this.opts.idResolver,
71 service: this.opts.cfg.relayUrl,
72 indexingSvc: this.indexingSvc,
73 db: this.opts.db,
74 startCursor,
75 });
76 this.runner = runner;
77 this.firehose = firehose;
78 this.start();
79 }
80
81 async processAll() {
82 await this.runner.processAll();
83 await this.background.processAll();
84 }
85
86 async destroy() {
87 try {
88 if (this.firehoseRunning) {
89 await this.firehose.destroy();
90 this.firehoseRunning = false;
91 }
92 console.info("Processing remaining runner tasks...");
93 if (this.opts.cfg.debugMode) {
94 const timeoutMs = 10000;
95 // Runner destroy with timeout and proper timer cleanup
96 let destroyTimeoutId: number | undefined;
97 try {
98 const timeoutPromise = new Promise<never>((_, reject) => {
99 destroyTimeoutId = setTimeout(
100 () => reject(new Error(`Timeout after ${timeoutMs}ms`)),
101 timeoutMs,
102 );
103 });
104 await Promise.race([this.runner.destroy(), timeoutPromise]);
105 } catch (e) {
106 console.warn("Runner destroy timed out; continuing shutdown", {
107 e,
108 });
109 } finally {
110 if (destroyTimeoutId !== undefined) {
111 clearTimeout(destroyTimeoutId);
112 destroyTimeoutId = undefined;
113 }
114 }
115 // Background drain with timeout and proper timer cleanup
116 let bgTimeoutId: number | undefined;
117 try {
118 const timeoutPromise = new Promise<never>((_, reject) => {
119 bgTimeoutId = setTimeout(
120 () => reject(new Error(`Timeout after ${timeoutMs}ms`)),
121 timeoutMs,
122 );
123 });
124 await Promise.race([this.background.processAll(), timeoutPromise]);
125 } catch (e) {
126 console.warn("Runner destroy timed out; continuing shutdown", {
127 e,
128 });
129 } finally {
130 if (bgTimeoutId !== undefined) {
131 clearTimeout(bgTimeoutId);
132 bgTimeoutId = undefined;
133 }
134 }
135 } else {
136 await this.runner.processAll();
137 await this.background.processAll();
138 }
139 } catch (error) {
140 console.error("Error during subscription destroy", { error });
141 throw error;
142 }
143 }
144}
145
146function createFirehose(opts: {
147 idResolver: IdResolver;
148 service?: string;
149 indexingSvc: IndexingService;
150 db: Database;
151 startCursor?: number;
152}): { firehose: Firehose; runner: MemoryRunner } {
153 const { idResolver, service, indexingSvc, db, startCursor } = opts;
154
155 const runner = new MemoryRunner({
156 startCursor,
157 setCursorInterval: 30000, // Save cursor every 30 seconds
158 setCursor: async (cursor: number) => {
159 const didSave = await db.saveCursorState(cursor);
160 if (didSave) {
161 console.info("Cursor saved to database", { cursor });
162 }
163 },
164 });
165 const firehose = new Firehose({
166 idResolver,
167 runner,
168 service,
169 onError: (err: Error) => console.error("error in subscription", { err }),
170 handleEvent: async (evt: FirehoseEvent) => {
171 if (evt.event === "identity") {
172 await indexingSvc.indexHandle(evt.did, evt.time, true);
173 } else if (evt.event === "account") {
174 if (evt.active === false && evt.status === "deleted") {
175 await indexingSvc.deleteActor(evt.did);
176 } else {
177 await indexingSvc.updateActorStatus(
178 evt.did,
179 evt.active,
180 evt.status,
181 );
182 }
183 } else if (evt.event === "sync") {
184 await Promise.all([
185 indexingSvc.setCommitLastSeen(evt.did, evt.cid, evt.rev),
186 indexingSvc.indexHandle(evt.did, evt.time),
187 ]);
188 } else {
189 const indexFn = evt.event === "delete"
190 ? indexingSvc.deleteRecord(evt.uri)
191 : indexingSvc.indexRecord(
192 evt.uri,
193 evt.cid,
194 evt.record as unknown as RepoRecord,
195 evt.event === "create"
196 ? WriteOpAction.Create
197 : WriteOpAction.Update,
198 evt.time,
199 );
200
201 await Promise.all([
202 indexFn,
203 indexingSvc.setCommitLastSeen(evt.did, evt.commit, evt.rev),
204 indexingSvc.indexHandle(evt.did, evt.time),
205 ]);
206 }
207 },
208 filterCollections: [
209 "so.sprk.*",
210 ],
211 });
212 return { firehose, runner };
213}