WIP! A BB-style forum, on the ATmosphere!
We're still working... we'll be back soon when we have something to show off!
node
typescript
hono
htmx
atproto
1import { describe, it, expect, beforeEach, vi, afterEach } from "vitest";
2import { BackfillManager, BackfillStatus } from "../backfill-manager.js";
3import type { Database } from "@atbb/db";
4import type { AppConfig } from "../config.js";
5import { AtpAgent } from "@atproto/api";
6import type { Indexer } from "../indexer.js";
7import { createMockLogger } from "./mock-logger.js";
8
9vi.mock("@atproto/api", () => ({
10 AtpAgent: vi.fn().mockImplementation(() => ({
11 com: {
12 atproto: {
13 repo: {
14 listRecords: vi.fn(),
15 },
16 },
17 },
18 })),
19}));
20
21// Minimal mock config
22function mockConfig(overrides: Partial<AppConfig> = {}): AppConfig {
23 return {
24 port: 3000,
25 forumDid: "did:plc:testforum",
26 pdsUrl: "https://pds.example.com",
27 databaseUrl: "postgres://test",
28 jetstreamUrl: "wss://jetstream.example.com",
29 oauthPublicUrl: "https://example.com",
30 sessionSecret: "a".repeat(32),
31 sessionTtlDays: 7,
32 backfillRateLimit: 10,
33 backfillConcurrency: 10,
34 backfillCursorMaxAgeHours: 48,
35 ...overrides,
36 } as AppConfig;
37}
38
39describe("BackfillManager", () => {
40 let mockDb: Database;
41 let manager: BackfillManager;
42 let mockLogger: ReturnType<typeof createMockLogger>;
43
44 beforeEach(() => {
45 mockDb = {
46 select: vi.fn().mockReturnValue({
47 from: vi.fn().mockReturnValue({
48 where: vi.fn().mockReturnValue({
49 limit: vi.fn().mockResolvedValue([]),
50 }),
51 }),
52 }),
53 } as unknown as Database;
54
55 mockLogger = createMockLogger();
56 manager = new BackfillManager(mockDb, mockConfig(), mockLogger);
57 });
58
59 afterEach(() => {
60 vi.clearAllMocks();
61 });
62
63 describe("checkIfNeeded", () => {
64 it("returns FullSync when cursor is null (no cursor)", async () => {
65 const status = await manager.checkIfNeeded(null);
66 expect(status).toBe(BackfillStatus.FullSync);
67 });
68
69 it("returns FullSync when cursor exists but forums table is empty", async () => {
70 // Forums query returns empty
71 vi.spyOn(mockDb, "select").mockReturnValue({
72 from: vi.fn().mockReturnValue({
73 where: vi.fn().mockReturnValue({
74 limit: vi.fn().mockResolvedValue([]),
75 }),
76 }),
77 } as any);
78
79 // Cursor from 1 hour ago (fresh)
80 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000);
81 const status = await manager.checkIfNeeded(cursor);
82 expect(status).toBe(BackfillStatus.FullSync);
83 });
84
85 it("returns CatchUp when cursor age exceeds threshold", async () => {
86 // Forums query returns a forum (DB not empty)
87 vi.spyOn(mockDb, "select").mockReturnValue({
88 from: vi.fn().mockReturnValue({
89 where: vi.fn().mockReturnValue({
90 limit: vi.fn().mockResolvedValue([{ id: 1n, rkey: "self" }]),
91 }),
92 }),
93 } as any);
94
95 // Cursor from 72 hours ago (stale)
96 const cursor = BigInt((Date.now() - 72 * 60 * 60 * 1000) * 1000);
97 const status = await manager.checkIfNeeded(cursor);
98 expect(status).toBe(BackfillStatus.CatchUp);
99 });
100
101 it("returns NotNeeded when cursor is fresh and DB has data", async () => {
102 // Forums query returns a forum
103 vi.spyOn(mockDb, "select").mockReturnValue({
104 from: vi.fn().mockReturnValue({
105 where: vi.fn().mockReturnValue({
106 limit: vi.fn().mockResolvedValue([{ id: 1n, rkey: "self" }]),
107 }),
108 }),
109 } as any);
110
111 // Cursor from 1 hour ago (fresh)
112 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000);
113 const status = await manager.checkIfNeeded(cursor);
114 expect(status).toBe(BackfillStatus.NotNeeded);
115 });
116
117 it("returns FullSync when DB query fails (fail safe)", async () => {
118 vi.spyOn(mockDb, "select").mockReturnValue({
119 from: vi.fn().mockReturnValue({
120 where: vi.fn().mockReturnValue({
121 limit: vi.fn().mockRejectedValue(new Error("DB connection lost")),
122 }),
123 }),
124 } as any);
125
126 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
127 const cursor = BigInt((Date.now() - 1 * 60 * 60 * 1000) * 1000);
128 const status = await manager.checkIfNeeded(cursor);
129 expect(status).toBe(BackfillStatus.FullSync);
130 consoleSpy.mockRestore();
131 });
132 });
133
134 describe("syncRepoRecords", () => {
135 let mockIndexer: Indexer;
136
137 beforeEach(() => {
138 mockIndexer = {
139 handlePostCreate: vi.fn().mockResolvedValue(true),
140 handleForumCreate: vi.fn().mockResolvedValue(true),
141 handleThemeCreate: vi.fn().mockResolvedValue(true),
142 handleThemePolicyCreate: vi.fn().mockResolvedValue(true),
143 } as unknown as Indexer;
144 });
145
146 it("fetches records and calls indexer for each one", async () => {
147 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
148 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({
149 data: {
150 records: [
151 {
152 uri: "at://did:plc:user1/space.atbb.post/abc123",
153 cid: "bafyabc",
154 value: { $type: "space.atbb.post", text: "Hello", createdAt: "2026-01-01T00:00:00Z" },
155 },
156 {
157 uri: "at://did:plc:user1/space.atbb.post/def456",
158 cid: "bafydef",
159 value: { $type: "space.atbb.post", text: "World", createdAt: "2026-01-01T01:00:00Z" },
160 },
161 ],
162 cursor: undefined,
163 },
164 });
165
166 manager.setIndexer(mockIndexer);
167 const stats = await manager.syncRepoRecords(
168 "did:plc:user1",
169 "space.atbb.post",
170 mockAgent
171 );
172
173 expect(stats.recordsFound).toBe(2);
174 expect(stats.recordsIndexed).toBe(2);
175 expect(stats.errors).toBe(0);
176 expect(mockIndexer.handlePostCreate).toHaveBeenCalledTimes(2);
177 expect(mockIndexer.handlePostCreate).toHaveBeenCalledWith(
178 expect.objectContaining({
179 did: "did:plc:user1",
180 commit: expect.objectContaining({
181 rkey: "abc123",
182 cid: "bafyabc",
183 record: expect.objectContaining({ text: "Hello" }),
184 }),
185 })
186 );
187 });
188
189 it("paginates through multiple pages", async () => {
190 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
191 (mockAgent.com.atproto.repo.listRecords as any)
192 .mockResolvedValueOnce({
193 data: {
194 records: [{
195 uri: "at://did:plc:user1/space.atbb.post/page1",
196 cid: "bafyp1",
197 value: { $type: "space.atbb.post", text: "Page 1", createdAt: "2026-01-01T00:00:00Z" },
198 }],
199 cursor: "next_page",
200 },
201 })
202 .mockResolvedValueOnce({
203 data: {
204 records: [{
205 uri: "at://did:plc:user1/space.atbb.post/page2",
206 cid: "bafyp2",
207 value: { $type: "space.atbb.post", text: "Page 2", createdAt: "2026-01-02T00:00:00Z" },
208 }],
209 cursor: undefined,
210 },
211 });
212
213 manager.setIndexer(mockIndexer);
214 const stats = await manager.syncRepoRecords(
215 "did:plc:user1",
216 "space.atbb.post",
217 mockAgent
218 );
219
220 expect(stats.recordsFound).toBe(2);
221 expect(stats.recordsIndexed).toBe(2);
222 expect(mockAgent.com.atproto.repo.listRecords).toHaveBeenCalledTimes(2);
223 });
224
225 it("continues on indexer errors and tracks error count", async () => {
226 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
227 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({
228 data: {
229 records: [
230 {
231 uri: "at://did:plc:user1/space.atbb.post/good",
232 cid: "bafygood",
233 value: { $type: "space.atbb.post", text: "Good", createdAt: "2026-01-01T00:00:00Z" },
234 },
235 {
236 uri: "at://did:plc:user1/space.atbb.post/bad",
237 cid: "bafybad",
238 value: { $type: "space.atbb.post", text: "Bad", createdAt: "2026-01-01T01:00:00Z" },
239 },
240 ],
241 cursor: undefined,
242 },
243 });
244
245 (mockIndexer.handlePostCreate as any)
246 .mockResolvedValueOnce(true)
247 .mockRejectedValueOnce(new Error("FK missing"));
248
249 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
250 manager.setIndexer(mockIndexer);
251 const stats = await manager.syncRepoRecords(
252 "did:plc:user1",
253 "space.atbb.post",
254 mockAgent
255 );
256
257 expect(stats.recordsFound).toBe(2);
258 expect(stats.recordsIndexed).toBe(1);
259 expect(stats.errors).toBe(1);
260 consoleSpy.mockRestore();
261 });
262
263 it("returns error stats when indexer is not set", async () => {
264 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
265 // No setIndexer call — indexer is null
266 const stats = await manager.syncRepoRecords("did:plc:user", "space.atbb.post", mockAgent);
267 expect(stats.errors).toBe(1);
268 expect(mockLogger.error).toHaveBeenCalledWith(
269 "backfill.sync_skipped",
270 expect.objectContaining({ reason: "indexer_not_set" })
271 );
272 });
273
274 it("handles PDS connection failure gracefully", async () => {
275 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
276 (mockAgent.com.atproto.repo.listRecords as any)
277 .mockRejectedValueOnce(new Error("fetch failed"));
278
279 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
280 manager.setIndexer(mockIndexer);
281 const stats = await manager.syncRepoRecords(
282 "did:plc:user1",
283 "space.atbb.post",
284 mockAgent
285 );
286
287 expect(stats.recordsFound).toBe(0);
288 expect(stats.recordsIndexed).toBe(0);
289 expect(stats.errors).toBe(1);
290 consoleSpy.mockRestore();
291 });
292
293 it("dispatches handleThemeCreate for space.atbb.forum.theme records", async () => {
294 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
295 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({
296 data: {
297 records: [{
298 uri: "at://did:web:atbb.space/space.atbb.forum.theme/neobrutal-dark",
299 cid: "bafytheme1",
300 value: {
301 $type: "space.atbb.forum.theme",
302 name: "Neobrutal Dark",
303 colorScheme: "dark",
304 tokens: { "color-bg": "#1a1a1a" },
305 createdAt: "2026-01-01T00:00:00Z",
306 },
307 }],
308 cursor: undefined,
309 },
310 });
311
312 manager.setIndexer(mockIndexer);
313 const stats = await manager.syncRepoRecords(
314 "did:web:atbb.space",
315 "space.atbb.forum.theme",
316 mockAgent
317 );
318
319 expect(stats.recordsFound).toBe(1);
320 expect(stats.recordsIndexed).toBe(1);
321 expect(stats.errors).toBe(0);
322 expect(mockIndexer.handleThemeCreate).toHaveBeenCalledTimes(1);
323 expect(mockIndexer.handleThemeCreate).toHaveBeenCalledWith(
324 expect.objectContaining({
325 did: "did:web:atbb.space",
326 commit: expect.objectContaining({
327 rkey: "neobrutal-dark",
328 cid: "bafytheme1",
329 record: expect.objectContaining({ name: "Neobrutal Dark", colorScheme: "dark" }),
330 }),
331 })
332 );
333 });
334
335 it("dispatches handleThemePolicyCreate for space.atbb.forum.themePolicy records", async () => {
336 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
337 (mockAgent.com.atproto.repo.listRecords as any).mockResolvedValueOnce({
338 data: {
339 records: [{
340 uri: "at://did:web:atbb.space/space.atbb.forum.themePolicy/self",
341 cid: "bafypolicy1",
342 value: {
343 $type: "space.atbb.forum.themePolicy",
344 availableThemes: [
345 { uri: "at://did:web:atbb.space/space.atbb.forum.theme/neobrutal-dark" },
346 ],
347 defaultLightTheme: { uri: "at://did:web:atbb.space/space.atbb.forum.theme/neobrutal-light" },
348 defaultDarkTheme: { uri: "at://did:web:atbb.space/space.atbb.forum.theme/neobrutal-dark" },
349 allowUserChoice: true,
350 },
351 }],
352 cursor: undefined,
353 },
354 });
355
356 manager.setIndexer(mockIndexer);
357 const stats = await manager.syncRepoRecords(
358 "did:web:atbb.space",
359 "space.atbb.forum.themePolicy",
360 mockAgent
361 );
362
363 expect(stats.recordsFound).toBe(1);
364 expect(stats.recordsIndexed).toBe(1);
365 expect(stats.errors).toBe(0);
366 expect(mockIndexer.handleThemePolicyCreate).toHaveBeenCalledTimes(1);
367 expect(mockIndexer.handleThemePolicyCreate).toHaveBeenCalledWith(
368 expect.objectContaining({
369 did: "did:web:atbb.space",
370 commit: expect.objectContaining({
371 rkey: "self",
372 cid: "bafypolicy1",
373 }),
374 })
375 );
376 });
377
378 it("returns error stats when handler method is missing on Indexer (as-any cast gap)", async () => {
379 // COLLECTION_HANDLER_MAP entry exists but the method is absent on the indexer.
380 // .bind() on undefined throws TypeError which propagates out of syncRepoRecords
381 // and fails performBackfill's outer catch rather than being silently swallowed.
382 const brokenIndexer = {} as unknown as Indexer;
383 manager.setIndexer(brokenIndexer);
384
385 const mockAgent = new AtpAgent({ service: "https://pds.example.com" });
386 // listRecords would never be called — the TypeError fires before the do-while
387 await expect(
388 manager.syncRepoRecords("did:web:atbb.space", "space.atbb.forum.theme", mockAgent)
389 ).rejects.toThrow(TypeError);
390 });
391 });
392
393 describe("performBackfill", () => {
394 let mockIndexer: Indexer;
395 let consoleSpy: any;
396
397 beforeEach(() => {
398 consoleSpy = vi.spyOn(console, "log").mockImplementation(() => {});
399 vi.spyOn(console, "error").mockImplementation(() => {});
400 vi.spyOn(console, "warn").mockImplementation(() => {});
401
402 mockIndexer = {
403 handleForumCreate: vi.fn().mockResolvedValue(true),
404 handleCategoryCreate: vi.fn().mockResolvedValue(true),
405 handleBoardCreate: vi.fn().mockResolvedValue(true),
406 handleRoleCreate: vi.fn().mockResolvedValue(true),
407 handleMembershipCreate: vi.fn().mockResolvedValue(true),
408 handlePostCreate: vi.fn().mockResolvedValue(true),
409 handleModActionCreate: vi.fn().mockResolvedValue(true),
410 handleThemeCreate: vi.fn().mockResolvedValue(true),
411 handleThemePolicyCreate: vi.fn().mockResolvedValue(true),
412 } as unknown as Indexer;
413 });
414
415 afterEach(() => {
416 consoleSpy.mockRestore();
417 });
418
419 it("creates a backfill_progress row on start", async () => {
420 const mockInsert = vi.fn().mockReturnValue({
421 values: vi.fn().mockReturnValue({
422 returning: vi.fn().mockResolvedValue([{ id: 1n }]),
423 }),
424 });
425
426 const mockSelectEmpty = vi.fn().mockReturnValue({
427 from: vi.fn().mockReturnValue({
428 where: vi.fn().mockReturnValue({
429 limit: vi.fn().mockResolvedValue([]),
430 orderBy: vi.fn().mockResolvedValue([]),
431 }),
432 orderBy: vi.fn().mockResolvedValue([]),
433 }),
434 });
435
436 mockDb = {
437 select: mockSelectEmpty,
438 insert: mockInsert,
439 update: vi.fn().mockReturnValue({
440 set: vi.fn().mockReturnValue({
441 where: vi.fn().mockResolvedValue(undefined),
442 }),
443 }),
444 } as unknown as Database;
445
446 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
447 manager.setIndexer(mockIndexer);
448
449 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
450 com: {
451 atproto: {
452 repo: {
453 listRecords: vi.fn().mockResolvedValue({
454 data: { records: [], cursor: undefined },
455 }),
456 },
457 },
458 },
459 });
460
461 await manager.performBackfill(BackfillStatus.FullSync);
462
463 expect(mockInsert).toHaveBeenCalled();
464 });
465
466 it("sets isRunning flag during backfill", async () => {
467 const mockInsert = vi.fn().mockReturnValue({
468 values: vi.fn().mockReturnValue({
469 returning: vi.fn().mockResolvedValue([{ id: 1n }]),
470 }),
471 });
472
473 mockDb = {
474 select: vi.fn().mockReturnValue({
475 from: vi.fn().mockReturnValue({
476 where: vi.fn().mockReturnValue({
477 limit: vi.fn().mockResolvedValue([]),
478 orderBy: vi.fn().mockResolvedValue([]),
479 }),
480 orderBy: vi.fn().mockResolvedValue([]),
481 }),
482 }),
483 insert: mockInsert,
484 update: vi.fn().mockReturnValue({
485 set: vi.fn().mockReturnValue({
486 where: vi.fn().mockResolvedValue(undefined),
487 }),
488 }),
489 } as unknown as Database;
490
491 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
492 manager.setIndexer(mockIndexer);
493
494 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
495 com: {
496 atproto: {
497 repo: {
498 listRecords: vi.fn().mockResolvedValue({
499 data: { records: [], cursor: undefined },
500 }),
501 },
502 },
503 },
504 });
505
506 expect(manager.getIsRunning()).toBe(false);
507 const promise = manager.performBackfill(BackfillStatus.FullSync);
508 expect(manager.getIsRunning()).toBe(true);
509 await promise;
510 expect(manager.getIsRunning()).toBe(false);
511 });
512
513 it("rejects concurrent backfill attempts", async () => {
514 const mockInsert = vi.fn().mockReturnValue({
515 values: vi.fn().mockReturnValue({
516 returning: vi.fn().mockResolvedValue([{ id: 1n }]),
517 }),
518 });
519
520 mockDb = {
521 select: vi.fn().mockReturnValue({
522 from: vi.fn().mockReturnValue({
523 where: vi.fn().mockReturnValue({
524 limit: vi.fn().mockResolvedValue([]),
525 orderBy: vi.fn().mockResolvedValue([]),
526 }),
527 orderBy: vi.fn().mockResolvedValue([]),
528 }),
529 }),
530 insert: mockInsert,
531 update: vi.fn().mockReturnValue({
532 set: vi.fn().mockReturnValue({
533 where: vi.fn().mockResolvedValue(undefined),
534 }),
535 }),
536 } as unknown as Database;
537
538 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
539 manager.setIndexer(mockIndexer);
540
541 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
542 com: {
543 atproto: {
544 repo: {
545 listRecords: vi.fn().mockImplementation(
546 () => new Promise((resolve) =>
547 setTimeout(() => resolve({ data: { records: [], cursor: undefined } }), 100)
548 )
549 ),
550 },
551 },
552 },
553 });
554
555 const first = manager.performBackfill(BackfillStatus.FullSync);
556
557 await expect(manager.performBackfill(BackfillStatus.FullSync))
558 .rejects.toThrow("Backfill is already in progress");
559
560 await first;
561 });
562
563 it("CatchUp: syncs user-owned collections and aggregates counts", async () => {
564 // Phase 1 (7 FORUM_OWNED_COLLECTIONS) must return empty so its records don't
565 // pollute the count. Phase 2: 2 users × 2 USER_OWNED_COLLECTIONS × 1 record = 4.
566 const emptyPage = { data: { records: [], cursor: undefined } };
567 const recordPage = {
568 data: {
569 records: [{
570 uri: "at://did:plc:u/space.atbb.post/r1",
571 cid: "bafyr1",
572 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" },
573 }],
574 cursor: undefined,
575 },
576 };
577
578 const mockListRecords = vi.fn()
579 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.forum (Phase 1 call 1)
580 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.category (Phase 1 call 2)
581 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.board (Phase 1 call 3)
582 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.role (Phase 1 call 4)
583 .mockResolvedValueOnce(emptyPage) // space.atbb.modAction (Phase 1 call 5)
584 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.theme (Phase 1 call 6)
585 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.themePolicy (Phase 1 call 7)
586 .mockResolvedValue(recordPage); // all Phase 2 user collection calls
587
588 mockDb = {
589 select: vi.fn().mockReturnValue({
590 from: vi.fn().mockReturnValue({
591 orderBy: vi.fn().mockResolvedValue([
592 { did: "did:plc:user1" },
593 { did: "did:plc:user2" },
594 ]),
595 }),
596 }),
597 insert: vi.fn().mockReturnValue({
598 values: vi.fn().mockReturnValue({
599 returning: vi.fn().mockResolvedValue([{ id: 42n }]),
600 }),
601 }),
602 update: vi.fn().mockReturnValue({
603 set: vi.fn().mockReturnValue({
604 where: vi.fn().mockResolvedValue(undefined),
605 }),
606 }),
607 } as unknown as Database;
608
609 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 5 }), createMockLogger());
610 manager.setIndexer(mockIndexer);
611 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
612 com: { atproto: { repo: { listRecords: mockListRecords } } },
613 });
614
615 const result = await manager.performBackfill(BackfillStatus.CatchUp);
616
617 // Phase 1: 0 records (forum collections empty)
618 // Phase 2: 2 users × 2 collections × 1 record each = 4 records indexed
619 expect(result.recordsIndexed).toBe(4);
620 expect(result.errors).toBe(0);
621 expect(result.didsProcessed).toBe(2);
622 expect(result.backfillId).toBe(42n);
623 });
624
625 it("CatchUp: rejected user batch increments totalErrors and is not swallowed", async () => {
626 // syncRepoRecords never throws — it catches PDS errors internally and returns errors:1.
627 // For the batch callback to reject (tested by the allSettled handling), the
628 // backfillErrors DB insert must fail, which propagates the rejection out of the callback.
629 const emptyPage = { data: { records: [], cursor: undefined } };
630
631 const mockListRecords = vi.fn()
632 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.forum (Phase 1 call 1)
633 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.category (Phase 1 call 2)
634 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.board (Phase 1 call 3)
635 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.role (Phase 1 call 4)
636 .mockResolvedValueOnce(emptyPage) // space.atbb.modAction (Phase 1 call 5)
637 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.theme (Phase 1 call 6)
638 .mockResolvedValueOnce(emptyPage) // space.atbb.forum.themePolicy (Phase 1 call 7)
639 // user1: both collections succeed, 1 record each
640 .mockResolvedValueOnce({ data: { records: [{
641 uri: "at://did:plc:user1/space.atbb.membership/self",
642 cid: "bafymem",
643 value: { $type: "space.atbb.membership", createdAt: "2026-01-01T00:00:00Z" },
644 }], cursor: undefined } })
645 .mockResolvedValueOnce({ data: { records: [{
646 uri: "at://did:plc:user1/space.atbb.post/p1",
647 cid: "bafyp1",
648 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" },
649 }], cursor: undefined } })
650 // user2/membership: PDS error → syncRepoRecords catches → returns errors:1 →
651 // triggers backfillErrors insert (which rejects below) → callback rejects
652 .mockRejectedValueOnce(new Error("PDS unreachable"));
653
654 mockDb = {
655 select: vi.fn().mockReturnValue({
656 from: vi.fn().mockReturnValue({
657 orderBy: vi.fn().mockResolvedValue([
658 { did: "did:plc:user1" },
659 { did: "did:plc:user2" },
660 ]),
661 }),
662 }),
663 insert: vi.fn()
664 .mockReturnValueOnce({ // backfillProgress insert — must succeed
665 values: vi.fn().mockReturnValue({
666 returning: vi.fn().mockResolvedValue([{ id: 7n }]),
667 }),
668 })
669 .mockReturnValueOnce({ // backfillErrors insert for user2 — rejects to make callback throw
670 values: vi.fn().mockReturnValue({
671 returning: vi.fn().mockRejectedValue(new Error("backfillErrors insert failed")),
672 }),
673 }),
674 update: vi.fn().mockReturnValue({
675 set: vi.fn().mockReturnValue({
676 where: vi.fn().mockResolvedValue(undefined),
677 }),
678 }),
679 } as unknown as Database;
680
681 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 1 }), createMockLogger());
682 manager.setIndexer(mockIndexer);
683 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
684 com: { atproto: { repo: { listRecords: mockListRecords } } },
685 });
686
687 const result = await manager.performBackfill(BackfillStatus.CatchUp);
688
689 // user1 batch (concurrency=1): fulfilled, 2 records indexed (membership + post)
690 // user2 batch: callback rejects → allSettled rejected branch → totalErrors++ = 1
691 expect(result.recordsIndexed).toBe(2);
692 expect(result.errors).toBe(1);
693 });
694
695 it("clears isRunning flag even when backfill fails", async () => {
696 const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {});
697
698 mockDb = {
699 insert: vi.fn().mockReturnValue({
700 values: vi.fn().mockReturnValue({
701 returning: vi.fn().mockRejectedValue(new Error("DB insert failed")),
702 }),
703 }),
704 update: vi.fn().mockReturnValue({
705 set: vi.fn().mockReturnValue({
706 where: vi.fn().mockResolvedValue(undefined),
707 }),
708 }),
709 } as unknown as Database;
710
711 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
712 manager.setIndexer(mockIndexer);
713
714 await expect(manager.performBackfill(BackfillStatus.FullSync))
715 .rejects.toThrow("DB insert failed");
716
717 expect(manager.getIsRunning()).toBe(false);
718 consoleSpy.mockRestore();
719 });
720 });
721
722 describe("checkForInterruptedBackfill", () => {
723 it("returns null when no interrupted backfill exists", async () => {
724 vi.spyOn(mockDb, "select").mockReturnValue({
725 from: vi.fn().mockReturnValue({
726 where: vi.fn().mockReturnValue({
727 limit: vi.fn().mockResolvedValue([]),
728 }),
729 }),
730 } as any);
731
732 const result = await manager.checkForInterruptedBackfill();
733 expect(result).toBeNull();
734 });
735
736 it("returns null and logs error when DB query fails", async () => {
737 vi.spyOn(mockDb, "select").mockReturnValue({
738 from: vi.fn().mockReturnValue({
739 where: vi.fn().mockReturnValue({
740 limit: vi.fn().mockRejectedValue(new Error("DB connection lost")),
741 }),
742 }),
743 } as any);
744
745 const result = await manager.checkForInterruptedBackfill();
746 expect(result).toBeNull();
747 expect(mockLogger.error).toHaveBeenCalled();
748 });
749
750 it("returns interrupted backfill row when one exists", async () => {
751 const interruptedRow = {
752 id: 5n,
753 status: "in_progress",
754 backfillType: "catch_up",
755 lastProcessedDid: "did:plc:halfway",
756 didsTotal: 100,
757 didsProcessed: 50,
758 recordsIndexed: 250,
759 startedAt: new Date(),
760 completedAt: null,
761 errorMessage: null,
762 };
763
764 vi.spyOn(mockDb, "select").mockReturnValue({
765 from: vi.fn().mockReturnValue({
766 where: vi.fn().mockReturnValue({
767 limit: vi.fn().mockResolvedValue([interruptedRow]),
768 }),
769 }),
770 } as any);
771
772 const result = await manager.checkForInterruptedBackfill();
773 expect(result).toEqual(interruptedRow);
774 });
775 });
776
777 describe("resumeBackfill", () => {
778 let mockIndexer: Indexer;
779
780 beforeEach(() => {
781 vi.spyOn(console, "log").mockImplementation(() => {});
782 vi.spyOn(console, "error").mockImplementation(() => {});
783
784 mockIndexer = {
785 handleForumCreate: vi.fn().mockResolvedValue(true),
786 handleCategoryCreate: vi.fn().mockResolvedValue(true),
787 handleBoardCreate: vi.fn().mockResolvedValue(true),
788 handleRoleCreate: vi.fn().mockResolvedValue(true),
789 handleMembershipCreate: vi.fn().mockResolvedValue(true),
790 handlePostCreate: vi.fn().mockResolvedValue(true),
791 handleModActionCreate: vi.fn().mockResolvedValue(true),
792 handleThemeCreate: vi.fn().mockResolvedValue(true),
793 handleThemePolicyCreate: vi.fn().mockResolvedValue(true),
794 } as unknown as Indexer;
795 });
796
797 afterEach(() => {
798 vi.restoreAllMocks();
799 });
800
801 it("resumes from lastProcessedDid and processes remaining users", async () => {
802 // Interrupted at user1 (didsProcessed=1), user2 and user3 remain
803 const interrupted = {
804 id: 5n,
805 status: "in_progress" as const,
806 backfillType: "catch_up",
807 lastProcessedDid: "did:plc:user1",
808 didsTotal: 3,
809 didsProcessed: 1,
810 recordsIndexed: 2,
811 startedAt: new Date(),
812 completedAt: null,
813 errorMessage: null,
814 };
815
816 // user2 and user3: 1 record each per collection (2 collections = 4 total)
817 const recordPage = {
818 data: {
819 records: [{ uri: "at://did:plc:u/space.atbb.post/r1", cid: "bafyr1",
820 value: { $type: "space.atbb.post", text: "hi", createdAt: "2026-01-01T00:00:00Z" } }],
821 cursor: undefined,
822 },
823 };
824
825 const mockListRecords = vi.fn().mockResolvedValue(recordPage);
826
827 mockDb = {
828 select: vi.fn().mockReturnValue({
829 from: vi.fn().mockReturnValue({
830 where: vi.fn().mockReturnValue({
831 orderBy: vi.fn().mockResolvedValue([
832 { did: "did:plc:user2" },
833 { did: "did:plc:user3" },
834 ]),
835 }),
836 }),
837 }),
838 insert: vi.fn().mockReturnValue({
839 values: vi.fn().mockReturnValue({
840 returning: vi.fn().mockResolvedValue([]),
841 }),
842 }),
843 update: vi.fn().mockReturnValue({
844 set: vi.fn().mockReturnValue({
845 where: vi.fn().mockResolvedValue(undefined),
846 }),
847 }),
848 } as unknown as Database;
849
850 manager = new BackfillManager(mockDb, mockConfig({ backfillConcurrency: 5 }), createMockLogger());
851 manager.setIndexer(mockIndexer);
852 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
853 com: { atproto: { repo: { listRecords: mockListRecords } } },
854 });
855
856 const result = await manager.resumeBackfill(interrupted);
857
858 // Starts from interrupted.recordsIndexed=2, adds 2 users × 2 collections × 1 record = 4
859 expect(result.recordsIndexed).toBe(6);
860 expect(result.errors).toBe(0);
861 expect(result.didsProcessed).toBe(3); // 1 (prior) + 2 (resumed)
862 expect(result.backfillId).toBe(5n);
863 });
864
865 it("marks completed even when no remaining users", async () => {
866 // Interrupted at the last user — no users with DID > lastProcessedDid
867 const interrupted = {
868 id: 3n,
869 status: "in_progress" as const,
870 backfillType: "catch_up",
871 lastProcessedDid: "did:plc:last",
872 didsTotal: 2,
873 didsProcessed: 2,
874 recordsIndexed: 10,
875 startedAt: new Date(),
876 completedAt: null,
877 errorMessage: null,
878 };
879
880 mockDb = {
881 select: vi.fn().mockReturnValue({
882 from: vi.fn().mockReturnValue({
883 where: vi.fn().mockReturnValue({
884 orderBy: vi.fn().mockResolvedValue([]), // no remaining users
885 }),
886 }),
887 }),
888 update: vi.fn().mockReturnValue({
889 set: vi.fn().mockReturnValue({
890 where: vi.fn().mockResolvedValue(undefined),
891 }),
892 }),
893 } as unknown as Database;
894
895 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
896 manager.setIndexer(mockIndexer);
897 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
898 com: { atproto: { repo: { listRecords: vi.fn() } } },
899 });
900
901 const result = await manager.resumeBackfill(interrupted);
902
903 // No new records — just marks completed with existing counts
904 expect(result.recordsIndexed).toBe(10);
905 expect(result.didsProcessed).toBe(2);
906 expect(result.backfillId).toBe(3n);
907
908 // DB row should be updated to completed
909 const updateMock = mockDb.update as any;
910 expect(updateMock).toHaveBeenCalled();
911 });
912
913 it("clears isRunning flag even when resume fails", async () => {
914 const interrupted = {
915 id: 9n,
916 status: "in_progress" as const,
917 backfillType: "catch_up",
918 lastProcessedDid: "did:plc:checkpoint",
919 didsTotal: 5,
920 didsProcessed: 3,
921 recordsIndexed: 15,
922 startedAt: new Date(),
923 completedAt: null,
924 errorMessage: null,
925 };
926
927 mockDb = {
928 select: vi.fn().mockReturnValue({
929 from: vi.fn().mockReturnValue({
930 where: vi.fn().mockReturnValue({
931 orderBy: vi.fn().mockRejectedValue(new Error("DB query failed")),
932 }),
933 }),
934 }),
935 update: vi.fn().mockReturnValue({
936 set: vi.fn().mockReturnValue({
937 where: vi.fn().mockResolvedValue(undefined),
938 }),
939 }),
940 } as unknown as Database;
941
942 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
943 manager.setIndexer(mockIndexer);
944 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
945 com: { atproto: { repo: { listRecords: vi.fn() } } },
946 });
947
948 await expect(manager.resumeBackfill(interrupted))
949 .rejects.toThrow("DB query failed");
950
951 expect(manager.getIsRunning()).toBe(false);
952 });
953
954 it("marks full_sync interrupted backfill as failed (cannot resume FullSync)", async () => {
955 const interrupted = {
956 id: 10n,
957 status: "in_progress" as const,
958 backfillType: "full_sync",
959 lastProcessedDid: null,
960 didsTotal: 0,
961 didsProcessed: 0,
962 recordsIndexed: 0,
963 startedAt: new Date(),
964 completedAt: null,
965 errorMessage: null,
966 };
967
968 const mockUpdate = vi.fn().mockReturnValue({
969 set: vi.fn().mockReturnValue({
970 where: vi.fn().mockResolvedValue(undefined),
971 }),
972 });
973 mockDb = {
974 update: mockUpdate,
975 } as unknown as Database;
976
977 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
978 manager.setIndexer(mockIndexer);
979
980 await expect(manager.resumeBackfill(interrupted))
981 .rejects.toThrow("Interrupted FullSync cannot be resumed");
982
983 // Verify the row was marked as failed
984 expect(mockUpdate).toHaveBeenCalled();
985 const setCall = mockUpdate.mock.results[0].value.set;
986 expect(setCall).toHaveBeenCalledWith(
987 expect.objectContaining({ status: "failed" })
988 );
989 });
990
991 it("rejects concurrent resume attempts", async () => {
992 const interrupted = {
993 id: 2n,
994 status: "in_progress" as const,
995 backfillType: "catch_up",
996 lastProcessedDid: "did:plc:check",
997 didsTotal: 2,
998 didsProcessed: 1,
999 recordsIndexed: 5,
1000 startedAt: new Date(),
1001 completedAt: null,
1002 errorMessage: null,
1003 };
1004
1005 mockDb = {
1006 select: vi.fn().mockReturnValue({
1007 from: vi.fn().mockReturnValue({
1008 where: vi.fn().mockReturnValue({
1009 orderBy: vi.fn().mockImplementation(
1010 () => new Promise((resolve) => setTimeout(() => resolve([]), 200))
1011 ),
1012 }),
1013 }),
1014 }),
1015 update: vi.fn().mockReturnValue({
1016 set: vi.fn().mockReturnValue({
1017 where: vi.fn().mockResolvedValue(undefined),
1018 }),
1019 }),
1020 } as unknown as Database;
1021
1022 manager = new BackfillManager(mockDb, mockConfig(), createMockLogger());
1023 manager.setIndexer(mockIndexer);
1024 vi.spyOn(manager as any, "createAgentForPds").mockReturnValue({
1025 com: { atproto: { repo: { listRecords: vi.fn() } } },
1026 });
1027
1028 const first = manager.resumeBackfill(interrupted);
1029
1030 await expect(manager.resumeBackfill(interrupted))
1031 .rejects.toThrow("Backfill is already in progress");
1032
1033 await first;
1034 });
1035 });
1036});