[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { DAY, HOUR } from "@atp/common";
2import { getPds, IdResolver } from "@atp/identity";
3import { Cid, type DidString, l } from "@atp/lex";
4import { parseCid } from "@atp/lex/data";
5import { Client, XRPCError } from "@atp/xrpc";
6
7import {
8 getAndParseRecord,
9 readCarWithRoot,
10 type RepoRecord,
11 VerifiedRepo,
12 verifyRepo,
13 WriteOpAction,
14} from "@atp/repo";
15import { AtUri } from "@atp/syntax";
16import { retryXrpc } from "../../utils/retry.ts";
17import * as com from "../../lex/com.ts";
18import { BackgroundQueue } from "../background.ts";
19import { Database } from "../db/index.ts";
20import { ActorDocument } from "../db/models.ts";
21import * as Block from "./plugins/block.ts";
22import * as Generator from "./plugins/generator.ts";
23import * as Follow from "./plugins/follow.ts";
24import * as Like from "./plugins/like.ts";
25import * as Post from "./plugins/post.ts";
26import * as Reply from "./plugins/reply.ts";
27import * as Profile from "./plugins/profile.ts";
28import * as Repost from "./plugins/repost.ts";
29import * as Story from "./plugins/story.ts";
30import * as Audio from "./plugins/audio.ts";
31import * as Labeler from "./plugins/labeler.ts";
32import { RecordProcessor } from "./processor.ts";
33import { ServerConfig } from "../../config.ts";
34import { PushService } from "../../utils/push.ts";
35
36export class IndexingService {
37 records: {
38 post: Post.PluginType;
39 reply: Reply.PluginType;
40 like: Like.PluginType;
41 repost: Repost.PluginType;
42 follow: Follow.PluginType;
43 profile: Profile.PluginType;
44 block: Block.PluginType;
45 generator: Generator.PluginType;
46 story: Story.PluginType;
47 audio: Audio.PluginType;
48 labeler: Labeler.PluginType;
49 };
50 private pushService?: PushService;
51
52 constructor(
53 public db: Database,
54 public cfg: ServerConfig,
55 public idResolver: IdResolver,
56 public background: BackgroundQueue,
57 pushService?: PushService,
58 ) {
59 this.pushService = pushService;
60 this.records = {
61 post: Post.makePlugin(this.db, this.background),
62 reply: Reply.makePlugin(this.db, this.background),
63 like: Like.makePlugin(this.db, this.background),
64 repost: Repost.makePlugin(this.db, this.background),
65 follow: Follow.makePlugin(this.db, this.background),
66 profile: Profile.makePlugin(this.db, this.background),
67 block: Block.makePlugin(this.db, this.background),
68 generator: Generator.makePlugin(this.db, this.background),
69 story: Story.makePlugin(this.db, this.background),
70 audio: Audio.makePlugin(this.db, this.background),
71 labeler: Labeler.makePlugin(this.db, this.background),
72 };
73
74 // Set push service on all processors
75 if (pushService) {
76 Object.values(this.records).forEach((processor) => {
77 processor.setPushService(pushService);
78 });
79 }
80 }
81
82 transact(txn: Database) {
83 return new IndexingService(
84 txn,
85 this.cfg,
86 this.idResolver,
87 this.background,
88 this.pushService,
89 );
90 }
91
92 async indexRecord(
93 uri: AtUri,
94 cid: Cid,
95 obj: RepoRecord,
96 action: WriteOpAction.Create | WriteOpAction.Update,
97 timestamp: string,
98 opts?: { disableNotifs?: boolean; disableLabels?: boolean },
99 ) {
100 const indexer = this.findIndexerForCollection(uri.collection);
101 if (!indexer) return;
102 if (action === WriteOpAction.Create) {
103 await indexer.insertRecord(uri, cid, obj, timestamp, opts);
104 } else {
105 await indexer.updateRecord(uri, cid, obj, timestamp);
106 }
107 }
108
109 async deleteRecord(uri: AtUri, cascading = false) {
110 const indexer = this.findIndexerForCollection(uri.collection);
111 if (!indexer) return;
112 await indexer.deleteRecord(uri, cascading);
113 }
114
115 async indexHandle(did: string, timestamp: string, force = false) {
116 const actor = await this.db.models.Actor.findOne({ did });
117 if (!force && !needsHandleReindex(actor, timestamp)) {
118 return;
119 }
120
121 try {
122 const atpData = await this.idResolver.did.resolveAtprotoData(did, true);
123
124 const handle = atpData.handle.toLowerCase();
125
126 const actorWithHandle = handle !== null
127 ? await this.db.models.Actor.findOne({ handle }).lean()
128 : null;
129
130 // handle contention
131 if (handle && actorWithHandle && did !== actorWithHandle.did) {
132 await this.db.models.Actor.updateOne(
133 { did: actorWithHandle.did },
134 { handle: null },
135 );
136 }
137
138 const actorInfo = { handle, indexedAt: timestamp };
139 await this.db.models.Actor.findOneAndUpdate(
140 { did },
141 { did, ...actorInfo },
142 { upsert: true, returnDocument: "after" },
143 );
144 } catch (err) {
145 // Log the error but don't throw - this prevents the firehose from crashing
146 console.warn(
147 "Failed to index handle, skipping",
148 { err, did, timestamp },
149 );
150
151 // Still update the actor record with null handle to prevent repeated attempts
152 const actorInfo = { handle: null, indexedAt: timestamp };
153 try {
154 await this.db.models.Actor.findOneAndUpdate(
155 { did },
156 { did, ...actorInfo },
157 { upsert: true, returnDocument: "after" },
158 );
159 } catch (dbErr) {
160 console.error(
161 "Failed to update actor record after handle resolution failure",
162 { err: dbErr, did },
163 );
164 }
165 }
166 }
167
168 async indexRepo(did: string, commit?: string) {
169 const now = new Date().toISOString();
170
171 const actorExists = await this.db.models.Actor.findOne({ did }).lean();
172 if (!actorExists) {
173 console.info(
174 `indexRepo: No actor record found for ${did}, indexing handle first`,
175 );
176 await this.indexHandle(did, now);
177 }
178
179 const { pds, signingKey } = await this.idResolver.did.resolveAtprotoData(
180 did,
181 true,
182 );
183 const client = new Client(pds);
184
185 const { data: car } = await retryXrpc(() =>
186 client.call(com.atproto.sync.getRepo, {
187 params: { did: did as DidString },
188 })
189 );
190 const { root, blocks } = await readCarWithRoot(car);
191 const verifiedRepo = await verifyRepo(blocks, root, did, signingKey);
192
193 const currRecords = await this.getCurrentRecords(did);
194 const repoRecords = formatCheckout(did, verifiedRepo);
195 const diff = findDiffFromCheckout(currRecords, repoRecords);
196
197 console.info(`Indexing ${diff.length} records for ${did}:`);
198
199 await Promise.all(
200 diff.map(async (op) => {
201 const { uri, cid } = op;
202 try {
203 if (op.op === "delete") {
204 await this.deleteRecord(uri);
205 } else {
206 const parsed = getAndParseRecord(blocks, cid);
207 await this.indexRecord(
208 uri,
209 cid,
210 parsed.record,
211 op.op === "create" ? WriteOpAction.Create : WriteOpAction.Update,
212 now,
213 );
214 }
215 } catch (err) {
216 if (err instanceof l.ValidationError) {
217 console.warn(
218 "skipping indexing of invalid record",
219 { did, commit, uri: uri.toString(), cid: cid.toString() },
220 );
221 } else {
222 console.error(
223 "skipping indexing due to error processing record",
224 { err, did, commit, uri: uri.toString(), cid: cid.toString() },
225 );
226 }
227 }
228 }),
229 );
230
231 // Update the last seen commit for this actor
232 await this.setCommitLastSeen(did, root, commit || "");
233 }
234
235 async getCurrentRecords(did: string) {
236 const res = await this.db.models.Record.find({ did }).select(["uri", "cid"])
237 .lean();
238 return res.reduce(
239 (acc, cur) => {
240 acc[cur.uri] = {
241 uri: new AtUri(cur.uri),
242 cid: parseCid(cur.cid),
243 };
244 return acc;
245 },
246 {} as Record<string, { uri: AtUri; cid: Cid }>,
247 );
248 }
249
250 async setCommitLastSeen(did: string, commit: Cid, rev: string) {
251 await this.db.models.ActorSync.findOneAndUpdate(
252 { did },
253 {
254 did,
255 commitCid: commit.toString(),
256 repoRev: rev ?? null,
257 },
258 { upsert: true, returnDocument: "after" },
259 );
260 }
261
262 findIndexerForCollection(collection: string) {
263 const indexers = Object.values(
264 this.records as Record<string, RecordProcessor<l.RecordSchema, unknown>>,
265 );
266 return indexers.find((indexer) => indexer.collection === collection);
267 }
268
269 async updateActorStatus(did: string, active: boolean, status: string = "") {
270 let upstreamStatus: string | null;
271 if (active) {
272 upstreamStatus = null;
273 } else if (["deactivated", "suspended", "takendown"].includes(status)) {
274 upstreamStatus = status;
275 } else {
276 throw new Error(`Unrecognized account status: ${status}`);
277 }
278 await this.db.models.Actor.updateOne(
279 { did },
280 { upstreamStatus },
281 );
282 }
283
284 async deleteActor(did: string) {
285 const actorIsHosted = await this.getActorIsHosted(did);
286 if (actorIsHosted === false) {
287 await this.db.models.Actor.deleteOne({ did });
288 await this.unindexActor(did);
289 // Note: Notification model not present in current schemas
290 }
291 }
292
293 private async getActorIsHosted(did: string) {
294 try {
295 const doc = await this.idResolver.did.resolve(did, true);
296 const pds = doc && getPds(doc);
297 if (!pds) return false;
298 const client = new Client(pds);
299 try {
300 await retryXrpc(() =>
301 client.call(com.atproto.sync.getLatestCommit, {
302 params: { did: did as DidString },
303 })
304 );
305 return true;
306 } catch (err) {
307 if (err instanceof XRPCError && err.error === "RepoNotFound") {
308 return false;
309 }
310 return null;
311 }
312 } catch (err) {
313 console.warn(
314 "Failed to check if actor is hosted, assuming not hosted",
315 { err, did },
316 );
317 return false;
318 }
319 }
320
321 async unindexActor(did: string) {
322 await this.db.models.Profile.deleteMany({ authorDid: did });
323 await this.db.models.Follow.deleteMany({ authorDid: did });
324 await this.db.models.Repost.deleteMany({ authorDid: did });
325 await this.db.models.Like.deleteMany({ authorDid: did });
326 await this.db.models.Generator.deleteMany({ authorDid: did });
327 await this.db.models.Story.deleteMany({ authorDid: did });
328 await this.db.models.Audio.deleteMany({ authorDid: did });
329 await this.db.models.Block.deleteMany({ authorDid: did });
330 await this.db.models.Post.deleteMany({ authorDid: did });
331 await this.db.models.Reply.deleteMany({ authorDid: did });
332 await this.db.models.Labeler.deleteMany({ authorDid: did });
333 await this.db.models.CrosspostReply.deleteMany({ authorDid: did });
334 }
335}
336
337type UriAndCid = {
338 uri: AtUri;
339 cid: Cid;
340};
341
342type IndexOp =
343 | ({
344 op: "create" | "update";
345 } & UriAndCid)
346 | ({ op: "delete" } & UriAndCid);
347
348const findDiffFromCheckout = (
349 curr: Record<string, UriAndCid>,
350 checkout: Record<string, UriAndCid>,
351): IndexOp[] => {
352 const ops: IndexOp[] = [];
353 for (const uri of Object.keys(checkout)) {
354 const record = checkout[uri];
355 if (!curr[uri]) {
356 ops.push({ op: "create", ...record });
357 } else {
358 if (curr[uri].cid.equals(record.cid)) {
359 // no-op
360 continue;
361 }
362 ops.push({ op: "update", ...record });
363 }
364 }
365 for (const uri of Object.keys(curr)) {
366 const record = curr[uri];
367 if (!checkout[uri]) {
368 ops.push({ op: "delete", ...record });
369 }
370 }
371 return ops;
372};
373
374const formatCheckout = (
375 did: string,
376 verifiedRepo: VerifiedRepo,
377): Record<string, UriAndCid> => {
378 const records: Record<string, UriAndCid> = {};
379 for (const create of verifiedRepo.creates) {
380 const uri = AtUri.make(did, create.collection, create.rkey);
381 records[uri.toString()] = {
382 uri,
383 cid: create.cid,
384 };
385 }
386 return records;
387};
388
389const needsHandleReindex = (
390 actor: ActorDocument | null,
391 timestamp: string,
392) => {
393 if (!actor) return true;
394 const timeDiff = new Date(timestamp).getTime() -
395 new Date(actor.indexedAt).getTime();
396 // revalidate daily
397 if (timeDiff > DAY) return true;
398 // revalidate more aggressively for invalidated handles
399 if (actor.handle === null && timeDiff > HOUR) return true;
400 return false;
401};