[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 400 lines 12 kB view raw
1import { Cid, l, lexParse, lexStringify } from "@atp/lex"; 2import { parseCid } from "@atp/lex/data"; 3import { AtUri } from "@atp/syntax"; 4import { BackgroundQueue } from "../background.ts"; 5import { Database } from "../db/index.ts"; 6import { chunkArray } from "@atp/common"; 7import { PushService } from "../../utils/push.ts"; 8 9// @NOTE re: insertions and deletions. Due to how record updates are handled, 10// (insertFn) should have the same effect as (insertFn -> deleteFn -> insertFn). 11type RecordProcessorOptions<TSchema extends l.RecordSchema, TRow> = { 12 schema: TSchema; 13 insertFn: ( 14 db: Database, 15 uri: AtUri, 16 cid: Cid, 17 obj: l.Infer<TSchema>, 18 timestamp: string, 19 ) => Promise<TRow | null>; 20 findDuplicate: ( 21 db: Database, 22 uri: AtUri, 23 obj: l.Infer<TSchema>, 24 ) => Promise<AtUri | null> | AtUri | null; 25 deleteFn: (db: Database, uri: AtUri) => Promise<TRow | null>; 26 notifsForInsert: (obj: TRow) => Notif[]; 27 notifsForDelete: ( 28 prev: TRow, 29 replacedBy: TRow | null, 30 ) => { notifs: Notif[]; toDelete: string[] }; 31 updateAggregates?: (db: Database, obj: TRow) => Promise<void>; 32 deleteRecordIfInsertReturnsNull?: boolean; 33}; 34 35type Notif = { 36 did: string; 37 reason: string; 38 author: string; 39 recordUri: string; 40 recordCid: string; 41 sortAt: string; 42 reasonSubject?: string; 43}; 44 45export class RecordProcessor<TSchema extends l.RecordSchema, TRow> { 46 db: Database; 47 private pushService: PushService | null = null; 48 49 /** 50 * RecordProcessor for handling a single AT Protocol collection. 51 * 52 * This processor handles exactly one generated record schema: 53 * - Validates records against the specific schema (e.g., so.sprk.graph.follow) 54 * - Only processes records that match the exact collection NSID 55 * - Rejects records from other collections, even similar ones 56 * 57 * Example usage: 58 * ```typescript 59 * const processor = new RecordProcessor(db, background, { 60 * schema: so.sprk.graph.follow.main, 61 * // ... other params 62 * }); 63 * 64 * // This will only process records with collection "so.sprk.graph.follow" 65 * await processor.insertRecord(uri, cid, obj, timestamp); 66 * ``` 67 */ 68 constructor( 69 private appDb: Database, 70 private background: BackgroundQueue, 71 private options: RecordProcessorOptions<TSchema, TRow>, 72 ) { 73 this.db = appDb; 74 } 75 76 get collection(): TSchema["$type"] { 77 return this.options.schema.$type; 78 } 79 80 setPushService(pushService: PushService) { 81 this.pushService = pushService; 82 } 83 84 matchesCollection(uri: AtUri): boolean { 85 return uri.collection === this.collection; 86 } 87 88 matchesSchema<I>(obj: I): obj is I & l.Infer<TSchema> { 89 return this.options.schema.matches(obj); 90 } 91 92 assertValidRecord(obj: unknown): asserts obj is l.Infer<TSchema> { 93 this.options.schema.assert(obj); 94 } 95 96 // Helper method to get the collection this processor handles 97 getLexId(): TSchema["$type"] { 98 return this.collection; 99 } 100 101 async insertRecord( 102 uri: AtUri, 103 cid: Cid, 104 obj: unknown, 105 timestamp: string, 106 opts?: { disableNotifs?: boolean }, 107 ) { 108 this.assertValidRecord(obj); 109 110 // Extract createdAt from the record object if available 111 const recordObj = obj as Record<string, unknown>; 112 const createdAt = typeof recordObj.createdAt === "string" 113 ? recordObj.createdAt 114 : timestamp; 115 116 // Check for duplicates first before attempting insert 117 const found = await this.options.findDuplicate(this.db, uri, obj); 118 if (found && found.toString() !== uri.toString()) { 119 // Duplicate exists with different URI, store in duplicates table with no events 120 await this.db.models.DuplicateRecord.findOneAndUpdate( 121 { uri: uri.toString() }, 122 { 123 uri: uri.toString(), 124 cid: cid.toString(), 125 duplicateOf: found.toString(), 126 indexedAt: timestamp, 127 }, 128 { upsert: true, returnDocument: "after" }, 129 ); 130 return; 131 } 132 133 // Insert or update record 134 await this.db.models.Record.findOneAndUpdate( 135 { uri: uri.toString() }, 136 { 137 uri: uri.toString(), 138 cid: cid.toString(), 139 did: uri.host, 140 collectionName: uri.collection, 141 rkey: uri.rkey, 142 json: lexStringify(obj), 143 createdAt, 144 indexedAt: timestamp, 145 }, 146 { upsert: true, returnDocument: "after" }, 147 ); 148 149 const inserted = await this.options.insertFn( 150 this.db, 151 uri, 152 cid, 153 obj, 154 timestamp, 155 ); 156 if (!inserted) { 157 if (this.options.deleteRecordIfInsertReturnsNull) { 158 await this.db.models.Record.deleteOne({ uri: uri.toString() }); 159 await this.db.models.DuplicateRecord.deleteOne({ 160 uri: uri.toString(), 161 }); 162 } 163 return; 164 } 165 166 this.aggregateOnCommit(inserted); 167 if (!opts?.disableNotifs) { 168 await this.handleNotifs({ inserted }); 169 } 170 } 171 172 // Currently using a very simple strategy for updates: purge the existing index 173 // for the uri then replace it. The main upside is that this allows the indexer 174 // for each collection to avoid bespoke logic for in-place updates, which isn't 175 // straightforward in the general case. We still get nice control over notifications. 176 async updateRecord( 177 uri: AtUri, 178 cid: Cid, 179 obj: unknown, 180 timestamp: string, 181 opts?: { disableNotifs?: boolean }, 182 ) { 183 this.assertValidRecord(obj); 184 185 // Extract createdAt from the record object if available 186 const recordObj = obj as Record<string, unknown>; 187 const createdAt = typeof recordObj.createdAt === "string" 188 ? recordObj.createdAt 189 : undefined; 190 191 // Update record 192 const updateData: { 193 cid: string; 194 json: string; 195 indexedAt: string; 196 createdAt?: string; 197 } = { 198 cid: cid.toString(), 199 json: lexStringify(obj), 200 indexedAt: timestamp, 201 }; 202 if (createdAt) { 203 updateData.createdAt = createdAt; 204 } 205 206 await this.db.models.Record.findOneAndUpdate( 207 { uri: uri.toString() }, 208 updateData, 209 { returnDocument: "after" }, 210 ); 211 212 // If the updated record was a dupe, update dupe info for it 213 const dupe = await this.options.findDuplicate(this.db, uri, obj); 214 if (dupe) { 215 await this.db.models.DuplicateRecord.findOneAndUpdate( 216 { uri: uri.toString() }, 217 { 218 cid: cid.toString(), 219 duplicateOf: dupe.toString(), 220 indexedAt: timestamp, 221 }, 222 { upsert: true, returnDocument: "after" }, 223 ); 224 } else { 225 await this.db.models.DuplicateRecord.deleteOne({ uri: uri.toString() }); 226 } 227 228 const deleted = await this.options.deleteFn(this.db, uri); 229 if (!deleted) { 230 // If a record was updated but hadn't been indexed yet, treat it like a plain insert. 231 return this.insertRecord(uri, cid, obj, timestamp); 232 } 233 this.aggregateOnCommit(deleted); 234 const inserted = await this.options.insertFn( 235 this.db, 236 uri, 237 cid, 238 obj, 239 timestamp, 240 ); 241 if (!inserted) { 242 if (this.options.deleteRecordIfInsertReturnsNull) { 243 await this.db.models.Record.deleteOne({ uri: uri.toString() }); 244 await this.db.models.DuplicateRecord.deleteOne({ 245 uri: uri.toString(), 246 }); 247 if (!opts?.disableNotifs) { 248 await this.handleNotifs({ deleted }); 249 } 250 return; 251 } 252 throw new Error( 253 "Record update failed: removed from index but could not be replaced", 254 ); 255 } 256 this.aggregateOnCommit(inserted); 257 if (!opts?.disableNotifs) { 258 await this.handleNotifs({ inserted, deleted }); 259 } 260 } 261 262 async deleteRecord(uri: AtUri, cascading = false) { 263 const uriStr = uri.toString(); 264 265 await this.db.models.Record.deleteOne({ uri: uriStr }); 266 await this.db.models.DuplicateRecord.deleteOne({ uri: uriStr }); 267 268 const deleted = await this.options.deleteFn(this.db, uri); 269 if (!deleted) return; 270 271 this.aggregateOnCommit(deleted); 272 if (cascading) { 273 await this.db.models.DuplicateRecord.deleteMany({ 274 duplicateOf: uri.toString(), 275 }); 276 return this.handleNotifs({ deleted }); 277 } else { 278 const found = await this.db.models.DuplicateRecord.findOne({ 279 duplicateOf: uri.toString(), 280 }) 281 .sort({ indexedAt: 1 }) 282 .lean(); 283 284 if (!found) { 285 return this.handleNotifs({ deleted }); 286 } 287 288 // Get the actual record from the Record model 289 const recordDoc = await this.db.models.Record.findOne({ uri: found.uri }) 290 .lean(); 291 if (!recordDoc || !recordDoc.json) { 292 return this.handleNotifs({ deleted }); 293 } 294 295 const foundUri = new AtUri(found.uri); 296 if (!this.matchesCollection(foundUri)) { 297 return this.handleNotifs({ deleted }); 298 } 299 300 let record: unknown; 301 try { 302 record = lexParse(recordDoc.json, { strict: false }); 303 } catch { 304 return this.handleNotifs({ deleted }); 305 } 306 if (!this.matchesSchema(record)) { 307 return this.handleNotifs({ deleted }); 308 } 309 310 const inserted = await this.options.insertFn( 311 this.db, 312 foundUri, 313 parseCid(found.cid), 314 record, 315 found.indexedAt, 316 ); 317 if (inserted) { 318 this.aggregateOnCommit(inserted); 319 } 320 await this.handleNotifs({ deleted, inserted: inserted ?? undefined }); 321 } 322 } 323 324 async handleNotifs(op: { deleted?: TRow; inserted?: TRow }) { 325 let notifs: Notif[] = []; 326 const runOnCommit: ((db: Database) => Promise<void>)[] = []; 327 if (op.deleted) { 328 const forDelete = this.options.notifsForDelete( 329 op.deleted, 330 op.inserted ?? null, 331 ); 332 if (forDelete.toDelete.length > 0) { 333 // Notifs can be deleted in background: they are expensive to delete and 334 // listNotifications already excludes notifs with missing records. 335 runOnCommit.push(async (db) => { 336 await db.models.Notification.deleteMany({ 337 recordUri: { $in: forDelete.toDelete }, 338 }); 339 }); 340 } 341 notifs = forDelete.notifs; 342 } else if (op.inserted) { 343 notifs = this.options.notifsForInsert(op.inserted); 344 } 345 for (const chunk of chunkArray(notifs, 500)) { 346 runOnCommit.push(async (db) => { 347 const filtered = await this.filterNotifsForThreadMutes(chunk); 348 if (filtered.length > 0) { 349 await db.models.Notification.insertMany( 350 filtered.map((n) => ({ 351 did: n.did, 352 recordUri: n.recordUri, 353 recordCid: n.recordCid, 354 author: n.author, 355 reason: n.reason, 356 reasonSubject: n.reasonSubject ?? null, 357 sortAt: n.sortAt, 358 })), 359 ); 360 } 361 }); 362 } 363 // Need to ensure notif deletion always happens before creation, otherwise delete may clobber in a race. 364 for (const fn of runOnCommit) { 365 await fn(this.appDb); // these could be backgrounded 366 } 367 368 // Queue push notifications in the background 369 if (this.pushService?.enabled && notifs.length > 0) { 370 for (const notif of notifs) { 371 this.background.add(async () => { 372 await this.pushService?.sendPush(notif.did, { 373 recipientDid: notif.did, 374 reason: notif.reason, 375 author: notif.author, 376 recordUri: notif.recordUri, 377 reasonSubject: notif.reasonSubject, 378 }); 379 }); 380 } 381 } 382 } 383 384 // Filter notifications for thread mutes (placeholder for future implementation) 385 filterNotifsForThreadMutes(notifs: Notif[]): Promise<Notif[]> { 386 // TODO: Implement thread mute filtering 387 // For now, return all notifications unfiltered 388 return Promise.resolve(notifs); 389 } 390 391 aggregateOnCommit(indexed: TRow) { 392 const { updateAggregates } = this.options; 393 if (!updateAggregates) return; 394 // Note: MongoDB doesn't have transactions in the same way, so we'll run aggregates immediately 395 // In a production system, you might want to use MongoDB transactions or a different approach 396 this.background.add((db) => updateAggregates(db, indexed)); 397 } 398} 399 400export default RecordProcessor;