[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 3b73895e29748ca524bbe040b656ddb4e167104b 395 lines 12 kB view raw
1import { CID } from "multiformats/cid"; 2import { AtpAgent, ComAtprotoSyncGetLatestCommit } from "@atproto/api"; 3import { DAY, HOUR } from "@atp/common"; 4import { getPds, IdResolver } from "@atp/identity"; 5import { RepoRecord, ValidationError } from "@atp/lexicon"; 6import { 7 getAndParseRecord, 8 readCarWithRoot, 9 VerifiedRepo, 10 verifyRepo, 11 WriteOpAction, 12} from "@atp/repo"; 13import { AtUri } from "@atp/syntax"; 14import { retryXrpc } from "../../utils/retry.ts"; 15import { BackgroundQueue } from "../background.ts"; 16import { Database } from "../db/index.ts"; 17import { ActorDocument } from "../db/models.ts"; 18import * as Block from "./plugins/block.ts"; 19import * as Generator from "./plugins/generator.ts"; 20import * as Follow from "./plugins/follow.ts"; 21import * as Like from "./plugins/like.ts"; 22import * as Post from "./plugins/post.ts"; 23import * as Reply from "./plugins/reply.ts"; 24import * as Profile from "./plugins/profile.ts"; 25import * as Repost from "./plugins/repost.ts"; 26import * as Story from "./plugins/story.ts"; 27import * as Audio from "./plugins/audio.ts"; 28import * as Labeler from "./plugins/labeler.ts"; 29import * as CrosspostReply from "./plugins/crosspost/reply.ts"; 30import { RecordProcessor } from "./processor.ts"; 31import { ServerConfig } from "../../config.ts"; 32import { PushService } from "../../utils/push.ts"; 33 34export class IndexingService { 35 records: { 36 post: Post.PluginType; 37 reply: Reply.PluginType; 38 like: Like.PluginType; 39 repost: Repost.PluginType; 40 follow: Follow.PluginType; 41 profile: Profile.PluginType; 42 block: Block.PluginType; 43 generator: Generator.PluginType; 44 story: Story.PluginType; 45 audio: Audio.PluginType; 46 labeler: Labeler.PluginType; 47 crosspostReply: CrosspostReply.PluginType; 48 }; 49 private pushService?: PushService; 50 51 constructor( 52 public db: Database, 53 public cfg: ServerConfig, 54 public idResolver: IdResolver, 55 public background: BackgroundQueue, 56 pushService?: PushService, 57 ) { 58 this.pushService = pushService; 59 this.records = { 60 post: Post.makePlugin(this.db, this.background), 61 reply: Reply.makePlugin(this.db, this.background), 62 like: Like.makePlugin(this.db, this.background), 63 repost: Repost.makePlugin(this.db, this.background), 64 follow: Follow.makePlugin(this.db, this.background), 65 profile: Profile.makePlugin(this.db, this.background), 66 block: Block.makePlugin(this.db, this.background), 67 generator: Generator.makePlugin(this.db, this.background), 68 story: Story.makePlugin(this.db, this.background), 69 audio: Audio.makePlugin(this.db, this.background), 70 labeler: Labeler.makePlugin(this.db, this.background), 71 crosspostReply: CrosspostReply.makePlugin(this.db, this.background), 72 }; 73 74 // Set push service on all processors 75 if (pushService) { 76 Object.values(this.records).forEach((processor) => { 77 processor.setPushService(pushService); 78 }); 79 } 80 } 81 82 transact(txn: Database) { 83 return new IndexingService( 84 txn, 85 this.cfg, 86 this.idResolver, 87 this.background, 88 this.pushService, 89 ); 90 } 91 92 async indexRecord( 93 uri: AtUri, 94 cid: CID, 95 obj: RepoRecord, 96 action: WriteOpAction.Create | WriteOpAction.Update, 97 timestamp: string, 98 opts?: { disableNotifs?: boolean; disableLabels?: boolean }, 99 ) { 100 const indexer = this.findIndexerForCollection(uri.collection); 101 if (!indexer) return; 102 if (action === WriteOpAction.Create) { 103 await indexer.insertRecord(uri, cid, obj, timestamp, opts); 104 } else { 105 await indexer.updateRecord(uri, cid, obj, timestamp); 106 } 107 } 108 109 async deleteRecord(uri: AtUri, cascading = false) { 110 const indexer = this.findIndexerForCollection(uri.collection); 111 if (!indexer) return; 112 await indexer.deleteRecord(uri, cascading); 113 } 114 115 async indexHandle(did: string, timestamp: string, force = false) { 116 const actor = await this.db.models.Actor.findOne({ did }); 117 if (!force && !needsHandleReindex(actor, timestamp)) { 118 return; 119 } 120 121 try { 122 const atpData = await this.idResolver.did.resolveAtprotoData(did, true); 123 124 const handle = atpData.handle.toLowerCase(); 125 126 const actorWithHandle = handle !== null 127 ? await this.db.models.Actor.findOne({ handle }).lean() 128 : null; 129 130 // handle contention 131 if (handle && actorWithHandle && did !== actorWithHandle.did) { 132 await this.db.models.Actor.updateOne( 133 { did: actorWithHandle.did }, 134 { handle: null }, 135 ); 136 } 137 138 const actorInfo = { handle, indexedAt: timestamp }; 139 await this.db.models.Actor.findOneAndUpdate( 140 { did }, 141 { did, ...actorInfo }, 142 { upsert: true, returnDocument: "after" }, 143 ); 144 } catch (err) { 145 // Log the error but don't throw - this prevents the firehose from crashing 146 console.warn( 147 "Failed to index handle, skipping", 148 { err, did, timestamp }, 149 ); 150 151 // Still update the actor record with null handle to prevent repeated attempts 152 const actorInfo = { handle: null, indexedAt: timestamp }; 153 try { 154 await this.db.models.Actor.findOneAndUpdate( 155 { did }, 156 { did, ...actorInfo }, 157 { upsert: true, returnDocument: "after" }, 158 ); 159 } catch (dbErr) { 160 console.error( 161 "Failed to update actor record after handle resolution failure", 162 { err: dbErr, did }, 163 ); 164 } 165 } 166 } 167 168 async indexRepo(did: string, commit?: string) { 169 const now = new Date().toISOString(); 170 171 const actorExists = await this.db.models.Actor.findOne({ did }).lean(); 172 if (!actorExists) { 173 console.info( 174 `indexRepo: No actor record found for ${did}, indexing handle first`, 175 ); 176 await this.indexHandle(did, now); 177 } 178 179 const { pds, signingKey } = await this.idResolver.did.resolveAtprotoData( 180 did, 181 true, 182 ); 183 const agent = new AtpAgent({ service: pds }); 184 185 const { data: car } = await retryXrpc(() => 186 agent.com.atproto.sync.getRepo({ did }) 187 ); 188 const { root, blocks } = await readCarWithRoot(car); 189 const verifiedRepo = await verifyRepo(blocks, root, did, signingKey); 190 191 const currRecords = await this.getCurrentRecords(did); 192 const repoRecords = formatCheckout(did, verifiedRepo); 193 const diff = findDiffFromCheckout(currRecords, repoRecords); 194 195 console.info(`Indexing ${diff.length} records for ${did}:`); 196 197 await Promise.all( 198 diff.map(async (op) => { 199 const { uri, cid } = op; 200 try { 201 if (op.op === "delete") { 202 await this.deleteRecord(uri); 203 } else { 204 const parsed = getAndParseRecord(blocks, cid); 205 await this.indexRecord( 206 uri, 207 cid, 208 parsed.record, 209 op.op === "create" ? WriteOpAction.Create : WriteOpAction.Update, 210 now, 211 ); 212 } 213 } catch (err) { 214 if (err instanceof ValidationError) { 215 console.warn( 216 "skipping indexing of invalid record", 217 { did, commit, uri: uri.toString(), cid: cid.toString() }, 218 ); 219 } else { 220 console.error( 221 "skipping indexing due to error processing record", 222 { err, did, commit, uri: uri.toString(), cid: cid.toString() }, 223 ); 224 } 225 } 226 }), 227 ); 228 229 // Update the last seen commit for this actor 230 await this.setCommitLastSeen(did, root, commit || ""); 231 } 232 233 async getCurrentRecords(did: string) { 234 const res = await this.db.models.Record.find({ did }).select(["uri", "cid"]) 235 .lean(); 236 return res.reduce( 237 (acc, cur) => { 238 acc[cur.uri] = { 239 uri: new AtUri(cur.uri), 240 cid: CID.parse(cur.cid), 241 }; 242 return acc; 243 }, 244 {} as Record<string, { uri: AtUri; cid: CID }>, 245 ); 246 } 247 248 async setCommitLastSeen(did: string, commit: CID, rev: string) { 249 await this.db.models.ActorSync.findOneAndUpdate( 250 { did }, 251 { 252 did, 253 commitCid: commit.toString(), 254 repoRev: rev ?? null, 255 }, 256 { upsert: true, returnDocument: "after" }, 257 ); 258 } 259 260 findIndexerForCollection(collection: string) { 261 const indexers = Object.values( 262 this.records as Record<string, RecordProcessor<unknown, unknown>>, 263 ); 264 return indexers.find((indexer) => indexer.collection === collection); 265 } 266 267 async updateActorStatus(did: string, active: boolean, status: string = "") { 268 let upstreamStatus: string | null; 269 if (active) { 270 upstreamStatus = null; 271 } else if (["deactivated", "suspended", "takendown"].includes(status)) { 272 upstreamStatus = status; 273 } else { 274 throw new Error(`Unrecognized account status: ${status}`); 275 } 276 await this.db.models.Actor.updateOne( 277 { did }, 278 { upstreamStatus }, 279 ); 280 } 281 282 async deleteActor(did: string) { 283 const actorIsHosted = await this.getActorIsHosted(did); 284 if (actorIsHosted === false) { 285 await this.db.models.Actor.deleteOne({ did }); 286 await this.unindexActor(did); 287 // Note: Notification model not present in current schemas 288 } 289 } 290 291 private async getActorIsHosted(did: string) { 292 try { 293 const doc = await this.idResolver.did.resolve(did, true); 294 const pds = doc && getPds(doc); 295 if (!pds) return false; 296 const agent = new AtpAgent({ service: pds }); 297 try { 298 await retryXrpc(() => agent.com.atproto.sync.getLatestCommit({ did })); 299 return true; 300 } catch (err) { 301 if (err instanceof ComAtprotoSyncGetLatestCommit.RepoNotFoundError) { 302 return false; 303 } 304 return null; 305 } 306 } catch (err) { 307 console.warn( 308 "Failed to check if actor is hosted, assuming not hosted", 309 { err, did }, 310 ); 311 return false; 312 } 313 } 314 315 async unindexActor(did: string) { 316 await this.db.models.Profile.deleteMany({ authorDid: did }); 317 await this.db.models.Follow.deleteMany({ authorDid: did }); 318 await this.db.models.Repost.deleteMany({ authorDid: did }); 319 await this.db.models.Like.deleteMany({ authorDid: did }); 320 await this.db.models.Generator.deleteMany({ authorDid: did }); 321 await this.db.models.Story.deleteMany({ authorDid: did }); 322 await this.db.models.Audio.deleteMany({ authorDid: did }); 323 await this.db.models.Block.deleteMany({ authorDid: did }); 324 await this.db.models.Post.deleteMany({ authorDid: did }); 325 await this.db.models.Reply.deleteMany({ authorDid: did }); 326 await this.db.models.Labeler.deleteMany({ authorDid: did }); 327 await this.db.models.CrosspostReply.deleteMany({ authorDid: did }); 328 } 329} 330 331type UriAndCid = { 332 uri: AtUri; 333 cid: CID; 334}; 335 336type IndexOp = 337 | ({ 338 op: "create" | "update"; 339 } & UriAndCid) 340 | ({ op: "delete" } & UriAndCid); 341 342const findDiffFromCheckout = ( 343 curr: Record<string, UriAndCid>, 344 checkout: Record<string, UriAndCid>, 345): IndexOp[] => { 346 const ops: IndexOp[] = []; 347 for (const uri of Object.keys(checkout)) { 348 const record = checkout[uri]; 349 if (!curr[uri]) { 350 ops.push({ op: "create", ...record }); 351 } else { 352 if (curr[uri].cid.equals(record.cid)) { 353 // no-op 354 continue; 355 } 356 ops.push({ op: "update", ...record }); 357 } 358 } 359 for (const uri of Object.keys(curr)) { 360 const record = curr[uri]; 361 if (!checkout[uri]) { 362 ops.push({ op: "delete", ...record }); 363 } 364 } 365 return ops; 366}; 367 368const formatCheckout = ( 369 did: string, 370 verifiedRepo: VerifiedRepo, 371): Record<string, UriAndCid> => { 372 const records: Record<string, UriAndCid> = {}; 373 for (const create of verifiedRepo.creates) { 374 const uri = AtUri.make(did, create.collection, create.rkey); 375 records[uri.toString()] = { 376 uri, 377 cid: create.cid, 378 }; 379 } 380 return records; 381}; 382 383const needsHandleReindex = ( 384 actor: ActorDocument | null, 385 timestamp: string, 386) => { 387 if (!actor) return true; 388 const timeDiff = new Date(timestamp).getTime() - 389 new Date(actor.indexedAt).getTime(); 390 // revalidate daily 391 if (timeDiff > DAY) return true; 392 // revalidate more aggressively for invalidated handles 393 if (actor.handle === null && timeDiff > HOUR) return true; 394 return false; 395};