atproto user agency toolkit for individuals and groups
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add P2P replication offer negotiation via lexicon records

Peers publish org.p2pds.replication.offer records declaring willingness
to replicate specific DIDs. Mutual offers are detected during sync and
automatically converted into PolicyEngine policies, driving the existing
replication and challenge machinery. Revoking an offer removes the
derived policy on the next discovery cycle.

+827 -1
+63
src/index.ts
··· 485 485 }); 486 486 487 487 // ============================================ 488 + // Replication offers 489 + // ============================================ 490 + 491 + app.post("/xrpc/org.p2pds.replication.publishOffer", requireAuth, async (c) => { 492 + if (!replicationManager) { 493 + return c.json({ error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 400); 494 + } 495 + const offerManager = replicationManager.getOfferManager(); 496 + if (!offerManager) { 497 + return c.json({ error: "PolicyNotEnabled", message: "Policy engine is required for offers" }, 400); 498 + } 499 + const body = await c.req.json<{ 500 + subject: string; 501 + minCopies?: number; 502 + intervalSec?: number; 503 + priority?: number; 504 + }>(); 505 + if (!body.subject) { 506 + return c.json({ error: "MissingParameter", message: "subject is required" }, 400); 507 + } 508 + const offer = await offerManager.publishOffer(body.subject, { 509 + minCopies: body.minCopies, 510 + intervalSec: body.intervalSec, 511 + priority: body.priority, 512 + }); 513 + return c.json({ offer }); 514 + }); 515 + 516 + app.get("/xrpc/org.p2pds.replication.listOffers", requireAuth, async (c) => { 517 + if (!replicationManager) { 518 + return c.json({ error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 400); 519 + } 520 + const offerManager = replicationManager.getOfferManager(); 521 + if (!offerManager) { 522 + return c.json({ error: "PolicyNotEnabled", message: "Policy engine is required for offers" }, 400); 523 + } 524 + const offers = await offerManager.getLocalOffers(); 525 + // Discover current agreements from tracked peers 526 + const states = replicationManager.getSyncStates(); 527 + const peers = states 528 + .filter((s) => s.pdsEndpoint) 529 + .map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 530 + const agreements = await offerManager.discoverAgreements(peers); 531 + return c.json({ offers, agreements }); 532 + }); 533 + 534 + app.post("/xrpc/org.p2pds.replication.revokeOffer", requireAuth, async (c) => { 535 + if (!replicationManager) { 536 + return c.json({ error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 400); 537 + } 538 + const offerManager = replicationManager.getOfferManager(); 539 + if (!offerManager) { 540 + return c.json({ error: "PolicyNotEnabled", message: "Policy engine is required for offers" }, 400); 541 + } 542 + const body = await c.req.json<{ subject: string }>(); 543 + if (!body.subject) { 544 + return c.json({ error: "MissingParameter", message: "subject is required" }, 400); 545 + } 546 + await offerManager.revokeOffer(body.subject); 547 + return c.json({ message: "Offer revoked" }); 548 + }); 549 + 550 + // ============================================ 488 551 // MST Proof serving 489 552 // ============================================ 490 553
+458
src/replication/offer-manager.test.ts
··· 1 + import { describe, it, expect, beforeEach, afterEach, vi } from "vitest"; 2 + import { mkdtempSync, rmSync } from "node:fs"; 3 + import { tmpdir } from "node:os"; 4 + import { join } from "node:path"; 5 + import Database from "better-sqlite3"; 6 + import { IpfsService } from "../ipfs.js"; 7 + import { RepoManager } from "../repo-manager.js"; 8 + import type { Config } from "../config.js"; 9 + import { PolicyEngine } from "../policy/engine.js"; 10 + import { OfferManager, type Agreement } from "./offer-manager.js"; 11 + import { PeerDiscovery } from "./peer-discovery.js"; 12 + import { OFFER_NSID, didToRkey, type OfferRecord } from "./types.js"; 13 + 14 + function testConfig(dataDir: string, did = "did:plc:local"): Config { 15 + return { 16 + DID: did, 17 + HANDLE: "test.example.com", 18 + PDS_HOSTNAME: "test.example.com", 19 + AUTH_TOKEN: "test-auth-token", 20 + SIGNING_KEY: 21 + "0000000000000000000000000000000000000000000000000000000000000001", 22 + SIGNING_KEY_PUBLIC: 23 + "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 24 + JWT_SECRET: "test-jwt-secret", 25 + PASSWORD_HASH: "$2a$10$test", 26 + DATA_DIR: dataDir, 27 + PORT: 3000, 28 + IPFS_ENABLED: true, 29 + IPFS_NETWORKING: false, 30 + REPLICATE_DIDS: [], 31 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 32 + FIREHOSE_ENABLED: false, 33 + }; 34 + } 35 + 36 + function makeOffer(subject: string, params?: Partial<OfferRecord>): OfferRecord { 37 + return { 38 + $type: OFFER_NSID, 39 + subject, 40 + minCopies: params?.minCopies ?? 2, 41 + intervalSec: params?.intervalSec ?? 600, 42 + priority: params?.priority ?? 50, 43 + createdAt: params?.createdAt ?? new Date().toISOString(), 44 + }; 45 + } 46 + 47 + describe("OfferManager", () => { 48 + let tmpDir: string; 49 + let db: InstanceType<typeof Database>; 50 + let ipfsService: IpfsService; 51 + let repoManager: RepoManager; 52 + let policyEngine: PolicyEngine; 53 + let offerManager: OfferManager; 54 + let mockPeerDiscovery: PeerDiscovery; 55 + 56 + const LOCAL_DID = "did:plc:local"; 57 + const PEER_A_DID = "did:plc:peerA"; 58 + const PEER_B_DID = "did:plc:peerB"; 59 + const PEER_C_DID = "did:plc:peerC"; 60 + 61 + beforeEach(async () => { 62 + tmpDir = mkdtempSync(join(tmpdir(), "offer-mgr-test-")); 63 + const config = testConfig(tmpDir, LOCAL_DID); 64 + 65 + db = new Database(join(tmpDir, "test.db")); 66 + ipfsService = new IpfsService({ 67 + blocksPath: join(tmpDir, "ipfs-blocks"), 68 + datastorePath: join(tmpDir, "ipfs-datastore"), 69 + networking: false, 70 + }); 71 + await ipfsService.start(); 72 + 73 + repoManager = new RepoManager(db, config); 74 + repoManager.init(undefined, ipfsService, ipfsService); 75 + 76 + policyEngine = new PolicyEngine(); 77 + 78 + // Mock PeerDiscovery — we control what remote peers "offer" 79 + mockPeerDiscovery = { 80 + discoverOffers: vi.fn().mockResolvedValue([]), 81 + } as unknown as PeerDiscovery; 82 + 83 + offerManager = new OfferManager( 84 + repoManager, 85 + mockPeerDiscovery, 86 + policyEngine, 87 + LOCAL_DID, 88 + ); 89 + }); 90 + 91 + afterEach(async () => { 92 + if (ipfsService.isRunning()) { 93 + await ipfsService.stop(); 94 + } 95 + db.close(); 96 + rmSync(tmpDir, { recursive: true, force: true }); 97 + }); 98 + 99 + // ============================================ 100 + // Unit tests 101 + // ============================================ 102 + 103 + it("publishOffer creates record with correct NSID and rkey", async () => { 104 + const offer = await offerManager.publishOffer(PEER_A_DID); 105 + 106 + expect(offer.$type).toBe(OFFER_NSID); 107 + expect(offer.subject).toBe(PEER_A_DID); 108 + expect(offer.minCopies).toBe(2); 109 + expect(offer.intervalSec).toBe(600); 110 + expect(offer.priority).toBe(50); 111 + 112 + // Verify it's actually in the repo 113 + const record = await repoManager.getRecord( 114 + OFFER_NSID, 115 + didToRkey(PEER_A_DID), 116 + ); 117 + expect(record).not.toBeNull(); 118 + expect((record!.record as OfferRecord).subject).toBe(PEER_A_DID); 119 + }); 120 + 121 + it("publishOffer uses custom params", async () => { 122 + const offer = await offerManager.publishOffer(PEER_A_DID, { 123 + minCopies: 5, 124 + intervalSec: 120, 125 + priority: 90, 126 + }); 127 + 128 + expect(offer.minCopies).toBe(5); 129 + expect(offer.intervalSec).toBe(120); 130 + expect(offer.priority).toBe(90); 131 + }); 132 + 133 + it("revokeOffer deletes record and removes derived policy", async () => { 134 + // Publish an offer 135 + await offerManager.publishOffer(PEER_A_DID); 136 + 137 + // Manually add a P2P policy (simulating what syncPolicies does) 138 + policyEngine.addPolicy({ 139 + id: `p2p:${PEER_A_DID}`, 140 + name: "test", 141 + target: { type: "list", dids: [PEER_A_DID] }, 142 + replication: { minCopies: 2 }, 143 + sync: { intervalSec: 600 }, 144 + retention: { maxAgeSec: 0, keepHistory: false }, 145 + priority: 50, 146 + enabled: true, 147 + }); 148 + 149 + await offerManager.revokeOffer(PEER_A_DID); 150 + 151 + // Record should be deleted 152 + const record = await repoManager.getRecord( 153 + OFFER_NSID, 154 + didToRkey(PEER_A_DID), 155 + ); 156 + expect(record).toBeNull(); 157 + 158 + // Policy should be removed 159 + const policies = policyEngine.getPolicies(); 160 + expect(policies.find((p) => p.id === `p2p:${PEER_A_DID}`)).toBeUndefined(); 161 + }); 162 + 163 + it("getLocalOffers returns all offers", async () => { 164 + await offerManager.publishOffer(PEER_A_DID); 165 + await offerManager.publishOffer(PEER_B_DID); 166 + 167 + const offers = await offerManager.getLocalOffers(); 168 + 169 + expect(offers).toHaveLength(2); 170 + const subjects = offers.map((o) => o.subject).sort(); 171 + expect(subjects).toEqual([PEER_A_DID, PEER_B_DID].sort()); 172 + }); 173 + 174 + it("discoverAgreements detects mutual offers as agreement", async () => { 175 + // We offer to replicate peer A 176 + await offerManager.publishOffer(PEER_A_DID); 177 + 178 + // Peer A offers to replicate our data 179 + const remoteOffer = makeOffer(LOCAL_DID); 180 + vi.mocked(mockPeerDiscovery.discoverOffers).mockResolvedValue([ 181 + remoteOffer, 182 + ]); 183 + 184 + const agreements = await offerManager.discoverAgreements([ 185 + { did: PEER_A_DID, pdsEndpoint: "https://pds-a.example.com" }, 186 + ]); 187 + 188 + expect(agreements).toHaveLength(1); 189 + expect(agreements[0].counterpartyDid).toBe(PEER_A_DID); 190 + expect(agreements[0].localOffer.subject).toBe(PEER_A_DID); 191 + expect(agreements[0].remoteOffer.subject).toBe(LOCAL_DID); 192 + }); 193 + 194 + it("discoverAgreements ignores one-sided offers (no agreement)", async () => { 195 + // We offer to replicate peer A 196 + await offerManager.publishOffer(PEER_A_DID); 197 + 198 + // Peer A does NOT offer to replicate our data (offers something else) 199 + vi.mocked(mockPeerDiscovery.discoverOffers).mockResolvedValue([ 200 + makeOffer("did:plc:someoneelse"), 201 + ]); 202 + 203 + const agreements = await offerManager.discoverAgreements([ 204 + { did: PEER_A_DID, pdsEndpoint: "https://pds-a.example.com" }, 205 + ]); 206 + 207 + expect(agreements).toHaveLength(0); 208 + }); 209 + 210 + it("discoverAgreements ignores peers we have no offer for", async () => { 211 + // We have NO local offers 212 + // Peer A offers to replicate our data 213 + vi.mocked(mockPeerDiscovery.discoverOffers).mockResolvedValue([ 214 + makeOffer(LOCAL_DID), 215 + ]); 216 + 217 + const agreements = await offerManager.discoverAgreements([ 218 + { did: PEER_A_DID, pdsEndpoint: "https://pds-a.example.com" }, 219 + ]); 220 + 221 + expect(agreements).toHaveLength(0); 222 + }); 223 + 224 + it("syncPolicies adds P2P policies to engine with correct params", () => { 225 + const agreements: Agreement[] = [ 226 + { 227 + counterpartyDid: PEER_A_DID, 228 + localOffer: makeOffer(PEER_A_DID, { minCopies: 3, intervalSec: 300, priority: 70 }), 229 + remoteOffer: makeOffer(LOCAL_DID, { minCopies: 2, intervalSec: 600, priority: 50 }), 230 + effectiveParams: { minCopies: 3, intervalSec: 300, priority: 70 }, 231 + }, 232 + ]; 233 + 234 + offerManager.syncPolicies(agreements); 235 + 236 + const policies = policyEngine.getPolicies(); 237 + expect(policies).toHaveLength(1); 238 + 239 + const policy = policies[0]; 240 + expect(policy.id).toBe(`p2p:${PEER_A_DID}`); 241 + expect(policy.target).toEqual({ type: "list", dids: [PEER_A_DID] }); 242 + expect(policy.replication.minCopies).toBe(3); 243 + expect(policy.sync.intervalSec).toBe(300); 244 + expect(policy.priority).toBe(70); 245 + expect(policy.enabled).toBe(true); 246 + }); 247 + 248 + it("syncPolicies removes stale policies when agreement revoked", () => { 249 + // First sync: agreement with peer A 250 + offerManager.syncPolicies([ 251 + { 252 + counterpartyDid: PEER_A_DID, 253 + localOffer: makeOffer(PEER_A_DID), 254 + remoteOffer: makeOffer(LOCAL_DID), 255 + effectiveParams: { minCopies: 2, intervalSec: 600, priority: 50 }, 256 + }, 257 + ]); 258 + expect(policyEngine.getPolicies()).toHaveLength(1); 259 + 260 + // Second sync: no agreements (offer revoked) 261 + offerManager.syncPolicies([]); 262 + expect(policyEngine.getPolicies()).toHaveLength(0); 263 + }); 264 + 265 + it("syncPolicies does not remove non-P2P policies", () => { 266 + // Add a manual policy 267 + policyEngine.addPolicy({ 268 + id: "manual-policy", 269 + name: "Manual", 270 + target: { type: "all" }, 271 + replication: { minCopies: 1 }, 272 + sync: { intervalSec: 300 }, 273 + retention: { maxAgeSec: 0, keepHistory: false }, 274 + priority: 50, 275 + enabled: true, 276 + }); 277 + 278 + // Sync with no agreements 279 + offerManager.syncPolicies([]); 280 + 281 + // Manual policy should remain 282 + expect(policyEngine.getPolicies()).toHaveLength(1); 283 + expect(policyEngine.getPolicies()[0].id).toBe("manual-policy"); 284 + }); 285 + 286 + it("parameter merging: max(minCopies), min(intervalSec), max(priority)", async () => { 287 + await offerManager.publishOffer(PEER_A_DID, { 288 + minCopies: 2, 289 + intervalSec: 300, 290 + priority: 40, 291 + }); 292 + 293 + vi.mocked(mockPeerDiscovery.discoverOffers).mockResolvedValue([ 294 + makeOffer(LOCAL_DID, { 295 + minCopies: 5, 296 + intervalSec: 600, 297 + priority: 80, 298 + }), 299 + ]); 300 + 301 + const agreements = await offerManager.discoverAgreements([ 302 + { did: PEER_A_DID, pdsEndpoint: "https://pds-a.example.com" }, 303 + ]); 304 + 305 + expect(agreements).toHaveLength(1); 306 + expect(agreements[0].effectiveParams).toEqual({ 307 + minCopies: 5, // max(2, 5) 308 + intervalSec: 300, // min(300, 600) 309 + priority: 80, // max(40, 80) 310 + }); 311 + }); 312 + 313 + it("multiple agreements: three-peer mutual-aid group", async () => { 314 + await offerManager.publishOffer(PEER_A_DID); 315 + await offerManager.publishOffer(PEER_B_DID); 316 + await offerManager.publishOffer(PEER_C_DID); 317 + 318 + // All three peers offer to replicate our data 319 + vi.mocked(mockPeerDiscovery.discoverOffers) 320 + .mockResolvedValueOnce([makeOffer(LOCAL_DID)]) // peer A 321 + .mockResolvedValueOnce([makeOffer(LOCAL_DID)]) // peer B 322 + .mockResolvedValueOnce([makeOffer(LOCAL_DID)]); // peer C 323 + 324 + const agreements = await offerManager.discoverAgreements([ 325 + { did: PEER_A_DID, pdsEndpoint: "https://pds-a.example.com" }, 326 + { did: PEER_B_DID, pdsEndpoint: "https://pds-b.example.com" }, 327 + { did: PEER_C_DID, pdsEndpoint: "https://pds-c.example.com" }, 328 + ]); 329 + 330 + expect(agreements).toHaveLength(3); 331 + const counterparties = agreements.map((a) => a.counterpartyDid).sort(); 332 + expect(counterparties).toEqual([PEER_A_DID, PEER_B_DID, PEER_C_DID].sort()); 333 + 334 + // Sync all three to policies 335 + offerManager.syncPolicies(agreements); 336 + expect(policyEngine.getPolicies()).toHaveLength(3); 337 + }); 338 + 339 + it("idempotent: re-running discoverAndSync doesn't create duplicates", async () => { 340 + await offerManager.publishOffer(PEER_A_DID); 341 + 342 + vi.mocked(mockPeerDiscovery.discoverOffers).mockResolvedValue([ 343 + makeOffer(LOCAL_DID), 344 + ]); 345 + 346 + const peers = [{ did: PEER_A_DID, pdsEndpoint: "https://pds-a.example.com" }]; 347 + 348 + // Run twice 349 + await offerManager.discoverAndSync(peers); 350 + await offerManager.discoverAndSync(peers); 351 + 352 + // Should still have exactly one policy 353 + expect(policyEngine.getPolicies()).toHaveLength(1); 354 + expect(policyEngine.getPolicies()[0].id).toBe(`p2p:${PEER_A_DID}`); 355 + }); 356 + 357 + it("syncPolicies updates policy when params change", async () => { 358 + // First sync with initial params 359 + offerManager.syncPolicies([ 360 + { 361 + counterpartyDid: PEER_A_DID, 362 + localOffer: makeOffer(PEER_A_DID, { minCopies: 2 }), 363 + remoteOffer: makeOffer(LOCAL_DID, { minCopies: 2 }), 364 + effectiveParams: { minCopies: 2, intervalSec: 600, priority: 50 }, 365 + }, 366 + ]); 367 + expect(policyEngine.getPolicies()[0].replication.minCopies).toBe(2); 368 + 369 + // Second sync with changed params 370 + offerManager.syncPolicies([ 371 + { 372 + counterpartyDid: PEER_A_DID, 373 + localOffer: makeOffer(PEER_A_DID, { minCopies: 5 }), 374 + remoteOffer: makeOffer(LOCAL_DID, { minCopies: 5 }), 375 + effectiveParams: { minCopies: 5, intervalSec: 300, priority: 80 }, 376 + }, 377 + ]); 378 + 379 + expect(policyEngine.getPolicies()).toHaveLength(1); 380 + expect(policyEngine.getPolicies()[0].replication.minCopies).toBe(5); 381 + expect(policyEngine.getPolicies()[0].sync.intervalSec).toBe(300); 382 + expect(policyEngine.getPolicies()[0].priority).toBe(80); 383 + }); 384 + 385 + // ============================================ 386 + // Integration test 387 + // ============================================ 388 + 389 + it("integration: two repos with mutual offers result in active replication policy", async () => { 390 + // Setup: create a second repo to represent peer A 391 + const tmpDir2 = mkdtempSync(join(tmpdir(), "offer-mgr-test-peer-")); 392 + const config2 = testConfig(tmpDir2, PEER_A_DID); 393 + const db2 = new Database(join(tmpDir2, "test.db")); 394 + const ipfsService2 = new IpfsService({ 395 + blocksPath: join(tmpDir2, "ipfs-blocks"), 396 + datastorePath: join(tmpDir2, "ipfs-datastore"), 397 + networking: false, 398 + }); 399 + await ipfsService2.start(); 400 + const repoManager2 = new RepoManager(db2, config2); 401 + repoManager2.init(undefined, ipfsService2, ipfsService2); 402 + 403 + try { 404 + // Peer A's policy engine and offer manager 405 + const policyEngine2 = new PolicyEngine(); 406 + const mockPeerDiscovery2 = { 407 + discoverOffers: vi.fn(), 408 + } as unknown as PeerDiscovery; 409 + const offerManager2 = new OfferManager( 410 + repoManager2, 411 + mockPeerDiscovery2, 412 + policyEngine2, 413 + PEER_A_DID, 414 + ); 415 + 416 + // Local publishes offer for peer A 417 + await offerManager.publishOffer(PEER_A_DID); 418 + 419 + // Peer A publishes offer for local 420 + await offerManager2.publishOffer(LOCAL_DID); 421 + 422 + // Now simulate discovery: local discovers peer A's offers 423 + // by reading peer A's repo directly 424 + const peerAOffers = await offerManager2.getLocalOffers(); 425 + vi.mocked(mockPeerDiscovery.discoverOffers).mockResolvedValue(peerAOffers); 426 + 427 + // And peer A discovers local's offers 428 + const localOffers = await offerManager.getLocalOffers(); 429 + vi.mocked(mockPeerDiscovery2.discoverOffers).mockResolvedValue(localOffers); 430 + 431 + // Both run discoverAndSync 432 + const agreementsLocal = await offerManager.discoverAndSync([ 433 + { did: PEER_A_DID, pdsEndpoint: "https://pds-a.example.com" }, 434 + ]); 435 + const agreementsPeerA = await offerManager2.discoverAndSync([ 436 + { did: LOCAL_DID, pdsEndpoint: "https://pds-local.example.com" }, 437 + ]); 438 + 439 + // Both should have detected the agreement 440 + expect(agreementsLocal).toHaveLength(1); 441 + expect(agreementsPeerA).toHaveLength(1); 442 + 443 + // Local's policy engine now has a policy for peer A 444 + expect(policyEngine.shouldReplicate(PEER_A_DID)).toBe(true); 445 + expect(policyEngine.getExplicitDids()).toContain(PEER_A_DID); 446 + 447 + // Peer A's policy engine now has a policy for local 448 + expect(policyEngine2.shouldReplicate(LOCAL_DID)).toBe(true); 449 + expect(policyEngine2.getExplicitDids()).toContain(LOCAL_DID); 450 + } finally { 451 + if (ipfsService2.isRunning()) { 452 + await ipfsService2.stop(); 453 + } 454 + db2.close(); 455 + rmSync(tmpDir2, { recursive: true, force: true }); 456 + } 457 + }); 458 + });
+223
src/replication/offer-manager.ts
··· 1 + /** 2 + * Manages replication offers: publishing, discovery, agreement detection, 3 + * and policy generation from mutual agreements. 4 + */ 5 + 6 + import type { RepoManager } from "../repo-manager.js"; 7 + import type { PeerDiscovery } from "./peer-discovery.js"; 8 + import type { PolicyEngine } from "../policy/engine.js"; 9 + import type { Policy } from "../policy/types.js"; 10 + import { 11 + OFFER_NSID, 12 + didToRkey, 13 + type OfferRecord, 14 + } from "./types.js"; 15 + 16 + /** A detected mutual replication agreement between two peers. */ 17 + export interface Agreement { 18 + counterpartyDid: string; 19 + localOffer: OfferRecord; 20 + remoteOffer: OfferRecord; 21 + effectiveParams: { minCopies: number; intervalSec: number; priority: number }; 22 + } 23 + 24 + /** Prefix for policy IDs generated from P2P agreements. */ 25 + const P2P_POLICY_PREFIX = "p2p:"; 26 + 27 + export class OfferManager { 28 + constructor( 29 + private repoManager: RepoManager, 30 + private peerDiscovery: PeerDiscovery, 31 + private policyEngine: PolicyEngine, 32 + private localDid: string, 33 + ) {} 34 + 35 + /** 36 + * Publish (or update) a replication offer for a subject DID. 37 + */ 38 + async publishOffer( 39 + subject: string, 40 + params?: { minCopies?: number; intervalSec?: number; priority?: number }, 41 + ): Promise<OfferRecord> { 42 + const record: OfferRecord = { 43 + $type: OFFER_NSID, 44 + subject, 45 + minCopies: params?.minCopies ?? 2, 46 + intervalSec: params?.intervalSec ?? 600, 47 + priority: params?.priority ?? 50, 48 + createdAt: new Date().toISOString(), 49 + }; 50 + 51 + await this.repoManager.putRecord(OFFER_NSID, didToRkey(subject), record); 52 + return record; 53 + } 54 + 55 + /** 56 + * Revoke a replication offer and remove any derived policy. 57 + */ 58 + async revokeOffer(subject: string): Promise<void> { 59 + await this.repoManager.deleteRecord(OFFER_NSID, didToRkey(subject)); 60 + // Remove the P2P policy derived from this offer 61 + this.policyEngine.removePolicy(`${P2P_POLICY_PREFIX}${subject}`); 62 + } 63 + 64 + /** 65 + * List all local offers from our repo. 66 + */ 67 + async getLocalOffers(): Promise<OfferRecord[]> { 68 + const result = await this.repoManager.listRecords(OFFER_NSID, { 69 + limit: 100, 70 + }); 71 + return result.records 72 + .map((r) => r.value) 73 + .filter( 74 + (v): v is OfferRecord => 75 + typeof v === "object" && 76 + v !== null && 77 + (v as Record<string, unknown>).$type === OFFER_NSID, 78 + ); 79 + } 80 + 81 + /** 82 + * Discover mutual agreements with a set of peers. 83 + * 84 + * A mutual agreement exists when: 85 + * 1. The remote peer has an offer where subject === our localDid 86 + * 2. We have a local offer where subject === the remote peer's DID 87 + */ 88 + async discoverAgreements( 89 + peers: Array<{ did: string; pdsEndpoint: string }>, 90 + ): Promise<Agreement[]> { 91 + const localOffers = await this.getLocalOffers(); 92 + const localOffersBySubject = new Map<string, OfferRecord>(); 93 + for (const offer of localOffers) { 94 + localOffersBySubject.set(offer.subject, offer); 95 + } 96 + 97 + const agreements: Agreement[] = []; 98 + 99 + for (const peer of peers) { 100 + // Check if we have an offer for this peer's DID 101 + const localOffer = localOffersBySubject.get(peer.did); 102 + if (!localOffer) continue; 103 + 104 + // Discover their offers 105 + let remoteOffers: OfferRecord[]; 106 + try { 107 + remoteOffers = await this.peerDiscovery.discoverOffers( 108 + peer.did, 109 + peer.pdsEndpoint, 110 + ); 111 + } catch { 112 + continue; // Skip peers we can't reach 113 + } 114 + 115 + // Check if they have an offer for our DID 116 + const remoteOffer = remoteOffers.find( 117 + (o) => o.subject === this.localDid, 118 + ); 119 + if (!remoteOffer) continue; 120 + 121 + // Mutual agreement detected 122 + agreements.push({ 123 + counterpartyDid: peer.did, 124 + localOffer, 125 + remoteOffer, 126 + effectiveParams: mergeOfferParams(localOffer, remoteOffer), 127 + }); 128 + } 129 + 130 + return agreements; 131 + } 132 + 133 + /** 134 + * Sync policies from detected agreements. 135 + * 136 + * - Adds P2P policies for new agreements 137 + * - Updates existing P2P policies if params changed 138 + * - Removes stale P2P policies whose agreements no longer exist 139 + */ 140 + syncPolicies(agreements: Agreement[]): void { 141 + const activeAgreementIds = new Set<string>(); 142 + 143 + for (const agreement of agreements) { 144 + const policyId = `${P2P_POLICY_PREFIX}${agreement.counterpartyDid}`; 145 + activeAgreementIds.add(policyId); 146 + 147 + const policy: Policy = { 148 + id: policyId, 149 + name: `P2P agreement with ${agreement.counterpartyDid}`, 150 + target: { type: "list", dids: [agreement.counterpartyDid] }, 151 + replication: { 152 + minCopies: agreement.effectiveParams.minCopies, 153 + }, 154 + sync: { 155 + intervalSec: agreement.effectiveParams.intervalSec, 156 + }, 157 + retention: { 158 + maxAgeSec: 0, 159 + keepHistory: false, 160 + }, 161 + priority: agreement.effectiveParams.priority, 162 + enabled: true, 163 + }; 164 + 165 + // Check if policy already exists with same params 166 + const existing = this.policyEngine 167 + .getPolicies() 168 + .find((p) => p.id === policyId); 169 + 170 + if (existing) { 171 + // Update only if params changed 172 + if ( 173 + existing.replication.minCopies !== policy.replication.minCopies || 174 + existing.sync.intervalSec !== policy.sync.intervalSec || 175 + existing.priority !== policy.priority 176 + ) { 177 + this.policyEngine.removePolicy(policyId); 178 + this.policyEngine.addPolicy(policy); 179 + } 180 + } else { 181 + this.policyEngine.addPolicy(policy); 182 + } 183 + } 184 + 185 + // Remove stale P2P policies 186 + for (const existing of this.policyEngine.getPolicies()) { 187 + if ( 188 + existing.id.startsWith(P2P_POLICY_PREFIX) && 189 + !activeAgreementIds.has(existing.id) 190 + ) { 191 + this.policyEngine.removePolicy(existing.id); 192 + } 193 + } 194 + } 195 + 196 + /** 197 + * Run the full discover-and-sync cycle: discover agreements, then sync policies. 198 + */ 199 + async discoverAndSync( 200 + peers: Array<{ did: string; pdsEndpoint: string }>, 201 + ): Promise<Agreement[]> { 202 + const agreements = await this.discoverAgreements(peers); 203 + this.syncPolicies(agreements); 204 + return agreements; 205 + } 206 + } 207 + 208 + /** 209 + * Merge parameters from two offers into effective params. 210 + * - minCopies: max (most protective) 211 + * - intervalSec: min (most frequent) 212 + * - priority: max (highest importance) 213 + */ 214 + function mergeOfferParams( 215 + a: OfferRecord, 216 + b: OfferRecord, 217 + ): { minCopies: number; intervalSec: number; priority: number } { 218 + return { 219 + minCopies: Math.max(a.minCopies, b.minCopies), 220 + intervalSec: Math.min(a.intervalSec, b.intervalSec), 221 + priority: Math.max(a.priority, b.priority), 222 + }; 223 + }
+29 -1
src/replication/peer-discovery.ts
··· 3 3 */ 4 4 5 5 import type { RepoFetcher } from "./repo-fetcher.js"; 6 - import { PEER_NSID, MANIFEST_NSID, type ManifestRecord } from "./types.js"; 6 + import { PEER_NSID, MANIFEST_NSID, OFFER_NSID, type ManifestRecord, type OfferRecord } from "./types.js"; 7 7 8 8 export interface PeerInfo { 9 9 pdsEndpoint: string; ··· 73 73 typeof v === "object" && 74 74 v !== null && 75 75 (v as Record<string, unknown>).$type === MANIFEST_NSID, 76 + ); 77 + } 78 + 79 + /** 80 + * Discover a peer's replication offers. 81 + * Fetches all org.p2pds.replication.offer records from their PDS. 82 + */ 83 + async discoverOffers( 84 + did: string, 85 + pdsEndpoint: string, 86 + ): Promise<OfferRecord[]> { 87 + const records = await this.repoFetcher.listRecords( 88 + pdsEndpoint, 89 + did, 90 + OFFER_NSID, 91 + ); 92 + 93 + return records 94 + .map((r) => r.value) 95 + .filter( 96 + (v): v is OfferRecord => 97 + typeof v === "object" && 98 + v !== null && 99 + (v as Record<string, unknown>).$type === OFFER_NSID && 100 + typeof (v as Record<string, unknown>).subject === "string" && 101 + typeof (v as Record<string, unknown>).minCopies === "number" && 102 + typeof (v as Record<string, unknown>).intervalSec === "number" && 103 + typeof (v as Record<string, unknown>).priority === "number", 76 104 ); 77 105 } 78 106 }
+43
src/replication/replication-manager.ts
··· 37 37 import { ChallengeScheduler } from "./challenge-response/challenge-scheduler.js"; 38 38 import { ChallengeStorage, type ChallengeHistoryRow, type PeerReliabilityRow } from "./challenge-response/challenge-storage.js"; 39 39 import type { ChallengeTransport } from "./challenge-response/transport.js"; 40 + import { OfferManager } from "./offer-manager.js"; 40 41 41 42 /** How old cached peer info can be before re-fetching (1 hour). */ 42 43 const PEER_INFO_TTL_MS = 60 * 60 * 1000; ··· 64 65 private challengeScheduler: ChallengeScheduler | null = null; 65 66 private stopped = false; 66 67 private policyEngine: PolicyEngine | null = null; 68 + private offerManager: OfferManager | null = null; 67 69 /** Per-DID last-sync timestamps (epoch ms) for policy-driven interval tracking. */ 68 70 private lastSyncTimestamps: Map<string, number> = new Map(); 69 71 ··· 93 95 ); 94 96 if (policyEngine) { 95 97 this.policyEngine = policyEngine; 98 + this.offerManager = new OfferManager( 99 + repoManager, 100 + this.peerDiscovery, 101 + policyEngine, 102 + config.DID, 103 + ); 96 104 } 97 105 } 98 106 ··· 104 112 } 105 113 106 114 /** 115 + * Get the OfferManager, if one is configured (requires PolicyEngine). 116 + */ 117 + getOfferManager(): OfferManager | null { 118 + return this.offerManager; 119 + } 120 + 121 + /** 107 122 * Initialize replication: create tables, publish identity, sync manifests. 108 123 */ 109 124 async init(): Promise<void> { ··· 111 126 this.challengeStorage.initSchema(); 112 127 await this.publishPeerIdentity(); 113 128 await this.syncManifests(); 129 + await this.runOfferDiscovery(); 114 130 } 115 131 116 132 /** ··· 201 217 } 202 218 203 219 /** 220 + * Run offer discovery: gather peers from sync state and discover agreements. 221 + * Non-fatal: errors are logged but don't block sync. 222 + */ 223 + private async runOfferDiscovery(): Promise<void> { 224 + if (!this.offerManager) return; 225 + 226 + try { 227 + const states = this.syncStorage.getAllStates(); 228 + const peers = states 229 + .filter((s) => s.pdsEndpoint) 230 + .map((s) => ({ did: s.did, pdsEndpoint: s.pdsEndpoint })); 231 + 232 + if (peers.length > 0) { 233 + await this.offerManager.discoverAndSync(peers); 234 + } 235 + } catch (err) { 236 + console.error( 237 + "[replication] Offer discovery error:", 238 + err instanceof Error ? err.message : String(err), 239 + ); 240 + } 241 + } 242 + 243 + /** 204 244 * Sync all configured DIDs. 205 245 * 206 246 * When a PolicyEngine is present: ··· 235 275 this.syncStorage.updateStatus(did, "error", message); 236 276 } 237 277 } 278 + 279 + // Re-run offer discovery to pick up new/revoked offers 280 + await this.runOfferDiscovery(); 238 281 } 239 282 240 283 /**
+11
src/replication/types.ts
··· 5 5 /** Lexicon NSIDs */ 6 6 export const PEER_NSID = "org.p2pds.peer"; 7 7 export const MANIFEST_NSID = "org.p2pds.manifest"; 8 + export const OFFER_NSID = "org.p2pds.replication.offer"; 8 9 9 10 /** Peer identity record — binds a DID to an IPFS PeerID. */ 10 11 export interface PeerIdentityRecord { ··· 22 23 lastSyncRev: string | null; 23 24 lastSyncAt: string | null; 24 25 createdAt: string; 26 + } 27 + 28 + /** Replication offer — declares willingness to replicate a given DID's data. */ 29 + export interface OfferRecord { 30 + $type: typeof OFFER_NSID; 31 + subject: string; // DID whose data I will replicate 32 + minCopies: number; // Desired redundancy (default 2) 33 + intervalSec: number; // Sync frequency in seconds (default 600) 34 + priority: number; // 0-100 (default 50) 35 + createdAt: string; // ISO 8601 25 36 } 26 37 27 38 /** Operational sync state tracked in SQLite (not in repo). */