WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 663 lines 22 kB view raw
1import type { Database } from "@atbb/db"; 2import { forums, backfillProgress, backfillErrors, users } from "@atbb/db"; 3import { eq, asc, gt } from "drizzle-orm"; 4import { AtpAgent } from "@atproto/api"; 5import { CursorManager } from "./cursor-manager.js"; 6import type { AppConfig } from "./config.js"; 7import type { Indexer } from "./indexer.js"; 8import { isProgrammingError } from "./errors.js"; 9import type { Logger } from "@atbb/logger"; 10 11/** 12 * Maps AT Proto collection NSIDs to Indexer handler method names. 13 * Order matters: sync forum-owned records first (FK dependencies). 14 */ 15// These collections define the sync order. Used by performBackfill() in Task 6. 16export const FORUM_OWNED_COLLECTIONS = [ 17 "space.atbb.forum.forum", 18 "space.atbb.forum.category", 19 "space.atbb.forum.board", 20 "space.atbb.forum.role", 21 "space.atbb.modAction", 22 "space.atbb.forum.theme", 23 "space.atbb.forum.themePolicy", 24] as const; 25 26export const USER_OWNED_COLLECTIONS = [ 27 "space.atbb.membership", 28 "space.atbb.post", 29] as const; 30 31const COLLECTION_HANDLER_MAP: Record<string, string> = { 32 "space.atbb.post": "handlePostCreate", 33 "space.atbb.forum.forum": "handleForumCreate", 34 "space.atbb.forum.category": "handleCategoryCreate", 35 "space.atbb.forum.board": "handleBoardCreate", 36 "space.atbb.forum.role": "handleRoleCreate", 37 "space.atbb.membership": "handleMembershipCreate", 38 "space.atbb.modAction": "handleModActionCreate", 39 "space.atbb.forum.theme": "handleThemeCreate", 40 "space.atbb.forum.themePolicy": "handleThemePolicyCreate", 41}; 42 43export enum BackfillStatus { 44 NotNeeded = "not_needed", 45 CatchUp = "catch_up", 46 FullSync = "full_sync", 47} 48 49export interface BackfillResult { 50 backfillId: bigint; 51 type: BackfillStatus; 52 didsProcessed: number; 53 recordsIndexed: number; 54 errors: number; 55 durationMs: number; 56} 57 58export interface SyncStats { 59 recordsFound: number; 60 recordsIndexed: number; 61 errors: number; 62} 63 64export class BackfillManager { 65 private cursorManager: CursorManager; 66 private isRunning = false; 67 private indexer: Indexer | null = null; 68 69 constructor( 70 private db: Database, 71 private config: AppConfig, 72 private logger: Logger, 73 ) { 74 this.cursorManager = new CursorManager(db, logger); 75 } 76 77 /** 78 * Inject the Indexer instance. Called during AppContext wiring. 79 */ 80 setIndexer(indexer: Indexer): void { 81 this.indexer = indexer; 82 } 83 84 /** 85 * Sync all records from a single (DID, collection) pair via listRecords. 86 * Feeds each record through the matching Indexer handler. 87 */ 88 async syncRepoRecords( 89 did: string, 90 collection: string, 91 agent: AtpAgent 92 ): Promise<SyncStats> { 93 const stats: SyncStats = { recordsFound: 0, recordsIndexed: 0, errors: 0 }; 94 const handlerName = COLLECTION_HANDLER_MAP[collection]; 95 96 if (!handlerName || !this.indexer) { 97 this.logger.error("backfill.sync_skipped", { 98 event: "backfill.sync_skipped", 99 did, 100 collection, 101 reason: !handlerName ? "unknown_collection" : "indexer_not_set", 102 }); 103 stats.errors = 1; 104 return stats; 105 } 106 107 const handler = (this.indexer as any)[handlerName].bind(this.indexer); 108 const delayMs = 1000 / this.config.backfillRateLimit; 109 let cursor: string | undefined; 110 111 try { 112 do { 113 const response = await agent.com.atproto.repo.listRecords({ 114 repo: did, 115 collection, 116 limit: 100, 117 cursor, 118 }); 119 120 const records = response.data.records; 121 stats.recordsFound += records.length; 122 123 for (const record of records) { 124 try { 125 const rkey = record.uri.split("/").pop()!; 126 const event = { 127 did, 128 commit: { rkey, cid: record.cid, record: record.value }, 129 }; 130 await handler(event); 131 stats.recordsIndexed++; 132 } catch (error) { 133 if (isProgrammingError(error)) throw error; 134 stats.errors++; 135 this.logger.error("backfill.record_error", { 136 event: "backfill.record_error", 137 did, 138 collection, 139 uri: record.uri, 140 error: error instanceof Error ? error.message : String(error), 141 }); 142 } 143 } 144 145 cursor = response.data.cursor; 146 147 // Rate limiting: delay between page fetches 148 if (cursor) { 149 await new Promise((resolve) => setTimeout(resolve, delayMs)); 150 } 151 } while (cursor); 152 } catch (error) { 153 if (isProgrammingError(error)) throw error; 154 stats.errors++; 155 this.logger.error("backfill.pds_error", { 156 event: "backfill.pds_error", 157 did, 158 collection, 159 error: error instanceof Error ? error.message : String(error), 160 }); 161 } 162 163 return stats; 164 } 165 166 /** 167 * Determine if backfill is needed based on cursor state and DB contents. 168 */ 169 async checkIfNeeded(cursor: bigint | null): Promise<BackfillStatus> { 170 // No cursor at all → first startup or wiped cursor 171 if (cursor === null) { 172 this.logger.info("backfill.decision", { 173 event: "backfill.decision", 174 status: BackfillStatus.FullSync, 175 reason: "no_cursor", 176 }); 177 return BackfillStatus.FullSync; 178 } 179 180 // Check if DB has forum data (consistency check) 181 let forum: { rkey: string } | undefined; 182 try { 183 const results = await this.db 184 .select() 185 .from(forums) 186 .where(eq(forums.rkey, "self")) 187 .limit(1); 188 forum = results[0]; 189 } catch (error) { 190 this.logger.error("backfill.decision", { 191 event: "backfill.decision", 192 status: BackfillStatus.FullSync, 193 reason: "db_query_failed", 194 error: error instanceof Error ? error.message : String(error), 195 }); 196 return BackfillStatus.FullSync; 197 } 198 199 if (!forum) { 200 this.logger.info("backfill.decision", { 201 event: "backfill.decision", 202 status: BackfillStatus.FullSync, 203 reason: "db_inconsistency", 204 cursorTimestamp: cursor.toString(), 205 }); 206 return BackfillStatus.FullSync; 207 } 208 209 // Check cursor age 210 const ageHours = this.cursorManager.getCursorAgeHours(cursor)!; 211 if (ageHours > this.config.backfillCursorMaxAgeHours) { 212 this.logger.info("backfill.decision", { 213 event: "backfill.decision", 214 status: BackfillStatus.CatchUp, 215 reason: "cursor_too_old", 216 cursorAgeHours: Math.round(ageHours), 217 thresholdHours: this.config.backfillCursorMaxAgeHours, 218 cursorTimestamp: cursor.toString(), 219 }); 220 return BackfillStatus.CatchUp; 221 } 222 223 this.logger.info("backfill.decision", { 224 event: "backfill.decision", 225 status: BackfillStatus.NotNeeded, 226 reason: "cursor_fresh", 227 cursorAgeHours: Math.round(ageHours), 228 }); 229 return BackfillStatus.NotNeeded; 230 } 231 232 /** 233 * Check if a backfill is currently running. 234 */ 235 getIsRunning(): boolean { 236 return this.isRunning; 237 } 238 239 /** 240 * Create an AtpAgent pointed at the forum's PDS. 241 * Extracted as a private method for test mocking. 242 */ 243 private createAgentForPds(): AtpAgent { 244 return new AtpAgent({ service: this.config.pdsUrl }); 245 } 246 247 /** 248 * Create a progress row and return its ID. 249 * Use this before performBackfill when you need the ID immediately (e.g., for a 202 response). 250 * Pass the returned ID as existingRowId to performBackfill to skip duplicate row creation. 251 */ 252 async prepareBackfillRow(type: BackfillStatus): Promise<bigint> { 253 const [row] = await this.db 254 .insert(backfillProgress) 255 .values({ 256 status: "in_progress", 257 backfillType: type, 258 startedAt: new Date(), 259 }) 260 .returning({ id: backfillProgress.id }); 261 return row.id; 262 } 263 264 /** 265 * Query the backfill_progress table for any row with status = 'in_progress'. 266 * Returns the first such row, or null if none exists. 267 */ 268 async checkForInterruptedBackfill() { 269 try { 270 const [row] = await this.db 271 .select() 272 .from(backfillProgress) 273 .where(eq(backfillProgress.status, "in_progress")) 274 .limit(1); 275 276 return row ?? null; 277 } catch (error) { 278 if (isProgrammingError(error)) throw error; 279 this.logger.error("backfill.check_interrupted.failed", { 280 event: "backfill.check_interrupted.failed", 281 error: error instanceof Error ? error.message : String(error), 282 note: "Could not check for interrupted backfills — assuming none", 283 }); 284 return null; 285 } 286 } 287 288 /** 289 * Resume a CatchUp backfill from its last checkpoint (lastProcessedDid). 290 * Only processes users with DID > lastProcessedDid. 291 * Does NOT re-run Phase 1 (forum-owned collections). 292 */ 293 async resumeBackfill(interrupted: typeof backfillProgress.$inferSelect): Promise<BackfillResult> { 294 if (this.isRunning) { 295 throw new Error("Backfill is already in progress"); 296 } 297 298 this.isRunning = true; 299 const startTime = Date.now(); 300 let totalIndexed = interrupted.recordsIndexed; 301 let totalErrors = 0; 302 let didsProcessed = interrupted.didsProcessed; 303 304 this.logger.info("backfill.resuming", { 305 event: "backfill.resuming", 306 backfillId: interrupted.id.toString(), 307 lastProcessedDid: interrupted.lastProcessedDid, 308 didsProcessed: interrupted.didsProcessed, 309 didsTotal: interrupted.didsTotal, 310 }); 311 312 try { 313 const agent = this.createAgentForPds(); 314 315 if (interrupted.backfillType !== BackfillStatus.CatchUp) { 316 // FullSync cannot be resumed from a checkpoint — it must re-run from scratch 317 throw new Error( 318 "Interrupted FullSync cannot be resumed. Re-trigger via /api/admin/backfill?force=full_sync." 319 ); 320 } 321 322 if (interrupted.lastProcessedDid) { 323 // Resume: fetch users after lastProcessedDid 324 // TODO(ATB-13): Paginate for large forums 325 const remainingUsers = await this.db 326 .select({ did: users.did }) 327 .from(users) 328 .where(gt(users.did, interrupted.lastProcessedDid)) 329 .orderBy(asc(users.did)); 330 331 for (let i = 0; i < remainingUsers.length; i += this.config.backfillConcurrency) { 332 const batch = remainingUsers.slice(i, i + this.config.backfillConcurrency); 333 const backfillId = interrupted.id; 334 335 const batchResults = await Promise.allSettled( 336 batch.map(async (user) => { 337 let userIndexed = 0; 338 let userErrors = 0; 339 for (const collection of USER_OWNED_COLLECTIONS) { 340 const stats = await this.syncRepoRecords(user.did, collection, agent); 341 userIndexed += stats.recordsIndexed; 342 if (stats.errors > 0) { 343 userErrors += stats.errors; 344 await this.db.insert(backfillErrors).values({ 345 backfillId, 346 did: user.did, 347 collection, 348 errorMessage: `${stats.errors} record(s) failed`, 349 createdAt: new Date(), 350 }); 351 } 352 } 353 return { indexed: userIndexed, errors: userErrors }; 354 }) 355 ); 356 357 // Aggregate results after settlement, including DID for debuggability 358 batchResults.forEach((result, i) => { 359 if (result.status === "fulfilled") { 360 totalIndexed += result.value.indexed; 361 totalErrors += result.value.errors; 362 } else { 363 totalErrors++; 364 this.logger.error("backfill.resume.batch_user_failed", { 365 event: "backfill.resume.batch_user_failed", 366 backfillId: backfillId.toString(), 367 did: batch[i].did, 368 error: result.reason instanceof Error ? result.reason.message : String(result.reason), 369 }); 370 } 371 }); 372 373 didsProcessed += batch.length; 374 375 try { 376 await this.db 377 .update(backfillProgress) 378 .set({ 379 didsProcessed, 380 recordsIndexed: totalIndexed, 381 lastProcessedDid: batch[batch.length - 1].did, 382 }) 383 .where(eq(backfillProgress.id, backfillId)); 384 } catch (checkpointError) { 385 if (isProgrammingError(checkpointError)) throw checkpointError; 386 this.logger.warn("backfill.resume.checkpoint_failed", { 387 event: "backfill.resume.checkpoint_failed", 388 backfillId: backfillId.toString(), 389 didsProcessed, 390 error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError), 391 note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.", 392 }); 393 } 394 } 395 } 396 397 // Mark completed 398 await this.db 399 .update(backfillProgress) 400 .set({ 401 status: "completed", 402 didsProcessed, 403 recordsIndexed: totalIndexed, 404 completedAt: new Date(), 405 }) 406 .where(eq(backfillProgress.id, interrupted.id)); 407 408 const result: BackfillResult = { 409 backfillId: interrupted.id, 410 type: interrupted.backfillType as BackfillStatus, 411 didsProcessed, 412 recordsIndexed: totalIndexed, 413 errors: totalErrors, 414 durationMs: Date.now() - startTime, 415 }; 416 417 const resumeEvent = totalErrors > 0 ? "backfill.resume.completed_with_errors" : "backfill.resume.completed"; 418 this.logger.info(resumeEvent, { 419 event: resumeEvent, 420 ...result, 421 backfillId: result.backfillId.toString(), 422 }); 423 424 return result; 425 } catch (error) { 426 // Best-effort: mark as failed 427 try { 428 await this.db 429 .update(backfillProgress) 430 .set({ 431 status: "failed", 432 errorMessage: error instanceof Error ? error.message : String(error), 433 completedAt: new Date(), 434 }) 435 .where(eq(backfillProgress.id, interrupted.id)); 436 } catch (updateError) { 437 this.logger.error("backfill.resume.failed_status_update_error", { 438 event: "backfill.resume.failed_status_update_error", 439 backfillId: interrupted.id.toString(), 440 error: updateError instanceof Error ? updateError.message : String(updateError), 441 }); 442 } 443 444 this.logger.error("backfill.resume.failed", { 445 event: "backfill.resume.failed", 446 backfillId: interrupted.id.toString(), 447 error: error instanceof Error ? error.message : String(error), 448 }); 449 throw error; 450 } finally { 451 this.isRunning = false; 452 } 453 } 454 455 /** 456 * Execute a backfill operation. 457 * Phase 1: Syncs forum-owned collections from the Forum DID. 458 * Phase 2 (CatchUp only): Syncs user-owned collections from all known users. 459 * 460 * @param existingRowId - If provided (from prepareBackfillRow), skips creating a new progress row. 461 */ 462 async performBackfill(type: BackfillStatus, existingRowId?: bigint): Promise<BackfillResult> { 463 if (this.isRunning) { 464 throw new Error("Backfill is already in progress"); 465 } 466 467 this.isRunning = true; 468 const startTime = Date.now(); 469 let backfillId: bigint | undefined = existingRowId; 470 let totalIndexed = 0; 471 let totalErrors = 0; 472 let didsProcessed = 0; 473 474 try { 475 // Create progress row only if not pre-created by prepareBackfillRow 476 if (backfillId === undefined) { 477 const [row] = await this.db 478 .insert(backfillProgress) 479 .values({ 480 status: "in_progress", 481 backfillType: type, 482 startedAt: new Date(), 483 }) 484 .returning({ id: backfillProgress.id }); 485 backfillId = row.id; 486 } 487 // Capture in const so TypeScript can narrow through async closures 488 const resolvedBackfillId: bigint = backfillId; 489 490 const agent = this.createAgentForPds(); 491 492 // Phase 1: Sync forum-owned collections from Forum DID 493 for (const collection of FORUM_OWNED_COLLECTIONS) { 494 const stats = await this.syncRepoRecords( 495 this.config.forumDid, 496 collection, 497 agent 498 ); 499 totalIndexed += stats.recordsIndexed; 500 totalErrors += stats.errors; 501 if (stats.errors > 0) { 502 await this.db.insert(backfillErrors).values({ 503 backfillId: resolvedBackfillId, 504 did: this.config.forumDid, 505 collection, 506 errorMessage: `${stats.errors} record(s) failed`, 507 createdAt: new Date(), 508 }); 509 } 510 } 511 512 // Phase 2: For CatchUp, sync user-owned records from known DIDs 513 if (type === BackfillStatus.CatchUp) { 514 // TODO(ATB-13): Paginate for large forums — currently loads all DIDs into memory 515 const knownUsers = await this.db 516 .select({ did: users.did }) 517 .from(users) 518 .orderBy(asc(users.did)); 519 520 const didsTotal = knownUsers.length; 521 522 await this.db 523 .update(backfillProgress) 524 .set({ didsTotal }) 525 .where(eq(backfillProgress.id, backfillId)); 526 527 // Process in batches of backfillConcurrency 528 for (let i = 0; i < knownUsers.length; i += this.config.backfillConcurrency) { 529 const batch = knownUsers.slice(i, i + this.config.backfillConcurrency); 530 531 const batchResults = await Promise.allSettled( 532 batch.map(async (user) => { 533 let userIndexed = 0; 534 let userErrors = 0; 535 for (const collection of USER_OWNED_COLLECTIONS) { 536 const stats = await this.syncRepoRecords(user.did, collection, agent); 537 userIndexed += stats.recordsIndexed; 538 if (stats.errors > 0) { 539 userErrors += stats.errors; 540 await this.db.insert(backfillErrors).values({ 541 backfillId: resolvedBackfillId, 542 did: user.did, 543 collection, 544 errorMessage: `${stats.errors} record(s) failed`, 545 createdAt: new Date(), 546 }); 547 } 548 } 549 return { indexed: userIndexed, errors: userErrors }; 550 }) 551 ); 552 553 // Aggregate results after settlement, including DID for debuggability 554 batchResults.forEach((result, i) => { 555 if (result.status === "fulfilled") { 556 totalIndexed += result.value.indexed; 557 totalErrors += result.value.errors; 558 } else { 559 totalErrors++; 560 this.logger.error("backfill.batch_user_failed", { 561 event: "backfill.batch_user_failed", 562 backfillId: resolvedBackfillId.toString(), 563 did: batch[i].did, 564 error: result.reason instanceof Error ? result.reason.message : String(result.reason), 565 }); 566 } 567 }); 568 569 didsProcessed += batch.length; 570 571 try { 572 await this.db 573 .update(backfillProgress) 574 .set({ 575 didsProcessed, 576 recordsIndexed: totalIndexed, 577 lastProcessedDid: batch[batch.length - 1].did, 578 }) 579 .where(eq(backfillProgress.id, backfillId)); 580 } catch (checkpointError) { 581 if (isProgrammingError(checkpointError)) throw checkpointError; 582 this.logger.warn("backfill.checkpoint_failed", { 583 event: "backfill.checkpoint_failed", 584 backfillId: resolvedBackfillId.toString(), 585 didsProcessed, 586 error: checkpointError instanceof Error ? checkpointError.message : String(checkpointError), 587 note: "Checkpoint save failed — continuing backfill. Resume may reprocess this batch.", 588 }); 589 } 590 591 this.logger.info("backfill.progress", { 592 event: "backfill.progress", 593 backfillId: backfillId.toString(), 594 type, 595 didsProcessed, 596 didsTotal, 597 recordsIndexed: totalIndexed, 598 elapsedMs: Date.now() - startTime, 599 }); 600 } 601 } 602 603 // Mark completed 604 await this.db 605 .update(backfillProgress) 606 .set({ 607 status: "completed", 608 didsProcessed, 609 recordsIndexed: totalIndexed, 610 completedAt: new Date(), 611 }) 612 .where(eq(backfillProgress.id, backfillId)); 613 614 const result: BackfillResult = { 615 backfillId: resolvedBackfillId, 616 type, 617 didsProcessed, 618 recordsIndexed: totalIndexed, 619 errors: totalErrors, 620 durationMs: Date.now() - startTime, 621 }; 622 623 const completedEvent = totalErrors > 0 ? "backfill.completed_with_errors" : "backfill.completed"; 624 this.logger.info(completedEvent, { 625 event: completedEvent, 626 ...result, 627 backfillId: result.backfillId.toString(), 628 }); 629 630 return result; 631 } catch (error) { 632 // Best-effort: mark progress row as failed (if it was created) 633 if (backfillId !== undefined) { 634 try { 635 await this.db 636 .update(backfillProgress) 637 .set({ 638 status: "failed", 639 errorMessage: error instanceof Error ? error.message : String(error), 640 completedAt: new Date(), 641 }) 642 .where(eq(backfillProgress.id, backfillId)); 643 } catch (updateError) { 644 this.logger.error("backfill.failed_status_update_error", { 645 event: "backfill.failed_status_update_error", 646 backfillId: backfillId.toString(), 647 error: updateError instanceof Error ? updateError.message : String(updateError), 648 }); 649 } 650 } 651 652 this.logger.error("backfill.failed", { 653 event: "backfill.failed", 654 backfillId: backfillId !== undefined ? backfillId.toString() : "not_created", 655 error: error instanceof Error ? error.message : String(error), 656 }); 657 throw error; 658 } finally { 659 this.isRunning = false; 660 } 661 } 662} 663