[READ ONLY MIRROR] Spark Social AppView Server github.com/sprksocial/server
atproto deno hono lexicon
5
fork

Configure Feed

Select the types of activity you want to include in your feed.

at eb947da8a3a8a7d485ff132b3ff0db4a8baaac19 380 lines 12 kB view raw
1import { CID } from "multiformats/cid"; 2import { AtpAgent, ComAtprotoSyncGetLatestCommit } from "@atproto/api"; 3import { DAY, HOUR } from "@atp/common"; 4import { getPds, IdResolver } from "@atp/identity"; 5import { RepoRecord, ValidationError } from "@atp/lexicon"; 6import { 7 getAndParseRecord, 8 readCarWithRoot, 9 VerifiedRepo, 10 verifyRepo, 11 WriteOpAction, 12} from "@atp/repo"; 13import { AtUri } from "@atp/syntax"; 14import { retryXrpc } from "../../utils/retry.ts"; 15import { BackgroundQueue } from "../background.ts"; 16import { Database } from "../db/index.ts"; 17import { ActorDocument } from "../db/models.ts"; 18import * as Block from "./plugins/block.ts"; 19import * as Generator from "./plugins/generator.ts"; 20import * as Follow from "./plugins/follow.ts"; 21import * as Like from "./plugins/like.ts"; 22import * as Post from "./plugins/post.ts"; 23import * as Reply from "./plugins/reply.ts"; 24import * as Profile from "./plugins/profile.ts"; 25import * as Repost from "./plugins/repost.ts"; 26import * as Story from "./plugins/story.ts"; 27import * as Audio from "./plugins/audio.ts"; 28import { RecordProcessor } from "./processor.ts"; 29import { getLogger, Logger } from "@logtape/logtape"; 30import { ServerConfig } from "../../config.ts"; 31 32export class IndexingService { 33 records: { 34 post: Post.PluginType; 35 reply: Reply.PluginType; 36 like: Like.PluginType; 37 repost: Repost.PluginType; 38 follow: Follow.PluginType; 39 profile: Profile.PluginType; 40 block: Block.PluginType; 41 generator: Generator.PluginType; 42 story: Story.PluginType; 43 audio: Audio.PluginType; 44 }; 45 logger: Logger; 46 47 constructor( 48 public db: Database, 49 public cfg: ServerConfig, 50 public idResolver: IdResolver, 51 public background: BackgroundQueue, 52 ) { 53 this.logger = getLogger(["appview", "indexer"]); 54 this.records = { 55 post: Post.makePlugin(this.db, this.background), 56 reply: Reply.makePlugin(this.db, this.background), 57 like: Like.makePlugin(this.db, this.background), 58 repost: Repost.makePlugin(this.db, this.background), 59 follow: Follow.makePlugin(this.db, this.background), 60 profile: Profile.makePlugin(this.db, this.background), 61 block: Block.makePlugin(this.db, this.background), 62 generator: Generator.makePlugin(this.db, this.background), 63 story: Story.makePlugin(this.db, this.background), 64 audio: Audio.makePlugin(this.db, this.background), 65 }; 66 } 67 68 transact(txn: Database) { 69 return new IndexingService( 70 txn, 71 this.cfg, 72 this.idResolver, 73 this.background, 74 ); 75 } 76 77 async indexRecord( 78 uri: AtUri, 79 cid: CID, 80 obj: RepoRecord, 81 action: WriteOpAction.Create | WriteOpAction.Update, 82 timestamp: string, 83 opts?: { disableNotifs?: boolean; disableLabels?: boolean }, 84 ) { 85 const indexer = this.findIndexerForCollection(uri.collection); 86 if (!indexer) return; 87 if (action === WriteOpAction.Create) { 88 await indexer.insertRecord(uri, cid, obj, timestamp, opts); 89 } else { 90 await indexer.updateRecord(uri, cid, obj, timestamp); 91 } 92 } 93 94 async deleteRecord(uri: AtUri, cascading = false) { 95 const indexer = this.findIndexerForCollection(uri.collection); 96 if (!indexer) return; 97 await indexer.deleteRecord(uri, cascading); 98 } 99 100 async indexHandle(did: string, timestamp: string, force = false) { 101 const actor = await this.db.models.Actor.findOne({ did }); 102 if (!force && !needsHandleReindex(actor, timestamp)) { 103 return; 104 } 105 106 try { 107 const atpData = await this.idResolver.did.resolveAtprotoData(did, true); 108 109 const handle = atpData.handle.toLowerCase(); 110 111 const actorWithHandle = handle !== null 112 ? await this.db.models.Actor.findOne({ handle }).lean() 113 : null; 114 115 // handle contention 116 if (handle && actorWithHandle && did !== actorWithHandle.did) { 117 await this.db.models.Actor.updateOne( 118 { did: actorWithHandle.did }, 119 { handle: null }, 120 ); 121 } 122 123 const uri = `at://${did}/so.sprk.actor.profile`; 124 const actorInfo = { uri, handle, indexedAt: timestamp }; 125 await this.db.models.Actor.findOneAndUpdate( 126 { did }, 127 { did, ...actorInfo }, 128 { upsert: true, new: true }, 129 ); 130 } catch (err) { 131 // Log the error but don't throw - this prevents the firehose from crashing 132 this.logger.warn( 133 "Failed to index handle, skipping", 134 { err, did, timestamp }, 135 ); 136 137 // Still update the actor record with null handle to prevent repeated attempts 138 const uri = `at://${did}/so.sprk.actor.profile`; 139 const actorInfo = { uri, handle: null, indexedAt: timestamp }; 140 try { 141 await this.db.models.Actor.findOneAndUpdate( 142 { did }, 143 { did, ...actorInfo }, 144 { upsert: true, new: true }, 145 ); 146 } catch (dbErr) { 147 this.logger.error( 148 "Failed to update actor record after handle resolution failure", 149 { err: dbErr, did }, 150 ); 151 } 152 } 153 } 154 155 async indexRepo(did: string, commit?: string) { 156 const now = new Date().toISOString(); 157 158 const actorExists = await this.db.models.Actor.findOne({ did }).lean(); 159 if (!actorExists) { 160 this.logger.info( 161 `indexRepo: No actor record found for ${did}, indexing handle first`, 162 ); 163 await this.indexHandle(did, now); 164 } 165 166 const { pds, signingKey } = await this.idResolver.did.resolveAtprotoData( 167 did, 168 true, 169 ); 170 const agent = new AtpAgent({ service: pds }); 171 172 const { data: car } = await retryXrpc(() => 173 agent.com.atproto.sync.getRepo({ did }) 174 ); 175 const { root, blocks } = await readCarWithRoot(car); 176 const verifiedRepo = await verifyRepo(blocks, root, did, signingKey); 177 178 const currRecords = await this.getCurrentRecords(did); 179 const repoRecords = formatCheckout(did, verifiedRepo); 180 const diff = findDiffFromCheckout(currRecords, repoRecords); 181 182 this.logger.info(`Indexing ${diff.length} records for ${did}:`); 183 184 await Promise.all( 185 diff.map(async (op) => { 186 const { uri, cid } = op; 187 try { 188 if (op.op === "delete") { 189 await this.deleteRecord(uri); 190 } else { 191 const parsed = getAndParseRecord(blocks, cid); 192 await this.indexRecord( 193 uri, 194 cid, 195 parsed.record, 196 op.op === "create" ? WriteOpAction.Create : WriteOpAction.Update, 197 now, 198 ); 199 } 200 } catch (err) { 201 if (err instanceof ValidationError) { 202 this.logger.warn( 203 "skipping indexing of invalid record", 204 { did, commit, uri: uri.toString(), cid: cid.toString() }, 205 ); 206 } else { 207 this.logger.error( 208 "skipping indexing due to error processing record", 209 { err, did, commit, uri: uri.toString(), cid: cid.toString() }, 210 ); 211 } 212 } 213 }), 214 ); 215 216 // Update the last seen commit for this actor 217 await this.setCommitLastSeen(did, root, commit || ""); 218 } 219 220 async getCurrentRecords(did: string) { 221 const res = await this.db.models.Record.find({ did }).select(["uri", "cid"]) 222 .lean(); 223 return res.reduce( 224 (acc, cur) => { 225 acc[cur.uri] = { 226 uri: new AtUri(cur.uri), 227 cid: CID.parse(cur.cid), 228 }; 229 return acc; 230 }, 231 {} as Record<string, { uri: AtUri; cid: CID }>, 232 ); 233 } 234 235 async setCommitLastSeen(did: string, commit: CID, rev: string) { 236 await this.db.models.ActorSync.findOneAndUpdate( 237 { did }, 238 { 239 did, 240 commitCid: commit.toString(), 241 repoRev: rev ?? null, 242 }, 243 { upsert: true, new: true }, 244 ); 245 } 246 247 findIndexerForCollection(collection: string) { 248 const indexers = Object.values( 249 this.records as Record<string, RecordProcessor<unknown, unknown>>, 250 ); 251 return indexers.find((indexer) => indexer.collection === collection); 252 } 253 254 async updateActorStatus(did: string, active: boolean, status: string = "") { 255 let upstreamStatus: string | null; 256 if (active) { 257 upstreamStatus = null; 258 } else if (["deactivated", "suspended", "takendown"].includes(status)) { 259 upstreamStatus = status; 260 } else { 261 throw new Error(`Unrecognized account status: ${status}`); 262 } 263 await this.db.models.Actor.updateOne( 264 { did }, 265 { upstreamStatus }, 266 ); 267 } 268 269 async deleteActor(did: string) { 270 const actorIsHosted = await this.getActorIsHosted(did); 271 if (actorIsHosted === false) { 272 await this.db.models.Actor.deleteOne({ did }); 273 await this.unindexActor(did); 274 // Note: Notification model not present in current schemas 275 } 276 } 277 278 private async getActorIsHosted(did: string) { 279 try { 280 const doc = await this.idResolver.did.resolve(did, true); 281 const pds = doc && getPds(doc); 282 if (!pds) return false; 283 const agent = new AtpAgent({ service: pds }); 284 try { 285 await retryXrpc(() => agent.com.atproto.sync.getLatestCommit({ did })); 286 return true; 287 } catch (err) { 288 if (err instanceof ComAtprotoSyncGetLatestCommit.RepoNotFoundError) { 289 return false; 290 } 291 return null; 292 } 293 } catch (err) { 294 this.logger.warn( 295 "Failed to check if actor is hosted, assuming not hosted", 296 { err, did }, 297 ); 298 return false; 299 } 300 } 301 302 async unindexActor(did: string) { 303 await this.db.models.Profile.deleteMany({ authorDid: did }); 304 await this.db.models.Follow.deleteMany({ authorDid: did }); 305 await this.db.models.Repost.deleteMany({ authorDid: did }); 306 await this.db.models.Like.deleteMany({ authorDid: did }); 307 await this.db.models.Generator.deleteMany({ authorDid: did }); 308 await this.db.models.Story.deleteMany({ authorDid: did }); 309 await this.db.models.Audio.deleteMany({ authorDid: did }); 310 await this.db.models.Block.deleteMany({ authorDid: did }); 311 await this.db.models.Post.deleteMany({ authorDid: did }); 312 await this.db.models.Reply.deleteMany({ authorDid: did }); 313 } 314} 315 316type UriAndCid = { 317 uri: AtUri; 318 cid: CID; 319}; 320 321type IndexOp = 322 | ({ 323 op: "create" | "update"; 324 } & UriAndCid) 325 | ({ op: "delete" } & UriAndCid); 326 327const findDiffFromCheckout = ( 328 curr: Record<string, UriAndCid>, 329 checkout: Record<string, UriAndCid>, 330): IndexOp[] => { 331 const ops: IndexOp[] = []; 332 for (const uri of Object.keys(checkout)) { 333 const record = checkout[uri]; 334 if (!curr[uri]) { 335 ops.push({ op: "create", ...record }); 336 } else { 337 if (curr[uri].cid.equals(record.cid)) { 338 // no-op 339 continue; 340 } 341 ops.push({ op: "update", ...record }); 342 } 343 } 344 for (const uri of Object.keys(curr)) { 345 const record = curr[uri]; 346 if (!checkout[uri]) { 347 ops.push({ op: "delete", ...record }); 348 } 349 } 350 return ops; 351}; 352 353const formatCheckout = ( 354 did: string, 355 verifiedRepo: VerifiedRepo, 356): Record<string, UriAndCid> => { 357 const records: Record<string, UriAndCid> = {}; 358 for (const create of verifiedRepo.creates) { 359 const uri = AtUri.make(did, create.collection, create.rkey); 360 records[uri.toString()] = { 361 uri, 362 cid: create.cid, 363 }; 364 } 365 return records; 366}; 367 368const needsHandleReindex = ( 369 actor: ActorDocument | null, 370 timestamp: string, 371) => { 372 if (!actor) return true; 373 const timeDiff = new Date(timestamp).getTime() - 374 new Date(actor.indexedAt).getTime(); 375 // revalidate daily 376 if (timeDiff > DAY) return true; 377 // revalidate more aggressively for invalidated handles 378 if (actor.handle === null && timeDiff > HOUR) return true; 379 return false; 380};