atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Wire policy engine into replication and optimize firehose to apply blocks directly

Firehose incremental pipeline: handleFirehoseCommit() now applies blocks
directly from the firehose event via applyFirehoseBlocks(), skipping the
HTTP round-trip to the source PDS. Falls back to full syncDid() for edge
cases (tooBig, rebase, sequence gaps, CAR parse failures). 14 new tests.

Policy engine integration: ReplicationManager optionally accepts a
PolicyEngine to drive which DIDs get replicated, at what frequency, and
in what priority order. Per-DID sync intervals replace the fixed 5-min
global timer. Server startup loads policies from POLICY_FILE and wires
them through. Fully backward compatible when no engine is provided.
20 new tests.

Also fix flaky temp dir cleanup in replication tests (ENOTEMPTY race).

+1899 -27
+640
src/replication/firehose-incremental.test.ts
··· 1 + /** 2 + * Tests for the incremental firehose block application optimization. 3 + * 4 + * When a firehose commit event arrives with CAR-encoded blocks, the 5 + * ReplicationManager should apply them directly to the blockstore 6 + * instead of making an HTTP round-trip to the source PDS via syncDid(). 7 + * 8 + * These tests verify: 9 + * - Incremental apply stores blocks and updates sync state 10 + * - Fallback to syncDid() on tooBig, rebase, empty blocks 11 + * - Gap detection triggers full sync 12 + * - CAR parse failure triggers fallback 13 + */ 14 + 15 + import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; 16 + import { mkdtempSync, rmSync } from "node:fs"; 17 + import { tmpdir } from "node:os"; 18 + import { join } from "node:path"; 19 + import Database from "better-sqlite3"; 20 + import { 21 + BlockMap, 22 + readCarWithRoot, 23 + blocksToCarFile, 24 + } from "@atproto/repo"; 25 + import { CID } from "multiformats"; 26 + import { 27 + create as createCid, 28 + CODEC_RAW, 29 + toString as cidToString, 30 + } from "@atcute/cid"; 31 + 32 + import { IpfsService } from "../ipfs.js"; 33 + import { RepoManager } from "../repo-manager.js"; 34 + import type { Config } from "../config.js"; 35 + import { SyncStorage } from "./sync-storage.js"; 36 + import { ReplicationManager } from "./replication-manager.js"; 37 + import { 38 + MANIFEST_NSID, 39 + didToRkey, 40 + } from "./types.js"; 41 + import type { FirehoseCommitEvent } from "./firehose-subscription.js"; 42 + 43 + // ============================================ 44 + // Helpers 45 + // ============================================ 46 + 47 + function testConfig(dataDir: string, replicateDids: string[] = []): Config { 48 + return { 49 + DID: "did:plc:localnode", 50 + HANDLE: "local.example.com", 51 + PDS_HOSTNAME: "local.example.com", 52 + AUTH_TOKEN: "test-auth-token", 53 + SIGNING_KEY: 54 + "0000000000000000000000000000000000000000000000000000000000000001", 55 + SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 56 + JWT_SECRET: "test-jwt-secret", 57 + PASSWORD_HASH: "$2a$10$test", 58 + DATA_DIR: dataDir, 59 + PORT: 3000, 60 + IPFS_ENABLED: true, 61 + IPFS_NETWORKING: false, 62 + REPLICATE_DIDS: replicateDids, 63 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 64 + FIREHOSE_ENABLED: false, 65 + }; 66 + } 67 + 68 + /** Create a CID string from raw bytes using SHA-256. */ 69 + async function makeCidStr(bytes: Uint8Array): Promise<string> { 70 + const cid = await createCid(CODEC_RAW, bytes); 71 + return cidToString(cid); 72 + } 73 + 74 + /** 75 + * Create a minimal valid CAR file with some blocks. 76 + * Returns the CAR bytes and the CID strings of the blocks stored. 77 + */ 78 + async function createTestCarBytes(): Promise<{ 79 + carBytes: Uint8Array; 80 + rootCidStr: string; 81 + blockCidStrs: string[]; 82 + }> { 83 + const blockMap = new BlockMap(); 84 + 85 + // Create some test blocks and add them to the BlockMap 86 + const block1Bytes = new TextEncoder().encode("test-block-1"); 87 + const block2Bytes = new TextEncoder().encode("test-block-2"); 88 + 89 + const cid1 = await createCid(CODEC_RAW, block1Bytes); 90 + const cid2 = await createCid(CODEC_RAW, block2Bytes); 91 + 92 + const mfCid1 = CID.parse(cidToString(cid1)); 93 + const mfCid2 = CID.parse(cidToString(cid2)); 94 + 95 + blockMap.set(mfCid1, block1Bytes); 96 + blockMap.set(mfCid2, block2Bytes); 97 + 98 + const carBytes = await blocksToCarFile(mfCid1, blockMap); 99 + 100 + return { 101 + carBytes, 102 + rootCidStr: mfCid1.toString(), 103 + blockCidStrs: [mfCid1.toString(), mfCid2.toString()], 104 + }; 105 + } 106 + 107 + /** 108 + * Create a FirehoseCommitEvent from CAR bytes. 109 + */ 110 + function makeFirehoseEvent( 111 + overrides: Partial<FirehoseCommitEvent> & { repo: string }, 112 + ): FirehoseCommitEvent { 113 + return { 114 + seq: 1, 115 + rev: "abc1234567890", 116 + since: null, 117 + blocks: new Uint8Array(0), 118 + ops: [], 119 + commit: null, 120 + time: new Date().toISOString(), 121 + tooBig: false, 122 + rebase: false, 123 + ...overrides, 124 + }; 125 + } 126 + 127 + /** Mock DidResolver that returns a test DID document. */ 128 + function mockDidResolver() { 129 + return { 130 + resolve: async (did: string) => ({ 131 + id: did, 132 + service: [ 133 + { 134 + id: "#atproto_pds", 135 + type: "AtprotoPersonalDataServer", 136 + serviceEndpoint: "https://pds.example.com", 137 + }, 138 + ], 139 + }), 140 + }; 141 + } 142 + 143 + // ============================================ 144 + // Incremental firehose block application 145 + // ============================================ 146 + 147 + describe("Firehose incremental block application", () => { 148 + let tmpDir: string; 149 + let db: InstanceType<typeof Database>; 150 + let ipfsService: IpfsService; 151 + let repoManager: RepoManager; 152 + let replManager: ReplicationManager; 153 + let syncStorage: SyncStorage; 154 + const trackedDid = "did:plc:remote1"; 155 + 156 + beforeEach(async () => { 157 + tmpDir = mkdtempSync(join(tmpdir(), "firehose-incremental-test-")); 158 + const config = testConfig(tmpDir, [trackedDid]); 159 + 160 + db = new Database(join(tmpDir, "test.db")); 161 + ipfsService = new IpfsService({ 162 + blocksPath: join(tmpDir, "ipfs-blocks"), 163 + datastorePath: join(tmpDir, "ipfs-datastore"), 164 + networking: false, 165 + }); 166 + await ipfsService.start(); 167 + 168 + repoManager = new RepoManager(db, config); 169 + repoManager.init(undefined, ipfsService, ipfsService); 170 + 171 + replManager = new ReplicationManager( 172 + db, 173 + config, 174 + repoManager, 175 + ipfsService, 176 + ipfsService, 177 + mockDidResolver() as any, 178 + ); 179 + 180 + // Initialize schema 181 + syncStorage = replManager.getSyncStorage(); 182 + syncStorage.initSchema(); 183 + 184 + // Set up sync state for the tracked DID 185 + syncStorage.upsertState({ 186 + did: trackedDid, 187 + pdsEndpoint: "https://pds.example.com", 188 + }); 189 + }); 190 + 191 + afterEach(async () => { 192 + replManager.stop(); 193 + if (ipfsService.isRunning()) { 194 + await ipfsService.stop(); 195 + } 196 + db.close(); 197 + rmSync(tmpDir, { recursive: true, force: true }); 198 + }); 199 + 200 + it("applies blocks from firehose event directly to blockstore", async () => { 201 + const { carBytes, rootCidStr, blockCidStrs } = await createTestCarBytes(); 202 + 203 + const event = makeFirehoseEvent({ 204 + repo: trackedDid, 205 + seq: 1, 206 + rev: "abc1234567890", 207 + since: null, 208 + blocks: carBytes, 209 + ops: [ 210 + { action: "create", path: "app.bsky.feed.post/abc", cid: null }, 211 + ], 212 + }); 213 + 214 + // Access the private method via any cast for testing 215 + await (replManager as any).handleFirehoseCommit(event); 216 + 217 + // Verify blocks were stored in IPFS 218 + for (const cidStr of blockCidStrs) { 219 + const has = await ipfsService.hasBlock(cidStr); 220 + expect(has).toBe(true); 221 + } 222 + 223 + // Verify sync state was updated 224 + const state = syncStorage.getState(trackedDid); 225 + expect(state).not.toBeNull(); 226 + expect(state!.lastSyncRev).toBe("abc1234567890"); 227 + expect(state!.rootCid).toBe(rootCidStr); 228 + expect(state!.status).toBe("synced"); 229 + expect(state!.lastSyncAt).not.toBeNull(); 230 + }); 231 + 232 + it("tracks block CIDs after incremental apply", async () => { 233 + const { carBytes, blockCidStrs } = await createTestCarBytes(); 234 + 235 + const event = makeFirehoseEvent({ 236 + repo: trackedDid, 237 + rev: "rev0000000001", 238 + blocks: carBytes, 239 + }); 240 + 241 + await (replManager as any).handleFirehoseCommit(event); 242 + 243 + // Verify block CIDs were tracked in the sync storage 244 + const trackedCids = syncStorage.getBlockCids(trackedDid); 245 + expect(trackedCids.length).toBe(blockCidStrs.length); 246 + for (const cid of blockCidStrs) { 247 + expect(trackedCids).toContain(cid); 248 + } 249 + }); 250 + 251 + it("uses event.rev for sync state instead of parsing commit block", async () => { 252 + const { carBytes } = await createTestCarBytes(); 253 + const expectedRev = "myrev12345678"; 254 + 255 + const event = makeFirehoseEvent({ 256 + repo: trackedDid, 257 + rev: expectedRev, 258 + blocks: carBytes, 259 + }); 260 + 261 + await (replManager as any).handleFirehoseCommit(event); 262 + 263 + const state = syncStorage.getState(trackedDid); 264 + expect(state!.lastSyncRev).toBe(expectedRev); 265 + }); 266 + 267 + it("falls back to syncDid when tooBig is set", async () => { 268 + const { carBytes } = await createTestCarBytes(); 269 + const syncDidSpy = vi.spyOn(replManager, "syncDid").mockResolvedValue(); 270 + 271 + const event = makeFirehoseEvent({ 272 + repo: trackedDid, 273 + blocks: carBytes, 274 + tooBig: true, 275 + }); 276 + 277 + await (replManager as any).handleFirehoseCommit(event); 278 + 279 + expect(syncDidSpy).toHaveBeenCalledWith(trackedDid); 280 + syncDidSpy.mockRestore(); 281 + }); 282 + 283 + it("falls back to syncDid when rebase is set", async () => { 284 + const { carBytes } = await createTestCarBytes(); 285 + const syncDidSpy = vi.spyOn(replManager, "syncDid").mockResolvedValue(); 286 + 287 + const event = makeFirehoseEvent({ 288 + repo: trackedDid, 289 + blocks: carBytes, 290 + rebase: true, 291 + }); 292 + 293 + await (replManager as any).handleFirehoseCommit(event); 294 + 295 + expect(syncDidSpy).toHaveBeenCalledWith(trackedDid); 296 + syncDidSpy.mockRestore(); 297 + }); 298 + 299 + it("falls back to syncDid when blocks are empty", async () => { 300 + const syncDidSpy = vi.spyOn(replManager, "syncDid").mockResolvedValue(); 301 + 302 + const event = makeFirehoseEvent({ 303 + repo: trackedDid, 304 + blocks: new Uint8Array(0), 305 + }); 306 + 307 + await (replManager as any).handleFirehoseCommit(event); 308 + 309 + expect(syncDidSpy).toHaveBeenCalledWith(trackedDid); 310 + syncDidSpy.mockRestore(); 311 + }); 312 + 313 + it("falls back to syncDid on gap detection (since mismatch)", async () => { 314 + // Set a known rev in sync state 315 + syncStorage.updateSyncProgress(trackedDid, "known_rev_0001"); 316 + 317 + const { carBytes } = await createTestCarBytes(); 318 + const syncDidSpy = vi.spyOn(replManager, "syncDid").mockResolvedValue(); 319 + const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); 320 + 321 + const event = makeFirehoseEvent({ 322 + repo: trackedDid, 323 + rev: "newrev0000002", 324 + since: "different_rev_", // does not match known_rev_0001 325 + blocks: carBytes, 326 + }); 327 + 328 + await (replManager as any).handleFirehoseCommit(event); 329 + 330 + // Should have fallen back to full sync 331 + expect(syncDidSpy).toHaveBeenCalledWith(trackedDid); 332 + 333 + // Should have logged the gap warning 334 + expect(consoleSpy).toHaveBeenCalledWith( 335 + expect.stringContaining("[replication] Gap detected"), 336 + ); 337 + 338 + syncDidSpy.mockRestore(); 339 + consoleSpy.mockRestore(); 340 + }); 341 + 342 + it("proceeds with incremental apply when since matches last sync rev", async () => { 343 + const knownRev = "known_rev_0001"; 344 + syncStorage.updateSyncProgress(trackedDid, knownRev); 345 + 346 + const { carBytes } = await createTestCarBytes(); 347 + const syncDidSpy = vi.spyOn(replManager, "syncDid"); 348 + 349 + const event = makeFirehoseEvent({ 350 + repo: trackedDid, 351 + rev: "newrev0000002", 352 + since: knownRev, // matches our known rev 353 + blocks: carBytes, 354 + }); 355 + 356 + await (replManager as any).handleFirehoseCommit(event); 357 + 358 + // Should NOT have called syncDid (incremental apply should work) 359 + expect(syncDidSpy).not.toHaveBeenCalled(); 360 + 361 + // Verify blocks were stored 362 + const state = syncStorage.getState(trackedDid); 363 + expect(state!.lastSyncRev).toBe("newrev0000002"); 364 + expect(state!.status).toBe("synced"); 365 + 366 + syncDidSpy.mockRestore(); 367 + }); 368 + 369 + it("falls back to syncDid when CAR parsing fails", async () => { 370 + const syncDidSpy = vi.spyOn(replManager, "syncDid").mockResolvedValue(); 371 + const consoleSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); 372 + 373 + // Create invalid CAR bytes (not parseable) 374 + const invalidCarBytes = new TextEncoder().encode("this is not a valid CAR file"); 375 + 376 + const event = makeFirehoseEvent({ 377 + repo: trackedDid, 378 + rev: "newrev0000001", 379 + blocks: invalidCarBytes, 380 + }); 381 + 382 + await (replManager as any).handleFirehoseCommit(event); 383 + 384 + // Should have fallen back to full sync 385 + expect(syncDidSpy).toHaveBeenCalledWith(trackedDid); 386 + 387 + // Should have logged the fallback warning 388 + expect(consoleSpy).toHaveBeenCalledWith( 389 + expect.stringContaining("[replication] Incremental apply failed"), 390 + expect.any(String), 391 + ); 392 + 393 + syncDidSpy.mockRestore(); 394 + consoleSpy.mockRestore(); 395 + }); 396 + 397 + it("skips events for non-tracked DIDs", async () => { 398 + const { carBytes } = await createTestCarBytes(); 399 + const syncDidSpy = vi.spyOn(replManager, "syncDid"); 400 + 401 + const event = makeFirehoseEvent({ 402 + repo: "did:plc:untracked", 403 + blocks: carBytes, 404 + }); 405 + 406 + await (replManager as any).handleFirehoseCommit(event); 407 + 408 + // Should not have attempted anything 409 + expect(syncDidSpy).not.toHaveBeenCalled(); 410 + 411 + syncDidSpy.mockRestore(); 412 + }); 413 + 414 + it("allows incremental apply when since is null (first commit or no prior state)", async () => { 415 + const { carBytes, blockCidStrs } = await createTestCarBytes(); 416 + const syncDidSpy = vi.spyOn(replManager, "syncDid"); 417 + 418 + const event = makeFirehoseEvent({ 419 + repo: trackedDid, 420 + rev: "firstrev00001", 421 + since: null, // first commit 422 + blocks: carBytes, 423 + }); 424 + 425 + await (replManager as any).handleFirehoseCommit(event); 426 + 427 + // Should use incremental path, not syncDid 428 + expect(syncDidSpy).not.toHaveBeenCalled(); 429 + 430 + // Blocks should be stored 431 + for (const cidStr of blockCidStrs) { 432 + const has = await ipfsService.hasBlock(cidStr); 433 + expect(has).toBe(true); 434 + } 435 + 436 + syncDidSpy.mockRestore(); 437 + }); 438 + 439 + it("updates manifest record after incremental apply", async () => { 440 + // Create a manifest record for the tracked DID first 441 + const rkey = didToRkey(trackedDid); 442 + await repoManager.putRecord(MANIFEST_NSID, rkey, { 443 + $type: MANIFEST_NSID, 444 + subject: trackedDid, 445 + status: "active", 446 + lastSyncRev: null, 447 + lastSyncAt: null, 448 + createdAt: "2025-01-01T00:00:00.000Z", 449 + }); 450 + 451 + const { carBytes } = await createTestCarBytes(); 452 + const event = makeFirehoseEvent({ 453 + repo: trackedDid, 454 + rev: "manifest_rev01", 455 + blocks: carBytes, 456 + }); 457 + 458 + await (replManager as any).handleFirehoseCommit(event); 459 + 460 + // Verify manifest was updated 461 + const manifest = await repoManager.getRecord(MANIFEST_NSID, rkey); 462 + expect(manifest).not.toBeNull(); 463 + const record = manifest!.record as Record<string, unknown>; 464 + expect(record.lastSyncRev).toBe("manifest_rev01"); 465 + expect(record.lastSyncAt).not.toBeNull(); 466 + // createdAt should be preserved 467 + expect(record.createdAt).toBe("2025-01-01T00:00:00.000Z"); 468 + }); 469 + }); 470 + 471 + // ============================================ 472 + // Using real CAR data from a source repo 473 + // ============================================ 474 + 475 + describe("Firehose incremental: real repo CAR roundtrip", () => { 476 + let tmpDir: string; 477 + let sourceDb: InstanceType<typeof Database>; 478 + let replicaDb: InstanceType<typeof Database>; 479 + let sourceIpfs: IpfsService; 480 + let replicaIpfs: IpfsService; 481 + let sourceRepo: RepoManager; 482 + let replicaRepo: RepoManager; 483 + let replManager: ReplicationManager; 484 + let syncStorage: SyncStorage; 485 + const sourceDid = "did:plc:test123"; 486 + const replicaDid = "did:plc:replica456"; 487 + 488 + beforeEach(async () => { 489 + tmpDir = mkdtempSync(join(tmpdir(), "firehose-real-car-test-")); 490 + 491 + // Source setup 492 + const sourceConfig = testConfig(join(tmpDir, "source"), []); 493 + sourceConfig.DID = sourceDid; 494 + sourceDb = new Database(join(tmpDir, "source.db")); 495 + sourceIpfs = new IpfsService({ 496 + blocksPath: join(tmpDir, "source-ipfs-blocks"), 497 + datastorePath: join(tmpDir, "source-ipfs-datastore"), 498 + networking: false, 499 + }); 500 + await sourceIpfs.start(); 501 + sourceRepo = new RepoManager(sourceDb, sourceConfig); 502 + sourceRepo.init(undefined, sourceIpfs, sourceIpfs); 503 + 504 + // Replica setup 505 + const replicaConfig = testConfig(join(tmpDir, "replica"), [sourceDid]); 506 + replicaConfig.DID = replicaDid; 507 + replicaConfig.SIGNING_KEY = 508 + "0000000000000000000000000000000000000000000000000000000000000002"; 509 + replicaDb = new Database(join(tmpDir, "replica.db")); 510 + replicaIpfs = new IpfsService({ 511 + blocksPath: join(tmpDir, "replica-ipfs-blocks"), 512 + datastorePath: join(tmpDir, "replica-ipfs-datastore"), 513 + networking: false, 514 + }); 515 + await replicaIpfs.start(); 516 + replicaRepo = new RepoManager(replicaDb, replicaConfig); 517 + replicaRepo.init(undefined, replicaIpfs, replicaIpfs); 518 + 519 + replManager = new ReplicationManager( 520 + replicaDb, 521 + replicaConfig, 522 + replicaRepo, 523 + replicaIpfs, 524 + replicaIpfs, 525 + mockDidResolver() as any, 526 + ); 527 + 528 + syncStorage = replManager.getSyncStorage(); 529 + syncStorage.initSchema(); 530 + syncStorage.upsertState({ 531 + did: sourceDid, 532 + pdsEndpoint: "https://pds.example.com", 533 + }); 534 + }); 535 + 536 + afterEach(async () => { 537 + replManager.stop(); 538 + if (sourceIpfs.isRunning()) await sourceIpfs.stop(); 539 + if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 540 + sourceDb.close(); 541 + replicaDb.close(); 542 + rmSync(tmpDir, { recursive: true, force: true }); 543 + }); 544 + 545 + it("incremental apply of real repo CAR stores all blocks correctly", async () => { 546 + // 1. Create records in source 547 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 548 + $type: "app.bsky.feed.post", 549 + text: "Firehose incremental test!", 550 + createdAt: new Date().toISOString(), 551 + }); 552 + 553 + // 2. Export as CAR (simulating what the firehose would send) 554 + const carBytes = await sourceRepo.getRepoCar(); 555 + expect(carBytes.length).toBeGreaterThan(0); 556 + 557 + // 3. Create a firehose event with the CAR bytes 558 + const event = makeFirehoseEvent({ 559 + repo: sourceDid, 560 + seq: 42, 561 + rev: "firehose_rev01", 562 + since: null, 563 + blocks: carBytes, 564 + ops: [ 565 + { action: "create", path: "app.bsky.feed.post/abc", cid: null }, 566 + ], 567 + }); 568 + 569 + // 4. Apply via the firehose handler 570 + await (replManager as any).handleFirehoseCommit(event); 571 + 572 + // 5. Verify all blocks from the CAR are in the replica's blockstore 573 + const { blocks } = await readCarWithRoot(carBytes); 574 + const internalMap = ( 575 + blocks as unknown as { map: Map<string, Uint8Array> } 576 + ).map; 577 + 578 + for (const cidStr of internalMap.keys()) { 579 + const has = await replicaIpfs.hasBlock(cidStr); 580 + expect(has).toBe(true); 581 + } 582 + 583 + // 6. Verify sync state 584 + const state = syncStorage.getState(sourceDid); 585 + expect(state!.lastSyncRev).toBe("firehose_rev01"); 586 + expect(state!.status).toBe("synced"); 587 + 588 + // 7. Verify block tracking 589 + const trackedCids = syncStorage.getBlockCids(sourceDid); 590 + expect(trackedCids.length).toBe(internalMap.size); 591 + }); 592 + 593 + it("sequential incremental applies accumulate blocks", async () => { 594 + // First commit 595 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 596 + $type: "app.bsky.feed.post", 597 + text: "First post", 598 + createdAt: new Date().toISOString(), 599 + }); 600 + 601 + const carBytes1 = await sourceRepo.getRepoCar(); 602 + const event1 = makeFirehoseEvent({ 603 + repo: sourceDid, 604 + seq: 1, 605 + rev: "rev0000000001", 606 + since: null, 607 + blocks: carBytes1, 608 + }); 609 + 610 + await (replManager as any).handleFirehoseCommit(event1); 611 + const blockCount1 = syncStorage.getBlockCids(sourceDid).length; 612 + expect(blockCount1).toBeGreaterThan(0); 613 + 614 + // Second commit (builds on first) 615 + await sourceRepo.createRecord("app.bsky.feed.post", undefined, { 616 + $type: "app.bsky.feed.post", 617 + text: "Second post", 618 + createdAt: new Date().toISOString(), 619 + }); 620 + 621 + const carBytes2 = await sourceRepo.getRepoCar(); 622 + const event2 = makeFirehoseEvent({ 623 + repo: sourceDid, 624 + seq: 2, 625 + rev: "rev0000000002", 626 + since: "rev0000000001", 627 + blocks: carBytes2, 628 + }); 629 + 630 + await (replManager as any).handleFirehoseCommit(event2); 631 + 632 + // Should have accumulated more blocks 633 + const blockCount2 = syncStorage.getBlockCids(sourceDid).length; 634 + expect(blockCount2).toBeGreaterThanOrEqual(blockCount1); 635 + 636 + // Rev should be updated 637 + const state = syncStorage.getState(sourceDid); 638 + expect(state!.lastSyncRev).toBe("rev0000000002"); 639 + }); 640 + });
+914
src/replication/policy-integration.test.ts
··· 1 + /** 2 + * Tests for PolicyEngine integration with ReplicationManager. 3 + * 4 + * Verifies that: 5 + * - Policy-driven DID list merges correctly with config DIDs 6 + * - Per-DID sync intervals are respected 7 + * - Priority ordering works 8 + * - shouldReplicate=false skips DIDs 9 + * - Backward compatibility: no PolicyEngine = identical to old behavior 10 + */ 11 + 12 + import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; 13 + import { mkdtempSync, rmSync } from "node:fs"; 14 + import { tmpdir } from "node:os"; 15 + import { join } from "node:path"; 16 + import Database from "better-sqlite3"; 17 + 18 + import { IpfsService } from "../ipfs.js"; 19 + import { RepoManager } from "../repo-manager.js"; 20 + import type { Config } from "../config.js"; 21 + import { PolicyEngine } from "../policy/engine.js"; 22 + import { mutualAid, saas } from "../policy/presets.js"; 23 + import type { Policy, PolicySet } from "../policy/types.js"; 24 + import { 25 + DEFAULT_REPLICATION, 26 + DEFAULT_SYNC, 27 + DEFAULT_RETENTION, 28 + } from "../policy/types.js"; 29 + import { ReplicationManager } from "./replication-manager.js"; 30 + import { DidResolver } from "../did-resolver.js"; 31 + import { InMemoryDidCache } from "../did-cache.js"; 32 + 33 + // ============================================ 34 + // Helpers 35 + // ============================================ 36 + 37 + function testConfig(dataDir: string, replicateDids: string[] = []): Config { 38 + return { 39 + DID: "did:plc:test123", 40 + HANDLE: "test.example.com", 41 + PDS_HOSTNAME: "test.example.com", 42 + AUTH_TOKEN: "test-auth-token", 43 + SIGNING_KEY: 44 + "0000000000000000000000000000000000000000000000000000000000000001", 45 + SIGNING_KEY_PUBLIC: "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 46 + JWT_SECRET: "test-jwt-secret", 47 + PASSWORD_HASH: "$2a$10$test", 48 + DATA_DIR: dataDir, 49 + PORT: 3000, 50 + IPFS_ENABLED: true, 51 + IPFS_NETWORKING: false, 52 + REPLICATE_DIDS: replicateDids, 53 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 54 + FIREHOSE_ENABLED: false, 55 + }; 56 + } 57 + 58 + function makePolicy(overrides: Partial<Policy> & { id: string }): Policy { 59 + return { 60 + name: overrides.id, 61 + target: { type: "all" }, 62 + replication: { ...DEFAULT_REPLICATION }, 63 + sync: { ...DEFAULT_SYNC }, 64 + retention: { ...DEFAULT_RETENTION }, 65 + priority: 50, 66 + enabled: true, 67 + ...overrides, 68 + }; 69 + } 70 + 71 + function makePolicySet(policies: Policy[]): PolicySet { 72 + return { version: 1, policies }; 73 + } 74 + 75 + // ============================================ 76 + // DID list merging 77 + // ============================================ 78 + 79 + describe("PolicyEngine + ReplicationManager: DID list merging", () => { 80 + let tmpDir: string; 81 + let db: InstanceType<typeof Database>; 82 + let ipfsService: IpfsService; 83 + let repoManager: RepoManager; 84 + let didResolver: DidResolver; 85 + 86 + beforeEach(async () => { 87 + tmpDir = mkdtempSync(join(tmpdir(), "policy-integration-test-")); 88 + db = new Database(join(tmpDir, "test.db")); 89 + ipfsService = new IpfsService({ 90 + blocksPath: join(tmpDir, "ipfs-blocks"), 91 + datastorePath: join(tmpDir, "ipfs-datastore"), 92 + networking: false, 93 + }); 94 + await ipfsService.start(); 95 + 96 + const config = testConfig(tmpDir, []); 97 + repoManager = new RepoManager(db, config); 98 + repoManager.init(undefined, ipfsService, ipfsService); 99 + 100 + didResolver = new DidResolver({ 101 + didCache: new InMemoryDidCache(), 102 + }); 103 + }); 104 + 105 + afterEach(async () => { 106 + if (ipfsService.isRunning()) await ipfsService.stop(); 107 + db.close(); 108 + rmSync(tmpDir, { recursive: true, force: true }); 109 + }); 110 + 111 + it("without PolicyEngine, returns only config DIDs", () => { 112 + const config = testConfig(tmpDir, ["did:plc:a", "did:plc:b"]); 113 + const rm = new ReplicationManager( 114 + db, 115 + config, 116 + repoManager, 117 + ipfsService, 118 + ipfsService, 119 + didResolver, 120 + ); 121 + expect(rm.getReplicateDids().sort()).toEqual([ 122 + "did:plc:a", 123 + "did:plc:b", 124 + ]); 125 + }); 126 + 127 + it("with PolicyEngine, merges config DIDs and policy explicit DIDs", () => { 128 + const config = testConfig(tmpDir, ["did:plc:config1"]); 129 + const engine = new PolicyEngine( 130 + makePolicySet([ 131 + makePolicy({ 132 + id: "p1", 133 + target: { type: "list", dids: ["did:plc:policy1", "did:plc:policy2"] }, 134 + }), 135 + ]), 136 + ); 137 + 138 + const rm = new ReplicationManager( 139 + db, 140 + config, 141 + repoManager, 142 + ipfsService, 143 + ipfsService, 144 + didResolver, 145 + undefined, 146 + undefined, 147 + engine, 148 + ); 149 + 150 + const dids = rm.getReplicateDids().sort(); 151 + expect(dids).toEqual([ 152 + "did:plc:config1", 153 + "did:plc:policy1", 154 + "did:plc:policy2", 155 + ]); 156 + }); 157 + 158 + it("deduplicates DIDs present in both config and policy", () => { 159 + const config = testConfig(tmpDir, ["did:plc:shared", "did:plc:config-only"]); 160 + const engine = new PolicyEngine( 161 + makePolicySet([ 162 + makePolicy({ 163 + id: "p1", 164 + target: { type: "list", dids: ["did:plc:shared", "did:plc:policy-only"] }, 165 + }), 166 + ]), 167 + ); 168 + 169 + const rm = new ReplicationManager( 170 + db, 171 + config, 172 + repoManager, 173 + ipfsService, 174 + ipfsService, 175 + didResolver, 176 + undefined, 177 + undefined, 178 + engine, 179 + ); 180 + 181 + const dids = rm.getReplicateDids().sort(); 182 + expect(dids).toEqual([ 183 + "did:plc:config-only", 184 + "did:plc:policy-only", 185 + "did:plc:shared", 186 + ]); 187 + }); 188 + 189 + it("config DIDs are always included even without matching policy", () => { 190 + const config = testConfig(tmpDir, ["did:plc:config-only"]); 191 + // Policy only targets did:plc:policy-only, not did:plc:config-only 192 + const engine = new PolicyEngine( 193 + makePolicySet([ 194 + makePolicy({ 195 + id: "p1", 196 + target: { type: "list", dids: ["did:plc:policy-only"] }, 197 + }), 198 + ]), 199 + ); 200 + 201 + const rm = new ReplicationManager( 202 + db, 203 + config, 204 + repoManager, 205 + ipfsService, 206 + ipfsService, 207 + didResolver, 208 + undefined, 209 + undefined, 210 + engine, 211 + ); 212 + 213 + const dids = rm.getReplicateDids(); 214 + expect(dids).toContain("did:plc:config-only"); 215 + expect(dids).toContain("did:plc:policy-only"); 216 + }); 217 + 218 + it("disabled policy DIDs are not included (unless in config)", () => { 219 + const config = testConfig(tmpDir, []); 220 + const engine = new PolicyEngine( 221 + makePolicySet([ 222 + makePolicy({ 223 + id: "disabled", 224 + target: { type: "list", dids: ["did:plc:disabled"] }, 225 + enabled: false, 226 + }), 227 + makePolicy({ 228 + id: "enabled", 229 + target: { type: "list", dids: ["did:plc:enabled"] }, 230 + enabled: true, 231 + }), 232 + ]), 233 + ); 234 + 235 + const rm = new ReplicationManager( 236 + db, 237 + config, 238 + repoManager, 239 + ipfsService, 240 + ipfsService, 241 + didResolver, 242 + undefined, 243 + undefined, 244 + engine, 245 + ); 246 + 247 + const dids = rm.getReplicateDids(); 248 + expect(dids).toContain("did:plc:enabled"); 249 + expect(dids).not.toContain("did:plc:disabled"); 250 + }); 251 + 252 + it("getPolicyEngine returns the engine when set", () => { 253 + const config = testConfig(tmpDir, []); 254 + const engine = new PolicyEngine(); 255 + 256 + const rm = new ReplicationManager( 257 + db, 258 + config, 259 + repoManager, 260 + ipfsService, 261 + ipfsService, 262 + didResolver, 263 + undefined, 264 + undefined, 265 + engine, 266 + ); 267 + 268 + expect(rm.getPolicyEngine()).toBe(engine); 269 + }); 270 + 271 + it("getPolicyEngine returns null when not set", () => { 272 + const config = testConfig(tmpDir, []); 273 + const rm = new ReplicationManager( 274 + db, 275 + config, 276 + repoManager, 277 + ipfsService, 278 + ipfsService, 279 + didResolver, 280 + ); 281 + 282 + expect(rm.getPolicyEngine()).toBeNull(); 283 + }); 284 + }); 285 + 286 + // ============================================ 287 + // Per-DID sync intervals 288 + // ============================================ 289 + 290 + describe("PolicyEngine + ReplicationManager: per-DID sync intervals", () => { 291 + let tmpDir: string; 292 + let db: InstanceType<typeof Database>; 293 + let ipfsService: IpfsService; 294 + let repoManager: RepoManager; 295 + let didResolver: DidResolver; 296 + 297 + beforeEach(async () => { 298 + tmpDir = mkdtempSync(join(tmpdir(), "policy-interval-test-")); 299 + db = new Database(join(tmpDir, "test.db")); 300 + ipfsService = new IpfsService({ 301 + blocksPath: join(tmpDir, "ipfs-blocks"), 302 + datastorePath: join(tmpDir, "ipfs-datastore"), 303 + networking: false, 304 + }); 305 + await ipfsService.start(); 306 + 307 + const config = testConfig(tmpDir, []); 308 + repoManager = new RepoManager(db, config); 309 + repoManager.init(undefined, ipfsService, ipfsService); 310 + 311 + didResolver = new DidResolver({ 312 + didCache: new InMemoryDidCache(), 313 + }); 314 + }); 315 + 316 + afterEach(async () => { 317 + if (ipfsService.isRunning()) await ipfsService.stop(); 318 + db.close(); 319 + rmSync(tmpDir, { recursive: true, force: true }); 320 + }); 321 + 322 + it("getEffectiveSyncIntervalMs returns default without policy engine", () => { 323 + const config = testConfig(tmpDir, ["did:plc:a"]); 324 + const rm = new ReplicationManager( 325 + db, 326 + config, 327 + repoManager, 328 + ipfsService, 329 + ipfsService, 330 + didResolver, 331 + ); 332 + 333 + // Default is 5 minutes 334 + expect(rm.getEffectiveSyncIntervalMs("did:plc:a")).toBe(5 * 60 * 1000); 335 + }); 336 + 337 + it("getEffectiveSyncIntervalMs returns policy interval when engine is set", () => { 338 + const config = testConfig(tmpDir, []); 339 + const engine = new PolicyEngine( 340 + makePolicySet([ 341 + makePolicy({ 342 + id: "fast", 343 + target: { type: "list", dids: ["did:plc:fast"] }, 344 + sync: { intervalSec: 60 }, 345 + }), 346 + makePolicy({ 347 + id: "slow", 348 + target: { type: "list", dids: ["did:plc:slow"] }, 349 + sync: { intervalSec: 600 }, 350 + }), 351 + ]), 352 + ); 353 + 354 + const rm = new ReplicationManager( 355 + db, 356 + config, 357 + repoManager, 358 + ipfsService, 359 + ipfsService, 360 + didResolver, 361 + undefined, 362 + undefined, 363 + engine, 364 + ); 365 + 366 + expect(rm.getEffectiveSyncIntervalMs("did:plc:fast")).toBe(60 * 1000); 367 + expect(rm.getEffectiveSyncIntervalMs("did:plc:slow")).toBe(600 * 1000); 368 + }); 369 + 370 + it("getEffectiveSyncIntervalMs returns default for DIDs without matching policy", () => { 371 + const config = testConfig(tmpDir, ["did:plc:no-policy"]); 372 + const engine = new PolicyEngine( 373 + makePolicySet([ 374 + makePolicy({ 375 + id: "targeted", 376 + target: { type: "list", dids: ["did:plc:targeted"] }, 377 + sync: { intervalSec: 60 }, 378 + }), 379 + ]), 380 + ); 381 + 382 + const rm = new ReplicationManager( 383 + db, 384 + config, 385 + repoManager, 386 + ipfsService, 387 + ipfsService, 388 + didResolver, 389 + undefined, 390 + undefined, 391 + engine, 392 + ); 393 + 394 + // did:plc:no-policy has no matching policy, so gets default 395 + expect(rm.getEffectiveSyncIntervalMs("did:plc:no-policy")).toBe( 396 + 5 * 60 * 1000, 397 + ); 398 + }); 399 + 400 + it("merged policy intervals take the minimum (most frequent)", () => { 401 + const config = testConfig(tmpDir, []); 402 + const engine = new PolicyEngine( 403 + makePolicySet([ 404 + makePolicy({ 405 + id: "slow", 406 + target: { type: "all" }, 407 + sync: { intervalSec: 600 }, 408 + }), 409 + makePolicy({ 410 + id: "fast", 411 + target: { type: "list", dids: ["did:plc:fast"] }, 412 + sync: { intervalSec: 30 }, 413 + }), 414 + ]), 415 + ); 416 + 417 + const rm = new ReplicationManager( 418 + db, 419 + config, 420 + repoManager, 421 + ipfsService, 422 + ipfsService, 423 + didResolver, 424 + undefined, 425 + undefined, 426 + engine, 427 + ); 428 + 429 + // did:plc:fast matches both policies; minimum wins 430 + expect(rm.getEffectiveSyncIntervalMs("did:plc:fast")).toBe(30 * 1000); 431 + }); 432 + }); 433 + 434 + // ============================================ 435 + // Priority ordering 436 + // ============================================ 437 + 438 + describe("PolicyEngine + ReplicationManager: priority ordering", () => { 439 + let tmpDir: string; 440 + let db: InstanceType<typeof Database>; 441 + let ipfsService: IpfsService; 442 + let repoManager: RepoManager; 443 + let didResolver: DidResolver; 444 + 445 + beforeEach(async () => { 446 + tmpDir = mkdtempSync(join(tmpdir(), "policy-priority-test-")); 447 + db = new Database(join(tmpDir, "test.db")); 448 + ipfsService = new IpfsService({ 449 + blocksPath: join(tmpDir, "ipfs-blocks"), 450 + datastorePath: join(tmpDir, "ipfs-datastore"), 451 + networking: false, 452 + }); 453 + await ipfsService.start(); 454 + 455 + const config = testConfig(tmpDir, []); 456 + repoManager = new RepoManager(db, config); 457 + repoManager.init(undefined, ipfsService, ipfsService); 458 + 459 + didResolver = new DidResolver({ 460 + didCache: new InMemoryDidCache(), 461 + }); 462 + }); 463 + 464 + afterEach(async () => { 465 + if (ipfsService.isRunning()) await ipfsService.stop(); 466 + db.close(); 467 + rmSync(tmpDir, { recursive: true, force: true }); 468 + }); 469 + 470 + it("syncAll processes higher-priority DIDs first", async () => { 471 + const config = testConfig(tmpDir, []); 472 + const engine = new PolicyEngine( 473 + makePolicySet([ 474 + makePolicy({ 475 + id: "low-pri", 476 + target: { type: "list", dids: ["did:plc:low"] }, 477 + priority: 10, 478 + }), 479 + makePolicy({ 480 + id: "high-pri", 481 + target: { type: "list", dids: ["did:plc:high"] }, 482 + priority: 90, 483 + }), 484 + makePolicy({ 485 + id: "mid-pri", 486 + target: { type: "list", dids: ["did:plc:mid"] }, 487 + priority: 50, 488 + }), 489 + ]), 490 + ); 491 + 492 + const rm = new ReplicationManager( 493 + db, 494 + config, 495 + repoManager, 496 + ipfsService, 497 + ipfsService, 498 + didResolver, 499 + undefined, 500 + undefined, 501 + engine, 502 + ); 503 + 504 + // Track the order in which syncDid is called 505 + const syncOrder: string[] = []; 506 + const origSyncDid = rm.syncDid.bind(rm); 507 + rm.syncDid = async (did: string) => { 508 + syncOrder.push(did); 509 + // Don't actually sync (would fail without real PDS) 510 + }; 511 + 512 + rm.getSyncStorage().initSchema(); 513 + await rm.syncAll(); 514 + 515 + expect(syncOrder).toEqual([ 516 + "did:plc:high", 517 + "did:plc:mid", 518 + "did:plc:low", 519 + ]); 520 + }); 521 + 522 + it("without policy engine, DIDs are synced in config order", async () => { 523 + const config = testConfig(tmpDir, [ 524 + "did:plc:first", 525 + "did:plc:second", 526 + "did:plc:third", 527 + ]); 528 + 529 + const rm = new ReplicationManager( 530 + db, 531 + config, 532 + repoManager, 533 + ipfsService, 534 + ipfsService, 535 + didResolver, 536 + ); 537 + 538 + const syncOrder: string[] = []; 539 + rm.syncDid = async (did: string) => { 540 + syncOrder.push(did); 541 + }; 542 + 543 + rm.getSyncStorage().initSchema(); 544 + await rm.syncAll(); 545 + 546 + expect(syncOrder).toEqual([ 547 + "did:plc:first", 548 + "did:plc:second", 549 + "did:plc:third", 550 + ]); 551 + }); 552 + }); 553 + 554 + // ============================================ 555 + // shouldReplicate filtering 556 + // ============================================ 557 + 558 + describe("PolicyEngine + ReplicationManager: shouldReplicate filtering", () => { 559 + let tmpDir: string; 560 + let db: InstanceType<typeof Database>; 561 + let ipfsService: IpfsService; 562 + let repoManager: RepoManager; 563 + let didResolver: DidResolver; 564 + 565 + beforeEach(async () => { 566 + tmpDir = mkdtempSync(join(tmpdir(), "policy-filter-test-")); 567 + db = new Database(join(tmpDir, "test.db")); 568 + ipfsService = new IpfsService({ 569 + blocksPath: join(tmpDir, "ipfs-blocks"), 570 + datastorePath: join(tmpDir, "ipfs-datastore"), 571 + networking: false, 572 + }); 573 + await ipfsService.start(); 574 + 575 + const config = testConfig(tmpDir, []); 576 + repoManager = new RepoManager(db, config); 577 + repoManager.init(undefined, ipfsService, ipfsService); 578 + 579 + didResolver = new DidResolver({ 580 + didCache: new InMemoryDidCache(), 581 + }); 582 + }); 583 + 584 + afterEach(async () => { 585 + if (ipfsService.isRunning()) await ipfsService.stop(); 586 + db.close(); 587 + rmSync(tmpDir, { recursive: true, force: true }); 588 + }); 589 + 590 + it("policy-only DIDs with disabled policy are excluded from sync", async () => { 591 + const config = testConfig(tmpDir, []); 592 + const engine = new PolicyEngine( 593 + makePolicySet([ 594 + makePolicy({ 595 + id: "enabled", 596 + target: { type: "list", dids: ["did:plc:enabled"] }, 597 + enabled: true, 598 + }), 599 + // This disabled policy won't add did:plc:disabled to explicit DIDs 600 + makePolicy({ 601 + id: "disabled", 602 + target: { type: "list", dids: ["did:plc:disabled"] }, 603 + enabled: false, 604 + }), 605 + ]), 606 + ); 607 + 608 + const rm = new ReplicationManager( 609 + db, 610 + config, 611 + repoManager, 612 + ipfsService, 613 + ipfsService, 614 + didResolver, 615 + undefined, 616 + undefined, 617 + engine, 618 + ); 619 + 620 + const syncedDids: string[] = []; 621 + rm.syncDid = async (did: string) => { 622 + syncedDids.push(did); 623 + }; 624 + 625 + rm.getSyncStorage().initSchema(); 626 + await rm.syncAll(); 627 + 628 + expect(syncedDids).toContain("did:plc:enabled"); 629 + expect(syncedDids).not.toContain("did:plc:disabled"); 630 + }); 631 + 632 + it("per-DID interval skips DIDs not yet due", async () => { 633 + const config = testConfig(tmpDir, []); 634 + const engine = new PolicyEngine( 635 + makePolicySet([ 636 + makePolicy({ 637 + id: "fast", 638 + target: { type: "list", dids: ["did:plc:fast"] }, 639 + sync: { intervalSec: 60 }, 640 + }), 641 + makePolicy({ 642 + id: "slow", 643 + target: { type: "list", dids: ["did:plc:slow"] }, 644 + sync: { intervalSec: 3600 }, 645 + }), 646 + ]), 647 + ); 648 + 649 + const rm = new ReplicationManager( 650 + db, 651 + config, 652 + repoManager, 653 + ipfsService, 654 + ipfsService, 655 + didResolver, 656 + undefined, 657 + undefined, 658 + engine, 659 + ); 660 + 661 + const syncedDids: string[] = []; 662 + rm.syncDid = async (did: string) => { 663 + syncedDids.push(did); 664 + }; 665 + 666 + rm.getSyncStorage().initSchema(); 667 + 668 + // First sync: both should sync (never synced before) 669 + await rm.syncAll(); 670 + expect(syncedDids).toContain("did:plc:fast"); 671 + expect(syncedDids).toContain("did:plc:slow"); 672 + 673 + // Reset tracking 674 + syncedDids.length = 0; 675 + 676 + // Second sync immediately: neither should sync (not enough time passed) 677 + await rm.syncAll(); 678 + expect(syncedDids).toEqual([]); 679 + }); 680 + }); 681 + 682 + // ============================================ 683 + // Preset integration 684 + // ============================================ 685 + 686 + describe("PolicyEngine + ReplicationManager: presets", () => { 687 + let tmpDir: string; 688 + let db: InstanceType<typeof Database>; 689 + let ipfsService: IpfsService; 690 + let repoManager: RepoManager; 691 + let didResolver: DidResolver; 692 + 693 + beforeEach(async () => { 694 + tmpDir = mkdtempSync(join(tmpdir(), "policy-preset-test-")); 695 + db = new Database(join(tmpDir, "test.db")); 696 + ipfsService = new IpfsService({ 697 + blocksPath: join(tmpDir, "ipfs-blocks"), 698 + datastorePath: join(tmpDir, "ipfs-datastore"), 699 + networking: false, 700 + }); 701 + await ipfsService.start(); 702 + 703 + const config = testConfig(tmpDir, []); 704 + repoManager = new RepoManager(db, config); 705 + repoManager.init(undefined, ipfsService, ipfsService); 706 + 707 + didResolver = new DidResolver({ 708 + didCache: new InMemoryDidCache(), 709 + }); 710 + }); 711 + 712 + afterEach(async () => { 713 + if (ipfsService.isRunning()) await ipfsService.stop(); 714 + db.close(); 715 + rmSync(tmpDir, { recursive: true, force: true }); 716 + }); 717 + 718 + it("mutualAid preset drives DID list and sync interval", () => { 719 + const config = testConfig(tmpDir, []); 720 + const engine = new PolicyEngine(); 721 + engine.addPolicy( 722 + mutualAid({ 723 + peerDids: ["did:plc:alice", "did:plc:bob", "did:plc:carol"], 724 + intervalSec: 600, 725 + }), 726 + ); 727 + 728 + const rm = new ReplicationManager( 729 + db, 730 + config, 731 + repoManager, 732 + ipfsService, 733 + ipfsService, 734 + didResolver, 735 + undefined, 736 + undefined, 737 + engine, 738 + ); 739 + 740 + const dids = rm.getReplicateDids().sort(); 741 + expect(dids).toEqual([ 742 + "did:plc:alice", 743 + "did:plc:bob", 744 + "did:plc:carol", 745 + ]); 746 + 747 + for (const did of dids) { 748 + expect(rm.getEffectiveSyncIntervalMs(did)).toBe(600 * 1000); 749 + } 750 + }); 751 + 752 + it("saas preset gets higher priority than mutualAid", async () => { 753 + const config = testConfig(tmpDir, []); 754 + const engine = new PolicyEngine(); 755 + engine.addPolicy( 756 + mutualAid({ 757 + id: "aid", 758 + peerDids: ["did:plc:peer1", "did:plc:peer2"], 759 + }), 760 + ); 761 + engine.addPolicy( 762 + saas({ 763 + id: "sla", 764 + accountDids: ["did:plc:customer"], 765 + }), 766 + ); 767 + 768 + const rm = new ReplicationManager( 769 + db, 770 + config, 771 + repoManager, 772 + ipfsService, 773 + ipfsService, 774 + didResolver, 775 + undefined, 776 + undefined, 777 + engine, 778 + ); 779 + 780 + // SaaS has priority 80, mutualAid has priority 50 781 + const syncOrder: string[] = []; 782 + rm.syncDid = async (did: string) => { 783 + syncOrder.push(did); 784 + }; 785 + 786 + rm.getSyncStorage().initSchema(); 787 + await rm.syncAll(); 788 + 789 + // Customer (priority 80) should be synced before peers (priority 50) 790 + const customerIdx = syncOrder.indexOf("did:plc:customer"); 791 + const peer1Idx = syncOrder.indexOf("did:plc:peer1"); 792 + const peer2Idx = syncOrder.indexOf("did:plc:peer2"); 793 + 794 + expect(customerIdx).toBeLessThan(peer1Idx); 795 + expect(customerIdx).toBeLessThan(peer2Idx); 796 + }); 797 + 798 + it("saas preset uses shorter sync interval than mutualAid", () => { 799 + const config = testConfig(tmpDir, []); 800 + const engine = new PolicyEngine(); 801 + engine.addPolicy( 802 + mutualAid({ 803 + id: "aid", 804 + peerDids: ["did:plc:peer1"], 805 + }), 806 + ); 807 + engine.addPolicy( 808 + saas({ 809 + id: "sla", 810 + accountDids: ["did:plc:customer"], 811 + }), 812 + ); 813 + 814 + const rm = new ReplicationManager( 815 + db, 816 + config, 817 + repoManager, 818 + ipfsService, 819 + ipfsService, 820 + didResolver, 821 + undefined, 822 + undefined, 823 + engine, 824 + ); 825 + 826 + // SaaS: 60s, mutualAid: 600s 827 + expect(rm.getEffectiveSyncIntervalMs("did:plc:customer")).toBe( 828 + 60 * 1000, 829 + ); 830 + expect(rm.getEffectiveSyncIntervalMs("did:plc:peer1")).toBe( 831 + 600 * 1000, 832 + ); 833 + }); 834 + }); 835 + 836 + // ============================================ 837 + // Backward compatibility 838 + // ============================================ 839 + 840 + describe("PolicyEngine + ReplicationManager: backward compatibility", () => { 841 + let tmpDir: string; 842 + let db: InstanceType<typeof Database>; 843 + let ipfsService: IpfsService; 844 + let repoManager: RepoManager; 845 + let didResolver: DidResolver; 846 + 847 + beforeEach(async () => { 848 + tmpDir = mkdtempSync(join(tmpdir(), "policy-compat-test-")); 849 + db = new Database(join(tmpDir, "test.db")); 850 + ipfsService = new IpfsService({ 851 + blocksPath: join(tmpDir, "ipfs-blocks"), 852 + datastorePath: join(tmpDir, "ipfs-datastore"), 853 + networking: false, 854 + }); 855 + await ipfsService.start(); 856 + 857 + const config = testConfig(tmpDir, []); 858 + repoManager = new RepoManager(db, config); 859 + repoManager.init(undefined, ipfsService, ipfsService); 860 + 861 + didResolver = new DidResolver({ 862 + didCache: new InMemoryDidCache(), 863 + }); 864 + }); 865 + 866 + afterEach(async () => { 867 + if (ipfsService.isRunning()) await ipfsService.stop(); 868 + db.close(); 869 + rmSync(tmpDir, { recursive: true, force: true }); 870 + }); 871 + 872 + it("without PolicyEngine, syncAll syncs all config DIDs every tick", async () => { 873 + const config = testConfig(tmpDir, ["did:plc:a", "did:plc:b"]); 874 + const rm = new ReplicationManager( 875 + db, 876 + config, 877 + repoManager, 878 + ipfsService, 879 + ipfsService, 880 + didResolver, 881 + ); 882 + 883 + const syncedDids: string[] = []; 884 + rm.syncDid = async (did: string) => { 885 + syncedDids.push(did); 886 + }; 887 + 888 + rm.getSyncStorage().initSchema(); 889 + 890 + // First sync 891 + await rm.syncAll(); 892 + expect(syncedDids.sort()).toEqual(["did:plc:a", "did:plc:b"]); 893 + 894 + // Second sync (should still sync all, no per-DID interval tracking) 895 + syncedDids.length = 0; 896 + await rm.syncAll(); 897 + expect(syncedDids.sort()).toEqual(["did:plc:a", "did:plc:b"]); 898 + }); 899 + 900 + it("constructor works without policyEngine parameter", () => { 901 + const config = testConfig(tmpDir, ["did:plc:a"]); 902 + const rm = new ReplicationManager( 903 + db, 904 + config, 905 + repoManager, 906 + ipfsService, 907 + ipfsService, 908 + didResolver, 909 + ); 910 + 911 + expect(rm.getPolicyEngine()).toBeNull(); 912 + expect(rm.getReplicateDids()).toEqual(["did:plc:a"]); 913 + }); 914 + });
+312 -14
src/replication/replication-manager.ts
··· 1 1 /** 2 2 * Main replication orchestrator. 3 3 * Publishes peer identity + manifest records, syncs remote repos to IPFS. 4 + * Optionally driven by a PolicyEngine for per-DID intervals, priority, and filtering. 4 5 */ 5 6 6 7 import type Database from "better-sqlite3"; ··· 11 12 import { readCarWithRoot } from "@atproto/repo"; 12 13 import { decode as cborDecode } from "../cbor-compat.js"; 13 14 import type { ReplicatedRepoReader } from "./replicated-repo-reader.js"; 15 + import type { PolicyEngine } from "../policy/engine.js"; 14 16 15 17 import { 16 18 PEER_NSID, ··· 35 37 /** How old cached peer info can be before re-fetching (1 hour). */ 36 38 const PEER_INFO_TTL_MS = 60 * 60 * 1000; 37 39 40 + /** Default sync interval when no policy engine is present (5 minutes). */ 41 + const DEFAULT_SYNC_INTERVAL_MS = 5 * 60 * 1000; 42 + 43 + /** Minimum tick interval to prevent excessive polling (10 seconds). */ 44 + const MIN_TICK_INTERVAL_MS = 10 * 1000; 45 + 38 46 export class ReplicationManager { 39 47 private syncStorage: SyncStorage; 40 48 private repoFetcher: RepoFetcher; ··· 49 57 private firehoseSubscription: FirehoseSubscription | null = null; 50 58 private firehoseCursorSaveTimer: ReturnType<typeof setInterval> | null = null; 51 59 private stopped = false; 60 + private policyEngine: PolicyEngine | null = null; 61 + /** Per-DID last-sync timestamps (epoch ms) for policy-driven interval tracking. */ 62 + private lastSyncTimestamps: Map<string, number> = new Map(); 52 63 53 64 constructor( 54 65 db: Database.Database, ··· 59 70 private didResolver: DidResolver, 60 71 verificationConfig?: Partial<VerificationConfig>, 61 72 private replicatedRepoReader?: ReplicatedRepoReader, 73 + policyEngine?: PolicyEngine, 62 74 ) { 63 75 this.syncStorage = new SyncStorage(db); 64 76 this.repoFetcher = new RepoFetcher(didResolver); ··· 72 84 blockStore, 73 85 this.verificationConfig, 74 86 ); 87 + if (policyEngine) { 88 + this.policyEngine = policyEngine; 89 + } 90 + } 91 + 92 + /** 93 + * Get the PolicyEngine, if one is configured. 94 + */ 95 + getPolicyEngine(): PolicyEngine | null { 96 + return this.policyEngine; 75 97 } 76 98 77 99 /** ··· 102 124 } 103 125 104 126 /** 127 + * Get the merged list of DIDs to replicate. 128 + * Combines config.REPLICATE_DIDS with policy engine explicit DIDs (deduplicated). 129 + * When a policy engine is present, filters out DIDs where shouldReplicate is false. 130 + */ 131 + getReplicateDids(): string[] { 132 + const configDids = new Set(this.config.REPLICATE_DIDS); 133 + 134 + if (this.policyEngine) { 135 + // Add DIDs from policy engine explicit lists 136 + for (const did of this.policyEngine.getExplicitDids()) { 137 + configDids.add(did); 138 + } 139 + 140 + // Filter: only include DIDs where policy says to replicate, 141 + // but always include config DIDs (they replicate even without a matching policy) 142 + const result: string[] = []; 143 + for (const did of configDids) { 144 + if ( 145 + this.config.REPLICATE_DIDS.includes(did) || 146 + this.policyEngine.shouldReplicate(did) 147 + ) { 148 + result.push(did); 149 + } 150 + } 151 + return result; 152 + } 153 + 154 + return [...configDids]; 155 + } 156 + 157 + /** 105 158 * Ensure a manifest record exists for each configured DID. 106 159 * Creates new manifests or updates existing ones. 107 160 */ 108 161 async syncManifests(): Promise<void> { 109 - for (const did of this.config.REPLICATE_DIDS) { 162 + for (const did of this.getReplicateDids()) { 110 163 const rkey = didToRkey(did); 111 164 112 165 // Check if manifest already exists ··· 141 194 142 195 /** 143 196 * Sync all configured DIDs. 197 + * 198 + * When a PolicyEngine is present: 199 + * - Merges config DIDs with policy explicit DIDs 200 + * - Filters out DIDs where shouldReplicate is false 201 + * - Sorts by priority (highest first) 202 + * - Respects per-DID sync intervals (skips DIDs not yet due) 144 203 */ 145 204 async syncAll(): Promise<void> { 146 - for (const did of this.config.REPLICATE_DIDS) { 205 + const dids = this.getReplicateDids(); 206 + 207 + // Sort by priority (highest first) when policy engine is present 208 + const sortedDids = this.policyEngine 209 + ? this.sortDidsByPriority(dids) 210 + : dids; 211 + 212 + for (const did of sortedDids) { 147 213 if (this.stopped) break; 214 + 215 + // Check per-DID interval when policy engine is present 216 + if (this.policyEngine && !this.isDidDueForSync(did)) { 217 + continue; 218 + } 219 + 148 220 try { 149 221 await this.syncDid(did); 222 + // Record successful sync timestamp 223 + this.lastSyncTimestamps.set(did, Date.now()); 150 224 } catch (err) { 151 225 const message = 152 226 err instanceof Error ? err.message : String(err); ··· 156 230 } 157 231 158 232 /** 233 + * Sort DIDs by their effective policy priority (highest first). 234 + * DIDs without a matching policy get priority 0. 235 + */ 236 + private sortDidsByPriority(dids: string[]): string[] { 237 + if (!this.policyEngine) return dids; 238 + return [...dids].sort((a, b) => { 239 + const pa = this.policyEngine!.evaluate(a).priority; 240 + const pb = this.policyEngine!.evaluate(b).priority; 241 + return pb - pa; 242 + }); 243 + } 244 + 245 + /** 246 + * Check whether a DID is due for sync based on its policy interval. 247 + * DIDs that have never been synced are always due. 248 + */ 249 + private isDidDueForSync(did: string): boolean { 250 + if (!this.policyEngine) return true; 251 + 252 + const lastSync = this.lastSyncTimestamps.get(did); 253 + if (lastSync === undefined) return true; // never synced 254 + 255 + const config = this.policyEngine.getReplicationConfig(did); 256 + // Config DIDs without a matching policy use the default interval 257 + const intervalMs = config 258 + ? config.sync.intervalSec * 1000 259 + : DEFAULT_SYNC_INTERVAL_MS; 260 + 261 + return Date.now() - lastSync >= intervalMs; 262 + } 263 + 264 + /** 265 + * Get the effective sync interval for a DID in milliseconds. 266 + * Returns the policy-driven interval if a policy engine is present, 267 + * otherwise returns the default 5-minute interval. 268 + */ 269 + getEffectiveSyncIntervalMs(did: string): number { 270 + if (!this.policyEngine) return DEFAULT_SYNC_INTERVAL_MS; 271 + 272 + const config = this.policyEngine.getReplicationConfig(did); 273 + return config 274 + ? config.sync.intervalSec * 1000 275 + : DEFAULT_SYNC_INTERVAL_MS; 276 + } 277 + 278 + /** 159 279 * Sync a single DID: fetch repo, store blocks in IPFS, verify, update state. 160 280 */ 161 281 async syncDid(did: string): Promise<void> { ··· 283 403 } 284 404 285 405 /** 406 + * Compute the tick interval for periodic sync. 407 + * 408 + * When a PolicyEngine is present, uses the minimum sync interval 409 + * across all DIDs so that no DID misses its window. Each DID's 410 + * individual interval is checked inside syncAll(). 411 + * 412 + * Falls back to the provided intervalMs (default 5 minutes). 413 + */ 414 + private computeTickIntervalMs(fallbackMs: number): number { 415 + if (!this.policyEngine) return fallbackMs; 416 + 417 + const dids = this.getReplicateDids(); 418 + if (dids.length === 0) return fallbackMs; 419 + 420 + let minInterval = Infinity; 421 + for (const did of dids) { 422 + const config = this.policyEngine.getReplicationConfig(did); 423 + const intervalMs = config 424 + ? config.sync.intervalSec * 1000 425 + : DEFAULT_SYNC_INTERVAL_MS; 426 + if (intervalMs < minInterval) { 427 + minInterval = intervalMs; 428 + } 429 + } 430 + 431 + // Clamp to minimum tick interval to prevent excessive polling 432 + return Math.max( 433 + minInterval === Infinity ? fallbackMs : minInterval, 434 + MIN_TICK_INTERVAL_MS, 435 + ); 436 + } 437 + 438 + /** 286 439 * Start periodic sync and verification at their respective intervals. 440 + * 441 + * When a PolicyEngine is present, the tick rate is derived from the 442 + * minimum sync interval across all DIDs. Each tick, syncAll() checks 443 + * which DIDs are actually due for sync based on their individual intervals. 287 444 */ 288 445 startPeriodicSync(intervalMs: number = 5 * 60 * 1000): void { 289 446 if (this.syncTimer) return; 290 447 this.stopped = false; 448 + 449 + const tickMs = this.computeTickIntervalMs(intervalMs); 291 450 292 451 // Run first sync after a short delay to let startup complete 293 452 setTimeout(() => { ··· 304 463 console.error("Periodic sync error:", err); 305 464 }); 306 465 } 307 - }, intervalMs); 466 + }, tickMs); 308 467 309 468 // Start verification on a separate timer 310 469 this.verificationTimer = setInterval(() => { ··· 324 483 startFirehose(): void { 325 484 if (this.firehoseSubscription) return; 326 485 if (!this.config.FIREHOSE_ENABLED) return; 327 - if (this.config.REPLICATE_DIDS.length === 0) return; 486 + 487 + const replicateDids = this.getReplicateDids(); 488 + if (replicateDids.length === 0) return; 328 489 329 490 this.firehoseSubscription = new FirehoseSubscription({ 330 491 firehoseUrl: this.config.FIREHOSE_URL, ··· 340 501 // Load saved cursor for resumption 341 502 const savedCursor = this.syncStorage.getFirehoseCursor(); 342 503 343 - // Start with the set of configured DIDs 344 - const dids = new Set(this.config.REPLICATE_DIDS); 504 + // Start with the merged set of DIDs 505 + const dids = new Set(replicateDids); 345 506 this.firehoseSubscription.start(dids, savedCursor); 346 507 347 508 // Periodically save the cursor to SQLite (every 30 seconds) ··· 358 519 359 520 /** 360 521 * Handle a commit event from the firehose. 361 - * Triggers an incremental sync for the affected DID. 522 + * 523 + * Optimization: the firehose event already contains the blocks (as CAR bytes) 524 + * and the commit rev/CID. We apply them directly to our blockstore, avoiding 525 + * an HTTP round-trip to the source PDS. 526 + * 527 + * Falls back to full syncDid() if incremental apply fails (e.g., tooBig event, 528 + * empty blocks, gap in sequence, or CAR parsing error). 362 529 */ 363 530 private async handleFirehoseCommit(event: FirehoseCommitEvent): Promise<void> { 364 531 const did = event.repo; 365 532 366 533 // Only process if this DID is one we are tracking 367 - if (!this.config.REPLICATE_DIDS.includes(did)) return; 534 + const replicateDids = this.getReplicateDids(); 535 + if (!replicateDids.includes(did)) return; 536 + 537 + // Determine if we should attempt incremental apply or fall back to full sync. 538 + // Fall back when: 539 + // - tooBig is set (blocks were too large to include in the event) 540 + // - rebase occurred (repo structure changed, need full sync) 541 + // - blocks are empty (nothing to apply) 542 + if (event.tooBig || event.rebase || event.blocks.length === 0) { 543 + try { 544 + await this.syncDid(did); 545 + this.lastSyncTimestamps.set(did, Date.now()); 546 + } catch (err) { 547 + const message = err instanceof Error ? err.message : String(err); 548 + this.syncStorage.updateStatus(did, "error", message); 549 + } 550 + return; 551 + } 368 552 553 + // Check for sequence gaps: if we have a `since` (previous rev) in the event 554 + // and it doesn't match our last sync rev, we may have missed events. 555 + const state = this.syncStorage.getState(did); 556 + if (event.since !== null && state !== null && state.lastSyncRev !== null) { 557 + if (state.lastSyncRev !== event.since) { 558 + // Gap detected: our last known rev doesn't match the event's `since`. 559 + // Fall back to full sync to catch up. 560 + console.warn( 561 + `[replication] Gap detected for ${did}: local rev=${state.lastSyncRev}, event.since=${event.since}. Falling back to full sync.`, 562 + ); 563 + try { 564 + await this.syncDid(did); 565 + this.lastSyncTimestamps.set(did, Date.now()); 566 + } catch (err) { 567 + const message = err instanceof Error ? err.message : String(err); 568 + this.syncStorage.updateStatus(did, "error", message); 569 + } 570 + return; 571 + } 572 + } 573 + 574 + // Attempt incremental block application from firehose event 369 575 try { 370 - // Trigger a sync for this specific DID to pull the latest changes. 371 - // The syncDid method already handles incremental sync via `since` parameter. 372 - await this.syncDid(did); 576 + await this.applyFirehoseBlocks(did, event); 577 + this.lastSyncTimestamps.set(did, Date.now()); 373 578 } catch (err) { 374 - const message = err instanceof Error ? err.message : String(err); 375 - this.syncStorage.updateStatus(did, "error", message); 579 + // Incremental apply failed — fall back to full sync 580 + console.warn( 581 + `[replication] Incremental apply failed for ${did}, falling back to full sync:`, 582 + err instanceof Error ? err.message : String(err), 583 + ); 584 + try { 585 + await this.syncDid(did); 586 + this.lastSyncTimestamps.set(did, Date.now()); 587 + } catch (syncErr) { 588 + const message = syncErr instanceof Error ? syncErr.message : String(syncErr); 589 + this.syncStorage.updateStatus(did, "error", message); 590 + } 591 + } 592 + } 593 + 594 + /** 595 + * Apply blocks from a firehose commit event directly to the blockstore. 596 + * The event's `blocks` field contains CAR-encoded bytes that can be parsed 597 + * and stored without fetching from the source PDS. 598 + */ 599 + private async applyFirehoseBlocks( 600 + did: string, 601 + event: FirehoseCommitEvent, 602 + ): Promise<void> { 603 + // 1. Parse the CAR bytes from the firehose event 604 + const { root, blocks } = await readCarWithRoot(event.blocks); 605 + 606 + // 2. Store blocks in our blockstore 607 + await this.blockStore.putBlocks(blocks); 608 + 609 + // 3. Collect CID strings for DHT announcement + block tracking 610 + const cidStrs: string[] = []; 611 + const internalMap = ( 612 + blocks as unknown as { map: Map<string, Uint8Array> } 613 + ).map; 614 + if (internalMap) { 615 + for (const cidStr of internalMap.keys()) { 616 + cidStrs.push(cidStr); 617 + } 618 + } 619 + 620 + // 4. Track block CIDs 621 + if (cidStrs.length > 0) { 622 + this.syncStorage.trackBlocks(did, cidStrs); 623 + } 624 + 625 + // 5. Announce to DHT (fire-and-forget) 626 + this.networkService.provideBlocks(cidStrs).catch(() => {}); 627 + 628 + // 6. Determine rev and root CID 629 + const rootCidStr = root.toString(); 630 + let rev = event.rev; // Use the rev directly from the firehose event 631 + 632 + // If no rev in the event, try to extract from commit block 633 + if (!rev) { 634 + const commitBytes = internalMap?.get(rootCidStr); 635 + if (commitBytes) { 636 + try { 637 + const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 638 + if (typeof commitObj.rev === "string") { 639 + rev = commitObj.rev; 640 + } 641 + } catch { 642 + // Fall back to root CID as rev 643 + rev = rootCidStr; 644 + } 645 + } else { 646 + rev = rootCidStr; 647 + } 648 + } 649 + 650 + // 7. Update sync state 651 + this.syncStorage.updateSyncProgress(did, rev, rootCidStr); 652 + 653 + // 8. Invalidate cached ReadableRepo so it reloads with new root 654 + this.replicatedRepoReader?.invalidateCache(did); 655 + 656 + // 9. Update manifest record 657 + const rkey = didToRkey(did); 658 + const existingManifest = await this.repoManager.getRecord( 659 + MANIFEST_NSID, 660 + rkey, 661 + ); 662 + if (existingManifest) { 663 + const manifest: ManifestRecord = { 664 + $type: MANIFEST_NSID, 665 + subject: did, 666 + status: "active", 667 + lastSyncRev: rev, 668 + lastSyncAt: new Date().toISOString(), 669 + createdAt: 670 + (existingManifest.record as Record<string, unknown>) 671 + ?.createdAt as string ?? new Date().toISOString(), 672 + }; 673 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 376 674 } 377 675 } 378 676 ··· 432 730 async runVerification(): Promise<LayeredVerificationResult[]> { 433 731 const results: LayeredVerificationResult[] = []; 434 732 435 - for (const did of this.config.REPLICATE_DIDS) { 733 + for (const did of this.getReplicateDids()) { 436 734 if (this.stopped) break; 437 735 try { 438 736 const result = await this.verifyDid(did);
+10 -10
src/replication/replication.test.ts
··· 97 97 98 98 afterEach(() => { 99 99 db.close(); 100 - rmSync(tmpDir, { recursive: true, force: true }); 100 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 101 101 }); 102 102 103 103 it("creates table and retrieves empty states", () => { ··· 219 219 await ipfsService.stop(); 220 220 } 221 221 db.close(); 222 - rmSync(tmpDir, { recursive: true, force: true }); 222 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 223 223 }); 224 224 225 225 it("publishPeerIdentity is no-op when networking is off", async () => { ··· 437 437 if (ipfsService.isRunning()) { 438 438 await ipfsService.stop(); 439 439 } 440 - rmSync(tmpDir, { recursive: true, force: true }); 440 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 441 441 }); 442 442 443 443 it("all blocks available reports 100%", async () => { ··· 550 550 if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 551 551 sourceDb.close(); 552 552 replicaDb.close(); 553 - rmSync(tmpDir, { recursive: true, force: true }); 553 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 554 554 }); 555 555 556 556 it("source records can be replicated via CAR export/import to IPFS", async () => { ··· 703 703 704 704 afterEach(() => { 705 705 db.close(); 706 - rmSync(tmpDir, { recursive: true, force: true }); 706 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 707 707 }); 708 708 709 709 it("tracks blocks for a DID", () => { ··· 773 773 if (ipfsService.isRunning()) { 774 774 await ipfsService.stop(); 775 775 } 776 - rmSync(tmpDir, { recursive: true, force: true }); 776 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 777 777 }); 778 778 779 779 it("Layer 0: passes when commit root is served via RASL", async () => { ··· 1148 1148 1149 1149 afterEach(async () => { 1150 1150 if (ipfsService.isRunning()) await ipfsService.stop(); 1151 - rmSync(tmpDir, { recursive: true, force: true }); 1151 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1152 1152 }); 1153 1153 1154 1154 it("getBytes roundtrip", async () => { ··· 1250 1250 if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1251 1251 sourceDb.close(); 1252 1252 replicaDb.close(); 1253 - rmSync(tmpDir, { recursive: true, force: true }); 1253 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1254 1254 }); 1255 1255 1256 1256 /** Helper: create records in source, export CAR, store blocks in replica IPFS, record root_cid. */ ··· 1495 1495 if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1496 1496 sourceDb.close(); 1497 1497 replicaDb.close(); 1498 - rmSync(tmpDir, { recursive: true, force: true }); 1498 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1499 1499 }); 1500 1500 1501 1501 async function replicateSource(): Promise<void> { ··· 1682 1682 if (replicaIpfs.isRunning()) await replicaIpfs.stop(); 1683 1683 sourceDb.close(); 1684 1684 replicaDb.close(); 1685 - rmSync(tmpDir, { recursive: true, force: true }); 1685 + try { rmSync(tmpDir, { recursive: true, force: true }); } catch {} 1686 1686 }); 1687 1687 1688 1688 it("after sync, replication_state has both root_cid and last_sync_rev (actual TID)", async () => {
+23 -3
src/server.ts
··· 6 6 import { WebSocketServer } from "ws"; 7 7 import pc from "picocolors"; 8 8 9 - import { loadConfig } from "./config.js"; 9 + import { loadConfig, loadPolicies } from "./config.js"; 10 10 import { RepoManager } from "./repo-manager.js"; 11 11 import { BlobStore } from "./blobs.js"; 12 12 import { Firehose } from "./firehose.js"; ··· 16 16 import { InMemoryDidCache } from "./did-cache.js"; 17 17 import { ReplicationManager } from "./replication/replication-manager.js"; 18 18 import { ReplicatedRepoReader } from "./replication/replicated-repo-reader.js"; 19 + import { PolicyEngine } from "./policy/engine.js"; 19 20 20 21 // Load configuration 21 22 const config = loadConfig(); ··· 58 59 didCache: new InMemoryDidCache(), 59 60 }); 60 61 62 + // Load policy engine if configured 63 + let policyEngine: PolicyEngine | undefined; 64 + if (config.POLICY_FILE) { 65 + const policySet = loadPolicies(config); 66 + if (policySet) { 67 + policyEngine = new PolicyEngine(policySet); 68 + console.log(pc.dim(` Policies: loaded ${policySet.policies.length} from ${config.POLICY_FILE}`)); 69 + } 70 + } 71 + 72 + // Determine if we have DIDs to replicate (from config and/or policies) 73 + const hasReplicateDids = 74 + config.REPLICATE_DIDS.length > 0 || 75 + (policyEngine && policyEngine.getExplicitDids().length > 0); 76 + 61 77 // Initialize replication manager and replicated repo reader (if IPFS enabled and DIDs configured) 62 78 let replicationManager: ReplicationManager | undefined; 63 79 let replicatedRepoReader: ReplicatedRepoReader | undefined; 64 - if (ipfsService && config.REPLICATE_DIDS.length > 0) { 80 + if (ipfsService && hasReplicateDids) { 65 81 replicationManager = new ReplicationManager( 66 82 db, 67 83 config, ··· 69 85 ipfsService, 70 86 ipfsService, 71 87 didResolver, 88 + undefined, 89 + undefined, 90 + policyEngine, 72 91 ); 73 92 replicatedRepoReader = new ReplicatedRepoReader( 74 93 ipfsService, ··· 150 169 try { 151 170 await replicationManager.init(); 152 171 replicationManager.startPeriodicSync(); 153 - console.log(pc.dim(` Replication: tracking ${config.REPLICATE_DIDS.length} DIDs`)); 172 + const trackedDids = replicationManager.getReplicateDids(); 173 + console.log(pc.dim(` Replication: tracking ${trackedDids.length} DIDs`)); 154 174 // Start firehose subscription for real-time updates 155 175 if (config.FIREHOSE_ENABLED) { 156 176 replicationManager.startFirehose();