WIP! A BB-style forum, on the ATmosphere! We're still working... we'll be back soon when we have something to show off!
node typescript hono htmx atproto
4
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 1036 lines 37 kB view raw
1import { describe, it, expect, beforeEach, vi, afterEach } from "vitest"; 2import { BackfillManager, BackfillStatus } from "../backfill-manager.js"; 3import type { Database } from "@atbb/db"; 4import type { AppConfig } from "../config.js"; 5import { AtpAgent } from "@atproto/api"; 6import type { Indexer } from "../indexer.js"; 7import { createMockLogger } from "./mock-logger.js"; 8 9vi.mock("@atproto/api", () => ({ 10 AtpAgent: vi.fn().mockImplementation(() => ({ 11 com: { 12 atproto: { 13 repo: { 14 listRecords: vi.fn(), 15 }, 16 }, 17 }, 18 })), 19})); 20 21// Minimal mock config 22function mockConfig(overrides: Partial<AppConfig> = {}): AppConfig { 23 return { 24 port: 3000, 25 forumDid: "did:plc:testforum", 26 pdsUrl: "https://pds.example.com", 27 databaseUrl: "postgres://test", 28 jetstreamUrl: "wss://jetstream.example.com", 29 oauthPublicUrl: "https://example.com", 30 sessionSecret: "a".repeat(32), 31 sessionTtlDays: 7, 32 backfillRateLimit: 10, 33 backfillConcurrency: 10, 34 backfillCursorMaxAgeHours: 48, 35 ...overrides, 36 } as AppConfig; 37} 38 39describe("BackfillManager", () => { 40 let mockDb: Database; 41 let manager: BackfillManager; 42 let mockLogger: ReturnType<typeof createMockLogger>; 43 44 beforeEach(() => { 45 mockDb = { 46 select: vi.fn().mockReturnValue({ 47 from: vi.fn().mockReturnValue({ 48 where: vi.fn().mockReturnValue({ 49 limit: vi.fn().mockResolvedValue([]), 50 }), 51 }), 52 }), 53 } as unknown as Database; 54 55 mockLogger = createMockLogger(); 56 manager = new BackfillManager(mockDb, mockConfig(), mockLogger); 57 }); 58 59 afterEach(() => { 60 vi.clearAllMocks(); 61 }); 62 63 describe("checkIfNeeded", () => { 64 it("returns FullSync when cursor is null (no cursor)", async () => { 65 const status = await manager.checkIfNeeded(null); 66 expect(status).toBe(BackfillStatus.FullSync); 67 }); 68 69 it("returns FullSync when cursor exists but forums table is empty", async () => { 70 // Forums query returns empty 71 vi.spyOn(mockDb, "select").mockReturnValue({ 72 from: vi.fn().mockReturnValue({ 73 where: vi.fn().mockReturnValue({ 74 limit: vi.fn().mockResolvedValue([]), 75 }), 76 }), 77 } as any); 78 79 // Cursor from 1 hour ago (fresh) 80 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000); 81 const status = await manager.checkIfNeeded(cursor); 82 expect(status).toBe(BackfillStatus.FullSync); 83 }); 84 85 it("returns CatchUp when cursor age exceeds threshold", async () => { 86 // Forums query returns a forum (DB not empty) 87 vi.spyOn(mockDb, "select").mockReturnValue({ 88 from: vi.fn().mockReturnValue({ 89 where: vi.fn().mockReturnValue({ 90 limit: vi.fn().mockResolvedValue([{ id: 1n, rkey: "self" }]), 91 }), 92 }), 93 } as any); 94 95 // Cursor from 72 hours ago (stale) 96 const cursor = BigInt((Date.now() - 72 * 60 * 60 * 1000) * 1000); 97 const status = await manager.checkIfNeeded(cursor); 98 expect(status).toBe(BackfillStatus.CatchUp); 99 }); 100 101 it("returns NotNeeded when cursor is fresh and DB has data", async () => { 102 // Forums query returns a forum 103 vi.spyOn(mockDb, "select").mockReturnValue({ 104 from: vi.fn().mockReturnValue({ 105 where: vi.fn().mockReturnValue({ 106 limit: vi.fn().mockResolvedValue([{ id: 1n, rkey: "self" }]), 107 }), 108 }), 109 } as any); 110 111 // Cursor from 1 hour ago (fresh) 112 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000); 113 const status = await manager.checkIfNeeded(cursor); 114 expect(status).toBe(BackfillStatus.NotNeeded); 115 }); 116 117 it("returns FullSync when DB query fails (fail safe)", async () => { 118 vi.spyOn(mockDb, "select").mockReturnValue({ 119 from: vi.fn().mockReturnValue({ 120 where: vi.fn().mockReturnValue({ 121 limit: vi.fn().mockRejectedValue(new Error("DB connection lost")), 122 }), 123 }), 124 } as any); 125 126 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 127 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000); 128 const status = await manager.checkIfNeeded(cursor); 129 expect(status).toBe(BackfillStatus.FullSync); 130 consoleSpy.mockRestore(); 131 }); 132 }); 133 134 describe("syncRepoRecords", () => { 135 let mockIndexer: Indexer; 136 137 beforeEach(() => { 138 mockIndexer = { 139 handlePostCreate: vi.fn().mockResolvedValue(true), 140 handleForumCreate: vi.fn().mockResolvedValue(true), 141 handleThemeCreate: vi.fn().mockResolvedValue(true), 142 handleThemePolicyCreate: vi.fn().mockResolvedValue(true), 143 } as unknown as Indexer; 144 }); 145 146 it("fetches records and calls indexer for each one", async () => { 147 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 148 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({ 149 data: { 150 records: [ 151 { 152 uri: "at://did:plc:user1/space.atbb.post/abc123", 153 cid: "bafyabc", 154 value: { $type: "space.atbb.post", text: "Hello", createdAt: "2026-01-01T00:00:00Z" }, 155 }, 156 { 157 uri: "at://did:plc:user1/space.atbb.post/def456", 158 cid: "bafydef", 159 value: { $type: "space.atbb.post", text: "World", createdAt: "2026-01-01T01:00:00Z" }, 160 }, 161 ], 162 cursor: undefined, 163 }, 164 }); 165 166 manager.setIndexer(mockIndexer); 167 const stats = await manager.syncRepoRecords( 168 "did:plc:user1", 169 "space.atbb.post", 170 mockAgent 171 ); 172 173 expect(stats.recordsFound).toBe(2); 174 expect(stats.recordsIndexed).toBe(2); 175 expect(stats.errors).toBe(0); 176 expect(mockIndexer.handlePostCreate).toHaveBeenCalledTimes(2); 177 expect(mockIndexer.handlePostCreate).toHaveBeenCalledWith( 178 expect.objectContaining({ 179 did: "did:plc:user1", 180 commit: expect.objectContaining({ 181 rkey: "abc123", 182 cid: "bafyabc", 183 record: expect.objectContaining({ text: "Hello" }), 184 }), 185 }) 186 ); 187 }); 188 189 it("paginates through multiple pages", async () => { 190 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 191 (mockAgent.com.atproto.repo.listRecords as any) 192 .mockResolvedValueOnce({ 193 data: { 194 records: [{ 195 uri: "at://did:plc:user1/space.atbb.post/page1", 196 cid: "bafyp1", 197 value: { $type: "space.atbb.post", text: "Page 1", createdAt: "2026-01-01T00:00:00Z" }, 198 }], 199 cursor: "next_page", 200 }, 201 }) 202 .mockResolvedValueOnce({ 203 data: { 204 records: [{ 205 uri: "at://did:plc:user1/space.atbb.post/page2", 206 cid: "bafyp2", 207 value: { $type: "space.atbb.post", text: "Page 2", createdAt: "2026-01-02T00:00:00Z" }, 208 }], 209 cursor: undefined, 210 }, 211 }); 212 213 manager.setIndexer(mockIndexer); 214 const stats = await manager.syncRepoRecords( 215 "did:plc:user1", 216 "space.atbb.post", 217 mockAgent 218 ); 219 220 expect(stats.recordsFound).toBe(2); 221 expect(stats.recordsIndexed).toBe(2); 222 expect(mockAgent.com.atproto.repo.listRecords).toHaveBeenCalledTimes(2); 223 }); 224 225 it("continues on indexer errors and tracks error count", async () => { 226 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 227 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({ 228 data: { 229 records: [ 230 { 231 uri: "at://did:plc:user1/space.atbb.post/good", 232 cid: "bafygood", 233 value: { $type: "space.atbb.post", text: "Good", createdAt: "2026-01-01T00:00:00Z" }, 234 }, 235 { 236 uri: "at://did:plc:user1/space.atbb.post/bad", 237 cid: "bafybad", 238 value: { $type: "space.atbb.post", text: "Bad", createdAt: "2026-01-01T01:00:00Z" }, 239 }, 240 ], 241 cursor: undefined, 242 }, 243 }); 244 245 (mockIndexer.handlePostCreate as any) 246 .mockResolvedValueOnce(true) 247 .mockRejectedValueOnce(new Error("FK missing")); 248 249 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 250 manager.setIndexer(mockIndexer); 251 const stats = await manager.syncRepoRecords( 252 "did:plc:user1", 253 "space.atbb.post", 254 mockAgent 255 ); 256 257 expect(stats.recordsFound).toBe(2); 258 expect(stats.recordsIndexed).toBe(1); 259 expect(stats.errors).toBe(1); 260 consoleSpy.mockRestore(); 261 }); 262 263 it("returns error stats when indexer is not set", async () => { 264 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 265 // No setIndexer call — indexer is null 266 const stats = await manager.syncRepoRecords("did:plc:user", "space.atbb.post", mockAgent); 267 expect(stats.errors).toBe(1); 268 expect(mockLogger.error).toHaveBeenCalledWith( 269 "backfill.sync_skipped", 270 expect.objectContaining({ reason: "indexer_not_set" }) 271 ); 272 }); 273 274 it("handles PDS connection failure gracefully", async () => { 275 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 276 (mockAgent.com.atproto.repo.listRecords as any) 277 .mockRejectedValueOnce(new Error("fetch failed")); 278 279 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 280 manager.setIndexer(mockIndexer); 281 const stats = await manager.syncRepoRecords( 282 "did:plc:user1", 283 "space.atbb.post", 284 mockAgent 285 ); 286 287 expect(stats.recordsFound).toBe(0); 288 expect(stats.recordsIndexed).toBe(0); 289 expect(stats.errors).toBe(1); 290 consoleSpy.mockRestore(); 291 }); 292 293 it("dispatches handleThemeCreate for space.atbb.forum.theme records", async () => { 294 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 295 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({ 296 data: { 297 records: [{ 298 uri: "at://did:web:atbb.space/space.atbb.forum.theme/neobrutal-dark", 299 cid: "bafytheme1", 300 value: { 301 $type: "space.atbb.forum.theme", 302 name: "Neobrutal Dark", 303 colorScheme: "dark", 304 tokens: { "color-bg": "#1a1a1a" }, 305 createdAt: "2026-01-01T00:00:00Z", 306 }, 307 }], 308 cursor: undefined, 309 }, 310 }); 311 312 manager.setIndexer(mockIndexer); 313 const stats = await manager.syncRepoRecords( 314 "did:web:atbb.space", 315 "space.atbb.forum.theme", 316 mockAgent 317 ); 318 319 expect(stats.recordsFound).toBe(1); 320 expect(stats.recordsIndexed).toBe(1); 321 expect(stats.errors).toBe(0); 322 expect(mockIndexer.handleThemeCreate).toHaveBeenCalledTimes(1); 323 expect(mockIndexer.handleThemeCreate).toHaveBeenCalledWith( 324 expect.objectContaining({ 325 did: "did:web:atbb.space", 326 commit: expect.objectContaining({ 327 rkey: "neobrutal-dark", 328 cid: "bafytheme1", 329 record: expect.objectContaining({ name: "Neobrutal Dark", colorScheme: "dark" }), 330 }), 331 }) 332 ); 333 }); 334 335 it("dispatches handleThemePolicyCreate for space.atbb.forum.themePolicy records", async () => { 336 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 337 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({ 338 data: { 339 records: [{ 340 uri: "at://did:web:atbb.space/space.atbb.forum.themePolicy/self", 341 cid: "bafypolicy1", 342 value: { 343 $type: "space.atbb.forum.themePolicy", 344 availableThemes: [ 345 { uri: "at://did:web:atbb.space/space.atbb.forum.theme/neobrutal-dark" }, 346 ], 347 defaultLightTheme: { uri: "at://did:web:atbb.space/space.atbb.forum.theme/neobrutal-light" }, 348 defaultDarkTheme: { uri: "at://did:web:atbb.space/space.atbb.forum.theme/neobrutal-dark" }, 349 allowUserChoice: true, 350 }, 351 }], 352 cursor: undefined, 353 }, 354 }); 355 356 manager.setIndexer(mockIndexer); 357 const stats = await manager.syncRepoRecords( 358 "did:web:atbb.space", 359 "space.atbb.forum.themePolicy", 360 mockAgent 361 ); 362 363 expect(stats.recordsFound).toBe(1); 364 expect(stats.recordsIndexed).toBe(1); 365 expect(stats.errors).toBe(0); 366 expect(mockIndexer.handleThemePolicyCreate).toHaveBeenCalledTimes(1); 367 expect(mockIndexer.handleThemePolicyCreate).toHaveBeenCalledWith( 368 expect.objectContaining({ 369 did: "did:web:atbb.space", 370 commit: expect.objectContaining({ 371 rkey: "self", 372 cid: "bafypolicy1", 373 }), 374 }) 375 ); 376 }); 377 378 it("returns error stats when handler method is missing on Indexer (as-any cast gap)", async () => { 379 // COLLECTION_HANDLER_MAP entry exists but the method is absent on the indexer. 380 // .bind() on undefined throws TypeError which propagates out of syncRepoRecords 381 // and fails performBackfill's outer catch rather than being silently swallowed. 382 const brokenIndexer = {} as unknown as Indexer; 383 manager.setIndexer(brokenIndexer); 384 385 const mockAgent = new AtpAgent({ service: "https://pds.example.com" }); 386 // listRecords would never be called — the TypeError fires before the do-while 387 await expect( 388 manager.syncRepoRecords("did:web:atbb.space", "space.atbb.forum.theme", mockAgent) 389 ).rejects.toThrow(TypeError); 390 }); 391 }); 392 393 describe("performBackfill", () => { 394 let mockIndexer: Indexer; 395 let consoleSpy: any; 396 397 beforeEach(() => { 398 consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {}); 399 vi.spyOn(console, "error").mockImplementation(() => {}); 400 vi.spyOn(console, "warn").mockImplementation(() => {}); 401 402 mockIndexer = { 403 handleForumCreate: vi.fn().mockResolvedValue(true), 404 handleCategoryCreate: vi.fn().mockResolvedValue(true), 405 handleBoardCreate: vi.fn().mockResolvedValue(true), 406 handleRoleCreate: vi.fn().mockResolvedValue(true), 407 handleMembershipCreate: vi.fn().mockResolvedValue(true), 408 handlePostCreate: vi.fn().mockResolvedValue(true), 409 handleModActionCreate: vi.fn().mockResolvedValue(true), 410 handleThemeCreate: vi.fn().mockResolvedValue(true), 411 handleThemePolicyCreate: vi.fn().mockResolvedValue(true), 412 } as unknown as Indexer; 413 }); 414 415 afterEach(() => { 416 consoleSpy.mockRestore(); 417 }); 418 419 it("creates a backfill_progress row on start", async () => { 420 const mockInsert = vi.fn().mockReturnValue({ 421 values: vi.fn().mockReturnValue({ 422 returning: vi.fn().mockResolvedValue([{ id: 1n }]), 423 }), 424 }); 425 426 const mockSelectEmpty = vi.fn().mockReturnValue({ 427 from: vi.fn().mockReturnValue({ 428 where: vi.fn().mockReturnValue({ 429 limit: vi.fn().mockResolvedValue([]), 430 orderBy: vi.fn().mockResolvedValue([]), 431 }), 432 orderBy: vi.fn().mockResolvedValue([]), 433 }), 434 }); 435 436 mockDb = { 437 select: mockSelectEmpty, 438 insert: mockInsert, 439 update: vi.fn().mockReturnValue({ 440 set: vi.fn().mockReturnValue({ 441 where: vi.fn().mockResolvedValue(undefined), 442 }), 443 }), 444 } as unknown as Database; 445 446 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 447 manager.setIndexer(mockIndexer); 448 449 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 450 com: { 451 atproto: { 452 repo: { 453 listRecords: vi.fn().mockResolvedValue({ 454 data: { records: [], cursor: undefined }, 455 }), 456 }, 457 }, 458 }, 459 }); 460 461 await manager.performBackfill(BackfillStatus.FullSync); 462 463 expect(mockInsert).toHaveBeenCalled(); 464 }); 465 466 it("sets isRunning flag during backfill", async () => { 467 const mockInsert = vi.fn().mockReturnValue({ 468 values: vi.fn().mockReturnValue({ 469 returning: vi.fn().mockResolvedValue([{ id: 1n }]), 470 }), 471 }); 472 473 mockDb = { 474 select: vi.fn().mockReturnValue({ 475 from: vi.fn().mockReturnValue({ 476 where: vi.fn().mockReturnValue({ 477 limit: vi.fn().mockResolvedValue([]), 478 orderBy: vi.fn().mockResolvedValue([]), 479 }), 480 orderBy: vi.fn().mockResolvedValue([]), 481 }), 482 }), 483 insert: mockInsert, 484 update: vi.fn().mockReturnValue({ 485 set: vi.fn().mockReturnValue({ 486 where: vi.fn().mockResolvedValue(undefined), 487 }), 488 }), 489 } as unknown as Database; 490 491 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 492 manager.setIndexer(mockIndexer); 493 494 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 495 com: { 496 atproto: { 497 repo: { 498 listRecords: vi.fn().mockResolvedValue({ 499 data: { records: [], cursor: undefined }, 500 }), 501 }, 502 }, 503 }, 504 }); 505 506 expect(manager.getIsRunning()).toBe(false); 507 const promise = manager.performBackfill(BackfillStatus.FullSync); 508 expect(manager.getIsRunning()).toBe(true); 509 await promise; 510 expect(manager.getIsRunning()).toBe(false); 511 }); 512 513 it("rejects concurrent backfill attempts", async () => { 514 const mockInsert = vi.fn().mockReturnValue({ 515 values: vi.fn().mockReturnValue({ 516 returning: vi.fn().mockResolvedValue([{ id: 1n }]), 517 }), 518 }); 519 520 mockDb = { 521 select: vi.fn().mockReturnValue({ 522 from: vi.fn().mockReturnValue({ 523 where: vi.fn().mockReturnValue({ 524 limit: vi.fn().mockResolvedValue([]), 525 orderBy: vi.fn().mockResolvedValue([]), 526 }), 527 orderBy: vi.fn().mockResolvedValue([]), 528 }), 529 }), 530 insert: mockInsert, 531 update: vi.fn().mockReturnValue({ 532 set: vi.fn().mockReturnValue({ 533 where: vi.fn().mockResolvedValue(undefined), 534 }), 535 }), 536 } as unknown as Database; 537 538 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 539 manager.setIndexer(mockIndexer); 540 541 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 542 com: { 543 atproto: { 544 repo: { 545 listRecords: vi.fn().mockImplementation( 546 () => new Promise((resolve) => 547 setTimeout(() => resolve({ data: { records: [], cursor: undefined } }), 100) 548 ) 549 ), 550 }, 551 }, 552 }, 553 }); 554 555 const first = manager.performBackfill(BackfillStatus.FullSync); 556 557 await expect(manager.performBackfill(BackfillStatus.FullSync)) 558 .rejects.toThrow("Backfill is already in progress"); 559 560 await first; 561 }); 562 563 it("CatchUp: syncs user-owned collections and aggregates counts", async () => { 564 // Phase 1 (7 FORUM_OWNED_COLLECTIONS) must return empty so its records don't 565 // pollute the count. Phase 2: 2 users × 2 USER_OWNED_COLLECTIONS × 1 record = 4. 566 const emptyPage = { data: { records: [], cursor: undefined } }; 567 const recordPage = { 568 data: { 569 records: [{ 570 uri: "at://did:plc:u/space.atbb.post/r1", 571 cid: "bafyr1", 572 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" }, 573 }], 574 cursor: undefined, 575 }, 576 }; 577 578 const mockListRecords = vi.fn() 579 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.forum (Phase 1 call 1) 580 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.category (Phase 1 call 2) 581 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.board (Phase 1 call 3) 582 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.role (Phase 1 call 4) 583 .mockResolvedValueOnce(emptyPage) // space.atbb.modAction (Phase 1 call 5) 584 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.theme (Phase 1 call 6) 585 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.themePolicy (Phase 1 call 7) 586 .mockResolvedValue(recordPage); // all Phase 2 user collection calls 587 588 mockDb = { 589 select: vi.fn().mockReturnValue({ 590 from: vi.fn().mockReturnValue({ 591 orderBy: vi.fn().mockResolvedValue([ 592 { did: "did:plc:user1" }, 593 { did: "did:plc:user2" }, 594 ]), 595 }), 596 }), 597 insert: vi.fn().mockReturnValue({ 598 values: vi.fn().mockReturnValue({ 599 returning: vi.fn().mockResolvedValue([{ id: 42n }]), 600 }), 601 }), 602 update: vi.fn().mockReturnValue({ 603 set: vi.fn().mockReturnValue({ 604 where: vi.fn().mockResolvedValue(undefined), 605 }), 606 }), 607 } as unknown as Database; 608 609 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 5 }), createMockLogger()); 610 manager.setIndexer(mockIndexer); 611 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 612 com: { atproto: { repo: { listRecords: mockListRecords } } }, 613 }); 614 615 const result = await manager.performBackfill(BackfillStatus.CatchUp); 616 617 // Phase 1: 0 records (forum collections empty) 618 // Phase 2: 2 users × 2 collections × 1 record each = 4 records indexed 619 expect(result.recordsIndexed).toBe(4); 620 expect(result.errors).toBe(0); 621 expect(result.didsProcessed).toBe(2); 622 expect(result.backfillId).toBe(42n); 623 }); 624 625 it("CatchUp: rejected user batch increments totalErrors and is not swallowed", async () => { 626 // syncRepoRecords never throws — it catches PDS errors internally and returns errors:1. 627 // For the batch callback to reject (tested by the allSettled handling), the 628 // backfillErrors DB insert must fail, which propagates the rejection out of the callback. 629 const emptyPage = { data: { records: [], cursor: undefined } }; 630 631 const mockListRecords = vi.fn() 632 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.forum (Phase 1 call 1) 633 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.category (Phase 1 call 2) 634 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.board (Phase 1 call 3) 635 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.role (Phase 1 call 4) 636 .mockResolvedValueOnce(emptyPage) // space.atbb.modAction (Phase 1 call 5) 637 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.theme (Phase 1 call 6) 638 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.themePolicy (Phase 1 call 7) 639 // user1: both collections succeed, 1 record each 640 .mockResolvedValueOnce({ data: { records: [{ 641 uri: "at://did:plc:user1/space.atbb.membership/self", 642 cid: "bafymem", 643 value: { $type: "space.atbb.membership", createdAt: "2026-01-01T00:00:00Z" }, 644 }], cursor: undefined } }) 645 .mockResolvedValueOnce({ data: { records: [{ 646 uri: "at://did:plc:user1/space.atbb.post/p1", 647 cid: "bafyp1", 648 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" }, 649 }], cursor: undefined } }) 650 // user2/membership: PDS error → syncRepoRecords catches → returns errors:1 → 651 // triggers backfillErrors insert (which rejects below) → callback rejects 652 .mockRejectedValueOnce(new Error("PDS unreachable")); 653 654 mockDb = { 655 select: vi.fn().mockReturnValue({ 656 from: vi.fn().mockReturnValue({ 657 orderBy: vi.fn().mockResolvedValue([ 658 { did: "did:plc:user1" }, 659 { did: "did:plc:user2" }, 660 ]), 661 }), 662 }), 663 insert: vi.fn() 664 .mockReturnValueOnce({ // backfillProgress insert — must succeed 665 values: vi.fn().mockReturnValue({ 666 returning: vi.fn().mockResolvedValue([{ id: 7n }]), 667 }), 668 }) 669 .mockReturnValueOnce({ // backfillErrors insert for user2 — rejects to make callback throw 670 values: vi.fn().mockReturnValue({ 671 returning: vi.fn().mockRejectedValue(new Error("backfillErrors insert failed")), 672 }), 673 }), 674 update: vi.fn().mockReturnValue({ 675 set: vi.fn().mockReturnValue({ 676 where: vi.fn().mockResolvedValue(undefined), 677 }), 678 }), 679 } as unknown as Database; 680 681 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 1 }), createMockLogger()); 682 manager.setIndexer(mockIndexer); 683 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 684 com: { atproto: { repo: { listRecords: mockListRecords } } }, 685 }); 686 687 const result = await manager.performBackfill(BackfillStatus.CatchUp); 688 689 // user1 batch (concurrency=1): fulfilled, 2 records indexed (membership + post) 690 // user2 batch: callback rejects → allSettled rejected branch → totalErrors++ = 1 691 expect(result.recordsIndexed).toBe(2); 692 expect(result.errors).toBe(1); 693 }); 694 695 it("clears isRunning flag even when backfill fails", async () => { 696 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 697 698 mockDb = { 699 insert: vi.fn().mockReturnValue({ 700 values: vi.fn().mockReturnValue({ 701 returning: vi.fn().mockRejectedValue(new Error("DB insert failed")), 702 }), 703 }), 704 update: vi.fn().mockReturnValue({ 705 set: vi.fn().mockReturnValue({ 706 where: vi.fn().mockResolvedValue(undefined), 707 }), 708 }), 709 } as unknown as Database; 710 711 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 712 manager.setIndexer(mockIndexer); 713 714 await expect(manager.performBackfill(BackfillStatus.FullSync)) 715 .rejects.toThrow("DB insert failed"); 716 717 expect(manager.getIsRunning()).toBe(false); 718 consoleSpy.mockRestore(); 719 }); 720 }); 721 722 describe("checkForInterruptedBackfill", () => { 723 it("returns null when no interrupted backfill exists", async () => { 724 vi.spyOn(mockDb, "select").mockReturnValue({ 725 from: vi.fn().mockReturnValue({ 726 where: vi.fn().mockReturnValue({ 727 limit: vi.fn().mockResolvedValue([]), 728 }), 729 }), 730 } as any); 731 732 const result = await manager.checkForInterruptedBackfill(); 733 expect(result).toBeNull(); 734 }); 735 736 it("returns null and logs error when DB query fails", async () => { 737 vi.spyOn(mockDb, "select").mockReturnValue({ 738 from: vi.fn().mockReturnValue({ 739 where: vi.fn().mockReturnValue({ 740 limit: vi.fn().mockRejectedValue(new Error("DB connection lost")), 741 }), 742 }), 743 } as any); 744 745 const result = await manager.checkForInterruptedBackfill(); 746 expect(result).toBeNull(); 747 expect(mockLogger.error).toHaveBeenCalled(); 748 }); 749 750 it("returns interrupted backfill row when one exists", async () => { 751 const interruptedRow = { 752 id: 5n, 753 status: "in_progress", 754 backfillType: "catch_up", 755 lastProcessedDid: "did:plc:halfway", 756 didsTotal: 100, 757 didsProcessed: 50, 758 recordsIndexed: 250, 759 startedAt: new Date(), 760 completedAt: null, 761 errorMessage: null, 762 }; 763 764 vi.spyOn(mockDb, "select").mockReturnValue({ 765 from: vi.fn().mockReturnValue({ 766 where: vi.fn().mockReturnValue({ 767 limit: vi.fn().mockResolvedValue([interruptedRow]), 768 }), 769 }), 770 } as any); 771 772 const result = await manager.checkForInterruptedBackfill(); 773 expect(result).toEqual(interruptedRow); 774 }); 775 }); 776 777 describe("resumeBackfill", () => { 778 let mockIndexer: Indexer; 779 780 beforeEach(() => { 781 vi.spyOn(console, "log").mockImplementation(() => {}); 782 vi.spyOn(console, "error").mockImplementation(() => {}); 783 784 mockIndexer = { 785 handleForumCreate: vi.fn().mockResolvedValue(true), 786 handleCategoryCreate: vi.fn().mockResolvedValue(true), 787 handleBoardCreate: vi.fn().mockResolvedValue(true), 788 handleRoleCreate: vi.fn().mockResolvedValue(true), 789 handleMembershipCreate: vi.fn().mockResolvedValue(true), 790 handlePostCreate: vi.fn().mockResolvedValue(true), 791 handleModActionCreate: vi.fn().mockResolvedValue(true), 792 handleThemeCreate: vi.fn().mockResolvedValue(true), 793 handleThemePolicyCreate: vi.fn().mockResolvedValue(true), 794 } as unknown as Indexer; 795 }); 796 797 afterEach(() => { 798 vi.restoreAllMocks(); 799 }); 800 801 it("resumes from lastProcessedDid and processes remaining users", async () => { 802 // Interrupted at user1 (didsProcessed=1), user2 and user3 remain 803 const interrupted = { 804 id: 5n, 805 status: "in_progress" as const, 806 backfillType: "catch_up", 807 lastProcessedDid: "did:plc:user1", 808 didsTotal: 3, 809 didsProcessed: 1, 810 recordsIndexed: 2, 811 startedAt: new Date(), 812 completedAt: null, 813 errorMessage: null, 814 }; 815 816 // user2 and user3: 1 record each per collection (2 collections = 4 total) 817 const recordPage = { 818 data: { 819 records: [{ uri: "at://did:plc:u/space.atbb.post/r1", cid: "bafyr1", 820 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" } }], 821 cursor: undefined, 822 }, 823 }; 824 825 const mockListRecords = vi.fn().mockResolvedValue(recordPage); 826 827 mockDb = { 828 select: vi.fn().mockReturnValue({ 829 from: vi.fn().mockReturnValue({ 830 where: vi.fn().mockReturnValue({ 831 orderBy: vi.fn().mockResolvedValue([ 832 { did: "did:plc:user2" }, 833 { did: "did:plc:user3" }, 834 ]), 835 }), 836 }), 837 }), 838 insert: vi.fn().mockReturnValue({ 839 values: vi.fn().mockReturnValue({ 840 returning: vi.fn().mockResolvedValue([]), 841 }), 842 }), 843 update: vi.fn().mockReturnValue({ 844 set: vi.fn().mockReturnValue({ 845 where: vi.fn().mockResolvedValue(undefined), 846 }), 847 }), 848 } as unknown as Database; 849 850 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 5 }), createMockLogger()); 851 manager.setIndexer(mockIndexer); 852 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 853 com: { atproto: { repo: { listRecords: mockListRecords } } }, 854 }); 855 856 const result = await manager.resumeBackfill(interrupted); 857 858 // Starts from interrupted.recordsIndexed=2, adds 2 users × 2 collections × 1 record = 4 859 expect(result.recordsIndexed).toBe(6); 860 expect(result.errors).toBe(0); 861 expect(result.didsProcessed).toBe(3); // 1 (prior) + 2 (resumed) 862 expect(result.backfillId).toBe(5n); 863 }); 864 865 it("marks completed even when no remaining users", async () => { 866 // Interrupted at the last user — no users with DID > lastProcessedDid 867 const interrupted = { 868 id: 3n, 869 status: "in_progress" as const, 870 backfillType: "catch_up", 871 lastProcessedDid: "did:plc:last", 872 didsTotal: 2, 873 didsProcessed: 2, 874 recordsIndexed: 10, 875 startedAt: new Date(), 876 completedAt: null, 877 errorMessage: null, 878 }; 879 880 mockDb = { 881 select: vi.fn().mockReturnValue({ 882 from: vi.fn().mockReturnValue({ 883 where: vi.fn().mockReturnValue({ 884 orderBy: vi.fn().mockResolvedValue([]), // no remaining users 885 }), 886 }), 887 }), 888 update: vi.fn().mockReturnValue({ 889 set: vi.fn().mockReturnValue({ 890 where: vi.fn().mockResolvedValue(undefined), 891 }), 892 }), 893 } as unknown as Database; 894 895 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 896 manager.setIndexer(mockIndexer); 897 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 898 com: { atproto: { repo: { listRecords: vi.fn() } } }, 899 }); 900 901 const result = await manager.resumeBackfill(interrupted); 902 903 // No new records — just marks completed with existing counts 904 expect(result.recordsIndexed).toBe(10); 905 expect(result.didsProcessed).toBe(2); 906 expect(result.backfillId).toBe(3n); 907 908 // DB row should be updated to completed 909 const updateMock = mockDb.update as any; 910 expect(updateMock).toHaveBeenCalled(); 911 }); 912 913 it("clears isRunning flag even when resume fails", async () => { 914 const interrupted = { 915 id: 9n, 916 status: "in_progress" as const, 917 backfillType: "catch_up", 918 lastProcessedDid: "did:plc:checkpoint", 919 didsTotal: 5, 920 didsProcessed: 3, 921 recordsIndexed: 15, 922 startedAt: new Date(), 923 completedAt: null, 924 errorMessage: null, 925 }; 926 927 mockDb = { 928 select: vi.fn().mockReturnValue({ 929 from: vi.fn().mockReturnValue({ 930 where: vi.fn().mockReturnValue({ 931 orderBy: vi.fn().mockRejectedValue(new Error("DB query failed")), 932 }), 933 }), 934 }), 935 update: vi.fn().mockReturnValue({ 936 set: vi.fn().mockReturnValue({ 937 where: vi.fn().mockResolvedValue(undefined), 938 }), 939 }), 940 } as unknown as Database; 941 942 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 943 manager.setIndexer(mockIndexer); 944 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 945 com: { atproto: { repo: { listRecords: vi.fn() } } }, 946 }); 947 948 await expect(manager.resumeBackfill(interrupted)) 949 .rejects.toThrow("DB query failed"); 950 951 expect(manager.getIsRunning()).toBe(false); 952 }); 953 954 it("marks full_sync interrupted backfill as failed (cannot resume FullSync)", async () => { 955 const interrupted = { 956 id: 10n, 957 status: "in_progress" as const, 958 backfillType: "full_sync", 959 lastProcessedDid: null, 960 didsTotal: 0, 961 didsProcessed: 0, 962 recordsIndexed: 0, 963 startedAt: new Date(), 964 completedAt: null, 965 errorMessage: null, 966 }; 967 968 const mockUpdate = vi.fn().mockReturnValue({ 969 set: vi.fn().mockReturnValue({ 970 where: vi.fn().mockResolvedValue(undefined), 971 }), 972 }); 973 mockDb = { 974 update: mockUpdate, 975 } as unknown as Database; 976 977 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 978 manager.setIndexer(mockIndexer); 979 980 await expect(manager.resumeBackfill(interrupted)) 981 .rejects.toThrow("Interrupted FullSync cannot be resumed"); 982 983 // Verify the row was marked as failed 984 expect(mockUpdate).toHaveBeenCalled(); 985 const setCall = mockUpdate.mock.results[0].value.set; 986 expect(setCall).toHaveBeenCalledWith( 987 expect.objectContaining({ status: "failed" }) 988 ); 989 }); 990 991 it("rejects concurrent resume attempts", async () => { 992 const interrupted = { 993 id: 2n, 994 status: "in_progress" as const, 995 backfillType: "catch_up", 996 lastProcessedDid: "did:plc:check", 997 didsTotal: 2, 998 didsProcessed: 1, 999 recordsIndexed: 5, 1000 startedAt: new Date(), 1001 completedAt: null, 1002 errorMessage: null, 1003 }; 1004 1005 mockDb = { 1006 select: vi.fn().mockReturnValue({ 1007 from: vi.fn().mockReturnValue({ 1008 where: vi.fn().mockReturnValue({ 1009 orderBy: vi.fn().mockImplementation( 1010 () => new Promise((resolve) => setTimeout(() => resolve([]), 200)) 1011 ), 1012 }), 1013 }), 1014 }), 1015 update: vi.fn().mockReturnValue({ 1016 set: vi.fn().mockReturnValue({ 1017 where: vi.fn().mockResolvedValue(undefined), 1018 }), 1019 }), 1020 } as unknown as Database; 1021 1022 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger()); 1023 manager.setIndexer(mockIndexer); 1024 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({ 1025 com: { atproto: { repo: { listRecords: vi.fn() } } }, 1026 }); 1027 1028 const first = manager.resumeBackfill(interrupted); 1029 1030 await expect(manager.resumeBackfill(interrupted)) 1031 .rejects.toThrow("Backfill is already in progress"); 1032 1033 await first; 1034 }); 1035 }); 1036});