atproto user agency toolkit for individuals and groups
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add firehose sync, MST path proofs, policy engine, and e2e networking tests

Four major features implemented in parallel:

- Real-time firehose sync: FirehoseSubscription subscribes to
com.atproto.sync.subscribeRepos with CBOR frame parsing, DID filtering,
cursor persistence in SQLite, and exponential backoff reconnection.
Integrated with ReplicationManager and configurable via FIREHOSE_URL
and FIREHOSE_ENABLED env vars.

- L3 MST path proof verification: generateMstProof() extracts minimal
Merkle Search Tree node path from root to leaf; verifyMstProof()
validates purely from proof bytes + trusted commit CID. Supports both
existence and non-existence proofs.

- Policy engine MVP: Declarative, deterministic, transport-agnostic
policy system with PolicyEngine class, three presets (mutualAid, saas,
groupGovernance), and config integration via POLICY_FILE env var.

- E2E networking integration tests: Two real Helia nodes with TCP on
localhost verify bitswap block exchange, bidirectional transfer, and
peer discovery.

+3895 -2
+38 -1
src/config.ts
··· 1 - import { readFileSync } from "node:fs"; 1 + import { readFileSync, existsSync } from "node:fs"; 2 2 import { resolve } from "node:path"; 3 + import type { PolicySet } from "./policy/types.js"; 3 4 4 5 export interface Config { 5 6 DID: string; ··· 16 17 IPFS_ENABLED: boolean; 17 18 IPFS_NETWORKING: boolean; 18 19 REPLICATE_DIDS: string[]; 20 + POLICY_FILE?: string; 21 + FIREHOSE_URL: string; 22 + FIREHOSE_ENABLED: boolean; 19 23 } 20 24 21 25 const REQUIRED_KEYS = [ ··· 98 102 IPFS_ENABLED: process.env.IPFS_ENABLED !== "false", 99 103 IPFS_NETWORKING: process.env.IPFS_NETWORKING !== "false", 100 104 REPLICATE_DIDS: (process.env.REPLICATE_DIDS ?? "").split(",").map(s => s.trim()).filter(Boolean), 105 + POLICY_FILE: process.env.POLICY_FILE || undefined, 106 + FIREHOSE_URL: process.env.FIREHOSE_URL ?? "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos", 107 + FIREHOSE_ENABLED: process.env.FIREHOSE_ENABLED !== "false", 101 108 }; 102 109 } 110 + 111 + /** 112 + * Load policies from a JSON file. Returns null if no file is configured or found. 113 + * The file should contain a PolicySet JSON object. 114 + */ 115 + export function loadPolicies(config: Config): PolicySet | null { 116 + const policyPath = config.POLICY_FILE; 117 + if (!policyPath) return null; 118 + 119 + const resolved = resolve(policyPath); 120 + if (!existsSync(resolved)) { 121 + console.warn(`Policy file not found: ${resolved}`); 122 + return null; 123 + } 124 + 125 + try { 126 + const content = readFileSync(resolved, "utf-8"); 127 + const parsed = JSON.parse(content) as PolicySet; 128 + if (parsed.version !== 1) { 129 + throw new Error(`Unsupported policy version: ${parsed.version}`); 130 + } 131 + if (!Array.isArray(parsed.policies)) { 132 + throw new Error("Policy file must contain a 'policies' array"); 133 + } 134 + return parsed; 135 + } catch (err) { 136 + const message = err instanceof Error ? err.message : String(err); 137 + throw new Error(`Failed to load policy file ${resolved}: ${message}`); 138 + } 139 + }
+2
src/ipfs.test.ts
··· 41 41 IPFS_ENABLED: true, 42 42 IPFS_NETWORKING: false, 43 43 REPLICATE_DIDS: [], 44 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 45 + FIREHOSE_ENABLED: false, 44 46 }; 45 47 } 46 48
+724
src/policy/engine.test.ts
··· 1 + import { describe, it, expect } from "vitest"; 2 + 3 + import { PolicyEngine, matchesTarget } from "./engine.js"; 4 + import type { 5 + Policy, 6 + PolicySet, 7 + PolicyTarget, 8 + EffectivePolicy, 9 + } from "./types.js"; 10 + import { 11 + DEFAULT_REPLICATION, 12 + DEFAULT_SYNC, 13 + DEFAULT_RETENTION, 14 + } from "./types.js"; 15 + import { mutualAid, saas, groupGovernance } from "./presets.js"; 16 + 17 + // ============================================ 18 + // Helpers 19 + // ============================================ 20 + 21 + function makePolicy(overrides: Partial<Policy> & { id: string }): Policy { 22 + return { 23 + name: overrides.id, 24 + target: { type: "all" }, 25 + replication: { ...DEFAULT_REPLICATION }, 26 + sync: { ...DEFAULT_SYNC }, 27 + retention: { ...DEFAULT_RETENTION }, 28 + priority: 50, 29 + enabled: true, 30 + ...overrides, 31 + }; 32 + } 33 + 34 + function makePolicySet(policies: Policy[]): PolicySet { 35 + return { version: 1, policies }; 36 + } 37 + 38 + // ============================================ 39 + // matchesTarget 40 + // ============================================ 41 + 42 + describe("matchesTarget", () => { 43 + it("type=all matches any DID", () => { 44 + const target: PolicyTarget = { type: "all" }; 45 + expect(matchesTarget(target, "did:plc:abc")).toBe(true); 46 + expect(matchesTarget(target, "did:web:example.com")).toBe(true); 47 + }); 48 + 49 + it("type=list matches only listed DIDs", () => { 50 + const target: PolicyTarget = { 51 + type: "list", 52 + dids: ["did:plc:aaa", "did:plc:bbb"], 53 + }; 54 + expect(matchesTarget(target, "did:plc:aaa")).toBe(true); 55 + expect(matchesTarget(target, "did:plc:bbb")).toBe(true); 56 + expect(matchesTarget(target, "did:plc:ccc")).toBe(false); 57 + }); 58 + 59 + it("type=pattern matches DID prefix", () => { 60 + const target: PolicyTarget = { type: "pattern", prefix: "did:web:" }; 61 + expect(matchesTarget(target, "did:web:example.com")).toBe(true); 62 + expect(matchesTarget(target, "did:web:other.org")).toBe(true); 63 + expect(matchesTarget(target, "did:plc:abc")).toBe(false); 64 + }); 65 + 66 + it("type=pattern with longer prefix", () => { 67 + const target: PolicyTarget = { 68 + type: "pattern", 69 + prefix: "did:web:example.com", 70 + }; 71 + expect(matchesTarget(target, "did:web:example.com")).toBe(true); 72 + expect(matchesTarget(target, "did:web:example.com:extra")).toBe(true); 73 + expect(matchesTarget(target, "did:web:other.com")).toBe(false); 74 + }); 75 + }); 76 + 77 + // ============================================ 78 + // PolicyEngine: loading and management 79 + // ============================================ 80 + 81 + describe("PolicyEngine: loading", () => { 82 + it("starts with no policies", () => { 83 + const engine = new PolicyEngine(); 84 + expect(engine.getPolicies()).toEqual([]); 85 + }); 86 + 87 + it("loads policies from a PolicySet", () => { 88 + const p1 = makePolicy({ id: "p1" }); 89 + const p2 = makePolicy({ id: "p2" }); 90 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 91 + expect(engine.getPolicies()).toHaveLength(2); 92 + }); 93 + 94 + it("constructor accepts PolicySet", () => { 95 + const p = makePolicy({ id: "test" }); 96 + const engine = new PolicyEngine(makePolicySet([p])); 97 + expect(engine.getPolicies()).toHaveLength(1); 98 + }); 99 + 100 + it("rejects duplicate policy IDs on load", () => { 101 + const p1 = makePolicy({ id: "dup" }); 102 + const p2 = makePolicy({ id: "dup" }); 103 + expect(() => new PolicyEngine(makePolicySet([p1, p2]))).toThrow( 104 + "Duplicate policy ID: dup", 105 + ); 106 + }); 107 + 108 + it("load replaces all existing policies", () => { 109 + const engine = new PolicyEngine( 110 + makePolicySet([makePolicy({ id: "old" })]), 111 + ); 112 + engine.load(makePolicySet([makePolicy({ id: "new" })])); 113 + expect(engine.getPolicies()).toHaveLength(1); 114 + expect(engine.getPolicies()[0]!.id).toBe("new"); 115 + }); 116 + 117 + it("addPolicy appends", () => { 118 + const engine = new PolicyEngine(); 119 + engine.addPolicy(makePolicy({ id: "a" })); 120 + engine.addPolicy(makePolicy({ id: "b" })); 121 + expect(engine.getPolicies()).toHaveLength(2); 122 + }); 123 + 124 + it("addPolicy rejects duplicate IDs", () => { 125 + const engine = new PolicyEngine(); 126 + engine.addPolicy(makePolicy({ id: "a" })); 127 + expect(() => engine.addPolicy(makePolicy({ id: "a" }))).toThrow( 128 + "Duplicate policy ID: a", 129 + ); 130 + }); 131 + 132 + it("removePolicy returns true on success", () => { 133 + const engine = new PolicyEngine(); 134 + engine.addPolicy(makePolicy({ id: "a" })); 135 + expect(engine.removePolicy("a")).toBe(true); 136 + expect(engine.getPolicies()).toHaveLength(0); 137 + }); 138 + 139 + it("removePolicy returns false when not found", () => { 140 + const engine = new PolicyEngine(); 141 + expect(engine.removePolicy("nonexistent")).toBe(false); 142 + }); 143 + 144 + it("export produces a valid PolicySet", () => { 145 + const p = makePolicy({ id: "test" }); 146 + const engine = new PolicyEngine(makePolicySet([p])); 147 + const exported = engine.export(); 148 + expect(exported.version).toBe(1); 149 + expect(exported.policies).toHaveLength(1); 150 + expect(exported.policies[0]!.id).toBe("test"); 151 + }); 152 + 153 + it("exported PolicySet is JSON-serializable", () => { 154 + const p = makePolicy({ 155 + id: "json-test", 156 + replication: { minCopies: 3, preferredPeers: ["did:plc:a"] }, 157 + }); 158 + const engine = new PolicyEngine(makePolicySet([p])); 159 + const json = JSON.stringify(engine.export()); 160 + const parsed = JSON.parse(json) as PolicySet; 161 + expect(parsed.version).toBe(1); 162 + expect(parsed.policies[0]!.replication.minCopies).toBe(3); 163 + }); 164 + }); 165 + 166 + // ============================================ 167 + // PolicyEngine: evaluation — single policy 168 + // ============================================ 169 + 170 + describe("PolicyEngine: evaluate — single policy", () => { 171 + it("returns shouldReplicate=false when no policies match", () => { 172 + const engine = new PolicyEngine(); 173 + const result = engine.evaluate("did:plc:orphan"); 174 + expect(result.shouldReplicate).toBe(false); 175 + expect(result.sourcePolicyIds).toEqual([]); 176 + expect(result.priority).toBe(0); 177 + }); 178 + 179 + it("disabled policies are ignored", () => { 180 + const p = makePolicy({ 181 + id: "disabled", 182 + target: { type: "all" }, 183 + enabled: false, 184 + }); 185 + const engine = new PolicyEngine(makePolicySet([p])); 186 + const result = engine.evaluate("did:plc:any"); 187 + expect(result.shouldReplicate).toBe(false); 188 + }); 189 + 190 + it("single matching policy passes through config", () => { 191 + const p = makePolicy({ 192 + id: "single", 193 + target: { type: "list", dids: ["did:plc:target"] }, 194 + replication: { minCopies: 5, preferredPeers: ["did:plc:peer1"] }, 195 + sync: { intervalSec: 120 }, 196 + retention: { maxAgeSec: 86400, keepHistory: true }, 197 + priority: 75, 198 + }); 199 + const engine = new PolicyEngine(makePolicySet([p])); 200 + const result = engine.evaluate("did:plc:target"); 201 + 202 + expect(result.shouldReplicate).toBe(true); 203 + expect(result.sourcePolicyIds).toEqual(["single"]); 204 + expect(result.replication.minCopies).toBe(5); 205 + expect(result.replication.preferredPeers).toEqual(["did:plc:peer1"]); 206 + expect(result.sync.intervalSec).toBe(120); 207 + expect(result.retention.maxAgeSec).toBe(86400); 208 + expect(result.retention.keepHistory).toBe(true); 209 + expect(result.priority).toBe(75); 210 + }); 211 + 212 + it("all-target matches every DID", () => { 213 + const p = makePolicy({ id: "global", target: { type: "all" } }); 214 + const engine = new PolicyEngine(makePolicySet([p])); 215 + expect(engine.shouldReplicate("did:plc:any")).toBe(true); 216 + expect(engine.shouldReplicate("did:web:example.com")).toBe(true); 217 + }); 218 + 219 + it("pattern-target matches by prefix", () => { 220 + const p = makePolicy({ 221 + id: "web-only", 222 + target: { type: "pattern", prefix: "did:web:" }, 223 + }); 224 + const engine = new PolicyEngine(makePolicySet([p])); 225 + expect(engine.shouldReplicate("did:web:example.com")).toBe(true); 226 + expect(engine.shouldReplicate("did:plc:abc")).toBe(false); 227 + }); 228 + }); 229 + 230 + // ============================================ 231 + // PolicyEngine: evaluation — merging multiple policies 232 + // ============================================ 233 + 234 + describe("PolicyEngine: evaluate — merging", () => { 235 + it("minCopies takes the maximum", () => { 236 + const p1 = makePolicy({ 237 + id: "low", 238 + target: { type: "all" }, 239 + replication: { minCopies: 2 }, 240 + }); 241 + const p2 = makePolicy({ 242 + id: "high", 243 + target: { type: "all" }, 244 + replication: { minCopies: 5 }, 245 + }); 246 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 247 + const result = engine.evaluate("did:plc:any"); 248 + expect(result.replication.minCopies).toBe(5); 249 + }); 250 + 251 + it("preferredPeers are unioned", () => { 252 + const p1 = makePolicy({ 253 + id: "peers1", 254 + target: { type: "all" }, 255 + replication: { minCopies: 1, preferredPeers: ["did:plc:a", "did:plc:b"] }, 256 + }); 257 + const p2 = makePolicy({ 258 + id: "peers2", 259 + target: { type: "all" }, 260 + replication: { minCopies: 1, preferredPeers: ["did:plc:b", "did:plc:c"] }, 261 + }); 262 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 263 + const result = engine.evaluate("did:plc:any"); 264 + expect(result.replication.preferredPeers).toBeDefined(); 265 + expect(result.replication.preferredPeers!.sort()).toEqual([ 266 + "did:plc:a", 267 + "did:plc:b", 268 + "did:plc:c", 269 + ]); 270 + }); 271 + 272 + it("preferredPeers is undefined when no policy specifies them", () => { 273 + const p1 = makePolicy({ 274 + id: "no-peers", 275 + target: { type: "all" }, 276 + replication: { minCopies: 1 }, 277 + }); 278 + const engine = new PolicyEngine(makePolicySet([p1])); 279 + const result = engine.evaluate("did:plc:any"); 280 + expect(result.replication.preferredPeers).toBeUndefined(); 281 + }); 282 + 283 + it("sync intervalSec takes the minimum (most frequent)", () => { 284 + const p1 = makePolicy({ 285 + id: "slow", 286 + target: { type: "all" }, 287 + sync: { intervalSec: 600 }, 288 + }); 289 + const p2 = makePolicy({ 290 + id: "fast", 291 + target: { type: "all" }, 292 + sync: { intervalSec: 60 }, 293 + }); 294 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 295 + const result = engine.evaluate("did:plc:any"); 296 + expect(result.sync.intervalSec).toBe(60); 297 + }); 298 + 299 + it("retention maxAgeSec: 0 (forever) wins over any finite value", () => { 300 + const p1 = makePolicy({ 301 + id: "finite", 302 + target: { type: "all" }, 303 + retention: { maxAgeSec: 86400, keepHistory: false }, 304 + }); 305 + const p2 = makePolicy({ 306 + id: "forever", 307 + target: { type: "all" }, 308 + retention: { maxAgeSec: 0, keepHistory: false }, 309 + }); 310 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 311 + const result = engine.evaluate("did:plc:any"); 312 + expect(result.retention.maxAgeSec).toBe(0); 313 + }); 314 + 315 + it("retention maxAgeSec: takes maximum when both finite", () => { 316 + const p1 = makePolicy({ 317 + id: "short", 318 + target: { type: "all" }, 319 + retention: { maxAgeSec: 3600, keepHistory: false }, 320 + }); 321 + const p2 = makePolicy({ 322 + id: "long", 323 + target: { type: "all" }, 324 + retention: { maxAgeSec: 86400, keepHistory: false }, 325 + }); 326 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 327 + const result = engine.evaluate("did:plc:any"); 328 + expect(result.retention.maxAgeSec).toBe(86400); 329 + }); 330 + 331 + it("retention keepHistory: true if any policy says true", () => { 332 + const p1 = makePolicy({ 333 + id: "no-hist", 334 + target: { type: "all" }, 335 + retention: { maxAgeSec: 0, keepHistory: false }, 336 + }); 337 + const p2 = makePolicy({ 338 + id: "hist", 339 + target: { type: "all" }, 340 + retention: { maxAgeSec: 0, keepHistory: true }, 341 + }); 342 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 343 + const result = engine.evaluate("did:plc:any"); 344 + expect(result.retention.keepHistory).toBe(true); 345 + }); 346 + 347 + it("priority takes the maximum", () => { 348 + const p1 = makePolicy({ 349 + id: "low-pri", 350 + target: { type: "all" }, 351 + priority: 20, 352 + }); 353 + const p2 = makePolicy({ 354 + id: "high-pri", 355 + target: { type: "all" }, 356 + priority: 90, 357 + }); 358 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 359 + const result = engine.evaluate("did:plc:any"); 360 + expect(result.priority).toBe(90); 361 + }); 362 + 363 + it("sourcePolicyIds includes all matching policies", () => { 364 + const p1 = makePolicy({ id: "alpha", target: { type: "all" } }); 365 + const p2 = makePolicy({ 366 + id: "beta", 367 + target: { type: "list", dids: ["did:plc:target"] }, 368 + }); 369 + const p3 = makePolicy({ 370 + id: "gamma", 371 + target: { type: "list", dids: ["did:plc:other"] }, 372 + }); 373 + const engine = new PolicyEngine(makePolicySet([p1, p2, p3])); 374 + const result = engine.evaluate("did:plc:target"); 375 + expect(result.sourcePolicyIds.sort()).toEqual(["alpha", "beta"]); 376 + }); 377 + 378 + it("only enabled policies contribute to merge", () => { 379 + const p1 = makePolicy({ 380 + id: "enabled-high", 381 + target: { type: "all" }, 382 + replication: { minCopies: 2 }, 383 + enabled: true, 384 + }); 385 + const p2 = makePolicy({ 386 + id: "disabled-higher", 387 + target: { type: "all" }, 388 + replication: { minCopies: 10 }, 389 + enabled: false, 390 + }); 391 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 392 + const result = engine.evaluate("did:plc:any"); 393 + expect(result.replication.minCopies).toBe(2); 394 + expect(result.sourcePolicyIds).toEqual(["enabled-high"]); 395 + }); 396 + }); 397 + 398 + // ============================================ 399 + // PolicyEngine: shouldReplicate 400 + // ============================================ 401 + 402 + describe("PolicyEngine: shouldReplicate", () => { 403 + it("returns false when no policies exist", () => { 404 + const engine = new PolicyEngine(); 405 + expect(engine.shouldReplicate("did:plc:any")).toBe(false); 406 + }); 407 + 408 + it("returns true when a matching policy exists", () => { 409 + const p = makePolicy({ id: "p", target: { type: "all" } }); 410 + const engine = new PolicyEngine(makePolicySet([p])); 411 + expect(engine.shouldReplicate("did:plc:any")).toBe(true); 412 + }); 413 + 414 + it("returns false for a DID not in any list", () => { 415 + const p = makePolicy({ 416 + id: "p", 417 + target: { type: "list", dids: ["did:plc:included"] }, 418 + }); 419 + const engine = new PolicyEngine(makePolicySet([p])); 420 + expect(engine.shouldReplicate("did:plc:excluded")).toBe(false); 421 + }); 422 + }); 423 + 424 + // ============================================ 425 + // PolicyEngine: getReplicationConfig 426 + // ============================================ 427 + 428 + describe("PolicyEngine: getReplicationConfig", () => { 429 + it("returns null when no policies match", () => { 430 + const engine = new PolicyEngine(); 431 + expect(engine.getReplicationConfig("did:plc:any")).toBeNull(); 432 + }); 433 + 434 + it("returns config when policies match", () => { 435 + const p = makePolicy({ 436 + id: "p", 437 + target: { type: "all" }, 438 + replication: { minCopies: 3 }, 439 + sync: { intervalSec: 120 }, 440 + retention: { maxAgeSec: 7200, keepHistory: true }, 441 + priority: 60, 442 + }); 443 + const engine = new PolicyEngine(makePolicySet([p])); 444 + const config = engine.getReplicationConfig("did:plc:any"); 445 + expect(config).not.toBeNull(); 446 + expect(config!.replication.minCopies).toBe(3); 447 + expect(config!.sync.intervalSec).toBe(120); 448 + expect(config!.retention.maxAgeSec).toBe(7200); 449 + expect(config!.retention.keepHistory).toBe(true); 450 + expect(config!.priority).toBe(60); 451 + }); 452 + }); 453 + 454 + // ============================================ 455 + // PolicyEngine: getExplicitDids 456 + // ============================================ 457 + 458 + describe("PolicyEngine: getExplicitDids", () => { 459 + it("returns empty array when no policies", () => { 460 + const engine = new PolicyEngine(); 461 + expect(engine.getExplicitDids()).toEqual([]); 462 + }); 463 + 464 + it("returns DIDs from list-type targets", () => { 465 + const p1 = makePolicy({ 466 + id: "p1", 467 + target: { type: "list", dids: ["did:plc:a", "did:plc:b"] }, 468 + }); 469 + const p2 = makePolicy({ 470 + id: "p2", 471 + target: { type: "list", dids: ["did:plc:b", "did:plc:c"] }, 472 + }); 473 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 474 + expect(engine.getExplicitDids().sort()).toEqual([ 475 + "did:plc:a", 476 + "did:plc:b", 477 + "did:plc:c", 478 + ]); 479 + }); 480 + 481 + it("ignores all-type and pattern-type targets", () => { 482 + const p1 = makePolicy({ id: "p1", target: { type: "all" } }); 483 + const p2 = makePolicy({ 484 + id: "p2", 485 + target: { type: "pattern", prefix: "did:web:" }, 486 + }); 487 + const engine = new PolicyEngine(makePolicySet([p1, p2])); 488 + expect(engine.getExplicitDids()).toEqual([]); 489 + }); 490 + 491 + it("ignores disabled policies", () => { 492 + const p = makePolicy({ 493 + id: "disabled", 494 + target: { type: "list", dids: ["did:plc:a"] }, 495 + enabled: false, 496 + }); 497 + const engine = new PolicyEngine(makePolicySet([p])); 498 + expect(engine.getExplicitDids()).toEqual([]); 499 + }); 500 + }); 501 + 502 + // ============================================ 503 + // Presets 504 + // ============================================ 505 + 506 + describe("Presets: mutualAid", () => { 507 + it("creates a valid policy with defaults", () => { 508 + const policy = mutualAid({ 509 + peerDids: ["did:plc:alice", "did:plc:bob", "did:plc:carol"], 510 + }); 511 + expect(policy.id).toBe("mutual-aid"); 512 + expect(policy.name).toBe("Mutual Aid"); 513 + expect(policy.enabled).toBe(true); 514 + expect(policy.target.type).toBe("list"); 515 + if (policy.target.type === "list") { 516 + expect(policy.target.dids).toEqual([ 517 + "did:plc:alice", 518 + "did:plc:bob", 519 + "did:plc:carol", 520 + ]); 521 + } 522 + expect(policy.replication.minCopies).toBe(2); 523 + expect(policy.replication.preferredPeers).toEqual([ 524 + "did:plc:alice", 525 + "did:plc:bob", 526 + "did:plc:carol", 527 + ]); 528 + expect(policy.sync.intervalSec).toBe(600); 529 + expect(policy.retention.maxAgeSec).toBe(0); 530 + expect(policy.retention.keepHistory).toBe(false); 531 + expect(policy.priority).toBe(50); 532 + }); 533 + 534 + it("accepts custom options", () => { 535 + const policy = mutualAid({ 536 + id: "my-group", 537 + name: "My Group", 538 + peerDids: ["did:plc:a"], 539 + minCopies: 4, 540 + intervalSec: 300, 541 + priority: 70, 542 + }); 543 + expect(policy.id).toBe("my-group"); 544 + expect(policy.name).toBe("My Group"); 545 + expect(policy.replication.minCopies).toBe(4); 546 + expect(policy.sync.intervalSec).toBe(300); 547 + expect(policy.priority).toBe(70); 548 + }); 549 + }); 550 + 551 + describe("Presets: saas", () => { 552 + it("creates a valid SaaS policy with defaults", () => { 553 + const policy = saas({ 554 + accountDids: ["did:plc:customer1", "did:plc:customer2"], 555 + }); 556 + expect(policy.id).toBe("saas"); 557 + expect(policy.name).toBe("SaaS SLA"); 558 + expect(policy.replication.minCopies).toBe(3); 559 + expect(policy.sync.intervalSec).toBe(60); 560 + expect(policy.retention.maxAgeSec).toBe(0); 561 + expect(policy.retention.keepHistory).toBe(true); 562 + expect(policy.priority).toBe(80); 563 + }); 564 + 565 + it("accepts custom options", () => { 566 + const policy = saas({ 567 + accountDids: ["did:plc:a"], 568 + minCopies: 5, 569 + intervalSec: 30, 570 + maxAgeSec: 365 * 24 * 3600, 571 + keepHistory: false, 572 + priority: 100, 573 + }); 574 + expect(policy.replication.minCopies).toBe(5); 575 + expect(policy.sync.intervalSec).toBe(30); 576 + expect(policy.retention.maxAgeSec).toBe(365 * 24 * 3600); 577 + expect(policy.retention.keepHistory).toBe(false); 578 + expect(policy.priority).toBe(100); 579 + }); 580 + }); 581 + 582 + describe("Presets: groupGovernance", () => { 583 + it("creates a valid governance policy with defaults", () => { 584 + const policy = groupGovernance({ 585 + approvedDids: ["did:plc:approved1"], 586 + memberDids: ["did:plc:member1", "did:plc:member2"], 587 + }); 588 + expect(policy.id).toBe("group-governance"); 589 + expect(policy.name).toBe("Group Governance"); 590 + expect(policy.target.type).toBe("list"); 591 + if (policy.target.type === "list") { 592 + expect(policy.target.dids).toEqual(["did:plc:approved1"]); 593 + } 594 + expect(policy.replication.minCopies).toBe(2); 595 + expect(policy.replication.preferredPeers).toEqual([ 596 + "did:plc:member1", 597 + "did:plc:member2", 598 + ]); 599 + expect(policy.sync.intervalSec).toBe(300); 600 + expect(policy.priority).toBe(60); 601 + }); 602 + 603 + it("works without memberDids", () => { 604 + const policy = groupGovernance({ 605 + approvedDids: ["did:plc:a"], 606 + }); 607 + expect(policy.replication.preferredPeers).toBeUndefined(); 608 + }); 609 + }); 610 + 611 + // ============================================ 612 + // Integration: presets loaded into engine 613 + // ============================================ 614 + 615 + describe("Integration: presets in engine", () => { 616 + it("mutual-aid group members replicate each other", () => { 617 + const group = ["did:plc:alice", "did:plc:bob", "did:plc:carol"]; 618 + const engine = new PolicyEngine(); 619 + engine.addPolicy(mutualAid({ peerDids: group })); 620 + 621 + for (const did of group) { 622 + expect(engine.shouldReplicate(did)).toBe(true); 623 + const config = engine.getReplicationConfig(did)!; 624 + expect(config.replication.minCopies).toBe(2); 625 + } 626 + 627 + // Non-group member 628 + expect(engine.shouldReplicate("did:plc:outsider")).toBe(false); 629 + }); 630 + 631 + it("saas customers get high-priority fast sync", () => { 632 + const engine = new PolicyEngine(); 633 + engine.addPolicy( 634 + saas({ accountDids: ["did:plc:customer1", "did:plc:customer2"] }), 635 + ); 636 + 637 + const config = engine.getReplicationConfig("did:plc:customer1")!; 638 + expect(config.sync.intervalSec).toBe(60); 639 + expect(config.priority).toBe(80); 640 + expect(config.retention.keepHistory).toBe(true); 641 + }); 642 + 643 + it("combined: saas overrides mutual-aid for overlapping DID", () => { 644 + const engine = new PolicyEngine(); 645 + engine.addPolicy( 646 + mutualAid({ 647 + id: "aid", 648 + peerDids: ["did:plc:overlap", "did:plc:peer2"], 649 + }), 650 + ); 651 + engine.addPolicy( 652 + saas({ 653 + id: "sla", 654 + accountDids: ["did:plc:overlap"], 655 + }), 656 + ); 657 + 658 + const result = engine.evaluate("did:plc:overlap"); 659 + expect(result.sourcePolicyIds.sort()).toEqual(["aid", "sla"]); 660 + // SaaS minCopies (3) > mutual-aid minCopies (2) 661 + expect(result.replication.minCopies).toBe(3); 662 + // SaaS intervalSec (60) < mutual-aid (600) → most frequent wins 663 + expect(result.sync.intervalSec).toBe(60); 664 + // SaaS priority (80) > mutual-aid (50) 665 + expect(result.priority).toBe(80); 666 + // SaaS keepHistory=true wins 667 + expect(result.retention.keepHistory).toBe(true); 668 + // preferred peers from mutual-aid 669 + expect(result.replication.preferredPeers).toBeDefined(); 670 + expect(result.replication.preferredPeers).toContain("did:plc:overlap"); 671 + expect(result.replication.preferredPeers).toContain("did:plc:peer2"); 672 + }); 673 + 674 + it("getExplicitDids across presets", () => { 675 + const engine = new PolicyEngine(); 676 + engine.addPolicy( 677 + mutualAid({ id: "aid", peerDids: ["did:plc:a", "did:plc:b"] }), 678 + ); 679 + engine.addPolicy( 680 + saas({ id: "sla", accountDids: ["did:plc:b", "did:plc:c"] }), 681 + ); 682 + 683 + expect(engine.getExplicitDids().sort()).toEqual([ 684 + "did:plc:a", 685 + "did:plc:b", 686 + "did:plc:c", 687 + ]); 688 + }); 689 + }); 690 + 691 + // ============================================ 692 + // Determinism: same inputs → same outputs 693 + // ============================================ 694 + 695 + describe("Determinism", () => { 696 + it("evaluate produces identical results on repeated calls", () => { 697 + const engine = new PolicyEngine(); 698 + engine.addPolicy( 699 + mutualAid({ 700 + id: "aid", 701 + peerDids: ["did:plc:a", "did:plc:b"], 702 + minCopies: 3, 703 + }), 704 + ); 705 + engine.addPolicy( 706 + saas({ 707 + id: "sla", 708 + accountDids: ["did:plc:a"], 709 + intervalSec: 30, 710 + }), 711 + ); 712 + 713 + const r1 = engine.evaluate("did:plc:a"); 714 + const r2 = engine.evaluate("did:plc:a"); 715 + 716 + expect(r1.sourcePolicyIds).toEqual(r2.sourcePolicyIds); 717 + expect(r1.replication.minCopies).toBe(r2.replication.minCopies); 718 + expect(r1.sync.intervalSec).toBe(r2.sync.intervalSec); 719 + expect(r1.retention.maxAgeSec).toBe(r2.retention.maxAgeSec); 720 + expect(r1.retention.keepHistory).toBe(r2.retention.keepHistory); 721 + expect(r1.priority).toBe(r2.priority); 722 + expect(r1.shouldReplicate).toBe(r2.shouldReplicate); 723 + }); 724 + });
+240
src/policy/engine.ts
··· 1 + /** 2 + * Declarative, deterministic, transport-agnostic policy engine. 3 + * 4 + * Loads a set of policies and evaluates them against DIDs to produce 5 + * effective (merged) replication configurations. Pure evaluation — 6 + * no side effects, no I/O. 7 + */ 8 + 9 + import type { 10 + Policy, 11 + PolicySet, 12 + PolicyTarget, 13 + EffectivePolicy, 14 + ReplicationGoals, 15 + SyncConfig, 16 + RetentionConfig, 17 + } from "./types.js"; 18 + import { 19 + DEFAULT_REPLICATION, 20 + DEFAULT_SYNC, 21 + DEFAULT_RETENTION, 22 + } from "./types.js"; 23 + 24 + export class PolicyEngine { 25 + private policies: Policy[] = []; 26 + 27 + constructor(policySet?: PolicySet) { 28 + if (policySet) { 29 + this.load(policySet); 30 + } 31 + } 32 + 33 + /** 34 + * Load (or replace) the full set of policies. 35 + * Validates that policy IDs are unique. 36 + */ 37 + load(policySet: PolicySet): void { 38 + const ids = new Set<string>(); 39 + for (const p of policySet.policies) { 40 + if (ids.has(p.id)) { 41 + throw new Error(`Duplicate policy ID: ${p.id}`); 42 + } 43 + ids.add(p.id); 44 + } 45 + this.policies = [...policySet.policies]; 46 + } 47 + 48 + /** 49 + * Add a single policy. Throws if a policy with the same ID already exists. 50 + */ 51 + addPolicy(policy: Policy): void { 52 + if (this.policies.some((p) => p.id === policy.id)) { 53 + throw new Error(`Duplicate policy ID: ${policy.id}`); 54 + } 55 + this.policies.push(policy); 56 + } 57 + 58 + /** 59 + * Remove a policy by ID. Returns true if removed, false if not found. 60 + */ 61 + removePolicy(id: string): boolean { 62 + const before = this.policies.length; 63 + this.policies = this.policies.filter((p) => p.id !== id); 64 + return this.policies.length < before; 65 + } 66 + 67 + /** 68 + * Get all loaded policies. 69 + */ 70 + getPolicies(): readonly Policy[] { 71 + return this.policies; 72 + } 73 + 74 + /** 75 + * Export the current policies as a serializable PolicySet. 76 + */ 77 + export(): PolicySet { 78 + return { 79 + version: 1, 80 + policies: [...this.policies], 81 + }; 82 + } 83 + 84 + // ============================================ 85 + // Evaluation 86 + // ============================================ 87 + 88 + /** 89 + * Find all enabled policies that match the given DID. 90 + */ 91 + matchingPolicies(did: string): Policy[] { 92 + return this.policies.filter( 93 + (p) => p.enabled && matchesTarget(p.target, did), 94 + ); 95 + } 96 + 97 + /** 98 + * Evaluate and return the effective (merged) policy for a DID. 99 + * 100 + * Merging rules: 101 + * - replication.minCopies: take the maximum (most protective) 102 + * - replication.preferredPeers: union of all 103 + * - sync.intervalSec: take the minimum (most frequent) 104 + * - retention.maxAgeSec: take the maximum (keep longer), with 0 = forever winning 105 + * - retention.keepHistory: true if any policy says true (most permissive) 106 + * - priority: take the maximum 107 + */ 108 + evaluate(did: string): EffectivePolicy { 109 + const matching = this.matchingPolicies(did); 110 + 111 + if (matching.length === 0) { 112 + return { 113 + did, 114 + sourcePolicyIds: [], 115 + replication: { ...DEFAULT_REPLICATION }, 116 + sync: { ...DEFAULT_SYNC }, 117 + retention: { ...DEFAULT_RETENTION }, 118 + priority: 0, 119 + shouldReplicate: false, 120 + }; 121 + } 122 + 123 + const replication = mergeReplication(matching.map((p) => p.replication)); 124 + const sync = mergeSync(matching.map((p) => p.sync)); 125 + const retention = mergeRetention(matching.map((p) => p.retention)); 126 + const priority = Math.max(...matching.map((p) => p.priority)); 127 + 128 + return { 129 + did, 130 + sourcePolicyIds: matching.map((p) => p.id), 131 + replication, 132 + sync, 133 + retention, 134 + priority, 135 + shouldReplicate: true, 136 + }; 137 + } 138 + 139 + /** 140 + * Whether this DID should be replicated based on policies. 141 + */ 142 + shouldReplicate(did: string): boolean { 143 + return this.evaluate(did).shouldReplicate; 144 + } 145 + 146 + /** 147 + * Get the replication configuration for a DID. 148 + * Returns null if no policies match (should not replicate). 149 + */ 150 + getReplicationConfig(did: string): { 151 + replication: ReplicationGoals; 152 + sync: SyncConfig; 153 + retention: RetentionConfig; 154 + priority: number; 155 + } | null { 156 + const effective = this.evaluate(did); 157 + if (!effective.shouldReplicate) { 158 + return null; 159 + } 160 + return { 161 + replication: effective.replication, 162 + sync: effective.sync, 163 + retention: effective.retention, 164 + priority: effective.priority, 165 + }; 166 + } 167 + 168 + /** 169 + * Get the list of all DIDs that should be replicated based on 170 + * policies with explicit DID lists. (Pattern/all targets cannot 171 + * enumerate DIDs — they are evaluated on-demand.) 172 + */ 173 + getExplicitDids(): string[] { 174 + const dids = new Set<string>(); 175 + for (const p of this.policies) { 176 + if (!p.enabled) continue; 177 + if (p.target.type === "list") { 178 + for (const did of p.target.dids) { 179 + dids.add(did); 180 + } 181 + } 182 + } 183 + return [...dids]; 184 + } 185 + } 186 + 187 + // ============================================ 188 + // Target matching (pure function) 189 + // ============================================ 190 + 191 + /** 192 + * Check if a DID matches a policy target. 193 + */ 194 + export function matchesTarget(target: PolicyTarget, did: string): boolean { 195 + switch (target.type) { 196 + case "all": 197 + return true; 198 + case "list": 199 + return target.dids.includes(did); 200 + case "pattern": 201 + return did.startsWith(target.prefix); 202 + } 203 + } 204 + 205 + // ============================================ 206 + // Merging functions (pure) 207 + // ============================================ 208 + 209 + function mergeReplication(goals: ReplicationGoals[]): ReplicationGoals { 210 + const minCopies = Math.max(...goals.map((g) => g.minCopies)); 211 + const allPeers = new Set<string>(); 212 + for (const g of goals) { 213 + if (g.preferredPeers) { 214 + for (const peer of g.preferredPeers) { 215 + allPeers.add(peer); 216 + } 217 + } 218 + } 219 + return { 220 + minCopies, 221 + preferredPeers: allPeers.size > 0 ? [...allPeers] : undefined, 222 + }; 223 + } 224 + 225 + function mergeSync(configs: SyncConfig[]): SyncConfig { 226 + // Most frequent wins (smallest interval) 227 + const intervalSec = Math.min(...configs.map((c) => c.intervalSec)); 228 + return { intervalSec }; 229 + } 230 + 231 + function mergeRetention(configs: RetentionConfig[]): RetentionConfig { 232 + // 0 means "forever" — if any config says forever, keep forever 233 + const hasForever = configs.some((c) => c.maxAgeSec === 0); 234 + const maxAgeSec = hasForever 235 + ? 0 236 + : Math.max(...configs.map((c) => c.maxAgeSec)); 237 + // Most permissive: if any says keep history, keep it 238 + const keepHistory = configs.some((c) => c.keepHistory); 239 + return { maxAgeSec, keepHistory }; 240 + }
+163
src/policy/presets.ts
··· 1 + /** 2 + * Built-in policy presets for common replication scenarios. 3 + * 4 + * Each preset is a factory that accepts minimal configuration and returns 5 + * a full Policy object. Presets are convenience helpers — users can always 6 + * write policies from scratch. 7 + */ 8 + 9 + import type { Policy } from "./types.js"; 10 + import { 11 + DEFAULT_REPLICATION, 12 + DEFAULT_SYNC, 13 + DEFAULT_RETENTION, 14 + DEFAULT_PRIORITY, 15 + } from "./types.js"; 16 + 17 + // ============================================ 18 + // mutual-aid: N-of-M redundancy among a peer group 19 + // ============================================ 20 + 21 + export interface MutualAidOptions { 22 + /** Unique policy ID. Defaults to "mutual-aid". */ 23 + id?: string; 24 + /** Human-readable name. */ 25 + name?: string; 26 + /** The DIDs of all peers in the mutual-aid group. */ 27 + peerDids: string[]; 28 + /** Minimum copies across the group. Default: 2. */ 29 + minCopies?: number; 30 + /** Sync interval in seconds. Default: 600 (10 minutes). */ 31 + intervalSec?: number; 32 + /** Priority. Default: 50. */ 33 + priority?: number; 34 + } 35 + 36 + /** 37 + * Create a mutual-aid policy. Each peer in the group stores copies of 38 + * the others' data for collective resilience. 39 + */ 40 + export function mutualAid(options: MutualAidOptions): Policy { 41 + return { 42 + id: options.id ?? "mutual-aid", 43 + name: options.name ?? "Mutual Aid", 44 + target: { 45 + type: "list", 46 + dids: options.peerDids, 47 + }, 48 + replication: { 49 + minCopies: options.minCopies ?? 2, 50 + preferredPeers: options.peerDids, 51 + }, 52 + sync: { 53 + intervalSec: options.intervalSec ?? 600, 54 + }, 55 + retention: { 56 + maxAgeSec: 0, // keep forever 57 + keepHistory: false, 58 + }, 59 + priority: options.priority ?? DEFAULT_PRIORITY, 60 + enabled: true, 61 + }; 62 + } 63 + 64 + // ============================================ 65 + // saas: SLA compliance 66 + // ============================================ 67 + 68 + export interface SaasOptions { 69 + /** Unique policy ID. Defaults to "saas". */ 70 + id?: string; 71 + /** Human-readable name. */ 72 + name?: string; 73 + /** The DIDs of serviced accounts. */ 74 + accountDids: string[]; 75 + /** Minimum copies for SLA guarantees. Default: 3. */ 76 + minCopies?: number; 77 + /** Sync interval in seconds. Default: 60 (1 minute). */ 78 + intervalSec?: number; 79 + /** Retention max age in seconds. Default: 0 (forever). */ 80 + maxAgeSec?: number; 81 + /** Whether to keep history. Default: true. */ 82 + keepHistory?: boolean; 83 + /** Priority. Default: 80 (high). */ 84 + priority?: number; 85 + } 86 + 87 + /** 88 + * Create a SaaS SLA policy. Guarantees fast sync, high redundancy, 89 + * and defined retention for paying accounts. 90 + */ 91 + export function saas(options: SaasOptions): Policy { 92 + return { 93 + id: options.id ?? "saas", 94 + name: options.name ?? "SaaS SLA", 95 + target: { 96 + type: "list", 97 + dids: options.accountDids, 98 + }, 99 + replication: { 100 + minCopies: options.minCopies ?? 3, 101 + }, 102 + sync: { 103 + intervalSec: options.intervalSec ?? 60, 104 + }, 105 + retention: { 106 + maxAgeSec: options.maxAgeSec ?? 0, 107 + keepHistory: options.keepHistory ?? true, 108 + }, 109 + priority: options.priority ?? 80, 110 + enabled: true, 111 + }; 112 + } 113 + 114 + // ============================================ 115 + // group-governance: quorum-based replication decisions 116 + // ============================================ 117 + 118 + export interface GroupGovernanceOptions { 119 + /** Unique policy ID. Defaults to "group-governance". */ 120 + id?: string; 121 + /** Human-readable name. */ 122 + name?: string; 123 + /** The DIDs that the group has decided to replicate. */ 124 + approvedDids: string[]; 125 + /** Members of the governance group (for preferred peers). */ 126 + memberDids?: string[]; 127 + /** Minimum copies. Default: 2. */ 128 + minCopies?: number; 129 + /** Sync interval in seconds. Default: 300 (5 minutes). */ 130 + intervalSec?: number; 131 + /** Priority. Default: 60. */ 132 + priority?: number; 133 + } 134 + 135 + /** 136 + * Create a group-governance policy. The group collectively decides 137 + * which accounts to replicate. The approved list is the outcome of 138 + * that governance process (quorum, vote, etc.) — the policy engine 139 + * simply enforces the result. 140 + */ 141 + export function groupGovernance(options: GroupGovernanceOptions): Policy { 142 + return { 143 + id: options.id ?? "group-governance", 144 + name: options.name ?? "Group Governance", 145 + target: { 146 + type: "list", 147 + dids: options.approvedDids, 148 + }, 149 + replication: { 150 + minCopies: options.minCopies ?? 2, 151 + preferredPeers: options.memberDids, 152 + }, 153 + sync: { 154 + intervalSec: options.intervalSec ?? 300, 155 + }, 156 + retention: { 157 + maxAgeSec: 0, 158 + keepHistory: false, 159 + }, 160 + priority: options.priority ?? 60, 161 + enabled: true, 162 + }; 163 + }
+139
src/policy/types.ts
··· 1 + /** 2 + * Type definitions for the declarative policy engine. 3 + * 4 + * Policies are plain data (JSON-serializable), deterministic, and 5 + * transport-agnostic — they operate on atproto accounts (DIDs), not raw blocks. 6 + */ 7 + 8 + // ============================================ 9 + // Target: which accounts a policy applies to 10 + // ============================================ 11 + 12 + /** Match all serviced accounts. */ 13 + export interface TargetAll { 14 + type: "all"; 15 + } 16 + 17 + /** Match an explicit list of DIDs. */ 18 + export interface TargetList { 19 + type: "list"; 20 + dids: string[]; 21 + } 22 + 23 + /** Match DIDs whose string representation matches a pattern (substring or prefix). */ 24 + export interface TargetPattern { 25 + type: "pattern"; 26 + /** A prefix string to match against DIDs, e.g. "did:plc:" or "did:web:example.com". */ 27 + prefix: string; 28 + } 29 + 30 + export type PolicyTarget = TargetAll | TargetList | TargetPattern; 31 + 32 + // ============================================ 33 + // Replication goals 34 + // ============================================ 35 + 36 + export interface ReplicationGoals { 37 + /** Minimum number of copies across peers (including this node). Default: 1. */ 38 + minCopies: number; 39 + /** Preferred peer DIDs to replicate with. Optional. */ 40 + preferredPeers?: string[]; 41 + } 42 + 43 + // ============================================ 44 + // Sync configuration 45 + // ============================================ 46 + 47 + export interface SyncConfig { 48 + /** How often to sync, in seconds. Default: 300 (5 minutes). */ 49 + intervalSec: number; 50 + } 51 + 52 + // ============================================ 53 + // Retention configuration 54 + // ============================================ 55 + 56 + export interface RetentionConfig { 57 + /** How long to keep data, in seconds. 0 = forever. Default: 0. */ 58 + maxAgeSec: number; 59 + /** Whether to keep historical revisions. Default: false. */ 60 + keepHistory: boolean; 61 + } 62 + 63 + // ============================================ 64 + // Policy definition 65 + // ============================================ 66 + 67 + export interface Policy { 68 + /** Unique identifier for this policy. */ 69 + id: string; 70 + /** Human-readable name. */ 71 + name: string; 72 + /** Which accounts this policy applies to. */ 73 + target: PolicyTarget; 74 + /** Replication goals. */ 75 + replication: ReplicationGoals; 76 + /** Sync frequency. */ 77 + sync: SyncConfig; 78 + /** Data retention. */ 79 + retention: RetentionConfig; 80 + /** Priority (higher = more important). Range: 0-100. Default: 50. */ 81 + priority: number; 82 + /** Whether this policy is currently active. */ 83 + enabled: boolean; 84 + } 85 + 86 + // ============================================ 87 + // Effective policy: the resolved result for a single DID 88 + // ============================================ 89 + 90 + /** 91 + * The effective (merged) policy for a single DID after evaluating 92 + * all matching policies. 93 + */ 94 + export interface EffectivePolicy { 95 + /** The DID this effective policy is for. */ 96 + did: string; 97 + /** IDs of all policies that contributed to this result. */ 98 + sourcePolicyIds: string[]; 99 + /** Merged replication goals. */ 100 + replication: ReplicationGoals; 101 + /** Merged sync config. */ 102 + sync: SyncConfig; 103 + /** Merged retention config. */ 104 + retention: RetentionConfig; 105 + /** Highest priority among matching policies. */ 106 + priority: number; 107 + /** Whether replication is enabled (at least one enabled policy matches). */ 108 + shouldReplicate: boolean; 109 + } 110 + 111 + // ============================================ 112 + // Policy set: what gets loaded from config / stored on disk 113 + // ============================================ 114 + 115 + export interface PolicySet { 116 + /** Schema version for forward compatibility. */ 117 + version: 1; 118 + /** All defined policies. */ 119 + policies: Policy[]; 120 + } 121 + 122 + // ============================================ 123 + // Defaults 124 + // ============================================ 125 + 126 + export const DEFAULT_REPLICATION: ReplicationGoals = { 127 + minCopies: 1, 128 + }; 129 + 130 + export const DEFAULT_SYNC: SyncConfig = { 131 + intervalSec: 300, 132 + }; 133 + 134 + export const DEFAULT_RETENTION: RetentionConfig = { 135 + maxAgeSec: 0, 136 + keepHistory: false, 137 + }; 138 + 139 + export const DEFAULT_PRIORITY = 50;
+388
src/replication/e2e-networking.test.ts
··· 1 + /** 2 + * End-to-end networking integration test. 3 + * 4 + * Proves two p2pds nodes can discover each other and exchange data 5 + * over the network using libp2p/Helia bitswap. 6 + * 7 + * These tests create real Helia nodes with TCP networking on localhost, 8 + * connect them directly, and verify block exchange via bitswap. 9 + */ 10 + 11 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 12 + import { mkdtempSync, rmSync } from "node:fs"; 13 + import { tmpdir } from "node:os"; 14 + import { join } from "node:path"; 15 + import { CID } from "multiformats"; 16 + // @ts-ignore -- multiformats v9 subpath exports lack type declarations 17 + import * as raw from "multiformats/codecs/raw"; 18 + // @ts-ignore -- multiformats v9 subpath exports lack type declarations 19 + import { sha256 } from "multiformats/hashes/sha2"; 20 + import type { Helia } from "@helia/interface"; 21 + import { FsBlockstore } from "blockstore-fs"; 22 + import { FsDatastore } from "datastore-fs"; 23 + 24 + /** 25 + * Create a CID from raw bytes using SHA-256. 26 + */ 27 + async function cidFromBytes(bytes: Uint8Array): Promise<CID> { 28 + const hash = await sha256.digest(bytes); 29 + return CID.create(1, raw.code, hash); 30 + } 31 + 32 + /** 33 + * Collect an async generator of Uint8Array chunks into a single Uint8Array. 34 + * Helia/interface-blockstore's get() returns AsyncGenerator<Uint8Array>, 35 + * not a plain Uint8Array. The chunks may be Node.js Buffers, so we 36 + * normalize to a plain Uint8Array for consistent comparison. 37 + */ 38 + async function collectBytes( 39 + gen: AsyncIterable<Uint8Array>, 40 + ): Promise<Uint8Array> { 41 + const chunks: Uint8Array[] = []; 42 + for await (const chunk of gen) { 43 + chunks.push(chunk); 44 + } 45 + if (chunks.length === 0) return new Uint8Array(0); 46 + // Always return a plain Uint8Array (not a Buffer subclass) 47 + if (chunks.length === 1) { 48 + const c = chunks[0]!; 49 + return new Uint8Array(c.buffer, c.byteOffset, c.byteLength); 50 + } 51 + const total = chunks.reduce((acc, c) => acc + c.length, 0); 52 + const result = new Uint8Array(total); 53 + let offset = 0; 54 + for (const c of chunks) { 55 + result.set(c, offset); 56 + offset += c.length; 57 + } 58 + return result; 59 + } 60 + 61 + /** 62 + * Create a minimal Helia node with TCP-only networking on localhost. 63 + * 64 + * Strips out bootstrap peers, mDNS, delegated routing, autoNAT, autoTLS, 65 + * uPnP, circuit relay, WebRTC, and WebSockets to avoid any external 66 + * network dependencies. Nodes must be connected manually via dial(). 67 + */ 68 + async function createTestHeliaNode( 69 + blocksPath: string, 70 + datastorePath: string, 71 + ): Promise<Helia> { 72 + const { createHelia } = await import("helia"); 73 + const { noise } = await import("@chainsafe/libp2p-noise"); 74 + const { yamux } = await import("@chainsafe/libp2p-yamux"); 75 + const { tcp } = await import("@libp2p/tcp"); 76 + const { identify } = await import("@libp2p/identify"); 77 + const { bitswap } = await import("@helia/block-brokers"); 78 + const { libp2pRouting } = await import("@helia/routers"); 79 + const { createLibp2p } = await import("libp2p"); 80 + 81 + const blockstore = new FsBlockstore(blocksPath); 82 + const datastore = new FsDatastore(datastorePath); 83 + 84 + const libp2p = await createLibp2p({ 85 + addresses: { 86 + listen: ["/ip4/127.0.0.1/tcp/0"], 87 + }, 88 + transports: [tcp()], 89 + connectionEncrypters: [noise()], 90 + streamMuxers: [yamux()], 91 + services: { 92 + identify: identify(), 93 + }, 94 + // No peer discovery -- we connect manually 95 + }); 96 + 97 + const helia = await createHelia({ 98 + libp2p, 99 + blockstore, 100 + datastore, 101 + blockBrokers: [bitswap()], 102 + routers: [libp2pRouting(libp2p)], 103 + }); 104 + 105 + return helia; 106 + } 107 + 108 + /** 109 + * Wait for a condition to become true, with a timeout. 110 + */ 111 + async function waitFor( 112 + fn: () => Promise<boolean> | boolean, 113 + timeoutMs: number = 10_000, 114 + intervalMs: number = 200, 115 + ): Promise<void> { 116 + const deadline = Date.now() + timeoutMs; 117 + while (Date.now() < deadline) { 118 + if (await fn()) return; 119 + await new Promise((r) => setTimeout(r, intervalMs)); 120 + } 121 + throw new Error(`waitFor timed out after ${timeoutMs}ms`); 122 + } 123 + 124 + describe("E2E networking: two Helia nodes", () => { 125 + let tmpDir: string; 126 + let nodeA: Helia | null = null; 127 + let nodeB: Helia | null = null; 128 + 129 + beforeEach(() => { 130 + tmpDir = mkdtempSync(join(tmpdir(), "e2e-networking-test-")); 131 + }); 132 + 133 + afterEach(async () => { 134 + // Stop nodes in parallel for faster cleanup 135 + const stops: Promise<void>[] = []; 136 + if (nodeA) stops.push(nodeA.stop().catch(() => {})); 137 + if (nodeB) stops.push(nodeB.stop().catch(() => {})); 138 + await Promise.all(stops); 139 + nodeA = null; 140 + nodeB = null; 141 + 142 + rmSync(tmpDir, { recursive: true, force: true }); 143 + }); 144 + 145 + it("nodes can connect and exchange blocks via bitswap", { timeout: 60_000 }, async () => { 146 + // 1. Create two Helia nodes with real TCP networking on localhost 147 + nodeA = await createTestHeliaNode( 148 + join(tmpDir, "a-blocks"), 149 + join(tmpDir, "a-datastore"), 150 + ); 151 + nodeB = await createTestHeliaNode( 152 + join(tmpDir, "b-blocks"), 153 + join(tmpDir, "b-datastore"), 154 + ); 155 + 156 + // 2. Verify both nodes are running and have addresses 157 + const addrsA = nodeA.libp2p.getMultiaddrs(); 158 + const addrsB = nodeB.libp2p.getMultiaddrs(); 159 + expect(addrsA.length).toBeGreaterThan(0); 160 + expect(addrsB.length).toBeGreaterThan(0); 161 + 162 + const peerIdA = nodeA.libp2p.peerId.toString(); 163 + const peerIdB = nodeB.libp2p.peerId.toString(); 164 + expect(peerIdA).toBeTruthy(); 165 + expect(peerIdB).toBeTruthy(); 166 + expect(peerIdA).not.toBe(peerIdB); 167 + 168 + // 3. Connect node B to node A 169 + await nodeB.libp2p.dial(addrsA[0]!); 170 + 171 + // Wait for the connection to be established in both directions 172 + await waitFor( 173 + () => 174 + nodeA!.libp2p.getConnections().length > 0 && 175 + nodeB!.libp2p.getConnections().length > 0, 176 + 5_000, 177 + ); 178 + 179 + expect(nodeA.libp2p.getConnections().length).toBeGreaterThan(0); 180 + expect(nodeB.libp2p.getConnections().length).toBeGreaterThan(0); 181 + 182 + // 4. Store blocks on node A 183 + const testData = [ 184 + new TextEncoder().encode("hello from node A"), 185 + new TextEncoder().encode("second block of data"), 186 + new TextEncoder().encode("third block for good measure"), 187 + ]; 188 + 189 + const cids: CID[] = []; 190 + for (const bytes of testData) { 191 + const cid = await cidFromBytes(bytes); 192 + await nodeA.blockstore.put(cid, bytes); 193 + cids.push(cid); 194 + } 195 + 196 + // Verify blocks are on node A 197 + for (const cid of cids) { 198 + expect(await nodeA.blockstore.has(cid)).toBe(true); 199 + } 200 + 201 + // Verify blocks are NOT on node B yet 202 + for (const cid of cids) { 203 + expect( 204 + await nodeB.blockstore.has(cid, { offline: true } as any), 205 + ).toBe(false); 206 + } 207 + 208 + // 5. Retrieve blocks on node B via bitswap (network fetch) 209 + // blockstore.get() returns AsyncGenerator<Uint8Array> 210 + for (let i = 0; i < cids.length; i++) { 211 + const cid = cids[i]!; 212 + const signal = AbortSignal.timeout(15_000); 213 + const retrieved = await collectBytes( 214 + nodeB.blockstore.get(cid, { signal }) as any, 215 + ); 216 + 217 + expect(retrieved).toBeDefined(); 218 + expect(retrieved.length).toBe(testData[i]!.length); 219 + expect(retrieved).toEqual(testData[i]!); 220 + } 221 + 222 + // 6. Verify blocks are now cached on node B 223 + for (const cid of cids) { 224 + expect(await nodeB.blockstore.has(cid)).toBe(true); 225 + } 226 + }); 227 + 228 + it("IpfsService instances can be connected and peer info is correct", { timeout: 60_000 }, async () => { 229 + // This test verifies that IpfsService with networking=true 230 + // exposes correct peer identity and multiaddr information. 231 + const { IpfsService } = await import("../ipfs.js"); 232 + 233 + const serviceA = new IpfsService({ 234 + blocksPath: join(tmpDir, "svc-a-blocks"), 235 + datastorePath: join(tmpDir, "svc-a-datastore"), 236 + networking: true, 237 + }); 238 + const serviceB = new IpfsService({ 239 + blocksPath: join(tmpDir, "svc-b-blocks"), 240 + datastorePath: join(tmpDir, "svc-b-datastore"), 241 + networking: true, 242 + }); 243 + 244 + try { 245 + await serviceA.start(); 246 + await serviceB.start(); 247 + 248 + // Verify peer IDs are present and distinct 249 + const peerIdA = serviceA.getPeerId(); 250 + const peerIdB = serviceB.getPeerId(); 251 + expect(peerIdA).not.toBeNull(); 252 + expect(peerIdB).not.toBeNull(); 253 + expect(peerIdA).not.toBe(peerIdB); 254 + 255 + // Verify multiaddrs are available 256 + const addrsA = serviceA.getMultiaddrs(); 257 + const addrsB = serviceB.getMultiaddrs(); 258 + expect(addrsA.length).toBeGreaterThan(0); 259 + expect(addrsB.length).toBeGreaterThan(0); 260 + 261 + // Verify that IpfsService reports running 262 + expect(serviceA.isRunning()).toBe(true); 263 + expect(serviceB.isRunning()).toBe(true); 264 + } finally { 265 + if (serviceA.isRunning()) await serviceA.stop(); 266 + if (serviceB.isRunning()) await serviceB.stop(); 267 + } 268 + }); 269 + 270 + it("block stored on one node is retrievable from the other after connection", { timeout: 60_000 }, async () => { 271 + // A focused test: one block, two nodes, verify bitswap fetch. 272 + nodeA = await createTestHeliaNode( 273 + join(tmpDir, "single-a-blocks"), 274 + join(tmpDir, "single-a-datastore"), 275 + ); 276 + nodeB = await createTestHeliaNode( 277 + join(tmpDir, "single-b-blocks"), 278 + join(tmpDir, "single-b-datastore"), 279 + ); 280 + 281 + // Connect 282 + const addrsA = nodeA.libp2p.getMultiaddrs(); 283 + await nodeB.libp2p.dial(addrsA[0]!); 284 + await waitFor( 285 + () => nodeB!.libp2p.getConnections().length > 0, 286 + 5_000, 287 + ); 288 + 289 + // Store a single block on node A 290 + const data = new TextEncoder().encode( 291 + "single block e2e test payload", 292 + ); 293 + const cid = await cidFromBytes(data); 294 + await nodeA.blockstore.put(cid, data); 295 + 296 + // Fetch from node B (will use bitswap to get from node A) 297 + const signal = AbortSignal.timeout(15_000); 298 + const fetched = await collectBytes( 299 + nodeB.blockstore.get(cid, { signal }) as any, 300 + ); 301 + 302 + expect(fetched).toEqual(data); 303 + 304 + // Verify it is now cached locally on node B 305 + const cachedLocally = await collectBytes( 306 + nodeB.blockstore.get(cid, { offline: true }) as any, 307 + ); 308 + expect(cachedLocally).toEqual(data); 309 + }); 310 + 311 + it("nodes discover each other's peer IDs after connection", { timeout: 30_000 }, async () => { 312 + nodeA = await createTestHeliaNode( 313 + join(tmpDir, "disc-a-blocks"), 314 + join(tmpDir, "disc-a-datastore"), 315 + ); 316 + nodeB = await createTestHeliaNode( 317 + join(tmpDir, "disc-b-blocks"), 318 + join(tmpDir, "disc-b-datastore"), 319 + ); 320 + 321 + const peerIdA = nodeA.libp2p.peerId; 322 + const peerIdB = nodeB.libp2p.peerId; 323 + 324 + // Before connection, neither knows the other 325 + expect(nodeA.libp2p.getConnections(peerIdB)).toHaveLength(0); 326 + expect(nodeB.libp2p.getConnections(peerIdA)).toHaveLength(0); 327 + 328 + // Connect B -> A 329 + const addrsA = nodeA.libp2p.getMultiaddrs(); 330 + await nodeB.libp2p.dial(addrsA[0]!); 331 + 332 + // After connection, both should see the connection 333 + await waitFor( 334 + () => nodeA!.libp2p.getConnections(peerIdB).length > 0, 335 + 5_000, 336 + ); 337 + 338 + expect( 339 + nodeA.libp2p.getConnections(peerIdB).length, 340 + ).toBeGreaterThan(0); 341 + expect( 342 + nodeB.libp2p.getConnections(peerIdA).length, 343 + ).toBeGreaterThan(0); 344 + }); 345 + 346 + it("bidirectional block exchange works", { timeout: 60_000 }, async () => { 347 + nodeA = await createTestHeliaNode( 348 + join(tmpDir, "bidir-a-blocks"), 349 + join(tmpDir, "bidir-a-datastore"), 350 + ); 351 + nodeB = await createTestHeliaNode( 352 + join(tmpDir, "bidir-b-blocks"), 353 + join(tmpDir, "bidir-b-datastore"), 354 + ); 355 + 356 + // Connect 357 + await nodeB.libp2p.dial(nodeA.libp2p.getMultiaddrs()[0]!); 358 + await waitFor( 359 + () => 360 + nodeA!.libp2p.getConnections().length > 0 && 361 + nodeB!.libp2p.getConnections().length > 0, 362 + 5_000, 363 + ); 364 + 365 + // Store block on A, different block on B 366 + const dataA = new TextEncoder().encode("block from A"); 367 + const dataB = new TextEncoder().encode("block from B"); 368 + const cidA = await cidFromBytes(dataA); 369 + const cidB = await cidFromBytes(dataB); 370 + 371 + await nodeA.blockstore.put(cidA, dataA); 372 + await nodeB.blockstore.put(cidB, dataB); 373 + 374 + const signal = AbortSignal.timeout(15_000); 375 + 376 + // B fetches A's block 377 + const fetchedFromA = await collectBytes( 378 + nodeB.blockstore.get(cidA, { signal }) as any, 379 + ); 380 + expect(fetchedFromA).toEqual(dataA); 381 + 382 + // A fetches B's block 383 + const fetchedFromB = await collectBytes( 384 + nodeA.blockstore.get(cidB, { signal }) as any, 385 + ); 386 + expect(fetchedFromB).toEqual(dataB); 387 + }); 388 + });
+616
src/replication/firehose-subscription.test.ts
··· 1 + import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; 2 + import { mkdtempSync, rmSync } from "node:fs"; 3 + import { tmpdir } from "node:os"; 4 + import { join } from "node:path"; 5 + import Database from "better-sqlite3"; 6 + import { WebSocketServer, WebSocket } from "ws"; 7 + import { createServer, type Server } from "node:http"; 8 + import { encode as cborEncode } from "../cbor-compat.js"; 9 + import { SyncStorage } from "./sync-storage.js"; 10 + import { 11 + FirehoseSubscription, 12 + type FirehoseCommitEvent, 13 + } from "./firehose-subscription.js"; 14 + 15 + // ============================================ 16 + // Helpers 17 + // ============================================ 18 + 19 + /** Encode a firehose frame (header + body) as concatenated CBOR. */ 20 + function encodeFrame(header: object, body: object): Buffer { 21 + const headerBytes = cborEncode(header); 22 + const bodyBytes = cborEncode(body); 23 + const frame = new Uint8Array(headerBytes.length + bodyBytes.length); 24 + frame.set(headerBytes, 0); 25 + frame.set(bodyBytes, headerBytes.length); 26 + return Buffer.from(frame); 27 + } 28 + 29 + /** Create a mock commit frame for a given DID. */ 30 + function makeCommitFrame( 31 + seq: number, 32 + repo: string, 33 + ops: Array<{ action: string; path: string; cid: null }> = [], 34 + ): Buffer { 35 + const header = { op: 1, t: "#commit" }; 36 + const body = { 37 + seq, 38 + repo, 39 + rev: `rev${seq}`, 40 + since: null, 41 + blocks: new Uint8Array(0), 42 + ops, 43 + commit: null, 44 + time: new Date().toISOString(), 45 + tooBig: false, 46 + rebase: false, 47 + }; 48 + return encodeFrame(header, body); 49 + } 50 + 51 + /** Create a mock identity frame. */ 52 + function makeIdentityFrame(seq: number, did: string): Buffer { 53 + const header = { op: 1, t: "#identity" }; 54 + const body = { seq, did, handle: "test.bsky.social", time: new Date().toISOString() }; 55 + return encodeFrame(header, body); 56 + } 57 + 58 + /** Create an error frame. */ 59 + function makeErrorFrame(error: string, message: string): Buffer { 60 + const header = { op: -1 }; 61 + const body = { error, message }; 62 + return encodeFrame(header, body); 63 + } 64 + 65 + /** Start a local WebSocket server that simulates a firehose relay. */ 66 + function startMockRelay(): Promise<{ 67 + server: Server; 68 + wss: WebSocketServer; 69 + url: string; 70 + getClients: () => Set<WebSocket>; 71 + close: () => Promise<void>; 72 + }> { 73 + return new Promise((resolve) => { 74 + const server = createServer(); 75 + const wss = new WebSocketServer({ server }); 76 + 77 + server.listen(0, "127.0.0.1", () => { 78 + const addr = server.address() as { port: number }; 79 + const url = `ws://127.0.0.1:${addr.port}/xrpc/com.atproto.sync.subscribeRepos`; 80 + resolve({ 81 + server, 82 + wss, 83 + url, 84 + getClients: () => wss.clients, 85 + close: () => 86 + new Promise<void>((res) => { 87 + for (const client of wss.clients) { 88 + client.close(); 89 + } 90 + wss.close(() => { 91 + server.close(() => res()); 92 + }); 93 + }), 94 + }); 95 + }); 96 + }); 97 + } 98 + 99 + /** Wait for a condition to be true, with timeout. */ 100 + async function waitFor( 101 + condition: () => boolean, 102 + timeoutMs = 5000, 103 + intervalMs = 50, 104 + ): Promise<void> { 105 + const deadline = Date.now() + timeoutMs; 106 + while (!condition()) { 107 + if (Date.now() > deadline) { 108 + throw new Error(`waitFor timed out after ${timeoutMs}ms`); 109 + } 110 + await new Promise((r) => setTimeout(r, intervalMs)); 111 + } 112 + } 113 + 114 + // ============================================ 115 + // CBOR Frame Parsing 116 + // ============================================ 117 + 118 + describe("FirehoseSubscription: frame parsing", () => { 119 + let relay: Awaited<ReturnType<typeof startMockRelay>>; 120 + let sub: FirehoseSubscription; 121 + 122 + beforeEach(async () => { 123 + relay = await startMockRelay(); 124 + }); 125 + 126 + afterEach(async () => { 127 + sub?.stop(); 128 + await relay.close(); 129 + }); 130 + 131 + it("parses commit events for tracked DIDs", async () => { 132 + const events: FirehoseCommitEvent[] = []; 133 + 134 + sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 135 + sub.onCommit((event) => { 136 + events.push(event); 137 + }); 138 + 139 + const trackedDid = "did:plc:tracked1"; 140 + sub.start(new Set([trackedDid])); 141 + 142 + // Wait for client to connect 143 + await waitFor(() => relay.getClients().size > 0); 144 + 145 + // Send a commit for the tracked DID 146 + const frame = makeCommitFrame(1, trackedDid, [ 147 + { action: "create", path: "app.bsky.feed.post/abc", cid: null }, 148 + ]); 149 + for (const client of relay.getClients()) { 150 + client.send(frame); 151 + } 152 + 153 + // Wait for event to be processed 154 + await waitFor(() => events.length > 0); 155 + 156 + expect(events).toHaveLength(1); 157 + expect(events[0]!.seq).toBe(1); 158 + expect(events[0]!.repo).toBe(trackedDid); 159 + expect(events[0]!.ops).toHaveLength(1); 160 + expect(events[0]!.ops[0]!.action).toBe("create"); 161 + expect(events[0]!.ops[0]!.path).toBe("app.bsky.feed.post/abc"); 162 + }); 163 + 164 + it("filters out events for non-tracked DIDs", async () => { 165 + const events: FirehoseCommitEvent[] = []; 166 + 167 + sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 168 + sub.onCommit((event) => { 169 + events.push(event); 170 + }); 171 + 172 + const trackedDid = "did:plc:tracked1"; 173 + const untrackedDid = "did:plc:untracked1"; 174 + sub.start(new Set([trackedDid])); 175 + 176 + await waitFor(() => relay.getClients().size > 0); 177 + 178 + // Send events for both tracked and untracked DIDs 179 + for (const client of relay.getClients()) { 180 + client.send(makeCommitFrame(1, untrackedDid)); 181 + client.send(makeCommitFrame(2, trackedDid)); 182 + client.send(makeCommitFrame(3, untrackedDid)); 183 + } 184 + 185 + await waitFor(() => events.length >= 1); 186 + // Small delay to ensure no extra events arrive 187 + await new Promise((r) => setTimeout(r, 200)); 188 + 189 + // Only the tracked DID's event should have been dispatched 190 + expect(events).toHaveLength(1); 191 + expect(events[0]!.repo).toBe(trackedDid); 192 + expect(events[0]!.seq).toBe(2); 193 + }); 194 + 195 + it("ignores identity events (only processes commits)", async () => { 196 + const events: FirehoseCommitEvent[] = []; 197 + 198 + sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 199 + sub.onCommit((event) => { 200 + events.push(event); 201 + }); 202 + 203 + const trackedDid = "did:plc:tracked1"; 204 + sub.start(new Set([trackedDid])); 205 + 206 + await waitFor(() => relay.getClients().size > 0); 207 + 208 + for (const client of relay.getClients()) { 209 + client.send(makeIdentityFrame(1, trackedDid)); 210 + client.send(makeCommitFrame(2, trackedDid)); 211 + } 212 + 213 + await waitFor(() => events.length >= 1); 214 + await new Promise((r) => setTimeout(r, 200)); 215 + 216 + // Only commit events should be dispatched 217 + expect(events).toHaveLength(1); 218 + expect(events[0]!.seq).toBe(2); 219 + }); 220 + 221 + it("handles error frames without crashing", async () => { 222 + const events: FirehoseCommitEvent[] = []; 223 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 224 + 225 + sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 226 + sub.onCommit((event) => { 227 + events.push(event); 228 + }); 229 + 230 + const trackedDid = "did:plc:tracked1"; 231 + sub.start(new Set([trackedDid])); 232 + 233 + await waitFor(() => relay.getClients().size > 0); 234 + 235 + for (const client of relay.getClients()) { 236 + client.send(makeErrorFrame("ConsumerTooSlow", "Consumer is too slow")); 237 + client.send(makeCommitFrame(1, trackedDid)); 238 + } 239 + 240 + await waitFor(() => events.length >= 1); 241 + 242 + // Error was logged (single string argument containing the prefix) 243 + expect(consoleSpy).toHaveBeenCalledWith( 244 + expect.stringContaining("[firehose-subscription] Error from relay"), 245 + ); 246 + 247 + // Commit event still processed 248 + expect(events).toHaveLength(1); 249 + 250 + consoleSpy.mockRestore(); 251 + }); 252 + }); 253 + 254 + // ============================================ 255 + // Cursor Management 256 + // ============================================ 257 + 258 + describe("FirehoseSubscription: cursor management", () => { 259 + let relay: Awaited<ReturnType<typeof startMockRelay>>; 260 + let sub: FirehoseSubscription; 261 + 262 + beforeEach(async () => { 263 + relay = await startMockRelay(); 264 + }); 265 + 266 + afterEach(async () => { 267 + sub?.stop(); 268 + await relay.close(); 269 + }); 270 + 271 + it("updates cursor as events are processed", async () => { 272 + const events: FirehoseCommitEvent[] = []; 273 + 274 + sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 275 + sub.onCommit((event) => { 276 + events.push(event); 277 + }); 278 + 279 + const trackedDid = "did:plc:tracked1"; 280 + sub.start(new Set([trackedDid])); 281 + 282 + await waitFor(() => relay.getClients().size > 0); 283 + 284 + expect(sub.getCursor()).toBeNull(); 285 + 286 + for (const client of relay.getClients()) { 287 + client.send(makeCommitFrame(10, trackedDid)); 288 + } 289 + await waitFor(() => events.length >= 1); 290 + expect(sub.getCursor()).toBe(10); 291 + 292 + for (const client of relay.getClients()) { 293 + client.send(makeCommitFrame(20, trackedDid)); 294 + } 295 + await waitFor(() => events.length >= 2); 296 + expect(sub.getCursor()).toBe(20); 297 + }); 298 + 299 + it("passes cursor to relay URL on connection", async () => { 300 + let receivedUrl: string | undefined; 301 + 302 + relay.wss.on("connection", (_ws, req) => { 303 + receivedUrl = req.url; 304 + }); 305 + 306 + sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 307 + sub.start(new Set(["did:plc:test"]), 12345); 308 + 309 + await waitFor(() => relay.getClients().size > 0); 310 + 311 + expect(receivedUrl).toContain("cursor=12345"); 312 + }); 313 + 314 + it("resumes from cursor on reconnect", async () => { 315 + const events: FirehoseCommitEvent[] = []; 316 + const urls: string[] = []; 317 + 318 + relay.wss.on("connection", (_ws, req) => { 319 + urls.push(req.url ?? ""); 320 + }); 321 + 322 + sub = new FirehoseSubscription({ 323 + firehoseUrl: relay.url, 324 + reconnectDelayMs: 100, 325 + }); 326 + sub.onCommit((event) => { 327 + events.push(event); 328 + }); 329 + 330 + const trackedDid = "did:plc:tracked1"; 331 + sub.start(new Set([trackedDid])); 332 + 333 + await waitFor(() => relay.getClients().size > 0); 334 + 335 + // Send an event to set the cursor 336 + for (const client of relay.getClients()) { 337 + client.send(makeCommitFrame(42, trackedDid)); 338 + } 339 + await waitFor(() => events.length >= 1); 340 + expect(sub.getCursor()).toBe(42); 341 + 342 + // Close all clients to trigger reconnect 343 + for (const client of relay.getClients()) { 344 + client.close(); 345 + } 346 + 347 + // Wait for reconnect 348 + await waitFor(() => urls.length >= 2, 5000); 349 + 350 + // The reconnection URL should include the cursor 351 + expect(urls[1]).toContain("cursor=42"); 352 + }); 353 + }); 354 + 355 + // ============================================ 356 + // SyncStorage: Firehose Cursor Persistence 357 + // ============================================ 358 + 359 + describe("SyncStorage: firehose cursor", () => { 360 + let tmpDir: string; 361 + let db: InstanceType<typeof Database>; 362 + let storage: SyncStorage; 363 + 364 + beforeEach(() => { 365 + tmpDir = mkdtempSync(join(tmpdir(), "firehose-cursor-test-")); 366 + db = new Database(join(tmpDir, "test.db")); 367 + storage = new SyncStorage(db); 368 + storage.initSchema(); 369 + }); 370 + 371 + afterEach(() => { 372 + db.close(); 373 + rmSync(tmpDir, { recursive: true, force: true }); 374 + }); 375 + 376 + it("returns null when no cursor saved", () => { 377 + expect(storage.getFirehoseCursor()).toBeNull(); 378 + }); 379 + 380 + it("saves and retrieves cursor", () => { 381 + storage.saveFirehoseCursor(12345); 382 + expect(storage.getFirehoseCursor()).toBe(12345); 383 + }); 384 + 385 + it("updates cursor on subsequent saves", () => { 386 + storage.saveFirehoseCursor(100); 387 + expect(storage.getFirehoseCursor()).toBe(100); 388 + 389 + storage.saveFirehoseCursor(200); 390 + expect(storage.getFirehoseCursor()).toBe(200); 391 + }); 392 + 393 + it("clears cursor", () => { 394 + storage.saveFirehoseCursor(100); 395 + storage.clearFirehoseCursor(); 396 + expect(storage.getFirehoseCursor()).toBeNull(); 397 + }); 398 + }); 399 + 400 + // ============================================ 401 + // Reconnection Logic 402 + // ============================================ 403 + 404 + describe("FirehoseSubscription: reconnection", () => { 405 + let relay: Awaited<ReturnType<typeof startMockRelay>>; 406 + let sub: FirehoseSubscription; 407 + 408 + beforeEach(async () => { 409 + relay = await startMockRelay(); 410 + }); 411 + 412 + afterEach(async () => { 413 + sub?.stop(); 414 + await relay.close(); 415 + }); 416 + 417 + it("reconnects after disconnection", async () => { 418 + let connectionCount = 0; 419 + relay.wss.on("connection", () => { 420 + connectionCount++; 421 + }); 422 + 423 + sub = new FirehoseSubscription({ 424 + firehoseUrl: relay.url, 425 + reconnectDelayMs: 100, 426 + maxReconnectDelayMs: 500, 427 + }); 428 + sub.start(new Set(["did:plc:test"])); 429 + 430 + await waitFor(() => connectionCount >= 1); 431 + 432 + // Close server-side connections to trigger reconnect 433 + for (const client of relay.getClients()) { 434 + client.close(); 435 + } 436 + 437 + await waitFor(() => connectionCount >= 2, 5000); 438 + expect(connectionCount).toBeGreaterThanOrEqual(2); 439 + }); 440 + 441 + it("stop() prevents reconnection", async () => { 442 + let connectionCount = 0; 443 + relay.wss.on("connection", () => { 444 + connectionCount++; 445 + }); 446 + 447 + sub = new FirehoseSubscription({ 448 + firehoseUrl: relay.url, 449 + reconnectDelayMs: 100, 450 + }); 451 + sub.start(new Set(["did:plc:test"])); 452 + 453 + await waitFor(() => connectionCount >= 1); 454 + 455 + // Stop before disconnection 456 + sub.stop(); 457 + 458 + // Close server-side connections 459 + for (const client of relay.getClients()) { 460 + client.close(); 461 + } 462 + 463 + // Wait and verify no reconnection 464 + await new Promise((r) => setTimeout(r, 500)); 465 + expect(connectionCount).toBe(1); 466 + }); 467 + }); 468 + 469 + // ============================================ 470 + // Stats and DID Updates 471 + // ============================================ 472 + 473 + describe("FirehoseSubscription: stats and DID updates", () => { 474 + let relay: Awaited<ReturnType<typeof startMockRelay>>; 475 + let sub: FirehoseSubscription; 476 + 477 + beforeEach(async () => { 478 + relay = await startMockRelay(); 479 + }); 480 + 481 + afterEach(async () => { 482 + sub?.stop(); 483 + await relay.close(); 484 + }); 485 + 486 + it("reports correct stats", async () => { 487 + sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 488 + sub.onCommit(() => {}); 489 + 490 + const trackedDid = "did:plc:tracked1"; 491 + const otherDid = "did:plc:other"; 492 + sub.start(new Set([trackedDid])); 493 + 494 + await waitFor(() => relay.getClients().size > 0); 495 + 496 + // Send events 497 + for (const client of relay.getClients()) { 498 + client.send(makeCommitFrame(1, otherDid)); // filtered out 499 + client.send(makeCommitFrame(2, trackedDid)); // processed 500 + client.send(makeCommitFrame(3, trackedDid)); // processed 501 + } 502 + 503 + await waitFor(() => sub.getStats().eventsProcessed >= 2); 504 + 505 + const stats = sub.getStats(); 506 + expect(stats.connected).toBe(true); 507 + expect(stats.cursor).toBe(3); 508 + expect(stats.eventsReceived).toBe(3); // all commit events received 509 + expect(stats.eventsProcessed).toBe(2); // only tracked DID 510 + expect(stats.trackedDids).toBe(1); 511 + }); 512 + 513 + it("updateDids changes which DIDs are tracked", async () => { 514 + const events: FirehoseCommitEvent[] = []; 515 + 516 + sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 517 + sub.onCommit((event) => { 518 + events.push(event); 519 + }); 520 + 521 + const did1 = "did:plc:first"; 522 + const did2 = "did:plc:second"; 523 + sub.start(new Set([did1])); 524 + 525 + await waitFor(() => relay.getClients().size > 0); 526 + 527 + // Send event for did1 528 + for (const client of relay.getClients()) { 529 + client.send(makeCommitFrame(1, did1)); 530 + } 531 + await waitFor(() => events.length >= 1); 532 + 533 + // Switch tracking to did2 534 + sub.updateDids(new Set([did2])); 535 + 536 + for (const client of relay.getClients()) { 537 + client.send(makeCommitFrame(2, did1)); // should be filtered 538 + client.send(makeCommitFrame(3, did2)); // should be processed 539 + } 540 + 541 + await waitFor(() => events.length >= 2); 542 + await new Promise((r) => setTimeout(r, 200)); 543 + 544 + expect(events).toHaveLength(2); 545 + expect(events[0]!.repo).toBe(did1); 546 + expect(events[1]!.repo).toBe(did2); 547 + }); 548 + }); 549 + 550 + // ============================================ 551 + // Multiple handlers 552 + // ============================================ 553 + 554 + describe("FirehoseSubscription: multiple handlers", () => { 555 + let relay: Awaited<ReturnType<typeof startMockRelay>>; 556 + let sub: FirehoseSubscription; 557 + 558 + beforeEach(async () => { 559 + relay = await startMockRelay(); 560 + }); 561 + 562 + afterEach(async () => { 563 + sub?.stop(); 564 + await relay.close(); 565 + }); 566 + 567 + it("dispatches to all registered handlers", async () => { 568 + const events1: FirehoseCommitEvent[] = []; 569 + const events2: FirehoseCommitEvent[] = []; 570 + 571 + sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 572 + sub.onCommit((event) => { events1.push(event); }); 573 + sub.onCommit((event) => { events2.push(event); }); 574 + 575 + const trackedDid = "did:plc:tracked1"; 576 + sub.start(new Set([trackedDid])); 577 + 578 + await waitFor(() => relay.getClients().size > 0); 579 + 580 + for (const client of relay.getClients()) { 581 + client.send(makeCommitFrame(1, trackedDid)); 582 + } 583 + 584 + await waitFor(() => events1.length >= 1 && events2.length >= 1); 585 + 586 + expect(events1).toHaveLength(1); 587 + expect(events2).toHaveLength(1); 588 + }); 589 + 590 + it("handler errors do not prevent other handlers from running", async () => { 591 + const events: FirehoseCommitEvent[] = []; 592 + const consoleSpy = vi.spyOn(console, "error").mockImplementation(() => {}); 593 + 594 + sub = new FirehoseSubscription({ firehoseUrl: relay.url }); 595 + sub.onCommit(() => { 596 + throw new Error("Handler 1 fails"); 597 + }); 598 + sub.onCommit((event) => { events.push(event); }); 599 + 600 + const trackedDid = "did:plc:tracked1"; 601 + sub.start(new Set([trackedDid])); 602 + 603 + await waitFor(() => relay.getClients().size > 0); 604 + 605 + for (const client of relay.getClients()) { 606 + client.send(makeCommitFrame(1, trackedDid)); 607 + } 608 + 609 + await waitFor(() => events.length >= 1); 610 + 611 + expect(events).toHaveLength(1); 612 + expect(consoleSpy).toHaveBeenCalled(); 613 + 614 + consoleSpy.mockRestore(); 615 + }); 616 + });
+430
src/replication/firehose-subscription.ts
··· 1 + /** 2 + * Subscribe to the AT Protocol firehose (com.atproto.sync.subscribeRepos) 3 + * to receive real-time updates for serviced DIDs. 4 + * 5 + * The firehose streams CBOR-encoded frames over WebSocket. Each frame 6 + * consists of two concatenated CBOR values: a header and a body. 7 + * The header contains { op, t } where t identifies the event type. 8 + * Commit events contain blocks (as CAR bytes) and ops (record changes). 9 + */ 10 + 11 + import { WebSocket } from "ws"; 12 + import { decode as cborDecode } from "../cbor-compat.js"; 13 + 14 + /** Parsed commit operation from a firehose event. */ 15 + export interface FirehoseCommitOp { 16 + action: "create" | "update" | "delete"; 17 + path: string; 18 + cid: unknown | null; 19 + } 20 + 21 + /** Parsed commit event from the firehose. */ 22 + export interface FirehoseCommitEvent { 23 + seq: number; 24 + repo: string; 25 + rev: string; 26 + since: string | null; 27 + blocks: Uint8Array; 28 + ops: FirehoseCommitOp[]; 29 + commit: unknown; 30 + time: string; 31 + tooBig: boolean; 32 + rebase: boolean; 33 + } 34 + 35 + /** Callback for commit events. */ 36 + export type CommitHandler = (event: FirehoseCommitEvent) => void | Promise<void>; 37 + 38 + /** Configuration for the firehose subscription. */ 39 + export interface FirehoseSubscriptionConfig { 40 + /** WebSocket URL for the firehose relay. */ 41 + firehoseUrl: string; 42 + /** Initial reconnect delay in ms (doubles on each failure, capped at maxReconnectDelayMs). */ 43 + reconnectDelayMs?: number; 44 + /** Maximum reconnect delay in ms. */ 45 + maxReconnectDelayMs?: number; 46 + } 47 + 48 + const DEFAULT_RECONNECT_DELAY_MS = 1000; 49 + const DEFAULT_MAX_RECONNECT_DELAY_MS = 60_000; 50 + 51 + /** 52 + * FirehoseSubscription connects to an AT Protocol firehose relay and 53 + * dispatches commit events for a configured set of DIDs. 54 + * 55 + * Features: 56 + * - Filters events to only process commits for serviced DIDs (Set lookup) 57 + * - Cursor-based resumption: saves last processed seq for restart 58 + * - Auto-reconnect with exponential backoff 59 + * - Graceful shutdown 60 + */ 61 + export class FirehoseSubscription { 62 + private ws: WebSocket | null = null; 63 + private dids: Set<string> = new Set(); 64 + private commitHandlers: CommitHandler[] = []; 65 + private cursor: number | null = null; 66 + private running = false; 67 + private reconnectTimer: ReturnType<typeof setTimeout> | null = null; 68 + private currentReconnectDelay: number; 69 + private config: Required<FirehoseSubscriptionConfig>; 70 + 71 + /** Count of events received (all DIDs, before filtering). */ 72 + private _eventsReceived = 0; 73 + /** Count of events processed (matching serviced DIDs). */ 74 + private _eventsProcessed = 0; 75 + 76 + constructor(config: FirehoseSubscriptionConfig) { 77 + this.config = { 78 + firehoseUrl: config.firehoseUrl, 79 + reconnectDelayMs: config.reconnectDelayMs ?? DEFAULT_RECONNECT_DELAY_MS, 80 + maxReconnectDelayMs: config.maxReconnectDelayMs ?? DEFAULT_MAX_RECONNECT_DELAY_MS, 81 + }; 82 + this.currentReconnectDelay = this.config.reconnectDelayMs; 83 + } 84 + 85 + /** 86 + * Start subscribing to the firehose for the given set of DIDs. 87 + * If a cursor is provided, events will be replayed from that point. 88 + */ 89 + start(dids: Set<string>, cursor?: number | null): void { 90 + if (this.running) return; 91 + this.running = true; 92 + this.dids = dids; 93 + if (cursor !== undefined && cursor !== null) { 94 + this.cursor = cursor; 95 + } 96 + this.connect(); 97 + } 98 + 99 + /** 100 + * Stop the firehose subscription and disconnect cleanly. 101 + */ 102 + stop(): void { 103 + this.running = false; 104 + if (this.reconnectTimer) { 105 + clearTimeout(this.reconnectTimer); 106 + this.reconnectTimer = null; 107 + } 108 + if (this.ws) { 109 + // Close with normal closure code 110 + try { 111 + this.ws.close(1000, "Shutting down"); 112 + } catch { 113 + // Ignore errors during close 114 + } 115 + this.ws = null; 116 + } 117 + } 118 + 119 + /** 120 + * Register a handler for commit events matching serviced DIDs. 121 + */ 122 + onCommit(handler: CommitHandler): void { 123 + this.commitHandlers.push(handler); 124 + } 125 + 126 + /** 127 + * Update the set of tracked DIDs without reconnecting. 128 + */ 129 + updateDids(dids: Set<string>): void { 130 + this.dids = dids; 131 + } 132 + 133 + /** 134 + * Get the last successfully processed cursor (sequence number). 135 + */ 136 + getCursor(): number | null { 137 + return this.cursor; 138 + } 139 + 140 + /** 141 + * Get subscription statistics. 142 + */ 143 + getStats(): { 144 + connected: boolean; 145 + cursor: number | null; 146 + eventsReceived: number; 147 + eventsProcessed: number; 148 + trackedDids: number; 149 + } { 150 + return { 151 + connected: this.ws !== null && this.ws.readyState === WebSocket.OPEN, 152 + cursor: this.cursor, 153 + eventsReceived: this._eventsReceived, 154 + eventsProcessed: this._eventsProcessed, 155 + trackedDids: this.dids.size, 156 + }; 157 + } 158 + 159 + // ============================================ 160 + // Internal: Connection management 161 + // ============================================ 162 + 163 + private connect(): void { 164 + if (!this.running) return; 165 + 166 + const url = new URL(this.config.firehoseUrl); 167 + if (this.cursor !== null) { 168 + url.searchParams.set("cursor", String(this.cursor)); 169 + } 170 + 171 + const ws = new WebSocket(url.toString()); 172 + this.ws = ws; 173 + 174 + ws.binaryType = "nodebuffer"; 175 + 176 + ws.on("open", () => { 177 + // Reset reconnect delay on successful connection 178 + this.currentReconnectDelay = this.config.reconnectDelayMs; 179 + }); 180 + 181 + ws.on("message", (data: Buffer | ArrayBuffer | Buffer[]) => { 182 + try { 183 + const bytes = toUint8Array(data); 184 + this.handleFrame(bytes); 185 + } catch (err) { 186 + // Log but don't crash on individual frame errors 187 + console.error("[firehose-subscription] Frame parse error:", err); 188 + } 189 + }); 190 + 191 + ws.on("close", (code: number, reason: Buffer) => { 192 + this.ws = null; 193 + if (this.running) { 194 + const reasonStr = reason.toString("utf8"); 195 + console.warn( 196 + `[firehose-subscription] Disconnected (code=${code}, reason="${reasonStr}"). Reconnecting in ${this.currentReconnectDelay}ms...`, 197 + ); 198 + this.scheduleReconnect(); 199 + } 200 + }); 201 + 202 + ws.on("error", (err: Error) => { 203 + console.error("[firehose-subscription] WebSocket error:", err.message); 204 + // The 'close' event will fire after this, triggering reconnection 205 + }); 206 + } 207 + 208 + private scheduleReconnect(): void { 209 + if (!this.running) return; 210 + if (this.reconnectTimer) return; 211 + 212 + this.reconnectTimer = setTimeout(() => { 213 + this.reconnectTimer = null; 214 + if (this.running) { 215 + this.connect(); 216 + } 217 + }, this.currentReconnectDelay); 218 + 219 + // Exponential backoff 220 + this.currentReconnectDelay = Math.min( 221 + this.currentReconnectDelay * 2, 222 + this.config.maxReconnectDelayMs, 223 + ); 224 + } 225 + 226 + // ============================================ 227 + // Internal: Frame parsing 228 + // ============================================ 229 + 230 + /** 231 + * Parse a firehose frame (two concatenated CBOR values: header + body). 232 + * 233 + * The AT Protocol firehose sends binary WebSocket frames containing: 234 + * 1. A CBOR-encoded header: { op: number, t: string } 235 + * - op=1 means a normal event, t identifies the type (#commit, #identity, etc.) 236 + * - op=-1 means an error frame 237 + * 2. A CBOR-encoded body (the event payload) 238 + */ 239 + private handleFrame(bytes: Uint8Array): void { 240 + // Decode the header (first CBOR value) 241 + const { value: header, bytesConsumed } = decodeCborWithLength(bytes); 242 + const headerObj = header as Record<string, unknown>; 243 + 244 + const op = headerObj.op as number; 245 + const type = headerObj.t as string | undefined; 246 + 247 + // Error frame 248 + if (op === -1) { 249 + const body = cborDecode(bytes.subarray(bytesConsumed)) as Record<string, unknown>; 250 + console.error( 251 + `[firehose-subscription] Error from relay: ${body.error}: ${body.message}`, 252 + ); 253 + return; 254 + } 255 + 256 + // Only process normal events 257 + if (op !== 1) return; 258 + 259 + // Only process commit events 260 + if (type !== "#commit") return; 261 + 262 + const body = cborDecode(bytes.subarray(bytesConsumed)) as Record<string, unknown>; 263 + this._eventsReceived++; 264 + 265 + // Filter: only process events for our serviced DIDs 266 + const repo = body.repo as string | undefined; 267 + if (!repo || !this.dids.has(repo)) return; 268 + 269 + const event: FirehoseCommitEvent = { 270 + seq: body.seq as number, 271 + repo: repo, 272 + rev: body.rev as string, 273 + since: (body.since as string) ?? null, 274 + blocks: body.blocks instanceof Uint8Array 275 + ? body.blocks 276 + : new Uint8Array(0), 277 + ops: Array.isArray(body.ops) 278 + ? (body.ops as Array<Record<string, unknown>>).map((op) => ({ 279 + action: op.action as "create" | "update" | "delete", 280 + path: op.path as string, 281 + cid: op.cid ?? null, 282 + })) 283 + : [], 284 + commit: body.commit ?? null, 285 + time: (body.time as string) ?? new Date().toISOString(), 286 + tooBig: (body.tooBig as boolean) ?? false, 287 + rebase: (body.rebase as boolean) ?? false, 288 + }; 289 + 290 + // Update cursor to this event's sequence number 291 + this.cursor = event.seq; 292 + this._eventsProcessed++; 293 + 294 + // Dispatch to handlers 295 + for (const handler of this.commitHandlers) { 296 + try { 297 + const result = handler(event); 298 + // If handler returns a promise, catch errors but don't await 299 + if (result && typeof result === "object" && "catch" in result) { 300 + (result as Promise<void>).catch((err) => { 301 + console.error("[firehose-subscription] Commit handler error:", err); 302 + }); 303 + } 304 + } catch (err) { 305 + console.error("[firehose-subscription] Commit handler error:", err); 306 + } 307 + } 308 + } 309 + } 310 + 311 + // ============================================ 312 + // Utility functions 313 + // ============================================ 314 + 315 + /** 316 + * Convert WebSocket message data to Uint8Array. 317 + */ 318 + function toUint8Array(data: Buffer | ArrayBuffer | Buffer[]): Uint8Array { 319 + if (data instanceof Uint8Array) { 320 + return data; 321 + } 322 + if (data instanceof ArrayBuffer) { 323 + return new Uint8Array(data); 324 + } 325 + if (Array.isArray(data)) { 326 + // Buffer[] — concatenate 327 + const totalLength = data.reduce((acc, buf) => acc + buf.length, 0); 328 + const result = new Uint8Array(totalLength); 329 + let offset = 0; 330 + for (const buf of data) { 331 + result.set(buf, offset); 332 + offset += buf.length; 333 + } 334 + return result; 335 + } 336 + return new Uint8Array(data as ArrayBuffer); 337 + } 338 + 339 + /** 340 + * Decode a single CBOR value from bytes and return the value plus 341 + * how many bytes were consumed. 342 + * 343 + * This is needed because firehose frames contain two concatenated CBOR 344 + * values and we need to split them. We use a simple approach: decode the 345 + * first value using the full buffer, then figure out where it ended 346 + * by re-encoding it (since CBOR is deterministic for our use case). 347 + * 348 + * For robustness we use a manual CBOR header parser to determine the 349 + * length of the first CBOR item (the header is always a small map). 350 + */ 351 + function decodeCborWithLength(bytes: Uint8Array): { value: unknown; bytesConsumed: number } { 352 + // The header is always a small CBOR map (2-3 entries). 353 + // We parse the CBOR manually to find where the first value ends. 354 + const consumed = cborItemLength(bytes, 0); 355 + const headerBytes = bytes.subarray(0, consumed); 356 + const value = cborDecode(headerBytes); 357 + return { value, bytesConsumed: consumed }; 358 + } 359 + 360 + /** 361 + * Calculate the byte length of a CBOR item starting at `offset` in `bytes`. 362 + * Supports the subset of CBOR types used in firehose headers: 363 + * unsigned int, negative int, byte string, text string, array, map. 364 + */ 365 + function cborItemLength(bytes: Uint8Array, offset: number): number { 366 + const initial = bytes[offset]!; 367 + const majorType = initial >> 5; 368 + const additionalInfo = initial & 0x1f; 369 + 370 + // Get the argument value and how many bytes the argument header takes 371 + let argValue: number; 372 + let headerSize: number; 373 + 374 + if (additionalInfo < 24) { 375 + argValue = additionalInfo; 376 + headerSize = 1; 377 + } else if (additionalInfo === 24) { 378 + argValue = bytes[offset + 1]!; 379 + headerSize = 2; 380 + } else if (additionalInfo === 25) { 381 + argValue = (bytes[offset + 1]! << 8) | bytes[offset + 2]!; 382 + headerSize = 3; 383 + } else if (additionalInfo === 26) { 384 + argValue = 385 + (bytes[offset + 1]! << 24) | 386 + (bytes[offset + 2]! << 16) | 387 + (bytes[offset + 3]! << 8) | 388 + bytes[offset + 4]!; 389 + headerSize = 5; 390 + } else { 391 + // 27 = 8-byte (not expected for headers), or special values 392 + throw new Error(`Unsupported CBOR additional info: ${additionalInfo}`); 393 + } 394 + 395 + switch (majorType) { 396 + case 0: // unsigned integer 397 + case 1: // negative integer 398 + return headerSize; 399 + case 2: // byte string 400 + case 3: // text string 401 + return headerSize + argValue; 402 + case 4: { // array 403 + let pos = offset + headerSize; 404 + for (let i = 0; i < argValue; i++) { 405 + pos += cborItemLength(bytes, pos); 406 + } 407 + return pos - offset; 408 + } 409 + case 5: { // map 410 + let pos = offset + headerSize; 411 + for (let i = 0; i < argValue; i++) { 412 + pos += cborItemLength(bytes, pos); // key 413 + pos += cborItemLength(bytes, pos); // value 414 + } 415 + return pos - offset; 416 + } 417 + case 6: { // tag 418 + return headerSize + cborItemLength(bytes, offset + headerSize); 419 + } 420 + case 7: // simple/float 421 + if (additionalInfo === 20 || additionalInfo === 21) return 1; // false/true 422 + if (additionalInfo === 22) return 1; // null 423 + if (additionalInfo === 25) return 3; // float16 424 + if (additionalInfo === 26) return 5; // float32 425 + if (additionalInfo === 27) return 9; // float64 426 + return headerSize; 427 + default: 428 + throw new Error(`Unknown CBOR major type: ${majorType}`); 429 + } 430 + }
+587
src/replication/mst-proof.test.ts
··· 1 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 2 + import { mkdtempSync, rmSync } from "node:fs"; 3 + import { tmpdir } from "node:os"; 4 + import { join } from "node:path"; 5 + import Database from "better-sqlite3"; 6 + import { IpfsService } from "../ipfs.js"; 7 + import { RepoManager } from "../repo-manager.js"; 8 + import type { Config } from "../config.js"; 9 + import { readCarWithRoot } from "@atproto/repo"; 10 + import { generateMstProof, verifyMstProof } from "./mst-proof.js"; 11 + 12 + function testConfig(dataDir: string): Config { 13 + return { 14 + DID: "did:plc:test123", 15 + HANDLE: "test.example.com", 16 + PDS_HOSTNAME: "test.example.com", 17 + AUTH_TOKEN: "test-auth-token", 18 + SIGNING_KEY: 19 + "0000000000000000000000000000000000000000000000000000000000000001", 20 + SIGNING_KEY_PUBLIC: 21 + "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 22 + JWT_SECRET: "test-jwt-secret", 23 + PASSWORD_HASH: "$2a$10$test", 24 + DATA_DIR: dataDir, 25 + PORT: 3000, 26 + IPFS_ENABLED: true, 27 + IPFS_NETWORKING: false, 28 + REPLICATE_DIDS: [], 29 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 30 + FIREHOSE_ENABLED: false, 31 + }; 32 + } 33 + 34 + describe("MST Path Proof", () => { 35 + let tmpDir: string; 36 + let db: InstanceType<typeof Database>; 37 + let ipfsService: IpfsService; 38 + let repoManager: RepoManager; 39 + 40 + beforeEach(async () => { 41 + tmpDir = mkdtempSync(join(tmpdir(), "mst-proof-test-")); 42 + const config = testConfig(tmpDir); 43 + 44 + db = new Database(join(tmpDir, "test.db")); 45 + ipfsService = new IpfsService({ 46 + blocksPath: join(tmpDir, "ipfs-blocks"), 47 + datastorePath: join(tmpDir, "ipfs-datastore"), 48 + networking: false, 49 + }); 50 + await ipfsService.start(); 51 + 52 + repoManager = new RepoManager(db, config); 53 + repoManager.init(undefined, ipfsService, ipfsService); 54 + }); 55 + 56 + afterEach(async () => { 57 + if (ipfsService.isRunning()) { 58 + await ipfsService.stop(); 59 + } 60 + db.close(); 61 + rmSync(tmpDir, { recursive: true, force: true }); 62 + }); 63 + 64 + /** 65 + * Helper: create records, export CAR, store blocks in IPFS, return root CID. 66 + */ 67 + async function getRepoRootCid(): Promise<string> { 68 + const carBytes = await repoManager.getRepoCar(); 69 + const { root, blocks } = await readCarWithRoot(carBytes); 70 + await ipfsService.putBlocks(blocks); 71 + return root.toString(); 72 + } 73 + 74 + // ============================================ 75 + // Existence proofs 76 + // ============================================ 77 + 78 + it("generates and verifies an existence proof for a single record", async () => { 79 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 80 + $type: "app.bsky.feed.post", 81 + text: "Hello, world!", 82 + createdAt: "2025-01-01T00:00:00.000Z", 83 + }); 84 + 85 + const rootCid = await getRepoRootCid(); 86 + 87 + // Get the record's rkey 88 + const records = await repoManager.listRecords("app.bsky.feed.post", { 89 + limit: 10, 90 + }); 91 + const rkey = records.records[0]!.uri.split("/").pop()!; 92 + const recordPath = `app.bsky.feed.post/${rkey}`; 93 + 94 + // Generate proof 95 + const proof = await generateMstProof(ipfsService, rootCid, recordPath); 96 + 97 + expect(proof.found).toBe(true); 98 + expect(proof.recordCid).not.toBeNull(); 99 + expect(proof.commitBlock.cid).toBe(rootCid); 100 + expect(proof.nodes.length).toBeGreaterThan(0); 101 + 102 + // Verify proof 103 + const verification = await verifyMstProof(proof, rootCid, recordPath); 104 + 105 + expect(verification.valid).toBe(true); 106 + expect(verification.found).toBe(true); 107 + expect(verification.recordCid).toBe(proof.recordCid); 108 + expect(verification.error).toBeUndefined(); 109 + }); 110 + 111 + it("generates and verifies proof with multiple records", async () => { 112 + // Create several records to build a deeper MST 113 + for (let i = 0; i < 10; i++) { 114 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 115 + $type: "app.bsky.feed.post", 116 + text: `Post number ${i}`, 117 + createdAt: new Date().toISOString(), 118 + }); 119 + } 120 + 121 + const rootCid = await getRepoRootCid(); 122 + 123 + // Get all records and verify proofs for each 124 + const records = await repoManager.listRecords("app.bsky.feed.post", { 125 + limit: 100, 126 + }); 127 + 128 + for (const record of records.records) { 129 + const rkey = record.uri.split("/").pop()!; 130 + const recordPath = `app.bsky.feed.post/${rkey}`; 131 + 132 + const proof = await generateMstProof( 133 + ipfsService, 134 + rootCid, 135 + recordPath, 136 + ); 137 + 138 + expect(proof.found).toBe(true); 139 + expect(proof.recordCid).not.toBeNull(); 140 + 141 + const verification = await verifyMstProof( 142 + proof, 143 + rootCid, 144 + recordPath, 145 + ); 146 + expect(verification.valid).toBe(true); 147 + expect(verification.found).toBe(true); 148 + } 149 + }); 150 + 151 + it("generates and verifies proof for records in different collections", async () => { 152 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 153 + $type: "app.bsky.feed.post", 154 + text: "A post", 155 + createdAt: new Date().toISOString(), 156 + }); 157 + 158 + await repoManager.putRecord("app.bsky.actor.profile", "self", { 159 + $type: "app.bsky.actor.profile", 160 + displayName: "Test User", 161 + }); 162 + 163 + const rootCid = await getRepoRootCid(); 164 + 165 + // Verify post 166 + const posts = await repoManager.listRecords("app.bsky.feed.post", { 167 + limit: 10, 168 + }); 169 + const postRkey = posts.records[0]!.uri.split("/").pop()!; 170 + const postPath = `app.bsky.feed.post/${postRkey}`; 171 + 172 + const postProof = await generateMstProof( 173 + ipfsService, 174 + rootCid, 175 + postPath, 176 + ); 177 + expect(postProof.found).toBe(true); 178 + const postVerification = await verifyMstProof( 179 + postProof, 180 + rootCid, 181 + postPath, 182 + ); 183 + expect(postVerification.valid).toBe(true); 184 + expect(postVerification.found).toBe(true); 185 + 186 + // Verify profile 187 + const profilePath = "app.bsky.actor.profile/self"; 188 + const profileProof = await generateMstProof( 189 + ipfsService, 190 + rootCid, 191 + profilePath, 192 + ); 193 + expect(profileProof.found).toBe(true); 194 + const profileVerification = await verifyMstProof( 195 + profileProof, 196 + rootCid, 197 + profilePath, 198 + ); 199 + expect(profileVerification.valid).toBe(true); 200 + expect(profileVerification.found).toBe(true); 201 + }); 202 + 203 + // ============================================ 204 + // Non-existence proofs 205 + // ============================================ 206 + 207 + it("generates and verifies a non-existence proof", async () => { 208 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 209 + $type: "app.bsky.feed.post", 210 + text: "Only post", 211 + createdAt: new Date().toISOString(), 212 + }); 213 + 214 + const rootCid = await getRepoRootCid(); 215 + 216 + // Use a path that definitely does not exist 217 + const nonExistentPath = "app.bsky.feed.post/nonexistent-rkey-12345"; 218 + 219 + const proof = await generateMstProof( 220 + ipfsService, 221 + rootCid, 222 + nonExistentPath, 223 + ); 224 + 225 + expect(proof.found).toBe(false); 226 + expect(proof.recordCid).toBeNull(); 227 + 228 + const verification = await verifyMstProof( 229 + proof, 230 + rootCid, 231 + nonExistentPath, 232 + ); 233 + 234 + expect(verification.valid).toBe(true); 235 + expect(verification.found).toBe(false); 236 + expect(verification.recordCid).toBeNull(); 237 + }); 238 + 239 + it("non-existence proof for nonexistent collection", async () => { 240 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 241 + $type: "app.bsky.feed.post", 242 + text: "Post in a real collection", 243 + createdAt: new Date().toISOString(), 244 + }); 245 + 246 + const rootCid = await getRepoRootCid(); 247 + 248 + const nonExistentPath = "com.example.nonexistent/abc123"; 249 + 250 + const proof = await generateMstProof( 251 + ipfsService, 252 + rootCid, 253 + nonExistentPath, 254 + ); 255 + 256 + expect(proof.found).toBe(false); 257 + expect(proof.recordCid).toBeNull(); 258 + 259 + const verification = await verifyMstProof( 260 + proof, 261 + rootCid, 262 + nonExistentPath, 263 + ); 264 + expect(verification.valid).toBe(true); 265 + expect(verification.found).toBe(false); 266 + }); 267 + 268 + // ============================================ 269 + // Proof compactness 270 + // ============================================ 271 + 272 + it("proof is compact: fewer blocks than total MST", async () => { 273 + // Create enough records to build a multi-level MST 274 + for (let i = 0; i < 20; i++) { 275 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 276 + $type: "app.bsky.feed.post", 277 + text: `Post ${i} for compactness test`, 278 + createdAt: new Date().toISOString(), 279 + }); 280 + } 281 + 282 + const rootCid = await getRepoRootCid(); 283 + 284 + const records = await repoManager.listRecords("app.bsky.feed.post", { 285 + limit: 100, 286 + }); 287 + const rkey = records.records[0]!.uri.split("/").pop()!; 288 + const recordPath = `app.bsky.feed.post/${rkey}`; 289 + 290 + const proof = await generateMstProof( 291 + ipfsService, 292 + rootCid, 293 + recordPath, 294 + ); 295 + 296 + // The proof should have: 297 + // - 1 commit block 298 + // - N MST node blocks (path from root to leaf) 299 + // For a tree with 20+ entries, this should be fewer blocks than the total tree 300 + const totalProofBlocks = 1 + proof.nodes.length; // commit + MST nodes 301 + // 20 records => several MST nodes in total; proof should be a subset 302 + expect(totalProofBlocks).toBeLessThanOrEqual(10); 303 + expect(proof.found).toBe(true); 304 + }); 305 + 306 + // ============================================ 307 + // Verification failure cases 308 + // ============================================ 309 + 310 + it("verification fails with wrong commit CID", async () => { 311 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 312 + $type: "app.bsky.feed.post", 313 + text: "Wrong CID test", 314 + createdAt: new Date().toISOString(), 315 + }); 316 + 317 + const rootCid = await getRepoRootCid(); 318 + 319 + const records = await repoManager.listRecords("app.bsky.feed.post", { 320 + limit: 10, 321 + }); 322 + const rkey = records.records[0]!.uri.split("/").pop()!; 323 + const recordPath = `app.bsky.feed.post/${rkey}`; 324 + 325 + const proof = await generateMstProof( 326 + ipfsService, 327 + rootCid, 328 + recordPath, 329 + ); 330 + 331 + // Verify with a wrong commit CID 332 + const fakeCommitCid = 333 + "bafyreig6mxqmjlb7yjbhhhz6vqmtiw4kgipvhqoowdkggjlpzpd5tcm4"; 334 + const verification = await verifyMstProof( 335 + proof, 336 + fakeCommitCid, 337 + recordPath, 338 + ); 339 + 340 + expect(verification.valid).toBe(false); 341 + expect(verification.error).toContain("Commit block CID mismatch"); 342 + }); 343 + 344 + it("verification fails with tampered node bytes", async () => { 345 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 346 + $type: "app.bsky.feed.post", 347 + text: "Tamper test", 348 + createdAt: new Date().toISOString(), 349 + }); 350 + 351 + const rootCid = await getRepoRootCid(); 352 + 353 + const records = await repoManager.listRecords("app.bsky.feed.post", { 354 + limit: 10, 355 + }); 356 + const rkey = records.records[0]!.uri.split("/").pop()!; 357 + const recordPath = `app.bsky.feed.post/${rkey}`; 358 + 359 + const proof = await generateMstProof( 360 + ipfsService, 361 + rootCid, 362 + recordPath, 363 + ); 364 + 365 + // Tamper with the first MST node 366 + const tamperedProof = { 367 + ...proof, 368 + nodes: proof.nodes.map((node, i) => { 369 + if (i === 0) { 370 + // Flip a byte 371 + const tampered = new Uint8Array(node.bytes); 372 + tampered[tampered.length - 1] = 373 + (tampered[tampered.length - 1]! ^ 0xff) & 0xff; 374 + return { ...node, bytes: tampered }; 375 + } 376 + return node; 377 + }), 378 + }; 379 + 380 + const verification = await verifyMstProof( 381 + tamperedProof, 382 + rootCid, 383 + recordPath, 384 + ); 385 + 386 + expect(verification.valid).toBe(false); 387 + }); 388 + 389 + it("verification fails with wrong record path", async () => { 390 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 391 + $type: "app.bsky.feed.post", 392 + text: "Wrong path test", 393 + createdAt: new Date().toISOString(), 394 + }); 395 + 396 + const rootCid = await getRepoRootCid(); 397 + 398 + const records = await repoManager.listRecords("app.bsky.feed.post", { 399 + limit: 10, 400 + }); 401 + const rkey = records.records[0]!.uri.split("/").pop()!; 402 + const recordPath = `app.bsky.feed.post/${rkey}`; 403 + 404 + const proof = await generateMstProof( 405 + ipfsService, 406 + rootCid, 407 + recordPath, 408 + ); 409 + 410 + // Verify against a different path — the proof says found=true but 411 + // the verification will walk the nodes with the wrong path 412 + const wrongPath = "app.bsky.feed.post/totally-wrong-rkey"; 413 + const verification = await verifyMstProof( 414 + proof, 415 + rootCid, 416 + wrongPath, 417 + ); 418 + 419 + // The proof was generated for a different path, so either: 420 + // - The verifier will not find the record at the wrong path (found mismatch) 421 + // - Or the node chain won't be valid 422 + expect(verification.valid).toBe(false); 423 + }); 424 + 425 + it("verification fails with empty nodes array", async () => { 426 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 427 + $type: "app.bsky.feed.post", 428 + text: "Empty nodes test", 429 + createdAt: new Date().toISOString(), 430 + }); 431 + 432 + const rootCid = await getRepoRootCid(); 433 + 434 + const records = await repoManager.listRecords("app.bsky.feed.post", { 435 + limit: 10, 436 + }); 437 + const rkey = records.records[0]!.uri.split("/").pop()!; 438 + const recordPath = `app.bsky.feed.post/${rkey}`; 439 + 440 + const proof = await generateMstProof( 441 + ipfsService, 442 + rootCid, 443 + recordPath, 444 + ); 445 + 446 + const emptyProof = { ...proof, nodes: [] }; 447 + const verification = await verifyMstProof( 448 + emptyProof, 449 + rootCid, 450 + recordPath, 451 + ); 452 + 453 + expect(verification.valid).toBe(false); 454 + expect(verification.error).toContain("no MST nodes"); 455 + }); 456 + 457 + // ============================================ 458 + // Edge cases 459 + // ============================================ 460 + 461 + it("handles a repo with a single record", async () => { 462 + await repoManager.putRecord("app.bsky.actor.profile", "self", { 463 + $type: "app.bsky.actor.profile", 464 + displayName: "Solo", 465 + }); 466 + 467 + const rootCid = await getRepoRootCid(); 468 + const recordPath = "app.bsky.actor.profile/self"; 469 + 470 + const proof = await generateMstProof( 471 + ipfsService, 472 + rootCid, 473 + recordPath, 474 + ); 475 + 476 + expect(proof.found).toBe(true); 477 + expect(proof.nodes.length).toBeGreaterThanOrEqual(1); 478 + 479 + const verification = await verifyMstProof(proof, rootCid, recordPath); 480 + expect(verification.valid).toBe(true); 481 + expect(verification.found).toBe(true); 482 + }); 483 + 484 + it("handles many records to create a deeper tree", async () => { 485 + // Create 50 records to ensure multi-level MST 486 + for (let i = 0; i < 50; i++) { 487 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 488 + $type: "app.bsky.feed.post", 489 + text: `Deep tree post ${i}`, 490 + createdAt: new Date().toISOString(), 491 + }); 492 + } 493 + 494 + const rootCid = await getRepoRootCid(); 495 + 496 + const records = await repoManager.listRecords("app.bsky.feed.post", { 497 + limit: 100, 498 + }); 499 + 500 + // Test the first, middle, and last record 501 + const indicesToTest = [ 502 + 0, 503 + Math.floor(records.records.length / 2), 504 + records.records.length - 1, 505 + ]; 506 + 507 + for (const idx of indicesToTest) { 508 + const record = records.records[idx]!; 509 + const rkey = record.uri.split("/").pop()!; 510 + const recordPath = `app.bsky.feed.post/${rkey}`; 511 + 512 + const proof = await generateMstProof( 513 + ipfsService, 514 + rootCid, 515 + recordPath, 516 + ); 517 + 518 + expect(proof.found).toBe(true); 519 + expect(proof.recordCid).not.toBeNull(); 520 + 521 + const verification = await verifyMstProof( 522 + proof, 523 + rootCid, 524 + recordPath, 525 + ); 526 + 527 + expect(verification.valid).toBe(true); 528 + expect(verification.found).toBe(true); 529 + expect(verification.recordCid).toBe(proof.recordCid); 530 + } 531 + }); 532 + 533 + it("generate throws when commit block is missing", async () => { 534 + await expect( 535 + generateMstProof( 536 + ipfsService, 537 + "bafyreig6mxqmjlb7yjbhhhz6vqmtiw4kgipvhqoowdkggjlpzpd5tcm4", 538 + "app.bsky.feed.post/abc", 539 + ), 540 + ).rejects.toThrow("Commit block not found"); 541 + }); 542 + 543 + it("proof roundtrip: generate then verify maintains consistency", async () => { 544 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 545 + $type: "app.bsky.feed.post", 546 + text: "Roundtrip test", 547 + createdAt: new Date().toISOString(), 548 + }); 549 + 550 + const rootCid = await getRepoRootCid(); 551 + 552 + const records = await repoManager.listRecords("app.bsky.feed.post", { 553 + limit: 10, 554 + }); 555 + const rkey = records.records[0]!.uri.split("/").pop()!; 556 + const existingPath = `app.bsky.feed.post/${rkey}`; 557 + const missingPath = "app.bsky.feed.post/zzz-does-not-exist"; 558 + 559 + // Existence proof roundtrip 560 + const existProof = await generateMstProof( 561 + ipfsService, 562 + rootCid, 563 + existingPath, 564 + ); 565 + const existVerify = await verifyMstProof( 566 + existProof, 567 + rootCid, 568 + existingPath, 569 + ); 570 + expect(existVerify.valid).toBe(true); 571 + expect(existVerify.found).toBe(true); 572 + 573 + // Non-existence proof roundtrip 574 + const missingProof = await generateMstProof( 575 + ipfsService, 576 + rootCid, 577 + missingPath, 578 + ); 579 + const missingVerify = await verifyMstProof( 580 + missingProof, 581 + rootCid, 582 + missingPath, 583 + ); 584 + expect(missingVerify.valid).toBe(true); 585 + expect(missingVerify.found).toBe(false); 586 + }); 587 + });
+412
src/replication/mst-proof.ts
··· 1 + /** 2 + * MST Path Proof: Generate and verify compact Merkle Search Tree proofs. 3 + * 4 + * An MST path proof demonstrates that a specific record exists (or does not 5 + * exist) in an atproto repo by providing only the MST nodes along the path 6 + * from the root to the leaf, plus the commit block that binds the MST root 7 + * to a signed commit CID. 8 + * 9 + * Proof structure: 10 + * - commitBlock: the CBOR-encoded commit object (contains `data` field = MST root CID) 11 + * - nodes: ordered list of { cid, bytes } for each MST node on the path from root to leaf 12 + * - recordCid: the CID of the record value if found, or null for non-existence proofs 13 + * - found: whether the record was found in the MST 14 + */ 15 + 16 + import { createHash } from "node:crypto"; 17 + import { CID } from "multiformats"; 18 + import { cborDecode } from "@atproto/common"; 19 + import type { BlockStore } from "../ipfs.js"; 20 + 21 + // ---------- Types ---------- 22 + 23 + /** A single block in the proof: its CID and raw CBOR bytes. */ 24 + export interface ProofBlock { 25 + cid: string; 26 + bytes: Uint8Array; 27 + } 28 + 29 + /** A compact MST path proof for a single record path. */ 30 + export interface MstProof { 31 + /** The CBOR-encoded commit block. */ 32 + commitBlock: ProofBlock; 33 + /** Ordered MST node blocks from root to the deepest node on the path. */ 34 + nodes: ProofBlock[]; 35 + /** CID of the record value if found, null if record does not exist. */ 36 + recordCid: string | null; 37 + /** Whether the record was found in the MST. */ 38 + found: boolean; 39 + } 40 + 41 + /** Decoded MST node data (matches @atproto/repo's NodeData shape). */ 42 + interface NodeData { 43 + l: CID | null; 44 + e: Array<{ 45 + p: number; 46 + k: Uint8Array; 47 + v: CID; 48 + t: CID | null; 49 + }>; 50 + } 51 + 52 + /** Result of verifying an MST proof. */ 53 + export interface MstProofVerification { 54 + /** Whether the proof is valid. */ 55 + valid: boolean; 56 + /** The record CID if the proof shows existence. */ 57 + recordCid: string | null; 58 + /** Whether the record was found (matches proof.found). */ 59 + found: boolean; 60 + /** Error message if verification failed. */ 61 + error?: string; 62 + } 63 + 64 + // ---------- Helpers ---------- 65 + 66 + /** dag-cbor multicodec code */ 67 + const DAG_CBOR_CODE = 0x71; 68 + 69 + /** sha2-256 multicodec code */ 70 + const SHA2_256_CODE = 0x12; 71 + 72 + /** 73 + * Compute the CID for raw CBOR bytes using dag-cbor codec + sha-256. 74 + * Uses Node.js built-in crypto for the hash. 75 + */ 76 + function cidForBytes(bytes: Uint8Array): CID { 77 + const hash = createHash("sha256").update(bytes).digest(); 78 + 79 + // Build a multihash: varint(code) + varint(length) + digest 80 + // sha2-256 code = 0x12, digest length = 32 81 + const multihash = new Uint8Array(2 + hash.length); 82 + multihash[0] = SHA2_256_CODE; 83 + multihash[1] = hash.length; 84 + multihash.set(hash, 2); 85 + 86 + // Create a CIDv1 with dag-cbor codec 87 + return CID.create( 88 + 1, 89 + DAG_CBOR_CODE, 90 + { code: SHA2_256_CODE, size: hash.length, digest: hash, bytes: multihash } as Parameters<typeof CID.create>[2], 91 + ); 92 + } 93 + 94 + /** 95 + * Decode CBOR bytes and return the MST NodeData structure. 96 + */ 97 + function decodeNodeData(bytes: Uint8Array): NodeData { 98 + const raw = cborDecode(bytes) as { 99 + l: CID | null; 100 + e: Array<{ p: number; k: Uint8Array; v: CID; t: CID | null }>; 101 + }; 102 + return raw; 103 + } 104 + 105 + /** 106 + * Reconstruct the full keys from a node's compressed entries. 107 + * Returns an array of { key, value (record CID), subtree (CID or null) }. 108 + */ 109 + function expandEntries( 110 + data: NodeData, 111 + ): Array<{ key: string; value: CID; subtree: CID | null }> { 112 + const result: Array<{ key: string; value: CID; subtree: CID | null }> = []; 113 + let lastKey = ""; 114 + for (const entry of data.e) { 115 + const keyStr = Buffer.from(entry.k).toString("ascii"); 116 + const key = lastKey.slice(0, entry.p) + keyStr; 117 + result.push({ key, value: entry.v, subtree: entry.t }); 118 + lastKey = key; 119 + } 120 + return result; 121 + } 122 + 123 + // ---------- Generation ---------- 124 + 125 + /** 126 + * Generate a compact MST path proof for a record path. 127 + * 128 + * @param blockStore - Block storage containing the repo blocks 129 + * @param commitCid - CID of the commit block (repo head) 130 + * @param recordPath - Record path in the form "collection/rkey" 131 + * @returns An MstProof containing just the blocks needed to verify the path 132 + */ 133 + export async function generateMstProof( 134 + blockStore: BlockStore, 135 + commitCid: string, 136 + recordPath: string, 137 + ): Promise<MstProof> { 138 + // 1. Fetch and decode the commit block 139 + const commitBytes = await blockStore.getBlock(commitCid); 140 + if (!commitBytes) { 141 + throw new Error(`Commit block not found: ${commitCid}`); 142 + } 143 + 144 + const commitObj = cborDecode(commitBytes) as { 145 + did: string; 146 + version: number; 147 + data: CID; 148 + rev: string; 149 + prev: CID | null; 150 + sig: Uint8Array; 151 + }; 152 + 153 + const mstRootCid = commitObj.data; 154 + if (!mstRootCid) { 155 + throw new Error("Commit block has no data field (MST root)"); 156 + } 157 + 158 + const commitBlock: ProofBlock = { 159 + cid: commitCid, 160 + bytes: commitBytes, 161 + }; 162 + 163 + // 2. Walk down the MST collecting nodes on the path 164 + const nodes: ProofBlock[] = []; 165 + let currentCid: CID = mstRootCid; 166 + let found = false; 167 + let recordCid: string | null = null; 168 + 169 + for (;;) { 170 + const cidStr = currentCid.toString(); 171 + const nodeBytes = await blockStore.getBlock(cidStr); 172 + if (!nodeBytes) { 173 + throw new Error(`MST node block not found: ${cidStr}`); 174 + } 175 + 176 + nodes.push({ cid: cidStr, bytes: nodeBytes }); 177 + 178 + const nodeData = decodeNodeData(nodeBytes); 179 + const entries = expandEntries(nodeData); 180 + 181 + // Find the first entry whose key is >= recordPath 182 + const index = entries.findIndex((e) => e.key >= recordPath); 183 + 184 + if (index >= 0 && entries[index]!.key === recordPath) { 185 + // Found the record at this level 186 + found = true; 187 + recordCid = entries[index]!.value.toString(); 188 + break; 189 + } 190 + 191 + // Determine which subtree to descend into: 192 + // - index < 0: all keys < recordPath => last entry's right subtree 193 + // - index === 0: recordPath < first key => left pointer (nodeData.l) 194 + // - index > 0: recordPath between entries[index-1] and entries[index] 195 + // => entries[index-1]'s right subtree 196 + let nextSubtree: CID | null = null; 197 + 198 + if (index < 0) { 199 + if (entries.length > 0) { 200 + nextSubtree = entries[entries.length - 1]!.subtree; 201 + } else { 202 + nextSubtree = nodeData.l; 203 + } 204 + } else if (index === 0) { 205 + nextSubtree = nodeData.l; 206 + } else { 207 + nextSubtree = entries[index - 1]!.subtree; 208 + } 209 + 210 + if (nextSubtree) { 211 + currentCid = nextSubtree; 212 + } else { 213 + // No subtree to descend into — record does not exist 214 + break; 215 + } 216 + } 217 + 218 + return { 219 + commitBlock, 220 + nodes, 221 + recordCid, 222 + found, 223 + }; 224 + } 225 + 226 + // ---------- Verification ---------- 227 + 228 + /** 229 + * Verify an MST path proof against a commit CID and record path. 230 + * 231 + * Verification checks: 232 + * 1. The commit block's CID matches the expected commitCid 233 + * 2. The commit's `data` field gives the MST root CID 234 + * 3. Each MST node's CID matches its content hash 235 + * 4. The path through the MST nodes correctly leads to the claimed record 236 + * (or correctly demonstrates non-existence) 237 + * 238 + * @param proof - The MST path proof to verify 239 + * @param commitCid - Expected commit CID 240 + * @param recordPath - Record path in the form "collection/rkey" 241 + * @returns Verification result 242 + */ 243 + export async function verifyMstProof( 244 + proof: MstProof, 245 + commitCid: string, 246 + recordPath: string, 247 + ): Promise<MstProofVerification> { 248 + try { 249 + // 1. Verify the commit block CID 250 + const actualCommitCid = cidForBytes(proof.commitBlock.bytes); 251 + if (actualCommitCid.toString() !== commitCid) { 252 + return { 253 + valid: false, 254 + recordCid: null, 255 + found: false, 256 + error: `Commit block CID mismatch: expected ${commitCid}, got ${actualCommitCid.toString()}`, 257 + }; 258 + } 259 + 260 + // 2. Decode the commit and extract the MST root CID 261 + const commitObj = cborDecode(proof.commitBlock.bytes) as { 262 + did: string; 263 + version: number; 264 + data: CID; 265 + rev: string; 266 + }; 267 + 268 + const mstRootCid = commitObj.data; 269 + if (!mstRootCid) { 270 + return { 271 + valid: false, 272 + recordCid: null, 273 + found: false, 274 + error: "Commit block has no data field (MST root)", 275 + }; 276 + } 277 + 278 + // 3. Verify each node block's CID and walk the path 279 + if (proof.nodes.length === 0) { 280 + return { 281 + valid: false, 282 + recordCid: null, 283 + found: false, 284 + error: "Proof contains no MST nodes", 285 + }; 286 + } 287 + 288 + // First node must be the MST root 289 + const firstNodeCid = cidForBytes(proof.nodes[0]!.bytes); 290 + if (firstNodeCid.toString() !== mstRootCid.toString()) { 291 + return { 292 + valid: false, 293 + recordCid: null, 294 + found: false, 295 + error: `First proof node CID (${firstNodeCid.toString()}) does not match MST root (${mstRootCid.toString()})`, 296 + }; 297 + } 298 + 299 + // Walk through nodes verifying the chain 300 + let expectedCid = mstRootCid.toString(); 301 + let found = false; 302 + let recordCid: string | null = null; 303 + 304 + for (let i = 0; i < proof.nodes.length; i++) { 305 + const node = proof.nodes[i]!; 306 + 307 + // Verify block CID matches content 308 + const actualCid = cidForBytes(node.bytes); 309 + if (actualCid.toString() !== expectedCid) { 310 + return { 311 + valid: false, 312 + recordCid: null, 313 + found: false, 314 + error: `Node ${i} CID mismatch: expected ${expectedCid}, got ${actualCid.toString()}`, 315 + }; 316 + } 317 + 318 + const nodeData = decodeNodeData(node.bytes); 319 + const entries = expandEntries(nodeData); 320 + 321 + // Find record in this node 322 + const entryIndex = entries.findIndex((e) => e.key >= recordPath); 323 + 324 + if (entryIndex >= 0 && entries[entryIndex]!.key === recordPath) { 325 + // Found the record 326 + found = true; 327 + recordCid = entries[entryIndex]!.value.toString(); 328 + 329 + // This should be the last node in the proof 330 + if (i !== proof.nodes.length - 1) { 331 + return { 332 + valid: false, 333 + recordCid: null, 334 + found: false, 335 + error: `Record found at node ${i} but proof has ${proof.nodes.length} nodes`, 336 + }; 337 + } 338 + break; 339 + } 340 + 341 + // Determine next subtree 342 + let nextSubtree: CID | null = null; 343 + if (entryIndex < 0) { 344 + if (entries.length > 0) { 345 + nextSubtree = entries[entries.length - 1]!.subtree; 346 + } else { 347 + nextSubtree = nodeData.l; 348 + } 349 + } else if (entryIndex === 0) { 350 + nextSubtree = nodeData.l; 351 + } else { 352 + nextSubtree = entries[entryIndex - 1]!.subtree; 353 + } 354 + 355 + if (i < proof.nodes.length - 1) { 356 + // There are more nodes — verify the next one connects 357 + if (!nextSubtree) { 358 + return { 359 + valid: false, 360 + recordCid: null, 361 + found: false, 362 + error: `Node ${i} has no subtree for the path, but proof has more nodes`, 363 + }; 364 + } 365 + expectedCid = nextSubtree.toString(); 366 + } else { 367 + // Last node and record not found — this is a non-existence proof 368 + // Verify there's no subtree to descend into 369 + if (nextSubtree) { 370 + return { 371 + valid: false, 372 + recordCid: null, 373 + found: false, 374 + error: `Last proof node has subtree for the path — proof is incomplete`, 375 + }; 376 + } 377 + } 378 + } 379 + 380 + // Verify the proof's claims match what we verified 381 + if (found !== proof.found) { 382 + return { 383 + valid: false, 384 + recordCid: null, 385 + found: false, 386 + error: `Proof claims found=${proof.found} but verification found found=${found}`, 387 + }; 388 + } 389 + 390 + if (found && recordCid !== proof.recordCid) { 391 + return { 392 + valid: false, 393 + recordCid: null, 394 + found: false, 395 + error: `Proof claims recordCid=${proof.recordCid} but verification found ${recordCid}`, 396 + }; 397 + } 398 + 399 + return { 400 + valid: true, 401 + recordCid: found ? recordCid : null, 402 + found, 403 + }; 404 + } catch (err) { 405 + return { 406 + valid: false, 407 + recordCid: null, 408 + found: false, 409 + error: err instanceof Error ? err.message : String(err), 410 + }; 411 + } 412 + }
+102 -1
src/replication/replication-manager.ts
··· 27 27 import { RepoFetcher } from "./repo-fetcher.js"; 28 28 import { PeerDiscovery } from "./peer-discovery.js"; 29 29 import { BlockVerifier, RemoteVerifier } from "./verification.js"; 30 + import { 31 + FirehoseSubscription, 32 + type FirehoseCommitEvent, 33 + } from "./firehose-subscription.js"; 30 34 31 35 /** How old cached peer info can be before re-fetching (1 hour). */ 32 36 const PEER_INFO_TTL_MS = 60 * 60 * 1000; ··· 42 46 private verificationConfig: VerificationConfig; 43 47 private lastVerificationResults: Map<string, LayeredVerificationResult> = 44 48 new Map(); 49 + private firehoseSubscription: FirehoseSubscription | null = null; 50 + private firehoseCursorSaveTimer: ReturnType<typeof setInterval> | null = null; 45 51 private stopped = false; 46 52 47 53 constructor( ··· 311 317 } 312 318 313 319 /** 314 - * Stop periodic sync and verification. 320 + * Start the firehose subscription for real-time updates. 321 + * The firehose provides streaming updates for serviced DIDs, 322 + * complementing the periodic polling sync. 323 + */ 324 + startFirehose(): void { 325 + if (this.firehoseSubscription) return; 326 + if (!this.config.FIREHOSE_ENABLED) return; 327 + if (this.config.REPLICATE_DIDS.length === 0) return; 328 + 329 + this.firehoseSubscription = new FirehoseSubscription({ 330 + firehoseUrl: this.config.FIREHOSE_URL, 331 + }); 332 + 333 + // Register handler for commit events 334 + this.firehoseSubscription.onCommit((event) => { 335 + this.handleFirehoseCommit(event).catch((err) => { 336 + console.error(`[replication] Firehose commit handler error for ${event.repo}:`, err); 337 + }); 338 + }); 339 + 340 + // Load saved cursor for resumption 341 + const savedCursor = this.syncStorage.getFirehoseCursor(); 342 + 343 + // Start with the set of configured DIDs 344 + const dids = new Set(this.config.REPLICATE_DIDS); 345 + this.firehoseSubscription.start(dids, savedCursor); 346 + 347 + // Periodically save the cursor to SQLite (every 30 seconds) 348 + this.firehoseCursorSaveTimer = setInterval(() => { 349 + this.saveFirehoseCursor(); 350 + }, 30_000); 351 + 352 + console.log( 353 + `[replication] Firehose subscription started` + 354 + (savedCursor !== null ? ` (resuming from cursor ${savedCursor})` : "") + 355 + ` — tracking ${dids.size} DIDs`, 356 + ); 357 + } 358 + 359 + /** 360 + * Handle a commit event from the firehose. 361 + * Triggers an incremental sync for the affected DID. 362 + */ 363 + private async handleFirehoseCommit(event: FirehoseCommitEvent): Promise<void> { 364 + const did = event.repo; 365 + 366 + // Only process if this DID is one we are tracking 367 + if (!this.config.REPLICATE_DIDS.includes(did)) return; 368 + 369 + try { 370 + // Trigger a sync for this specific DID to pull the latest changes. 371 + // The syncDid method already handles incremental sync via `since` parameter. 372 + await this.syncDid(did); 373 + } catch (err) { 374 + const message = err instanceof Error ? err.message : String(err); 375 + this.syncStorage.updateStatus(did, "error", message); 376 + } 377 + } 378 + 379 + /** 380 + * Save the current firehose cursor to persistent storage. 381 + */ 382 + private saveFirehoseCursor(): void { 383 + if (!this.firehoseSubscription) return; 384 + const cursor = this.firehoseSubscription.getCursor(); 385 + if (cursor !== null) { 386 + this.syncStorage.saveFirehoseCursor(cursor); 387 + } 388 + } 389 + 390 + /** 391 + * Get firehose subscription statistics. 392 + */ 393 + getFirehoseStats(): { 394 + connected: boolean; 395 + cursor: number | null; 396 + eventsReceived: number; 397 + eventsProcessed: number; 398 + trackedDids: number; 399 + } | null { 400 + if (!this.firehoseSubscription) return null; 401 + return this.firehoseSubscription.getStats(); 402 + } 403 + 404 + /** 405 + * Stop periodic sync, verification, and firehose subscription. 315 406 */ 316 407 stop(): void { 317 408 this.stopped = true; ··· 322 413 if (this.verificationTimer) { 323 414 clearInterval(this.verificationTimer); 324 415 this.verificationTimer = null; 416 + } 417 + // Save cursor and stop firehose 418 + if (this.firehoseCursorSaveTimer) { 419 + clearInterval(this.firehoseCursorSaveTimer); 420 + this.firehoseCursorSaveTimer = null; 421 + } 422 + if (this.firehoseSubscription) { 423 + this.saveFirehoseCursor(); 424 + this.firehoseSubscription.stop(); 425 + this.firehoseSubscription = null; 325 426 } 326 427 } 327 428
+2
src/replication/replication.test.ts
··· 53 53 IPFS_ENABLED: true, 54 54 IPFS_NETWORKING: false, 55 55 REPLICATE_DIDS: replicateDids, 56 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 57 + FIREHOSE_ENABLED: false, 56 58 }; 57 59 } 58 60
+48
src/replication/sync-storage.ts
··· 34 34 ); 35 35 `); 36 36 37 + // Firehose cursor table: stores the last-seen sequence number 38 + // for resumption after restart. 39 + this.db.exec(` 40 + CREATE TABLE IF NOT EXISTS firehose_cursor ( 41 + key TEXT PRIMARY KEY DEFAULT 'cursor', 42 + seq INTEGER NOT NULL, 43 + updated_at TEXT NOT NULL DEFAULT (datetime('now')) 44 + ); 45 + `); 46 + 37 47 // Migration: add root_cid column if missing (for existing databases) 38 48 const columns = this.db 39 49 .prepare("PRAGMA table_info(replication_state)") ··· 208 218 this.db 209 219 .prepare("DELETE FROM replication_blocks WHERE did = ?") 210 220 .run(did); 221 + } 222 + 223 + // ============================================ 224 + // Firehose cursor persistence 225 + // ============================================ 226 + 227 + /** 228 + * Save the firehose cursor (last processed sequence number). 229 + */ 230 + saveFirehoseCursor(seq: number): void { 231 + this.db 232 + .prepare( 233 + `INSERT INTO firehose_cursor (key, seq, updated_at) 234 + VALUES ('cursor', ?, datetime('now')) 235 + ON CONFLICT(key) DO UPDATE SET 236 + seq = excluded.seq, 237 + updated_at = datetime('now')`, 238 + ) 239 + .run(seq); 240 + } 241 + 242 + /** 243 + * Get the saved firehose cursor, or null if none exists. 244 + */ 245 + getFirehoseCursor(): number | null { 246 + const row = this.db 247 + .prepare("SELECT seq FROM firehose_cursor WHERE key = 'cursor'") 248 + .get() as { seq: number } | undefined; 249 + return row?.seq ?? null; 250 + } 251 + 252 + /** 253 + * Clear the firehose cursor (e.g., on full re-sync). 254 + */ 255 + clearFirehoseCursor(): void { 256 + this.db 257 + .prepare("DELETE FROM firehose_cursor WHERE key = 'cursor'") 258 + .run(); 211 259 } 212 260 213 261 private rowToState(row: Record<string, unknown>): SyncState {
+4
src/server.ts
··· 151 151 await replicationManager.init(); 152 152 replicationManager.startPeriodicSync(); 153 153 console.log(pc.dim(` Replication: tracking ${config.REPLICATE_DIDS.length} DIDs`)); 154 + // Start firehose subscription for real-time updates 155 + if (config.FIREHOSE_ENABLED) { 156 + replicationManager.startFirehose(); 157 + } 154 158 } catch (err) { 155 159 console.error(pc.red(` Replication startup failed:`), err); 156 160 }