[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { Cid, l, lexParse, lexStringify } from "@atp/lex";
2import { parseCid } from "@atp/lex/data";
3import { AtUri } from "@atp/syntax";
4import { BackgroundQueue } from "../background.ts";
5import { Database } from "../db/index.ts";
6import { chunkArray } from "@atp/common";
7import { PushService } from "../../utils/push.ts";
8
9// @NOTE re: insertions and deletions. Due to how record updates are handled,
10// (insertFn) should have the same effect as (insertFn -> deleteFn -> insertFn).
11type RecordProcessorOptions<TSchema extends l.RecordSchema, TRow> = {
12 schema: TSchema;
13 insertFn: (
14 db: Database,
15 uri: AtUri,
16 cid: Cid,
17 obj: l.Infer<TSchema>,
18 timestamp: string,
19 ) => Promise<TRow | null>;
20 findDuplicate: (
21 db: Database,
22 uri: AtUri,
23 obj: l.Infer<TSchema>,
24 ) => Promise<AtUri | null> | AtUri | null;
25 deleteFn: (db: Database, uri: AtUri) => Promise<TRow | null>;
26 notifsForInsert: (obj: TRow) => Notif[];
27 notifsForDelete: (
28 prev: TRow,
29 replacedBy: TRow | null,
30 ) => { notifs: Notif[]; toDelete: string[] };
31 updateAggregates?: (db: Database, obj: TRow) => Promise<void>;
32 deleteRecordIfInsertReturnsNull?: boolean;
33};
34
35type Notif = {
36 did: string;
37 reason: string;
38 author: string;
39 recordUri: string;
40 recordCid: string;
41 sortAt: string;
42 reasonSubject?: string;
43};
44
45export class RecordProcessor<TSchema extends l.RecordSchema, TRow> {
46 db: Database;
47 private pushService: PushService | null = null;
48
49 /**
50 * RecordProcessor for handling a single AT Protocol collection.
51 *
52 * This processor handles exactly one generated record schema:
53 * - Validates records against the specific schema (e.g., so.sprk.graph.follow)
54 * - Only processes records that match the exact collection NSID
55 * - Rejects records from other collections, even similar ones
56 *
57 * Example usage:
58 * ```typescript
59 * const processor = new RecordProcessor(db, background, {
60 * schema: so.sprk.graph.follow.main,
61 * // ... other params
62 * });
63 *
64 * // This will only process records with collection "so.sprk.graph.follow"
65 * await processor.insertRecord(uri, cid, obj, timestamp);
66 * ```
67 */
68 constructor(
69 private appDb: Database,
70 private background: BackgroundQueue,
71 private options: RecordProcessorOptions<TSchema, TRow>,
72 ) {
73 this.db = appDb;
74 }
75
76 get collection(): TSchema["$type"] {
77 return this.options.schema.$type;
78 }
79
80 setPushService(pushService: PushService) {
81 this.pushService = pushService;
82 }
83
84 matchesCollection(uri: AtUri): boolean {
85 return uri.collection === this.collection;
86 }
87
88 matchesSchema<I>(obj: I): obj is I & l.Infer<TSchema> {
89 return this.options.schema.matches(obj);
90 }
91
92 assertValidRecord(obj: unknown): asserts obj is l.Infer<TSchema> {
93 this.options.schema.assert(obj);
94 }
95
96 // Helper method to get the collection this processor handles
97 getLexId(): TSchema["$type"] {
98 return this.collection;
99 }
100
101 async insertRecord(
102 uri: AtUri,
103 cid: Cid,
104 obj: unknown,
105 timestamp: string,
106 opts?: { disableNotifs?: boolean },
107 ) {
108 this.assertValidRecord(obj);
109
110 // Extract createdAt from the record object if available
111 const recordObj = obj as Record<string, unknown>;
112 const createdAt = typeof recordObj.createdAt === "string"
113 ? recordObj.createdAt
114 : timestamp;
115
116 // Check for duplicates first before attempting insert
117 const found = await this.options.findDuplicate(this.db, uri, obj);
118 if (found && found.toString() !== uri.toString()) {
119 // Duplicate exists with different URI, store in duplicates table with no events
120 await this.db.models.DuplicateRecord.findOneAndUpdate(
121 { uri: uri.toString() },
122 {
123 uri: uri.toString(),
124 cid: cid.toString(),
125 duplicateOf: found.toString(),
126 indexedAt: timestamp,
127 },
128 { upsert: true, returnDocument: "after" },
129 );
130 return;
131 }
132
133 // Insert or update record
134 await this.db.models.Record.findOneAndUpdate(
135 { uri: uri.toString() },
136 {
137 uri: uri.toString(),
138 cid: cid.toString(),
139 did: uri.host,
140 collectionName: uri.collection,
141 rkey: uri.rkey,
142 json: lexStringify(obj),
143 createdAt,
144 indexedAt: timestamp,
145 },
146 { upsert: true, returnDocument: "after" },
147 );
148
149 const inserted = await this.options.insertFn(
150 this.db,
151 uri,
152 cid,
153 obj,
154 timestamp,
155 );
156 if (!inserted) {
157 if (this.options.deleteRecordIfInsertReturnsNull) {
158 await this.db.models.Record.deleteOne({ uri: uri.toString() });
159 await this.db.models.DuplicateRecord.deleteOne({
160 uri: uri.toString(),
161 });
162 }
163 return;
164 }
165
166 this.aggregateOnCommit(inserted);
167 if (!opts?.disableNotifs) {
168 await this.handleNotifs({ inserted });
169 }
170 }
171
172 // Currently using a very simple strategy for updates: purge the existing index
173 // for the uri then replace it. The main upside is that this allows the indexer
174 // for each collection to avoid bespoke logic for in-place updates, which isn't
175 // straightforward in the general case. We still get nice control over notifications.
176 async updateRecord(
177 uri: AtUri,
178 cid: Cid,
179 obj: unknown,
180 timestamp: string,
181 opts?: { disableNotifs?: boolean },
182 ) {
183 this.assertValidRecord(obj);
184
185 // Extract createdAt from the record object if available
186 const recordObj = obj as Record<string, unknown>;
187 const createdAt = typeof recordObj.createdAt === "string"
188 ? recordObj.createdAt
189 : undefined;
190
191 // Update record
192 const updateData: {
193 cid: string;
194 json: string;
195 indexedAt: string;
196 createdAt?: string;
197 } = {
198 cid: cid.toString(),
199 json: lexStringify(obj),
200 indexedAt: timestamp,
201 };
202 if (createdAt) {
203 updateData.createdAt = createdAt;
204 }
205
206 await this.db.models.Record.findOneAndUpdate(
207 { uri: uri.toString() },
208 updateData,
209 { returnDocument: "after" },
210 );
211
212 // If the updated record was a dupe, update dupe info for it
213 const dupe = await this.options.findDuplicate(this.db, uri, obj);
214 if (dupe) {
215 await this.db.models.DuplicateRecord.findOneAndUpdate(
216 { uri: uri.toString() },
217 {
218 cid: cid.toString(),
219 duplicateOf: dupe.toString(),
220 indexedAt: timestamp,
221 },
222 { upsert: true, returnDocument: "after" },
223 );
224 } else {
225 await this.db.models.DuplicateRecord.deleteOne({ uri: uri.toString() });
226 }
227
228 const deleted = await this.options.deleteFn(this.db, uri);
229 if (!deleted) {
230 // If a record was updated but hadn't been indexed yet, treat it like a plain insert.
231 return this.insertRecord(uri, cid, obj, timestamp);
232 }
233 this.aggregateOnCommit(deleted);
234 const inserted = await this.options.insertFn(
235 this.db,
236 uri,
237 cid,
238 obj,
239 timestamp,
240 );
241 if (!inserted) {
242 if (this.options.deleteRecordIfInsertReturnsNull) {
243 await this.db.models.Record.deleteOne({ uri: uri.toString() });
244 await this.db.models.DuplicateRecord.deleteOne({
245 uri: uri.toString(),
246 });
247 if (!opts?.disableNotifs) {
248 await this.handleNotifs({ deleted });
249 }
250 return;
251 }
252 throw new Error(
253 "Record update failed: removed from index but could not be replaced",
254 );
255 }
256 this.aggregateOnCommit(inserted);
257 if (!opts?.disableNotifs) {
258 await this.handleNotifs({ inserted, deleted });
259 }
260 }
261
262 async deleteRecord(uri: AtUri, cascading = false) {
263 const uriStr = uri.toString();
264
265 await this.db.models.Record.deleteOne({ uri: uriStr });
266 await this.db.models.DuplicateRecord.deleteOne({ uri: uriStr });
267
268 const deleted = await this.options.deleteFn(this.db, uri);
269 if (!deleted) return;
270
271 this.aggregateOnCommit(deleted);
272 if (cascading) {
273 await this.db.models.DuplicateRecord.deleteMany({
274 duplicateOf: uri.toString(),
275 });
276 return this.handleNotifs({ deleted });
277 } else {
278 const found = await this.db.models.DuplicateRecord.findOne({
279 duplicateOf: uri.toString(),
280 })
281 .sort({ indexedAt: 1 })
282 .lean();
283
284 if (!found) {
285 return this.handleNotifs({ deleted });
286 }
287
288 // Get the actual record from the Record model
289 const recordDoc = await this.db.models.Record.findOne({ uri: found.uri })
290 .lean();
291 if (!recordDoc || !recordDoc.json) {
292 return this.handleNotifs({ deleted });
293 }
294
295 const foundUri = new AtUri(found.uri);
296 if (!this.matchesCollection(foundUri)) {
297 return this.handleNotifs({ deleted });
298 }
299
300 let record: unknown;
301 try {
302 record = lexParse(recordDoc.json, { strict: false });
303 } catch {
304 return this.handleNotifs({ deleted });
305 }
306 if (!this.matchesSchema(record)) {
307 return this.handleNotifs({ deleted });
308 }
309
310 const inserted = await this.options.insertFn(
311 this.db,
312 foundUri,
313 parseCid(found.cid),
314 record,
315 found.indexedAt,
316 );
317 if (inserted) {
318 this.aggregateOnCommit(inserted);
319 }
320 await this.handleNotifs({ deleted, inserted: inserted ?? undefined });
321 }
322 }
323
324 async handleNotifs(op: { deleted?: TRow; inserted?: TRow }) {
325 let notifs: Notif[] = [];
326 const runOnCommit: ((db: Database) => Promise<void>)[] = [];
327 if (op.deleted) {
328 const forDelete = this.options.notifsForDelete(
329 op.deleted,
330 op.inserted ?? null,
331 );
332 if (forDelete.toDelete.length > 0) {
333 // Notifs can be deleted in background: they are expensive to delete and
334 // listNotifications already excludes notifs with missing records.
335 runOnCommit.push(async (db) => {
336 await db.models.Notification.deleteMany({
337 recordUri: { $in: forDelete.toDelete },
338 });
339 });
340 }
341 notifs = forDelete.notifs;
342 } else if (op.inserted) {
343 notifs = this.options.notifsForInsert(op.inserted);
344 }
345 for (const chunk of chunkArray(notifs, 500)) {
346 runOnCommit.push(async (db) => {
347 const filtered = await this.filterNotifsForThreadMutes(chunk);
348 if (filtered.length > 0) {
349 await db.models.Notification.insertMany(
350 filtered.map((n) => ({
351 did: n.did,
352 recordUri: n.recordUri,
353 recordCid: n.recordCid,
354 author: n.author,
355 reason: n.reason,
356 reasonSubject: n.reasonSubject ?? null,
357 sortAt: n.sortAt,
358 })),
359 );
360 }
361 });
362 }
363 // Need to ensure notif deletion always happens before creation, otherwise delete may clobber in a race.
364 for (const fn of runOnCommit) {
365 await fn(this.appDb); // these could be backgrounded
366 }
367
368 // Queue push notifications in the background
369 if (this.pushService?.enabled && notifs.length > 0) {
370 for (const notif of notifs) {
371 this.background.add(async () => {
372 await this.pushService?.sendPush(notif.did, {
373 recipientDid: notif.did,
374 reason: notif.reason,
375 author: notif.author,
376 recordUri: notif.recordUri,
377 reasonSubject: notif.reasonSubject,
378 });
379 });
380 }
381 }
382 }
383
384 // Filter notifications for thread mutes (placeholder for future implementation)
385 filterNotifsForThreadMutes(notifs: Notif[]): Promise<Notif[]> {
386 // TODO: Implement thread mute filtering
387 // For now, return all notifications unfiltered
388 return Promise.resolve(notifs);
389 }
390
391 aggregateOnCommit(indexed: TRow) {
392 const { updateAggregates } = this.options;
393 if (!updateAggregates) return;
394 // Note: MongoDB doesn't have transactions in the same way, so we'll run aggregates immediately
395 // In a production system, you might want to use MongoDB transactions or a different approach
396 this.background.add((db) => updateAggregates(db, indexed));
397 }
398}
399
400export default RecordProcessor;