[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { CID } from "multiformats/cid";
2import { AtpAgent, ComAtprotoSyncGetLatestCommit } from "@atproto/api";
3import { DAY, HOUR } from "@atp/common";
4import { getPds, IdResolver } from "@atp/identity";
5import { RepoRecord, ValidationError } from "@atp/lexicon";
6import {
7 getAndParseRecord,
8 readCarWithRoot,
9 VerifiedRepo,
10 verifyRepo,
11 WriteOpAction,
12} from "@atp/repo";
13import { AtUri } from "@atp/syntax";
14import { retryXrpc } from "../../utils/retry.ts";
15import { BackgroundQueue } from "../background.ts";
16import { Database } from "../db/index.ts";
17import { ActorDocument } from "../db/models.ts";
18import * as Block from "./plugins/block.ts";
19import * as Generator from "./plugins/generator.ts";
20import * as Follow from "./plugins/follow.ts";
21import * as Like from "./plugins/like.ts";
22import * as Post from "./plugins/post.ts";
23import * as Reply from "./plugins/reply.ts";
24import * as Profile from "./plugins/profile.ts";
25import * as Repost from "./plugins/repost.ts";
26import * as Story from "./plugins/story.ts";
27import * as Audio from "./plugins/audio.ts";
28import * as Labeler from "./plugins/labeler.ts";
29import * as CrosspostReply from "./plugins/crosspost/reply.ts";
30import { RecordProcessor } from "./processor.ts";
31import { ServerConfig } from "../../config.ts";
32import { PushService } from "../../utils/push.ts";
33
34export class IndexingService {
35 records: {
36 post: Post.PluginType;
37 reply: Reply.PluginType;
38 like: Like.PluginType;
39 repost: Repost.PluginType;
40 follow: Follow.PluginType;
41 profile: Profile.PluginType;
42 block: Block.PluginType;
43 generator: Generator.PluginType;
44 story: Story.PluginType;
45 audio: Audio.PluginType;
46 labeler: Labeler.PluginType;
47 crosspostReply: CrosspostReply.PluginType;
48 };
49 private pushService?: PushService;
50
51 constructor(
52 public db: Database,
53 public cfg: ServerConfig,
54 public idResolver: IdResolver,
55 public background: BackgroundQueue,
56 pushService?: PushService,
57 ) {
58 this.pushService = pushService;
59 this.records = {
60 post: Post.makePlugin(this.db, this.background),
61 reply: Reply.makePlugin(this.db, this.background),
62 like: Like.makePlugin(this.db, this.background),
63 repost: Repost.makePlugin(this.db, this.background),
64 follow: Follow.makePlugin(this.db, this.background),
65 profile: Profile.makePlugin(this.db, this.background),
66 block: Block.makePlugin(this.db, this.background),
67 generator: Generator.makePlugin(this.db, this.background),
68 story: Story.makePlugin(this.db, this.background),
69 audio: Audio.makePlugin(this.db, this.background),
70 labeler: Labeler.makePlugin(this.db, this.background),
71 crosspostReply: CrosspostReply.makePlugin(this.db, this.background),
72 };
73
74 // Set push service on all processors
75 if (pushService) {
76 Object.values(this.records).forEach((processor) => {
77 processor.setPushService(pushService);
78 });
79 }
80 }
81
82 transact(txn: Database) {
83 return new IndexingService(
84 txn,
85 this.cfg,
86 this.idResolver,
87 this.background,
88 this.pushService,
89 );
90 }
91
92 async indexRecord(
93 uri: AtUri,
94 cid: CID,
95 obj: RepoRecord,
96 action: WriteOpAction.Create | WriteOpAction.Update,
97 timestamp: string,
98 opts?: { disableNotifs?: boolean; disableLabels?: boolean },
99 ) {
100 const indexer = this.findIndexerForCollection(uri.collection);
101 if (!indexer) return;
102 if (action === WriteOpAction.Create) {
103 await indexer.insertRecord(uri, cid, obj, timestamp, opts);
104 } else {
105 await indexer.updateRecord(uri, cid, obj, timestamp);
106 }
107 }
108
109 async deleteRecord(uri: AtUri, cascading = false) {
110 const indexer = this.findIndexerForCollection(uri.collection);
111 if (!indexer) return;
112 await indexer.deleteRecord(uri, cascading);
113 }
114
115 async indexHandle(did: string, timestamp: string, force = false) {
116 const actor = await this.db.models.Actor.findOne({ did });
117 if (!force && !needsHandleReindex(actor, timestamp)) {
118 return;
119 }
120
121 try {
122 const atpData = await this.idResolver.did.resolveAtprotoData(did, true);
123
124 const handle = atpData.handle.toLowerCase();
125
126 const actorWithHandle = handle !== null
127 ? await this.db.models.Actor.findOne({ handle }).lean()
128 : null;
129
130 // handle contention
131 if (handle && actorWithHandle && did !== actorWithHandle.did) {
132 await this.db.models.Actor.updateOne(
133 { did: actorWithHandle.did },
134 { handle: null },
135 );
136 }
137
138 const actorInfo = { handle, indexedAt: timestamp };
139 await this.db.models.Actor.findOneAndUpdate(
140 { did },
141 { did, ...actorInfo },
142 { upsert: true, returnDocument: "after" },
143 );
144 } catch (err) {
145 // Log the error but don't throw - this prevents the firehose from crashing
146 console.warn(
147 "Failed to index handle, skipping",
148 { err, did, timestamp },
149 );
150
151 // Still update the actor record with null handle to prevent repeated attempts
152 const actorInfo = { handle: null, indexedAt: timestamp };
153 try {
154 await this.db.models.Actor.findOneAndUpdate(
155 { did },
156 { did, ...actorInfo },
157 { upsert: true, returnDocument: "after" },
158 );
159 } catch (dbErr) {
160 console.error(
161 "Failed to update actor record after handle resolution failure",
162 { err: dbErr, did },
163 );
164 }
165 }
166 }
167
168 async indexRepo(did: string, commit?: string) {
169 const now = new Date().toISOString();
170
171 const actorExists = await this.db.models.Actor.findOne({ did }).lean();
172 if (!actorExists) {
173 console.info(
174 `indexRepo: No actor record found for ${did}, indexing handle first`,
175 );
176 await this.indexHandle(did, now);
177 }
178
179 const { pds, signingKey } = await this.idResolver.did.resolveAtprotoData(
180 did,
181 true,
182 );
183 const agent = new AtpAgent({ service: pds });
184
185 const { data: car } = await retryXrpc(() =>
186 agent.com.atproto.sync.getRepo({ did })
187 );
188 const { root, blocks } = await readCarWithRoot(car);
189 const verifiedRepo = await verifyRepo(blocks, root, did, signingKey);
190
191 const currRecords = await this.getCurrentRecords(did);
192 const repoRecords = formatCheckout(did, verifiedRepo);
193 const diff = findDiffFromCheckout(currRecords, repoRecords);
194
195 console.info(`Indexing ${diff.length} records for ${did}:`);
196
197 await Promise.all(
198 diff.map(async (op) => {
199 const { uri, cid } = op;
200 try {
201 if (op.op === "delete") {
202 await this.deleteRecord(uri);
203 } else {
204 const parsed = getAndParseRecord(blocks, cid);
205 await this.indexRecord(
206 uri,
207 cid,
208 parsed.record,
209 op.op === "create" ? WriteOpAction.Create : WriteOpAction.Update,
210 now,
211 );
212 }
213 } catch (err) {
214 if (err instanceof ValidationError) {
215 console.warn(
216 "skipping indexing of invalid record",
217 { did, commit, uri: uri.toString(), cid: cid.toString() },
218 );
219 } else {
220 console.error(
221 "skipping indexing due to error processing record",
222 { err, did, commit, uri: uri.toString(), cid: cid.toString() },
223 );
224 }
225 }
226 }),
227 );
228
229 // Update the last seen commit for this actor
230 await this.setCommitLastSeen(did, root, commit || "");
231 }
232
233 async getCurrentRecords(did: string) {
234 const res = await this.db.models.Record.find({ did }).select(["uri", "cid"])
235 .lean();
236 return res.reduce(
237 (acc, cur) => {
238 acc[cur.uri] = {
239 uri: new AtUri(cur.uri),
240 cid: CID.parse(cur.cid),
241 };
242 return acc;
243 },
244 {} as Record<string, { uri: AtUri; cid: CID }>,
245 );
246 }
247
248 async setCommitLastSeen(did: string, commit: CID, rev: string) {
249 await this.db.models.ActorSync.findOneAndUpdate(
250 { did },
251 {
252 did,
253 commitCid: commit.toString(),
254 repoRev: rev ?? null,
255 },
256 { upsert: true, returnDocument: "after" },
257 );
258 }
259
260 findIndexerForCollection(collection: string) {
261 const indexers = Object.values(
262 this.records as Record<string, RecordProcessor<unknown, unknown>>,
263 );
264 return indexers.find((indexer) => indexer.collection === collection);
265 }
266
267 async updateActorStatus(did: string, active: boolean, status: string = "") {
268 let upstreamStatus: string | null;
269 if (active) {
270 upstreamStatus = null;
271 } else if (["deactivated", "suspended", "takendown"].includes(status)) {
272 upstreamStatus = status;
273 } else {
274 throw new Error(`Unrecognized account status: ${status}`);
275 }
276 await this.db.models.Actor.updateOne(
277 { did },
278 { upstreamStatus },
279 );
280 }
281
282 async deleteActor(did: string) {
283 const actorIsHosted = await this.getActorIsHosted(did);
284 if (actorIsHosted === false) {
285 await this.db.models.Actor.deleteOne({ did });
286 await this.unindexActor(did);
287 // Note: Notification model not present in current schemas
288 }
289 }
290
291 private async getActorIsHosted(did: string) {
292 try {
293 const doc = await this.idResolver.did.resolve(did, true);
294 const pds = doc && getPds(doc);
295 if (!pds) return false;
296 const agent = new AtpAgent({ service: pds });
297 try {
298 await retryXrpc(() => agent.com.atproto.sync.getLatestCommit({ did }));
299 return true;
300 } catch (err) {
301 if (err instanceof ComAtprotoSyncGetLatestCommit.RepoNotFoundError) {
302 return false;
303 }
304 return null;
305 }
306 } catch (err) {
307 console.warn(
308 "Failed to check if actor is hosted, assuming not hosted",
309 { err, did },
310 );
311 return false;
312 }
313 }
314
315 async unindexActor(did: string) {
316 await this.db.models.Profile.deleteMany({ authorDid: did });
317 await this.db.models.Follow.deleteMany({ authorDid: did });
318 await this.db.models.Repost.deleteMany({ authorDid: did });
319 await this.db.models.Like.deleteMany({ authorDid: did });
320 await this.db.models.Generator.deleteMany({ authorDid: did });
321 await this.db.models.Story.deleteMany({ authorDid: did });
322 await this.db.models.Audio.deleteMany({ authorDid: did });
323 await this.db.models.Block.deleteMany({ authorDid: did });
324 await this.db.models.Post.deleteMany({ authorDid: did });
325 await this.db.models.Reply.deleteMany({ authorDid: did });
326 await this.db.models.Labeler.deleteMany({ authorDid: did });
327 await this.db.models.CrosspostReply.deleteMany({ authorDid: did });
328 }
329}
330
331type UriAndCid = {
332 uri: AtUri;
333 cid: CID;
334};
335
336type IndexOp =
337 | ({
338 op: "create" | "update";
339 } & UriAndCid)
340 | ({ op: "delete" } & UriAndCid);
341
342const findDiffFromCheckout = (
343 curr: Record<string, UriAndCid>,
344 checkout: Record<string, UriAndCid>,
345): IndexOp[] => {
346 const ops: IndexOp[] = [];
347 for (const uri of Object.keys(checkout)) {
348 const record = checkout[uri];
349 if (!curr[uri]) {
350 ops.push({ op: "create", ...record });
351 } else {
352 if (curr[uri].cid.equals(record.cid)) {
353 // no-op
354 continue;
355 }
356 ops.push({ op: "update", ...record });
357 }
358 }
359 for (const uri of Object.keys(curr)) {
360 const record = curr[uri];
361 if (!checkout[uri]) {
362 ops.push({ op: "delete", ...record });
363 }
364 }
365 return ops;
366};
367
368const formatCheckout = (
369 did: string,
370 verifiedRepo: VerifiedRepo,
371): Record<string, UriAndCid> => {
372 const records: Record<string, UriAndCid> = {};
373 for (const create of verifiedRepo.creates) {
374 const uri = AtUri.make(did, create.collection, create.rkey);
375 records[uri.toString()] = {
376 uri,
377 cid: create.cid,
378 };
379 }
380 return records;
381};
382
383const needsHandleReindex = (
384 actor: ActorDocument | null,
385 timestamp: string,
386) => {
387 if (!actor) return true;
388 const timeDiff = new Date(timestamp).getTime() -
389 new Date(actor.indexedAt).getTime();
390 // revalidate daily
391 if (timeDiff > DAY) return true;
392 // revalidate more aggressively for invalidated handles
393 if (actor.handle === null && timeDiff > HOUR) return true;
394 return false;
395};