[READ ONLY MIRROR] Spark Social AppView Server
github.com/sprksocial/server
atproto
deno
hono
lexicon
1import { CID } from "multiformats/cid";
2import { jsonStringToLex, stringifyLex } from "@atp/lexicon";
3import { AtUri } from "@atp/syntax";
4import { lexicons } from "../../lex/lexicons.ts";
5import { BackgroundQueue } from "../background.ts";
6import { Database } from "../db/index.ts";
7
8// @NOTE re: insertions and deletions. Due to how record updates are handled,
9// (insertFn) should have the same effect as (insertFn -> deleteFn -> insertFn).
10type RecordProcessorParams<T, S> = {
11 lexId: string;
12 insertFn: (
13 db: Database,
14 uri: AtUri,
15 cid: CID,
16 obj: T,
17 timestamp: string,
18 ) => Promise<S | null>;
19 findDuplicate: (
20 db: Database,
21 uri: AtUri,
22 obj: T,
23 ) => Promise<AtUri | null> | AtUri | null;
24 deleteFn: (db: Database, uri: AtUri) => Promise<S | null>;
25 notifsForInsert: (obj: S) => Notif[];
26 notifsForDelete: (
27 prev: S,
28 replacedBy: S | null,
29 ) => { notifs: Notif[]; toDelete: string[] };
30 updateAggregates?: (db: Database, obj: S) => Promise<void>;
31};
32
33type Notif = {
34 did: string;
35 reason: string;
36 author: string;
37 recordUri: string;
38 recordCid: string;
39 sortAt: string;
40 reasonSubject?: string;
41};
42
43export class RecordProcessor<T, S> {
44 collection: string;
45 db: Database;
46
47 /**
48 * RecordProcessor for handling a single AT Protocol collection.
49 *
50 * This processor handles exactly one lexId:
51 * - Validates records against the specific lexId (e.g., "app.bsky.graph.follow")
52 * - Only processes records that match the exact collection NSID
53 * - Rejects records from other collections, even similar ones
54 *
55 * Example usage:
56 * ```typescript
57 * const processor = new RecordProcessor(db, background, {
58 * lexId: "app.bsky.graph.follow",
59 * // ... other params
60 * });
61 *
62 * // This will only process records with collection "app.bsky.graph.follow"
63 * await processor.insertRecord(uri, cid, obj, timestamp);
64 * ```
65 */
66 constructor(
67 private appDb: Database,
68 private background: BackgroundQueue,
69 private params: RecordProcessorParams<T, S>,
70 ) {
71 this.db = appDb;
72 this.collection = this.params.lexId;
73 }
74
75 matchesCollection(uri: AtUri): boolean {
76 return uri.collection === this.collection;
77 }
78
79 matchesSchema(obj: unknown): obj is T {
80 try {
81 lexicons.assertValidRecord(this.collection, obj);
82 return true;
83 } catch {
84 return false;
85 }
86 }
87
88 assertValidRecord(obj: unknown): asserts obj is T {
89 lexicons.assertValidRecord(this.collection, obj);
90 }
91
92 // Helper method to get the lexId this processor handles
93 getLexId(): string {
94 return this.collection;
95 }
96
97 async insertRecord(
98 uri: AtUri,
99 cid: CID,
100 obj: unknown,
101 timestamp: string,
102 opts?: { disableNotifs?: boolean },
103 ) {
104 this.assertValidRecord(obj);
105
106 // Extract createdAt from the record object if available
107 const recordObj = obj as Record<string, unknown>;
108 const createdAt = typeof recordObj.createdAt === "string"
109 ? recordObj.createdAt
110 : timestamp;
111
112 // Check for duplicates first before attempting insert
113 const found = await this.params.findDuplicate(this.db, uri, obj);
114 if (found && found.toString() !== uri.toString()) {
115 // Duplicate exists with different URI, store in duplicates table with no events
116 await this.db.models.DuplicateRecord.findOneAndUpdate(
117 { uri: uri.toString() },
118 {
119 uri: uri.toString(),
120 cid: cid.toString(),
121 duplicateOf: found.toString(),
122 indexedAt: timestamp,
123 },
124 { upsert: true, new: true },
125 );
126 return;
127 }
128
129 // Insert or update record
130 await this.db.models.Record.findOneAndUpdate(
131 { uri: uri.toString() },
132 {
133 uri: uri.toString(),
134 cid: cid.toString(),
135 did: uri.host,
136 collectionName: uri.collection,
137 rkey: uri.rkey,
138 json: stringifyLex(obj),
139 createdAt,
140 indexedAt: timestamp,
141 },
142 { upsert: true, new: true },
143 );
144
145 const inserted = await this.params.insertFn(
146 this.db,
147 uri,
148 cid,
149 obj,
150 timestamp,
151 );
152 if (inserted) {
153 this.aggregateOnCommit(inserted);
154 if (!opts?.disableNotifs) {
155 this.handleNotifs({ inserted });
156 }
157 }
158 }
159
160 // Currently using a very simple strategy for updates: purge the existing index
161 // for the uri then replace it. The main upside is that this allows the indexer
162 // for each collection to avoid bespoke logic for in-place updates, which isn't
163 // straightforward in the general case. We still get nice control over notifications.
164 async updateRecord(
165 uri: AtUri,
166 cid: CID,
167 obj: unknown,
168 timestamp: string,
169 opts?: { disableNotifs?: boolean },
170 ) {
171 this.assertValidRecord(obj);
172
173 // Extract createdAt from the record object if available
174 const recordObj = obj as Record<string, unknown>;
175 const createdAt = typeof recordObj.createdAt === "string"
176 ? recordObj.createdAt
177 : undefined;
178
179 // Update record
180 const updateData: {
181 cid: string;
182 json: string;
183 indexedAt: string;
184 createdAt?: string;
185 } = {
186 cid: cid.toString(),
187 json: stringifyLex(obj),
188 indexedAt: timestamp,
189 };
190 if (createdAt) {
191 updateData.createdAt = createdAt;
192 }
193
194 await this.db.models.Record.findOneAndUpdate(
195 { uri: uri.toString() },
196 updateData,
197 { new: true },
198 );
199
200 // If the updated record was a dupe, update dupe info for it
201 const dupe = await this.params.findDuplicate(this.db, uri, obj);
202 if (dupe) {
203 await this.db.models.DuplicateRecord.findOneAndUpdate(
204 { uri: uri.toString() },
205 {
206 cid: cid.toString(),
207 duplicateOf: dupe.toString(),
208 indexedAt: timestamp,
209 },
210 { upsert: true, new: true },
211 );
212 } else {
213 await this.db.models.DuplicateRecord.deleteOne({ uri: uri.toString() });
214 }
215
216 const deleted = await this.params.deleteFn(this.db, uri);
217 if (!deleted) {
218 // If a record was updated but hadn't been indexed yet, treat it like a plain insert.
219 return this.insertRecord(uri, cid, obj, timestamp);
220 }
221 this.aggregateOnCommit(deleted);
222 const inserted = await this.params.insertFn(
223 this.db,
224 uri,
225 cid,
226 obj,
227 timestamp,
228 );
229 if (!inserted) {
230 throw new Error(
231 "Record update failed: removed from index but could not be replaced",
232 );
233 }
234 this.aggregateOnCommit(inserted);
235 if (!opts?.disableNotifs) {
236 this.handleNotifs({ inserted, deleted });
237 }
238 }
239
240 async deleteRecord(uri: AtUri, cascading = false) {
241 await this.db.models.Record.deleteOne({ uri: uri.toString() });
242 await this.db.models.DuplicateRecord.deleteOne({ uri: uri.toString() });
243
244 const deleted = await this.params.deleteFn(this.db, uri);
245 if (!deleted) return;
246
247 this.aggregateOnCommit(deleted);
248 if (cascading) {
249 await this.db.models.DuplicateRecord.deleteMany({
250 duplicateOf: uri.toString(),
251 });
252 return this.handleNotifs({ deleted });
253 } else {
254 const found = await this.db.models.DuplicateRecord.findOne({
255 duplicateOf: uri.toString(),
256 })
257 .sort({ indexedAt: 1 })
258 .lean();
259
260 if (!found) {
261 return this.handleNotifs({ deleted });
262 }
263
264 // Get the actual record from the Record model
265 const recordDoc = await this.db.models.Record.findOne({ uri: found.uri })
266 .lean();
267 if (!recordDoc || !recordDoc.json) {
268 return this.handleNotifs({ deleted });
269 }
270
271 const foundUri = new AtUri(found.uri);
272 if (!this.matchesCollection(foundUri)) {
273 return this.handleNotifs({ deleted });
274 }
275
276 const record = jsonStringToLex(recordDoc.json);
277 if (!this.matchesSchema(record)) {
278 return this.handleNotifs({ deleted });
279 }
280
281 const inserted = await this.params.insertFn(
282 this.db,
283 foundUri,
284 CID.parse(found.cid),
285 record,
286 found.indexedAt,
287 );
288 if (inserted) {
289 this.aggregateOnCommit(inserted);
290 }
291 this.handleNotifs({ deleted, inserted: inserted ?? undefined });
292 }
293 }
294
295 handleNotifs(op: { deleted?: S; inserted?: S }) {
296 let _notifs: Notif[] = [];
297 if (op.deleted) {
298 const forDelete = this.params.notifsForDelete(
299 op.deleted,
300 op.inserted ?? null,
301 );
302 _notifs = forDelete.notifs;
303 } else if (op.inserted) {
304 _notifs = this.params.notifsForInsert(op.inserted);
305 }
306
307 // TODO: Implement notification handling
308 }
309
310 aggregateOnCommit(indexed: S) {
311 const { updateAggregates } = this.params;
312 if (!updateAggregates) return;
313 // Note: MongoDB doesn't have transactions in the same way, so we'll run aggregates immediately
314 // In a production system, you might want to use MongoDB transactions or a different approach
315 this.background.add((db) => updateAggregates(db, indexed));
316 }
317}
318
319export default RecordProcessor;