[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { CID } from "multiformats/cid";
2import { AtpAgent, ComAtprotoSyncGetLatestCommit } from "@atproto/api";
3import { DAY, HOUR } from "@atp/common";
4import { getPds, IdResolver } from "@atp/identity";
5import { RepoRecord, ValidationError } from "@atp/lexicon";
6import {
7 getAndParseRecord,
8 readCarWithRoot,
9 VerifiedRepo,
10 verifyRepo,
11 WriteOpAction,
12} from "@atp/repo";
13import { AtUri } from "@atp/syntax";
14import { retryXrpc } from "../../utils/retry.ts";
15import { BackgroundQueue } from "../background.ts";
16import { Database } from "../db/index.ts";
17import { ActorDocument } from "../db/models.ts";
18import * as Block from "./plugins/block.ts";
19import * as Generator from "./plugins/generator.ts";
20import * as Follow from "./plugins/follow.ts";
21import * as Like from "./plugins/like.ts";
22import * as Post from "./plugins/post.ts";
23import * as Reply from "./plugins/reply.ts";
24import * as Profile from "./plugins/profile.ts";
25import * as Repost from "./plugins/repost.ts";
26import * as Story from "./plugins/story.ts";
27import * as Audio from "./plugins/audio.ts";
28import { RecordProcessor } from "./processor.ts";
29import { getLogger, Logger } from "@logtape/logtape";
30import { ServerConfig } from "../../config.ts";
31
32export class IndexingService {
33 records: {
34 post: Post.PluginType;
35 reply: Reply.PluginType;
36 like: Like.PluginType;
37 repost: Repost.PluginType;
38 follow: Follow.PluginType;
39 profile: Profile.PluginType;
40 block: Block.PluginType;
41 generator: Generator.PluginType;
42 story: Story.PluginType;
43 audio: Audio.PluginType;
44 };
45 logger: Logger;
46
47 constructor(
48 public db: Database,
49 public cfg: ServerConfig,
50 public idResolver: IdResolver,
51 public background: BackgroundQueue,
52 ) {
53 this.logger = getLogger(["appview", "indexer"]);
54 this.records = {
55 post: Post.makePlugin(this.db, this.background),
56 reply: Reply.makePlugin(this.db, this.background),
57 like: Like.makePlugin(this.db, this.background),
58 repost: Repost.makePlugin(this.db, this.background),
59 follow: Follow.makePlugin(this.db, this.background),
60 profile: Profile.makePlugin(this.db, this.background),
61 block: Block.makePlugin(this.db, this.background),
62 generator: Generator.makePlugin(this.db, this.background),
63 story: Story.makePlugin(this.db, this.background),
64 audio: Audio.makePlugin(this.db, this.background),
65 };
66 }
67
68 transact(txn: Database) {
69 return new IndexingService(
70 txn,
71 this.cfg,
72 this.idResolver,
73 this.background,
74 );
75 }
76
77 async indexRecord(
78 uri: AtUri,
79 cid: CID,
80 obj: RepoRecord,
81 action: WriteOpAction.Create | WriteOpAction.Update,
82 timestamp: string,
83 opts?: { disableNotifs?: boolean; disableLabels?: boolean },
84 ) {
85 const indexer = this.findIndexerForCollection(uri.collection);
86 if (!indexer) return;
87 if (action === WriteOpAction.Create) {
88 await indexer.insertRecord(uri, cid, obj, timestamp, opts);
89 } else {
90 await indexer.updateRecord(uri, cid, obj, timestamp);
91 }
92 }
93
94 async deleteRecord(uri: AtUri, cascading = false) {
95 const indexer = this.findIndexerForCollection(uri.collection);
96 if (!indexer) return;
97 await indexer.deleteRecord(uri, cascading);
98 }
99
100 async indexHandle(did: string, timestamp: string, force = false) {
101 const actor = await this.db.models.Actor.findOne({ did });
102 if (!force && !needsHandleReindex(actor, timestamp)) {
103 return;
104 }
105
106 try {
107 const atpData = await this.idResolver.did.resolveAtprotoData(did, true);
108
109 const handle = atpData.handle.toLowerCase();
110
111 const actorWithHandle = handle !== null
112 ? await this.db.models.Actor.findOne({ handle }).lean()
113 : null;
114
115 // handle contention
116 if (handle && actorWithHandle && did !== actorWithHandle.did) {
117 await this.db.models.Actor.updateOne(
118 { did: actorWithHandle.did },
119 { handle: null },
120 );
121 }
122
123 const uri = `at://${did}/so.sprk.actor.profile`;
124 const actorInfo = { uri, handle, indexedAt: timestamp };
125 await this.db.models.Actor.findOneAndUpdate(
126 { did },
127 { did, ...actorInfo },
128 { upsert: true, new: true },
129 );
130 } catch (err) {
131 // Log the error but don't throw - this prevents the firehose from crashing
132 this.logger.warn(
133 "Failed to index handle, skipping",
134 { err, did, timestamp },
135 );
136
137 // Still update the actor record with null handle to prevent repeated attempts
138 const uri = `at://${did}/so.sprk.actor.profile`;
139 const actorInfo = { uri, handle: null, indexedAt: timestamp };
140 try {
141 await this.db.models.Actor.findOneAndUpdate(
142 { did },
143 { did, ...actorInfo },
144 { upsert: true, new: true },
145 );
146 } catch (dbErr) {
147 this.logger.error(
148 "Failed to update actor record after handle resolution failure",
149 { err: dbErr, did },
150 );
151 }
152 }
153 }
154
155 async indexRepo(did: string, commit?: string) {
156 const now = new Date().toISOString();
157
158 const actorExists = await this.db.models.Actor.findOne({ did }).lean();
159 if (!actorExists) {
160 this.logger.info(
161 `indexRepo: No actor record found for ${did}, indexing handle first`,
162 );
163 await this.indexHandle(did, now);
164 }
165
166 const { pds, signingKey } = await this.idResolver.did.resolveAtprotoData(
167 did,
168 true,
169 );
170 const agent = new AtpAgent({ service: pds });
171
172 const { data: car } = await retryXrpc(() =>
173 agent.com.atproto.sync.getRepo({ did })
174 );
175 const { root, blocks } = await readCarWithRoot(car);
176 const verifiedRepo = await verifyRepo(blocks, root, did, signingKey);
177
178 const currRecords = await this.getCurrentRecords(did);
179 const repoRecords = formatCheckout(did, verifiedRepo);
180 const diff = findDiffFromCheckout(currRecords, repoRecords);
181
182 this.logger.info(`Indexing ${diff.length} records for ${did}:`);
183
184 await Promise.all(
185 diff.map(async (op) => {
186 const { uri, cid } = op;
187 try {
188 if (op.op === "delete") {
189 await this.deleteRecord(uri);
190 } else {
191 const parsed = getAndParseRecord(blocks, cid);
192 await this.indexRecord(
193 uri,
194 cid,
195 parsed.record,
196 op.op === "create" ? WriteOpAction.Create : WriteOpAction.Update,
197 now,
198 );
199 }
200 } catch (err) {
201 if (err instanceof ValidationError) {
202 this.logger.warn(
203 "skipping indexing of invalid record",
204 { did, commit, uri: uri.toString(), cid: cid.toString() },
205 );
206 } else {
207 this.logger.error(
208 "skipping indexing due to error processing record",
209 { err, did, commit, uri: uri.toString(), cid: cid.toString() },
210 );
211 }
212 }
213 }),
214 );
215
216 // Update the last seen commit for this actor
217 await this.setCommitLastSeen(did, root, commit || "");
218 }
219
220 async getCurrentRecords(did: string) {
221 const res = await this.db.models.Record.find({ did }).select(["uri", "cid"])
222 .lean();
223 return res.reduce(
224 (acc, cur) => {
225 acc[cur.uri] = {
226 uri: new AtUri(cur.uri),
227 cid: CID.parse(cur.cid),
228 };
229 return acc;
230 },
231 {} as Record<string, { uri: AtUri; cid: CID }>,
232 );
233 }
234
235 async setCommitLastSeen(did: string, commit: CID, rev: string) {
236 await this.db.models.ActorSync.findOneAndUpdate(
237 { did },
238 {
239 did,
240 commitCid: commit.toString(),
241 repoRev: rev ?? null,
242 },
243 { upsert: true, new: true },
244 );
245 }
246
247 findIndexerForCollection(collection: string) {
248 const indexers = Object.values(
249 this.records as Record<string, RecordProcessor<unknown, unknown>>,
250 );
251 return indexers.find((indexer) => indexer.collection === collection);
252 }
253
254 async updateActorStatus(did: string, active: boolean, status: string = "") {
255 let upstreamStatus: string | null;
256 if (active) {
257 upstreamStatus = null;
258 } else if (["deactivated", "suspended", "takendown"].includes(status)) {
259 upstreamStatus = status;
260 } else {
261 throw new Error(`Unrecognized account status: ${status}`);
262 }
263 await this.db.models.Actor.updateOne(
264 { did },
265 { upstreamStatus },
266 );
267 }
268
269 async deleteActor(did: string) {
270 const actorIsHosted = await this.getActorIsHosted(did);
271 if (actorIsHosted === false) {
272 await this.db.models.Actor.deleteOne({ did });
273 await this.unindexActor(did);
274 // Note: Notification model not present in current schemas
275 }
276 }
277
278 private async getActorIsHosted(did: string) {
279 try {
280 const doc = await this.idResolver.did.resolve(did, true);
281 const pds = doc && getPds(doc);
282 if (!pds) return false;
283 const agent = new AtpAgent({ service: pds });
284 try {
285 await retryXrpc(() => agent.com.atproto.sync.getLatestCommit({ did }));
286 return true;
287 } catch (err) {
288 if (err instanceof ComAtprotoSyncGetLatestCommit.RepoNotFoundError) {
289 return false;
290 }
291 return null;
292 }
293 } catch (err) {
294 this.logger.warn(
295 "Failed to check if actor is hosted, assuming not hosted",
296 { err, did },
297 );
298 return false;
299 }
300 }
301
302 async unindexActor(did: string) {
303 await this.db.models.Profile.deleteMany({ authorDid: did });
304 await this.db.models.Follow.deleteMany({ authorDid: did });
305 await this.db.models.Repost.deleteMany({ authorDid: did });
306 await this.db.models.Like.deleteMany({ authorDid: did });
307 await this.db.models.Generator.deleteMany({ authorDid: did });
308 await this.db.models.Story.deleteMany({ authorDid: did });
309 await this.db.models.Audio.deleteMany({ authorDid: did });
310 await this.db.models.Block.deleteMany({ authorDid: did });
311 await this.db.models.Post.deleteMany({ authorDid: did });
312 await this.db.models.Reply.deleteMany({ authorDid: did });
313 }
314}
315
316type UriAndCid = {
317 uri: AtUri;
318 cid: CID;
319};
320
321type IndexOp =
322 | ({
323 op: "create" | "update";
324 } & UriAndCid)
325 | ({ op: "delete" } & UriAndCid);
326
327const findDiffFromCheckout = (
328 curr: Record<string, UriAndCid>,
329 checkout: Record<string, UriAndCid>,
330): IndexOp[] => {
331 const ops: IndexOp[] = [];
332 for (const uri of Object.keys(checkout)) {
333 const record = checkout[uri];
334 if (!curr[uri]) {
335 ops.push({ op: "create", ...record });
336 } else {
337 if (curr[uri].cid.equals(record.cid)) {
338 // no-op
339 continue;
340 }
341 ops.push({ op: "update", ...record });
342 }
343 }
344 for (const uri of Object.keys(curr)) {
345 const record = curr[uri];
346 if (!checkout[uri]) {
347 ops.push({ op: "delete", ...record });
348 }
349 }
350 return ops;
351};
352
353const formatCheckout = (
354 did: string,
355 verifiedRepo: VerifiedRepo,
356): Record<string, UriAndCid> => {
357 const records: Record<string, UriAndCid> = {};
358 for (const create of verifiedRepo.creates) {
359 const uri = AtUri.make(did, create.collection, create.rkey);
360 records[uri.toString()] = {
361 uri,
362 cid: create.cid,
363 };
364 }
365 return records;
366};
367
368const needsHandleReindex = (
369 actor: ActorDocument | null,
370 timestamp: string,
371) => {
372 if (!actor) return true;
373 const timeDiff = new Date(timestamp).getTime() -
374 new Date(actor.indexedAt).getTime();
375 // revalidate daily
376 if (timeDiff > DAY) return true;
377 // revalidate more aggressively for invalidated handles
378 if (actor.handle === null && timeDiff > HOUR) return true;
379 return false;
380};