[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eb947da8a3a8a7d485ff132b3ff0db4a8baaac19 319 lines 9.0 kB view raw
1import { CID } from "multiformats/cid"; 2import { jsonStringToLex, stringifyLex } from "@atp/lexicon"; 3import { AtUri } from "@atp/syntax"; 4import { lexicons } from "../../lex/lexicons.ts"; 5import { BackgroundQueue } from "../background.ts"; 6import { Database } from "../db/index.ts"; 7 8// @NOTE re: insertions and deletions. Due to how record updates are handled, 9// (insertFn) should have the same effect as (insertFn -> deleteFn -> insertFn). 10type RecordProcessorParams<T, S> = { 11 lexId: string; 12 insertFn: ( 13 db: Database, 14 uri: AtUri, 15 cid: CID, 16 obj: T, 17 timestamp: string, 18 ) => Promise<S | null>; 19 findDuplicate: ( 20 db: Database, 21 uri: AtUri, 22 obj: T, 23 ) => Promise<AtUri | null> | AtUri | null; 24 deleteFn: (db: Database, uri: AtUri) => Promise<S | null>; 25 notifsForInsert: (obj: S) => Notif[]; 26 notifsForDelete: ( 27 prev: S, 28 replacedBy: S | null, 29 ) => { notifs: Notif[]; toDelete: string[] }; 30 updateAggregates?: (db: Database, obj: S) => Promise<void>; 31}; 32 33type Notif = { 34 did: string; 35 reason: string; 36 author: string; 37 recordUri: string; 38 recordCid: string; 39 sortAt: string; 40 reasonSubject?: string; 41}; 42 43export class RecordProcessor<T, S> { 44 collection: string; 45 db: Database; 46 47 /** 48 * RecordProcessor for handling a single AT Protocol collection. 49 * 50 * This processor handles exactly one lexId: 51 * - Validates records against the specific lexId (e.g., "app.bsky.graph.follow") 52 * - Only processes records that match the exact collection NSID 53 * - Rejects records from other collections, even similar ones 54 * 55 * Example usage: 56 * ```typescript 57 * const processor = new RecordProcessor(db, background, { 58 * lexId: "app.bsky.graph.follow", 59 * // ... other params 60 * }); 61 * 62 * // This will only process records with collection "app.bsky.graph.follow" 63 * await processor.insertRecord(uri, cid, obj, timestamp); 64 * ``` 65 */ 66 constructor( 67 private appDb: Database, 68 private background: BackgroundQueue, 69 private params: RecordProcessorParams<T, S>, 70 ) { 71 this.db = appDb; 72 this.collection = this.params.lexId; 73 } 74 75 matchesCollection(uri: AtUri): boolean { 76 return uri.collection === this.collection; 77 } 78 79 matchesSchema(obj: unknown): obj is T { 80 try { 81 lexicons.assertValidRecord(this.collection, obj); 82 return true; 83 } catch { 84 return false; 85 } 86 } 87 88 assertValidRecord(obj: unknown): asserts obj is T { 89 lexicons.assertValidRecord(this.collection, obj); 90 } 91 92 // Helper method to get the lexId this processor handles 93 getLexId(): string { 94 return this.collection; 95 } 96 97 async insertRecord( 98 uri: AtUri, 99 cid: CID, 100 obj: unknown, 101 timestamp: string, 102 opts?: { disableNotifs?: boolean }, 103 ) { 104 this.assertValidRecord(obj); 105 106 // Extract createdAt from the record object if available 107 const recordObj = obj as Record<string, unknown>; 108 const createdAt = typeof recordObj.createdAt === "string" 109 ? recordObj.createdAt 110 : timestamp; 111 112 // Check for duplicates first before attempting insert 113 const found = await this.params.findDuplicate(this.db, uri, obj); 114 if (found && found.toString() !== uri.toString()) { 115 // Duplicate exists with different URI, store in duplicates table with no events 116 await this.db.models.DuplicateRecord.findOneAndUpdate( 117 { uri: uri.toString() }, 118 { 119 uri: uri.toString(), 120 cid: cid.toString(), 121 duplicateOf: found.toString(), 122 indexedAt: timestamp, 123 }, 124 { upsert: true, new: true }, 125 ); 126 return; 127 } 128 129 // Insert or update record 130 await this.db.models.Record.findOneAndUpdate( 131 { uri: uri.toString() }, 132 { 133 uri: uri.toString(), 134 cid: cid.toString(), 135 did: uri.host, 136 collectionName: uri.collection, 137 rkey: uri.rkey, 138 json: stringifyLex(obj), 139 createdAt, 140 indexedAt: timestamp, 141 }, 142 { upsert: true, new: true }, 143 ); 144 145 const inserted = await this.params.insertFn( 146 this.db, 147 uri, 148 cid, 149 obj, 150 timestamp, 151 ); 152 if (inserted) { 153 this.aggregateOnCommit(inserted); 154 if (!opts?.disableNotifs) { 155 this.handleNotifs({ inserted }); 156 } 157 } 158 } 159 160 // Currently using a very simple strategy for updates: purge the existing index 161 // for the uri then replace it. The main upside is that this allows the indexer 162 // for each collection to avoid bespoke logic for in-place updates, which isn't 163 // straightforward in the general case. We still get nice control over notifications. 164 async updateRecord( 165 uri: AtUri, 166 cid: CID, 167 obj: unknown, 168 timestamp: string, 169 opts?: { disableNotifs?: boolean }, 170 ) { 171 this.assertValidRecord(obj); 172 173 // Extract createdAt from the record object if available 174 const recordObj = obj as Record<string, unknown>; 175 const createdAt = typeof recordObj.createdAt === "string" 176 ? recordObj.createdAt 177 : undefined; 178 179 // Update record 180 const updateData: { 181 cid: string; 182 json: string; 183 indexedAt: string; 184 createdAt?: string; 185 } = { 186 cid: cid.toString(), 187 json: stringifyLex(obj), 188 indexedAt: timestamp, 189 }; 190 if (createdAt) { 191 updateData.createdAt = createdAt; 192 } 193 194 await this.db.models.Record.findOneAndUpdate( 195 { uri: uri.toString() }, 196 updateData, 197 { new: true }, 198 ); 199 200 // If the updated record was a dupe, update dupe info for it 201 const dupe = await this.params.findDuplicate(this.db, uri, obj); 202 if (dupe) { 203 await this.db.models.DuplicateRecord.findOneAndUpdate( 204 { uri: uri.toString() }, 205 { 206 cid: cid.toString(), 207 duplicateOf: dupe.toString(), 208 indexedAt: timestamp, 209 }, 210 { upsert: true, new: true }, 211 ); 212 } else { 213 await this.db.models.DuplicateRecord.deleteOne({ uri: uri.toString() }); 214 } 215 216 const deleted = await this.params.deleteFn(this.db, uri); 217 if (!deleted) { 218 // If a record was updated but hadn't been indexed yet, treat it like a plain insert. 219 return this.insertRecord(uri, cid, obj, timestamp); 220 } 221 this.aggregateOnCommit(deleted); 222 const inserted = await this.params.insertFn( 223 this.db, 224 uri, 225 cid, 226 obj, 227 timestamp, 228 ); 229 if (!inserted) { 230 throw new Error( 231 "Record update failed: removed from index but could not be replaced", 232 ); 233 } 234 this.aggregateOnCommit(inserted); 235 if (!opts?.disableNotifs) { 236 this.handleNotifs({ inserted, deleted }); 237 } 238 } 239 240 async deleteRecord(uri: AtUri, cascading = false) { 241 await this.db.models.Record.deleteOne({ uri: uri.toString() }); 242 await this.db.models.DuplicateRecord.deleteOne({ uri: uri.toString() }); 243 244 const deleted = await this.params.deleteFn(this.db, uri); 245 if (!deleted) return; 246 247 this.aggregateOnCommit(deleted); 248 if (cascading) { 249 await this.db.models.DuplicateRecord.deleteMany({ 250 duplicateOf: uri.toString(), 251 }); 252 return this.handleNotifs({ deleted }); 253 } else { 254 const found = await this.db.models.DuplicateRecord.findOne({ 255 duplicateOf: uri.toString(), 256 }) 257 .sort({ indexedAt: 1 }) 258 .lean(); 259 260 if (!found) { 261 return this.handleNotifs({ deleted }); 262 } 263 264 // Get the actual record from the Record model 265 const recordDoc = await this.db.models.Record.findOne({ uri: found.uri }) 266 .lean(); 267 if (!recordDoc || !recordDoc.json) { 268 return this.handleNotifs({ deleted }); 269 } 270 271 const foundUri = new AtUri(found.uri); 272 if (!this.matchesCollection(foundUri)) { 273 return this.handleNotifs({ deleted }); 274 } 275 276 const record = jsonStringToLex(recordDoc.json); 277 if (!this.matchesSchema(record)) { 278 return this.handleNotifs({ deleted }); 279 } 280 281 const inserted = await this.params.insertFn( 282 this.db, 283 foundUri, 284 CID.parse(found.cid), 285 record, 286 found.indexedAt, 287 ); 288 if (inserted) { 289 this.aggregateOnCommit(inserted); 290 } 291 this.handleNotifs({ deleted, inserted: inserted ?? undefined }); 292 } 293 } 294 295 handleNotifs(op: { deleted?: S; inserted?: S }) { 296 let _notifs: Notif[] = []; 297 if (op.deleted) { 298 const forDelete = this.params.notifsForDelete( 299 op.deleted, 300 op.inserted ?? null, 301 ); 302 _notifs = forDelete.notifs; 303 } else if (op.inserted) { 304 _notifs = this.params.notifsForInsert(op.inserted); 305 } 306 307 // TODO: Implement notification handling 308 } 309 310 aggregateOnCommit(indexed: S) { 311 const { updateAggregates } = this.params; 312 if (!updateAggregates) return; 313 // Note: MongoDB doesn't have transactions in the same way, so we'll run aggregates immediately 314 // In a production system, you might want to use MongoDB transactions or a different approach 315 this.background.add((db) => updateAggregates(db, indexed)); 316 } 317} 318 319export default RecordProcessor;