[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 3b73895e29748ca524bbe040b656ddb4e167104b 380 lines 11 kB view raw
1import { CID } from "multiformats/cid"; 2import { jsonStringToLex, stringifyLex } from "@atp/lexicon"; 3import { AtUri } from "@atp/syntax"; 4import { lexicons } from "../../lex/lexicons.ts"; 5import { BackgroundQueue } from "../background.ts"; 6import { Database } from "../db/index.ts"; 7import { chunkArray } from "@atp/common"; 8import { PushService } from "../../utils/push.ts"; 9 10// @NOTE re: insertions and deletions. Due to how record updates are handled, 11// (insertFn) should have the same effect as (insertFn -> deleteFn -> insertFn). 12type RecordProcessorParams<T, S> = { 13 lexId: string; 14 insertFn: ( 15 db: Database, 16 uri: AtUri, 17 cid: CID, 18 obj: T, 19 timestamp: string, 20 ) => Promise<S | null>; 21 findDuplicate: ( 22 db: Database, 23 uri: AtUri, 24 obj: T, 25 ) => Promise<AtUri | null> | AtUri | null; 26 deleteFn: (db: Database, uri: AtUri) => Promise<S | null>; 27 notifsForInsert: (obj: S) => Notif[]; 28 notifsForDelete: ( 29 prev: S, 30 replacedBy: S | null, 31 ) => { notifs: Notif[]; toDelete: string[] }; 32 updateAggregates?: (db: Database, obj: S) => Promise<void>; 33}; 34 35type Notif = { 36 did: string; 37 reason: string; 38 author: string; 39 recordUri: string; 40 recordCid: string; 41 sortAt: string; 42 reasonSubject?: string; 43}; 44 45export class RecordProcessor<T, S> { 46 collection: string; 47 db: Database; 48 private pushService: PushService | null = null; 49 50 /** 51 * RecordProcessor for handling a single AT Protocol collection. 52 * 53 * This processor handles exactly one lexId: 54 * - Validates records against the specific lexId (e.g., "app.bsky.graph.follow") 55 * - Only processes records that match the exact collection NSID 56 * - Rejects records from other collections, even similar ones 57 * 58 * Example usage: 59 * ```typescript 60 * const processor = new RecordProcessor(db, background, { 61 * lexId: "app.bsky.graph.follow", 62 * // ... other params 63 * }); 64 * 65 * // This will only process records with collection "app.bsky.graph.follow" 66 * await processor.insertRecord(uri, cid, obj, timestamp); 67 * ``` 68 */ 69 constructor( 70 private appDb: Database, 71 private background: BackgroundQueue, 72 private params: RecordProcessorParams<T, S>, 73 ) { 74 this.db = appDb; 75 this.collection = this.params.lexId; 76 } 77 78 setPushService(pushService: PushService) { 79 this.pushService = pushService; 80 } 81 82 matchesCollection(uri: AtUri): boolean { 83 return uri.collection === this.collection; 84 } 85 86 matchesSchema(obj: unknown): obj is T { 87 try { 88 lexicons.assertValidRecord(this.collection, obj); 89 return true; 90 } catch { 91 return false; 92 } 93 } 94 95 assertValidRecord(obj: unknown): asserts obj is T { 96 lexicons.assertValidRecord(this.collection, obj); 97 } 98 99 // Helper method to get the lexId this processor handles 100 getLexId(): string { 101 return this.collection; 102 } 103 104 async insertRecord( 105 uri: AtUri, 106 cid: CID, 107 obj: unknown, 108 timestamp: string, 109 opts?: { disableNotifs?: boolean }, 110 ) { 111 this.assertValidRecord(obj); 112 113 // Extract createdAt from the record object if available 114 const recordObj = obj as Record<string, unknown>; 115 const createdAt = typeof recordObj.createdAt === "string" 116 ? recordObj.createdAt 117 : timestamp; 118 119 // Check for duplicates first before attempting insert 120 const found = await this.params.findDuplicate(this.db, uri, obj); 121 if (found && found.toString() !== uri.toString()) { 122 // Duplicate exists with different URI, store in duplicates table with no events 123 await this.db.models.DuplicateRecord.findOneAndUpdate( 124 { uri: uri.toString() }, 125 { 126 uri: uri.toString(), 127 cid: cid.toString(), 128 duplicateOf: found.toString(), 129 indexedAt: timestamp, 130 }, 131 { upsert: true, returnDocument: "after" }, 132 ); 133 return; 134 } 135 136 // Insert or update record 137 await this.db.models.Record.findOneAndUpdate( 138 { uri: uri.toString() }, 139 { 140 uri: uri.toString(), 141 cid: cid.toString(), 142 did: uri.host, 143 collectionName: uri.collection, 144 rkey: uri.rkey, 145 json: stringifyLex(obj), 146 createdAt, 147 indexedAt: timestamp, 148 }, 149 { upsert: true, returnDocument: "after" }, 150 ); 151 152 const inserted = await this.params.insertFn( 153 this.db, 154 uri, 155 cid, 156 obj, 157 timestamp, 158 ); 159 if (inserted) { 160 this.aggregateOnCommit(inserted); 161 if (!opts?.disableNotifs) { 162 this.handleNotifs({ inserted }); 163 } 164 } 165 } 166 167 // Currently using a very simple strategy for updates: purge the existing index 168 // for the uri then replace it. The main upside is that this allows the indexer 169 // for each collection to avoid bespoke logic for in-place updates, which isn't 170 // straightforward in the general case. We still get nice control over notifications. 171 async updateRecord( 172 uri: AtUri, 173 cid: CID, 174 obj: unknown, 175 timestamp: string, 176 opts?: { disableNotifs?: boolean }, 177 ) { 178 this.assertValidRecord(obj); 179 180 // Extract createdAt from the record object if available 181 const recordObj = obj as Record<string, unknown>; 182 const createdAt = typeof recordObj.createdAt === "string" 183 ? recordObj.createdAt 184 : undefined; 185 186 // Update record 187 const updateData: { 188 cid: string; 189 json: string; 190 indexedAt: string; 191 createdAt?: string; 192 } = { 193 cid: cid.toString(), 194 json: stringifyLex(obj), 195 indexedAt: timestamp, 196 }; 197 if (createdAt) { 198 updateData.createdAt = createdAt; 199 } 200 201 await this.db.models.Record.findOneAndUpdate( 202 { uri: uri.toString() }, 203 updateData, 204 { returnDocument: "after" }, 205 ); 206 207 // If the updated record was a dupe, update dupe info for it 208 const dupe = await this.params.findDuplicate(this.db, uri, obj); 209 if (dupe) { 210 await this.db.models.DuplicateRecord.findOneAndUpdate( 211 { uri: uri.toString() }, 212 { 213 cid: cid.toString(), 214 duplicateOf: dupe.toString(), 215 indexedAt: timestamp, 216 }, 217 { upsert: true, returnDocument: "after" }, 218 ); 219 } else { 220 await this.db.models.DuplicateRecord.deleteOne({ uri: uri.toString() }); 221 } 222 223 const deleted = await this.params.deleteFn(this.db, uri); 224 if (!deleted) { 225 // If a record was updated but hadn't been indexed yet, treat it like a plain insert. 226 return this.insertRecord(uri, cid, obj, timestamp); 227 } 228 this.aggregateOnCommit(deleted); 229 const inserted = await this.params.insertFn( 230 this.db, 231 uri, 232 cid, 233 obj, 234 timestamp, 235 ); 236 if (!inserted) { 237 throw new Error( 238 "Record update failed: removed from index but could not be replaced", 239 ); 240 } 241 this.aggregateOnCommit(inserted); 242 if (!opts?.disableNotifs) { 243 this.handleNotifs({ inserted, deleted }); 244 } 245 } 246 247 async deleteRecord(uri: AtUri, cascading = false) { 248 const uriStr = uri.toString(); 249 250 await this.db.models.Record.deleteOne({ uri: uriStr }); 251 await this.db.models.DuplicateRecord.deleteOne({ uri: uriStr }); 252 253 const deleted = await this.params.deleteFn(this.db, uri); 254 if (!deleted) return; 255 256 this.aggregateOnCommit(deleted); 257 if (cascading) { 258 await this.db.models.DuplicateRecord.deleteMany({ 259 duplicateOf: uri.toString(), 260 }); 261 return this.handleNotifs({ deleted }); 262 } else { 263 const found = await this.db.models.DuplicateRecord.findOne({ 264 duplicateOf: uri.toString(), 265 }) 266 .sort({ indexedAt: 1 }) 267 .lean(); 268 269 if (!found) { 270 return this.handleNotifs({ deleted }); 271 } 272 273 // Get the actual record from the Record model 274 const recordDoc = await this.db.models.Record.findOne({ uri: found.uri }) 275 .lean(); 276 if (!recordDoc || !recordDoc.json) { 277 return this.handleNotifs({ deleted }); 278 } 279 280 const foundUri = new AtUri(found.uri); 281 if (!this.matchesCollection(foundUri)) { 282 return this.handleNotifs({ deleted }); 283 } 284 285 const record = jsonStringToLex(recordDoc.json); 286 if (!this.matchesSchema(record)) { 287 return this.handleNotifs({ deleted }); 288 } 289 290 const inserted = await this.params.insertFn( 291 this.db, 292 foundUri, 293 CID.parse(found.cid), 294 record, 295 found.indexedAt, 296 ); 297 if (inserted) { 298 this.aggregateOnCommit(inserted); 299 } 300 this.handleNotifs({ deleted, inserted: inserted ?? undefined }); 301 } 302 } 303 304 async handleNotifs(op: { deleted?: S; inserted?: S }) { 305 let notifs: Notif[] = []; 306 const runOnCommit: ((db: Database) => Promise<void>)[] = []; 307 if (op.deleted) { 308 const forDelete = this.params.notifsForDelete( 309 op.deleted, 310 op.inserted ?? null, 311 ); 312 if (forDelete.toDelete.length > 0) { 313 // Notifs can be deleted in background: they are expensive to delete and 314 // listNotifications already excludes notifs with missing records. 315 runOnCommit.push(async (db) => { 316 await db.models.Notification.deleteMany({ 317 recordUri: { $in: forDelete.toDelete }, 318 }); 319 }); 320 } 321 notifs = forDelete.notifs; 322 } else if (op.inserted) { 323 notifs = this.params.notifsForInsert(op.inserted); 324 } 325 for (const chunk of chunkArray(notifs, 500)) { 326 runOnCommit.push(async (db) => { 327 const filtered = await this.filterNotifsForThreadMutes(chunk); 328 if (filtered.length > 0) { 329 await db.models.Notification.insertMany( 330 filtered.map((n) => ({ 331 did: n.did, 332 recordUri: n.recordUri, 333 recordCid: n.recordCid, 334 author: n.author, 335 reason: n.reason, 336 reasonSubject: n.reasonSubject ?? null, 337 sortAt: n.sortAt, 338 })), 339 ); 340 } 341 }); 342 } 343 // Need to ensure notif deletion always happens before creation, otherwise delete may clobber in a race. 344 for (const fn of runOnCommit) { 345 await fn(this.appDb); // these could be backgrounded 346 } 347 348 // Queue push notifications in the background 349 if (this.pushService?.enabled && notifs.length > 0) { 350 for (const notif of notifs) { 351 this.background.add(async () => { 352 await this.pushService?.sendPush(notif.did, { 353 recipientDid: notif.did, 354 reason: notif.reason, 355 author: notif.author, 356 recordUri: notif.recordUri, 357 reasonSubject: notif.reasonSubject, 358 }); 359 }); 360 } 361 } 362 } 363 364 // Filter notifications for thread mutes (placeholder for future implementation) 365 filterNotifsForThreadMutes(notifs: Notif[]): Promise<Notif[]> { 366 // TODO: Implement thread mute filtering 367 // For now, return all notifications unfiltered 368 return Promise.resolve(notifs); 369 } 370 371 aggregateOnCommit(indexed: S) { 372 const { updateAggregates } = this.params; 373 if (!updateAggregates) return; 374 // Note: MongoDB doesn't have transactions in the same way, so we'll run aggregates immediately 375 // In a production system, you might want to use MongoDB transactions or a different approach 376 this.background.add((db) => updateAggregates(db, indexed)); 377 } 378} 379 380export default RecordProcessor;