[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 401 lines 12 kB view raw
1import { DAY, HOUR } from "@atp/common"; 2import { getPds, IdResolver } from "@atp/identity"; 3import { Cid, type DidString, l } from "@atp/lex"; 4import { parseCid } from "@atp/lex/data"; 5import { Client, XRPCError } from "@atp/xrpc"; 6 7import { 8 getAndParseRecord, 9 readCarWithRoot, 10 type RepoRecord, 11 VerifiedRepo, 12 verifyRepo, 13 WriteOpAction, 14} from "@atp/repo"; 15import { AtUri } from "@atp/syntax"; 16import { retryXrpc } from "../../utils/retry.ts"; 17import * as com from "../../lex/com.ts"; 18import { BackgroundQueue } from "../background.ts"; 19import { Database } from "../db/index.ts"; 20import { ActorDocument } from "../db/models.ts"; 21import * as Block from "./plugins/block.ts"; 22import * as Generator from "./plugins/generator.ts"; 23import * as Follow from "./plugins/follow.ts"; 24import * as Like from "./plugins/like.ts"; 25import * as Post from "./plugins/post.ts"; 26import * as Reply from "./plugins/reply.ts"; 27import * as Profile from "./plugins/profile.ts"; 28import * as Repost from "./plugins/repost.ts"; 29import * as Story from "./plugins/story.ts"; 30import * as Audio from "./plugins/audio.ts"; 31import * as Labeler from "./plugins/labeler.ts"; 32import { RecordProcessor } from "./processor.ts"; 33import { ServerConfig } from "../../config.ts"; 34import { PushService } from "../../utils/push.ts"; 35 36export class IndexingService { 37 records: { 38 post: Post.PluginType; 39 reply: Reply.PluginType; 40 like: Like.PluginType; 41 repost: Repost.PluginType; 42 follow: Follow.PluginType; 43 profile: Profile.PluginType; 44 block: Block.PluginType; 45 generator: Generator.PluginType; 46 story: Story.PluginType; 47 audio: Audio.PluginType; 48 labeler: Labeler.PluginType; 49 }; 50 private pushService?: PushService; 51 52 constructor( 53 public db: Database, 54 public cfg: ServerConfig, 55 public idResolver: IdResolver, 56 public background: BackgroundQueue, 57 pushService?: PushService, 58 ) { 59 this.pushService = pushService; 60 this.records = { 61 post: Post.makePlugin(this.db, this.background), 62 reply: Reply.makePlugin(this.db, this.background), 63 like: Like.makePlugin(this.db, this.background), 64 repost: Repost.makePlugin(this.db, this.background), 65 follow: Follow.makePlugin(this.db, this.background), 66 profile: Profile.makePlugin(this.db, this.background), 67 block: Block.makePlugin(this.db, this.background), 68 generator: Generator.makePlugin(this.db, this.background), 69 story: Story.makePlugin(this.db, this.background), 70 audio: Audio.makePlugin(this.db, this.background), 71 labeler: Labeler.makePlugin(this.db, this.background), 72 }; 73 74 // Set push service on all processors 75 if (pushService) { 76 Object.values(this.records).forEach((processor) => { 77 processor.setPushService(pushService); 78 }); 79 } 80 } 81 82 transact(txn: Database) { 83 return new IndexingService( 84 txn, 85 this.cfg, 86 this.idResolver, 87 this.background, 88 this.pushService, 89 ); 90 } 91 92 async indexRecord( 93 uri: AtUri, 94 cid: Cid, 95 obj: RepoRecord, 96 action: WriteOpAction.Create | WriteOpAction.Update, 97 timestamp: string, 98 opts?: { disableNotifs?: boolean; disableLabels?: boolean }, 99 ) { 100 const indexer = this.findIndexerForCollection(uri.collection); 101 if (!indexer) return; 102 if (action === WriteOpAction.Create) { 103 await indexer.insertRecord(uri, cid, obj, timestamp, opts); 104 } else { 105 await indexer.updateRecord(uri, cid, obj, timestamp); 106 } 107 } 108 109 async deleteRecord(uri: AtUri, cascading = false) { 110 const indexer = this.findIndexerForCollection(uri.collection); 111 if (!indexer) return; 112 await indexer.deleteRecord(uri, cascading); 113 } 114 115 async indexHandle(did: string, timestamp: string, force = false) { 116 const actor = await this.db.models.Actor.findOne({ did }); 117 if (!force && !needsHandleReindex(actor, timestamp)) { 118 return; 119 } 120 121 try { 122 const atpData = await this.idResolver.did.resolveAtprotoData(did, true); 123 124 const handle = atpData.handle.toLowerCase(); 125 126 const actorWithHandle = handle !== null 127 ? await this.db.models.Actor.findOne({ handle }).lean() 128 : null; 129 130 // handle contention 131 if (handle && actorWithHandle && did !== actorWithHandle.did) { 132 await this.db.models.Actor.updateOne( 133 { did: actorWithHandle.did }, 134 { handle: null }, 135 ); 136 } 137 138 const actorInfo = { handle, indexedAt: timestamp }; 139 await this.db.models.Actor.findOneAndUpdate( 140 { did }, 141 { did, ...actorInfo }, 142 { upsert: true, returnDocument: "after" }, 143 ); 144 } catch (err) { 145 // Log the error but don't throw - this prevents the firehose from crashing 146 console.warn( 147 "Failed to index handle, skipping", 148 { err, did, timestamp }, 149 ); 150 151 // Still update the actor record with null handle to prevent repeated attempts 152 const actorInfo = { handle: null, indexedAt: timestamp }; 153 try { 154 await this.db.models.Actor.findOneAndUpdate( 155 { did }, 156 { did, ...actorInfo }, 157 { upsert: true, returnDocument: "after" }, 158 ); 159 } catch (dbErr) { 160 console.error( 161 "Failed to update actor record after handle resolution failure", 162 { err: dbErr, did }, 163 ); 164 } 165 } 166 } 167 168 async indexRepo(did: string, commit?: string) { 169 const now = new Date().toISOString(); 170 171 const actorExists = await this.db.models.Actor.findOne({ did }).lean(); 172 if (!actorExists) { 173 console.info( 174 `indexRepo: No actor record found for ${did}, indexing handle first`, 175 ); 176 await this.indexHandle(did, now); 177 } 178 179 const { pds, signingKey } = await this.idResolver.did.resolveAtprotoData( 180 did, 181 true, 182 ); 183 const client = new Client(pds); 184 185 const { data: car } = await retryXrpc(() => 186 client.call(com.atproto.sync.getRepo, { 187 params: { did: did as DidString }, 188 }) 189 ); 190 const { root, blocks } = await readCarWithRoot(car); 191 const verifiedRepo = await verifyRepo(blocks, root, did, signingKey); 192 193 const currRecords = await this.getCurrentRecords(did); 194 const repoRecords = formatCheckout(did, verifiedRepo); 195 const diff = findDiffFromCheckout(currRecords, repoRecords); 196 197 console.info(`Indexing ${diff.length} records for ${did}:`); 198 199 await Promise.all( 200 diff.map(async (op) => { 201 const { uri, cid } = op; 202 try { 203 if (op.op === "delete") { 204 await this.deleteRecord(uri); 205 } else { 206 const parsed = getAndParseRecord(blocks, cid); 207 await this.indexRecord( 208 uri, 209 cid, 210 parsed.record, 211 op.op === "create" ? WriteOpAction.Create : WriteOpAction.Update, 212 now, 213 ); 214 } 215 } catch (err) { 216 if (err instanceof l.ValidationError) { 217 console.warn( 218 "skipping indexing of invalid record", 219 { did, commit, uri: uri.toString(), cid: cid.toString() }, 220 ); 221 } else { 222 console.error( 223 "skipping indexing due to error processing record", 224 { err, did, commit, uri: uri.toString(), cid: cid.toString() }, 225 ); 226 } 227 } 228 }), 229 ); 230 231 // Update the last seen commit for this actor 232 await this.setCommitLastSeen(did, root, commit || ""); 233 } 234 235 async getCurrentRecords(did: string) { 236 const res = await this.db.models.Record.find({ did }).select(["uri", "cid"]) 237 .lean(); 238 return res.reduce( 239 (acc, cur) => { 240 acc[cur.uri] = { 241 uri: new AtUri(cur.uri), 242 cid: parseCid(cur.cid), 243 }; 244 return acc; 245 }, 246 {} as Record<string, { uri: AtUri; cid: Cid }>, 247 ); 248 } 249 250 async setCommitLastSeen(did: string, commit: Cid, rev: string) { 251 await this.db.models.ActorSync.findOneAndUpdate( 252 { did }, 253 { 254 did, 255 commitCid: commit.toString(), 256 repoRev: rev ?? null, 257 }, 258 { upsert: true, returnDocument: "after" }, 259 ); 260 } 261 262 findIndexerForCollection(collection: string) { 263 const indexers = Object.values( 264 this.records as Record<string, RecordProcessor<l.RecordSchema, unknown>>, 265 ); 266 return indexers.find((indexer) => indexer.collection === collection); 267 } 268 269 async updateActorStatus(did: string, active: boolean, status: string = "") { 270 let upstreamStatus: string | null; 271 if (active) { 272 upstreamStatus = null; 273 } else if (["deactivated", "suspended", "takendown"].includes(status)) { 274 upstreamStatus = status; 275 } else { 276 throw new Error(`Unrecognized account status: ${status}`); 277 } 278 await this.db.models.Actor.updateOne( 279 { did }, 280 { upstreamStatus }, 281 ); 282 } 283 284 async deleteActor(did: string) { 285 const actorIsHosted = await this.getActorIsHosted(did); 286 if (actorIsHosted === false) { 287 await this.db.models.Actor.deleteOne({ did }); 288 await this.unindexActor(did); 289 // Note: Notification model not present in current schemas 290 } 291 } 292 293 private async getActorIsHosted(did: string) { 294 try { 295 const doc = await this.idResolver.did.resolve(did, true); 296 const pds = doc && getPds(doc); 297 if (!pds) return false; 298 const client = new Client(pds); 299 try { 300 await retryXrpc(() => 301 client.call(com.atproto.sync.getLatestCommit, { 302 params: { did: did as DidString }, 303 }) 304 ); 305 return true; 306 } catch (err) { 307 if (err instanceof XRPCError && err.error === "RepoNotFound") { 308 return false; 309 } 310 return null; 311 } 312 } catch (err) { 313 console.warn( 314 "Failed to check if actor is hosted, assuming not hosted", 315 { err, did }, 316 ); 317 return false; 318 } 319 } 320 321 async unindexActor(did: string) { 322 await this.db.models.Profile.deleteMany({ authorDid: did }); 323 await this.db.models.Follow.deleteMany({ authorDid: did }); 324 await this.db.models.Repost.deleteMany({ authorDid: did }); 325 await this.db.models.Like.deleteMany({ authorDid: did }); 326 await this.db.models.Generator.deleteMany({ authorDid: did }); 327 await this.db.models.Story.deleteMany({ authorDid: did }); 328 await this.db.models.Audio.deleteMany({ authorDid: did }); 329 await this.db.models.Block.deleteMany({ authorDid: did }); 330 await this.db.models.Post.deleteMany({ authorDid: did }); 331 await this.db.models.Reply.deleteMany({ authorDid: did }); 332 await this.db.models.Labeler.deleteMany({ authorDid: did }); 333 await this.db.models.CrosspostReply.deleteMany({ authorDid: did }); 334 } 335} 336 337type UriAndCid = { 338 uri: AtUri; 339 cid: Cid; 340}; 341 342type IndexOp = 343 | ({ 344 op: "create" | "update"; 345 } & UriAndCid) 346 | ({ op: "delete" } & UriAndCid); 347 348const findDiffFromCheckout = ( 349 curr: Record<string, UriAndCid>, 350 checkout: Record<string, UriAndCid>, 351): IndexOp[] => { 352 const ops: IndexOp[] = []; 353 for (const uri of Object.keys(checkout)) { 354 const record = checkout[uri]; 355 if (!curr[uri]) { 356 ops.push({ op: "create", ...record }); 357 } else { 358 if (curr[uri].cid.equals(record.cid)) { 359 // no-op 360 continue; 361 } 362 ops.push({ op: "update", ...record }); 363 } 364 } 365 for (const uri of Object.keys(curr)) { 366 const record = curr[uri]; 367 if (!checkout[uri]) { 368 ops.push({ op: "delete", ...record }); 369 } 370 } 371 return ops; 372}; 373 374const formatCheckout = ( 375 did: string, 376 verifiedRepo: VerifiedRepo, 377): Record<string, UriAndCid> => { 378 const records: Record<string, UriAndCid> = {}; 379 for (const create of verifiedRepo.creates) { 380 const uri = AtUri.make(did, create.collection, create.rkey); 381 records[uri.toString()] = { 382 uri, 383 cid: create.cid, 384 }; 385 } 386 return records; 387}; 388 389const needsHandleReindex = ( 390 actor: ActorDocument | null, 391 timestamp: string, 392) => { 393 if (!actor) return true; 394 const timeDiff = new Date(timestamp).getTime() - 395 new Date(actor.indexedAt).getTime(); 396 // revalidate daily 397 if (timeDiff > DAY) return true; 398 // revalidate more aggressively for invalidated handles 399 if (actor.handle === null && timeDiff > HOUR) return true; 400 return false; 401};