[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { CID } from "multiformats/cid";
2import { jsonStringToLex, stringifyLex } from "@atp/lexicon";
3import { AtUri } from "@atp/syntax";
4import { lexicons } from "../../lex/lexicons.ts";
5import { BackgroundQueue } from "../background.ts";
6import { Database } from "../db/index.ts";
7import { chunkArray } from "@atp/common";
8import { PushService } from "../../utils/push.ts";
9
10// @NOTE re: insertions and deletions. Due to how record updates are handled,
11// (insertFn) should have the same effect as (insertFn -> deleteFn -> insertFn).
12type RecordProcessorParams<T, S> = {
13 lexId: string;
14 insertFn: (
15 db: Database,
16 uri: AtUri,
17 cid: CID,
18 obj: T,
19 timestamp: string,
20 ) => Promise<S | null>;
21 findDuplicate: (
22 db: Database,
23 uri: AtUri,
24 obj: T,
25 ) => Promise<AtUri | null> | AtUri | null;
26 deleteFn: (db: Database, uri: AtUri) => Promise<S | null>;
27 notifsForInsert: (obj: S) => Notif[];
28 notifsForDelete: (
29 prev: S,
30 replacedBy: S | null,
31 ) => { notifs: Notif[]; toDelete: string[] };
32 updateAggregates?: (db: Database, obj: S) => Promise<void>;
33};
34
35type Notif = {
36 did: string;
37 reason: string;
38 author: string;
39 recordUri: string;
40 recordCid: string;
41 sortAt: string;
42 reasonSubject?: string;
43};
44
45export class RecordProcessor<T, S> {
46 collection: string;
47 db: Database;
48 private pushService: PushService | null = null;
49
50 /**
51 * RecordProcessor for handling a single AT Protocol collection.
52 *
53 * This processor handles exactly one lexId:
54 * - Validates records against the specific lexId (e.g., "app.bsky.graph.follow")
55 * - Only processes records that match the exact collection NSID
56 * - Rejects records from other collections, even similar ones
57 *
58 * Example usage:
59 * ```typescript
60 * const processor = new RecordProcessor(db, background, {
61 * lexId: "app.bsky.graph.follow",
62 * // ... other params
63 * });
64 *
65 * // This will only process records with collection "app.bsky.graph.follow"
66 * await processor.insertRecord(uri, cid, obj, timestamp);
67 * ```
68 */
69 constructor(
70 private appDb: Database,
71 private background: BackgroundQueue,
72 private params: RecordProcessorParams<T, S>,
73 ) {
74 this.db = appDb;
75 this.collection = this.params.lexId;
76 }
77
78 setPushService(pushService: PushService) {
79 this.pushService = pushService;
80 }
81
82 matchesCollection(uri: AtUri): boolean {
83 return uri.collection === this.collection;
84 }
85
86 matchesSchema(obj: unknown): obj is T {
87 try {
88 lexicons.assertValidRecord(this.collection, obj);
89 return true;
90 } catch {
91 return false;
92 }
93 }
94
95 assertValidRecord(obj: unknown): asserts obj is T {
96 lexicons.assertValidRecord(this.collection, obj);
97 }
98
99 // Helper method to get the lexId this processor handles
100 getLexId(): string {
101 return this.collection;
102 }
103
104 async insertRecord(
105 uri: AtUri,
106 cid: CID,
107 obj: unknown,
108 timestamp: string,
109 opts?: { disableNotifs?: boolean },
110 ) {
111 this.assertValidRecord(obj);
112
113 // Extract createdAt from the record object if available
114 const recordObj = obj as Record<string, unknown>;
115 const createdAt = typeof recordObj.createdAt === "string"
116 ? recordObj.createdAt
117 : timestamp;
118
119 // Check for duplicates first before attempting insert
120 const found = await this.params.findDuplicate(this.db, uri, obj);
121 if (found && found.toString() !== uri.toString()) {
122 // Duplicate exists with different URI, store in duplicates table with no events
123 await this.db.models.DuplicateRecord.findOneAndUpdate(
124 { uri: uri.toString() },
125 {
126 uri: uri.toString(),
127 cid: cid.toString(),
128 duplicateOf: found.toString(),
129 indexedAt: timestamp,
130 },
131 { upsert: true, returnDocument: "after" },
132 );
133 return;
134 }
135
136 // Insert or update record
137 await this.db.models.Record.findOneAndUpdate(
138 { uri: uri.toString() },
139 {
140 uri: uri.toString(),
141 cid: cid.toString(),
142 did: uri.host,
143 collectionName: uri.collection,
144 rkey: uri.rkey,
145 json: stringifyLex(obj),
146 createdAt,
147 indexedAt: timestamp,
148 },
149 { upsert: true, returnDocument: "after" },
150 );
151
152 const inserted = await this.params.insertFn(
153 this.db,
154 uri,
155 cid,
156 obj,
157 timestamp,
158 );
159 if (inserted) {
160 this.aggregateOnCommit(inserted);
161 if (!opts?.disableNotifs) {
162 this.handleNotifs({ inserted });
163 }
164 }
165 }
166
167 // Currently using a very simple strategy for updates: purge the existing index
168 // for the uri then replace it. The main upside is that this allows the indexer
169 // for each collection to avoid bespoke logic for in-place updates, which isn't
170 // straightforward in the general case. We still get nice control over notifications.
171 async updateRecord(
172 uri: AtUri,
173 cid: CID,
174 obj: unknown,
175 timestamp: string,
176 opts?: { disableNotifs?: boolean },
177 ) {
178 this.assertValidRecord(obj);
179
180 // Extract createdAt from the record object if available
181 const recordObj = obj as Record<string, unknown>;
182 const createdAt = typeof recordObj.createdAt === "string"
183 ? recordObj.createdAt
184 : undefined;
185
186 // Update record
187 const updateData: {
188 cid: string;
189 json: string;
190 indexedAt: string;
191 createdAt?: string;
192 } = {
193 cid: cid.toString(),
194 json: stringifyLex(obj),
195 indexedAt: timestamp,
196 };
197 if (createdAt) {
198 updateData.createdAt = createdAt;
199 }
200
201 await this.db.models.Record.findOneAndUpdate(
202 { uri: uri.toString() },
203 updateData,
204 { returnDocument: "after" },
205 );
206
207 // If the updated record was a dupe, update dupe info for it
208 const dupe = await this.params.findDuplicate(this.db, uri, obj);
209 if (dupe) {
210 await this.db.models.DuplicateRecord.findOneAndUpdate(
211 { uri: uri.toString() },
212 {
213 cid: cid.toString(),
214 duplicateOf: dupe.toString(),
215 indexedAt: timestamp,
216 },
217 { upsert: true, returnDocument: "after" },
218 );
219 } else {
220 await this.db.models.DuplicateRecord.deleteOne({ uri: uri.toString() });
221 }
222
223 const deleted = await this.params.deleteFn(this.db, uri);
224 if (!deleted) {
225 // If a record was updated but hadn't been indexed yet, treat it like a plain insert.
226 return this.insertRecord(uri, cid, obj, timestamp);
227 }
228 this.aggregateOnCommit(deleted);
229 const inserted = await this.params.insertFn(
230 this.db,
231 uri,
232 cid,
233 obj,
234 timestamp,
235 );
236 if (!inserted) {
237 throw new Error(
238 "Record update failed: removed from index but could not be replaced",
239 );
240 }
241 this.aggregateOnCommit(inserted);
242 if (!opts?.disableNotifs) {
243 this.handleNotifs({ inserted, deleted });
244 }
245 }
246
247 async deleteRecord(uri: AtUri, cascading = false) {
248 const uriStr = uri.toString();
249
250 await this.db.models.Record.deleteOne({ uri: uriStr });
251 await this.db.models.DuplicateRecord.deleteOne({ uri: uriStr });
252
253 const deleted = await this.params.deleteFn(this.db, uri);
254 if (!deleted) return;
255
256 this.aggregateOnCommit(deleted);
257 if (cascading) {
258 await this.db.models.DuplicateRecord.deleteMany({
259 duplicateOf: uri.toString(),
260 });
261 return this.handleNotifs({ deleted });
262 } else {
263 const found = await this.db.models.DuplicateRecord.findOne({
264 duplicateOf: uri.toString(),
265 })
266 .sort({ indexedAt: 1 })
267 .lean();
268
269 if (!found) {
270 return this.handleNotifs({ deleted });
271 }
272
273 // Get the actual record from the Record model
274 const recordDoc = await this.db.models.Record.findOne({ uri: found.uri })
275 .lean();
276 if (!recordDoc || !recordDoc.json) {
277 return this.handleNotifs({ deleted });
278 }
279
280 const foundUri = new AtUri(found.uri);
281 if (!this.matchesCollection(foundUri)) {
282 return this.handleNotifs({ deleted });
283 }
284
285 const record = jsonStringToLex(recordDoc.json);
286 if (!this.matchesSchema(record)) {
287 return this.handleNotifs({ deleted });
288 }
289
290 const inserted = await this.params.insertFn(
291 this.db,
292 foundUri,
293 CID.parse(found.cid),
294 record,
295 found.indexedAt,
296 );
297 if (inserted) {
298 this.aggregateOnCommit(inserted);
299 }
300 this.handleNotifs({ deleted, inserted: inserted ?? undefined });
301 }
302 }
303
304 async handleNotifs(op: { deleted?: S; inserted?: S }) {
305 let notifs: Notif[] = [];
306 const runOnCommit: ((db: Database) => Promise<void>)[] = [];
307 if (op.deleted) {
308 const forDelete = this.params.notifsForDelete(
309 op.deleted,
310 op.inserted ?? null,
311 );
312 if (forDelete.toDelete.length > 0) {
313 // Notifs can be deleted in background: they are expensive to delete and
314 // listNotifications already excludes notifs with missing records.
315 runOnCommit.push(async (db) => {
316 await db.models.Notification.deleteMany({
317 recordUri: { $in: forDelete.toDelete },
318 });
319 });
320 }
321 notifs = forDelete.notifs;
322 } else if (op.inserted) {
323 notifs = this.params.notifsForInsert(op.inserted);
324 }
325 for (const chunk of chunkArray(notifs, 500)) {
326 runOnCommit.push(async (db) => {
327 const filtered = await this.filterNotifsForThreadMutes(chunk);
328 if (filtered.length > 0) {
329 await db.models.Notification.insertMany(
330 filtered.map((n) => ({
331 did: n.did,
332 recordUri: n.recordUri,
333 recordCid: n.recordCid,
334 author: n.author,
335 reason: n.reason,
336 reasonSubject: n.reasonSubject ?? null,
337 sortAt: n.sortAt,
338 })),
339 );
340 }
341 });
342 }
343 // Need to ensure notif deletion always happens before creation, otherwise delete may clobber in a race.
344 for (const fn of runOnCommit) {
345 await fn(this.appDb); // these could be backgrounded
346 }
347
348 // Queue push notifications in the background
349 if (this.pushService?.enabled && notifs.length > 0) {
350 for (const notif of notifs) {
351 this.background.add(async () => {
352 await this.pushService?.sendPush(notif.did, {
353 recipientDid: notif.did,
354 reason: notif.reason,
355 author: notif.author,
356 recordUri: notif.recordUri,
357 reasonSubject: notif.reasonSubject,
358 });
359 });
360 }
361 }
362 }
363
364 // Filter notifications for thread mutes (placeholder for future implementation)
365 filterNotifsForThreadMutes(notifs: Notif[]): Promise<Notif[]> {
366 // TODO: Implement thread mute filtering
367 // For now, return all notifications unfiltered
368 return Promise.resolve(notifs);
369 }
370
371 aggregateOnCommit(indexed: S) {
372 const { updateAggregates } = this.params;
373 if (!updateAggregates) return;
374 // Note: MongoDB doesn't have transactions in the same way, so we'll run aggregates immediately
375 // In a production system, you might want to use MongoDB transactions or a different approach
376 this.background.add((db) => updateAggregates(db, indexed));
377 }
378}
379
380export default RecordProcessor;