[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { IdResolver } from "@atp/identity";
2import { WriteOpAction } from "@atp/repo";
3import { Event as FirehoseEvent, Firehose, MemoryRunner } from "@atp/sync";
4import { BackgroundQueue } from "./background.ts";
5import { Database } from "./db/index.ts";
6import { IndexingService } from "./indexing/index.ts";
7import { getLogger, Logger } from "@logtape/logtape";
8import { ServerConfig } from "../config.ts";
9
10export class RepoSubscription {
11 firehose: Firehose;
12 runner: MemoryRunner;
13 background: BackgroundQueue;
14 indexingSvc: IndexingService;
15 logger: Logger;
16 private firehoseRunning = false;
17
18 constructor(
19 public opts: {
20 db: Database;
21 idResolver: IdResolver;
22 startCursor?: number;
23 cfg: ServerConfig;
24 },
25 ) {
26 const { db, idResolver, startCursor, cfg } = opts;
27 this.logger = getLogger(["appview", "subscription"]);
28 this.background = new BackgroundQueue(db, this.logger);
29 this.indexingSvc = new IndexingService(
30 db,
31 cfg,
32 idResolver,
33 this.background,
34 );
35
36 const { runner, firehose } = createFirehose({
37 idResolver,
38 service: cfg.relayUrl,
39 indexingSvc: this.indexingSvc,
40 logger: this.logger,
41 db,
42 startCursor,
43 });
44 this.runner = runner;
45 this.firehose = firehose;
46 }
47
48 start() {
49 this.logger.info("Starting firehose subscription");
50 this.firehoseRunning = true;
51 this.firehose.start();
52 }
53
54 async restart() {
55 await this.destroy();
56
57 // Read fresh cursor from database
58 const savedCursor = await this.opts.db.getCursorState();
59 const startCursor = savedCursor !== null ? savedCursor : undefined;
60
61 const { runner, firehose } = createFirehose({
62 idResolver: this.opts.idResolver,
63 service: this.opts.cfg.relayUrl,
64 indexingSvc: this.indexingSvc,
65 logger: this.logger,
66 db: this.opts.db,
67 startCursor,
68 });
69 this.runner = runner;
70 this.firehose = firehose;
71 this.start();
72 }
73
74 async processAll() {
75 await this.runner.processAll();
76 await this.background.processAll();
77 }
78
79 async destroy() {
80 try {
81 if (this.firehoseRunning) {
82 await this.firehose.destroy();
83 this.firehoseRunning = false;
84 }
85 this.logger.info("Processing remaining runner tasks...");
86 if (this.opts.cfg.debugMode) {
87 const timeoutMs = 10000;
88 // Runner destroy with timeout and proper timer cleanup
89 let destroyTimeoutId: number | undefined;
90 try {
91 const timeoutPromise = new Promise<never>((_, reject) => {
92 destroyTimeoutId = setTimeout(
93 () => reject(new Error(`Timeout after ${timeoutMs}ms`)),
94 timeoutMs,
95 );
96 });
97 await Promise.race([this.runner.destroy(), timeoutPromise]);
98 } catch (e) {
99 this.logger.warn("Runner destroy timed out; continuing shutdown", {
100 e,
101 });
102 } finally {
103 if (destroyTimeoutId !== undefined) {
104 clearTimeout(destroyTimeoutId);
105 destroyTimeoutId = undefined;
106 }
107 }
108 // Background drain with timeout and proper timer cleanup
109 let bgTimeoutId: number | undefined;
110 try {
111 const timeoutPromise = new Promise<never>((_, reject) => {
112 bgTimeoutId = setTimeout(
113 () => reject(new Error(`Timeout after ${timeoutMs}ms`)),
114 timeoutMs,
115 );
116 });
117 await Promise.race([this.background.processAll(), timeoutPromise]);
118 } catch (e) {
119 this.logger.warn("Runner destroy timed out; continuing shutdown", {
120 e,
121 });
122 } finally {
123 if (bgTimeoutId !== undefined) {
124 clearTimeout(bgTimeoutId);
125 bgTimeoutId = undefined;
126 }
127 }
128 } else {
129 await this.runner.processAll();
130 await this.background.processAll();
131 }
132 } catch (error) {
133 this.logger.error("Error during subscription destroy", { error });
134 throw error;
135 }
136 }
137}
138
139function createFirehose(opts: {
140 idResolver: IdResolver;
141 service?: string;
142 indexingSvc: IndexingService;
143 logger: Logger;
144 db: Database;
145 startCursor?: number;
146}): { firehose: Firehose; runner: MemoryRunner } {
147 const { idResolver, service, indexingSvc, logger, db, startCursor } = opts;
148
149 const runner = new MemoryRunner({
150 startCursor,
151 setCursorInterval: 30000, // Save cursor every 30 seconds
152 setCursor: async (cursor: number) => {
153 await db.saveCursorState(cursor);
154 logger.info("Cursor saved to database", { cursor });
155 },
156 });
157 const firehose = new Firehose({
158 idResolver,
159 runner,
160 service,
161 onError: (err: Error) => logger.error("error in subscription", { err }),
162 handleEvent: async (evt: FirehoseEvent) => {
163 if (evt.event === "identity") {
164 await indexingSvc.indexHandle(evt.did, evt.time, true);
165 } else if (evt.event === "account") {
166 if (evt.active === false && evt.status === "deleted") {
167 await indexingSvc.deleteActor(evt.did);
168 } else {
169 await indexingSvc.updateActorStatus(
170 evt.did,
171 evt.active,
172 evt.status,
173 );
174 }
175 } else if (evt.event === "sync") {
176 await Promise.all([
177 indexingSvc.setCommitLastSeen(evt.did, evt.cid, evt.rev),
178 indexingSvc.indexHandle(evt.did, evt.time),
179 ]);
180 } else {
181 const indexFn = evt.event === "delete"
182 ? indexingSvc.deleteRecord(evt.uri)
183 : indexingSvc.indexRecord(
184 evt.uri,
185 evt.cid,
186 evt.record,
187 evt.event === "create"
188 ? WriteOpAction.Create
189 : WriteOpAction.Update,
190 evt.time,
191 );
192
193 await Promise.all([
194 indexFn,
195 indexingSvc.setCommitLastSeen(evt.did, evt.commit, evt.rev),
196 indexingSvc.indexHandle(evt.did, evt.time),
197 ]);
198 }
199 },
200 filterCollections: [
201 "so.sprk.*",
202 ],
203 });
204 return { firehose, runner };
205}