atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add per-DID PLC log status in UI, kadDHT client mode, and cross-node PLC sharing

- Show PLC log status (archived/validated/tombstoned) in DID detail view
- Enable kadDHT client mode for content routing and PLC log discovery
- Add provideForDid/findProvidersForDid to NetworkService for DHT announcements
- Add computeDiscoveryCid for deterministic DID→CID mapping
- Add public /xrpc/org.p2pds.plc.getLog endpoint for cross-node PLC exchange
- Fall back to DHT peer discovery when plc.directory is unavailable
- Re-announce mirrored DIDs to DHT on periodic refresh

+265 -4
+34
src/identity/plc-mirror.ts
··· 275 275 return rows.map((r) => r.did); 276 276 } 277 277 278 + /** 279 + * Compute a deterministic DHT discovery CID from a DID string. 280 + * SHA-256 hash of the DID bytes → CIDv1 with raw codec. 281 + * Used to announce/discover PLC log providers via the DHT. 282 + */ 283 + export async function computeDiscoveryCid(did: string): Promise<string> { 284 + const RAW_CODEC = 0x55; 285 + const didBytes = new TextEncoder().encode(did); 286 + const hash = await sha256.digest(didBytes); 287 + const cid = CID.createV1(RAW_CODEC, hash); 288 + return cid.toString(base32); 289 + } 290 + 291 + /** 292 + * Publish DHT provider records for all mirrored DIDs. 293 + * Called periodically alongside the PLC mirror refresh timer. 294 + */ 295 + export async function publishPlcProviders( 296 + routing: { provide(cid: unknown): Promise<void> }, 297 + dids: string[], 298 + ): Promise<void> { 299 + const RAW_CODEC = 0x55; 300 + for (const did of dids) { 301 + try { 302 + const didBytes = new TextEncoder().encode(did); 303 + const hash = await sha256.digest(didBytes); 304 + const cid = CID.createV1(RAW_CODEC, hash); 305 + await routing.provide(cid); 306 + } catch { 307 + // fire-and-forget per-DID 308 + } 309 + } 310 + } 311 + 278 312 // ============================================ 279 313 // SQLite storage helpers 280 314 // ============================================
+14
src/index.ts
··· 178 178 }), 179 179 ); 180 180 181 + // Public PLC log endpoint (unauthenticated, rate-limited) 182 + app.use( 183 + "/xrpc/org.p2pds.plc.getLog", 184 + rateLimitMiddleware(rateLimiter, { 185 + pool: "plcLog", 186 + rule: { maxRequests: config.RATE_LIMIT_CHALLENGE_PER_MIN, windowMs: w }, 187 + }), 188 + ); 189 + 181 190 // App endpoints 182 191 const appRL = rateLimitMiddleware(rateLimiter, { 183 192 pool: "app", ··· 687 696 // Incoming revocation notification (unauthenticated) 688 697 app.post("/xrpc/org.p2pds.replication.notifyRevoke", (c) => 689 698 app_routes.notifyRevoke(c, getConfigDid(), replicationManager), 699 + ); 700 + 701 + // Public PLC log endpoint (unauthenticated — PLC logs are public data) 702 + app.get("/xrpc/org.p2pds.plc.getLog", (c) => 703 + app_routes.getPlcLogPublic(c, db), 690 704 ); 691 705 692 706 app.post("/xrpc/org.p2pds.replication.revokeOffer", requireAuth, async (c) => {
+49 -3
src/ipfs.ts
··· 84 84 onIdentityNotification(handler: IdentityNotificationHandler): void; 85 85 subscribeIdentityTopics(dids: string[]): void; 86 86 unsubscribeIdentityTopics(dids: string[]): void; 87 + provideForDid(did: string): Promise<void>; 88 + findProvidersForDid(did: string): Promise<string[]>; 87 89 } 88 90 89 91 export interface IpfsConfig { ··· 133 135 const { yamux } = await import("@chainsafe/libp2p-yamux"); 134 136 const { identify } = await import("@libp2p/identify"); 135 137 const { autoNAT } = await import("@libp2p/autonat"); 136 - // kadDHT removed — not needed for minimal networking 138 + const { kadDHT, removePrivateAddressesMapper } = await import("@libp2p/kad-dht"); 137 139 const { bootstrap } = await import("@libp2p/bootstrap"); 138 140 const { ping } = await import("@libp2p/ping"); 139 141 const datastore = new SqliteDatastore(this.config.db); 140 142 141 143 // libp2p config: direct peer connections + lightweight diagnostics. 142 - // - Amino DHT in client mode: connects to public IPFS bootstrap peers 143 - // for routing but doesn't serve DHT queries. Lightweight. 144 + // - Amino DHT in client mode: queries the public IPFS DHT for content 145 + // routing and PLC log discovery, but doesn't serve DHT queries. 144 146 // - AutoNAT: asks connected peers to dial us back, confirming reachability. 145 147 // - Bootstrap: connects to standard IPFS bootstrap peers on startup. 146 148 // Cast SQLite stores to Helia's expected interfaces. ··· 156 158 identify: identify(), 157 159 ping: ping(), 158 160 autoNAT: autoNAT(), 161 + aminoDHT: kadDHT({ 162 + protocol: "/ipfs/kad/1.0.0", 163 + peerInfoMapper: removePrivateAddressesMapper, 164 + clientMode: true, 165 + }), 159 166 }, 160 167 peerDiscovery: [ 161 168 bootstrap({ ··· 521 528 .getConnections() 522 529 .filter((conn) => conn.remotePeer.toString() === peerId) 523 530 .map((conn) => conn.remoteAddr.toString()); 531 + } 532 + 533 + /** 534 + * Announce this node as a provider for a DID's PLC log via the DHT. 535 + * Computes a deterministic CID from the DID and calls routing.provide(). 536 + */ 537 + async provideForDid(did: string): Promise<void> { 538 + if (!this.helia) return; 539 + try { 540 + const { computeDiscoveryCid } = await import("./identity/plc-mirror.js"); 541 + const cidStr = await computeDiscoveryCid(did); 542 + const cid = CID.parse(cidStr); 543 + await this.helia.routing.provide(cid); 544 + } catch { 545 + // fire-and-forget 546 + } 547 + } 548 + 549 + /** 550 + * Find providers for a DID's PLC log via the DHT. 551 + * Returns multiaddrs of peers that have announced they hold this DID's log. 552 + */ 553 + async findProvidersForDid(did: string): Promise<string[]> { 554 + if (!this.helia) return []; 555 + try { 556 + const { computeDiscoveryCid } = await import("./identity/plc-mirror.js"); 557 + const cidStr = await computeDiscoveryCid(did); 558 + const cid = CID.parse(cidStr); 559 + const providers: string[] = []; 560 + for await (const provider of this.helia.routing.findProviders(cid)) { 561 + for (const ma of provider.multiaddrs) { 562 + providers.push(ma.toString()); 563 + } 564 + if (providers.length >= 10) break; 565 + } 566 + return providers; 567 + } catch { 568 + return []; 569 + } 524 570 } 525 571 526 572 /**
+2
src/replication/e2e-sync.test.ts
··· 70 70 onIdentityNotification: () => {}, 71 71 subscribeIdentityTopics: () => {}, 72 72 unsubscribeIdentityTopics: () => {}, 73 + provideForDid: async () => {}, 74 + findProvidersForDid: async () => [], 73 75 }; 74 76 75 77 describe("E2E sync integration", () => {
+2
src/replication/gossipsub-notifications.test.ts
··· 255 255 onIdentityNotification: vi.fn(), 256 256 subscribeIdentityTopics: vi.fn(), 257 257 unsubscribeIdentityTopics: vi.fn(), 258 + provideForDid: vi.fn().mockResolvedValue(undefined), 259 + findProvidersForDid: vi.fn().mockResolvedValue([]), 258 260 // Test helpers 259 261 _handlers: handlers, 260 262 _subscribedDids: subscribedDids,
+2
src/replication/peer-freshness.test.ts
··· 98 98 getMultiaddrs: vi.fn().mockImplementation(() => multiaddrs), 99 99 getConnectionCount: vi.fn().mockReturnValue(0), 100 100 getRemoteAddrs: vi.fn().mockImplementation((peerId: string) => remoteAddrsMap[peerId] ?? []), 101 + provideForDid: vi.fn().mockResolvedValue(undefined), 102 + findProvidersForDid: vi.fn().mockResolvedValue([]), 101 103 // Test helpers 102 104 _commitHandlers: commitHandlers, 103 105 _identityHandlers: identityHandlers,
+88 -1
src/replication/replication-manager.ts
··· 2337 2337 private async fetchPlcLog(did: string): Promise<void> { 2338 2338 const { fetchAndValidateLog } = await import("../identity/plc-mirror.js"); 2339 2339 await fetchAndValidateLog(this.db, did); 2340 + // Announce via DHT that we hold this DID's PLC log 2341 + this.networkService.provideForDid(did).catch(() => {}); 2340 2342 } 2341 2343 2342 2344 /** 2343 2345 * Refresh PLC logs for all tracked did:plc DIDs. 2344 2346 */ 2345 2347 private async refreshAllPlcLogs(): Promise<void> { 2346 - const { getAllPlcMirrorDids, fetchAndValidateLog } = await import("../identity/plc-mirror.js"); 2348 + const { getAllPlcMirrorDids, fetchAndValidateLog, publishPlcProviders } = await import("../identity/plc-mirror.js"); 2347 2349 2348 2350 // Refresh existing mirrored DIDs 2349 2351 const mirroredDids = getAllPlcMirrorDids(this.db); ··· 2356 2358 `[replication] PLC log refresh for ${did} failed:`, 2357 2359 err instanceof Error ? err.message : String(err), 2358 2360 ); 2361 + // Fallback: try discovering PLC log from DHT peers 2362 + await this.fetchPlcLogFromPeers(did).catch(() => {}); 2359 2363 } 2360 2364 } 2361 2365 ··· 2372 2376 `[replication] PLC log fetch for ${did} failed:`, 2373 2377 err instanceof Error ? err.message : String(err), 2374 2378 ); 2379 + // Fallback: try discovering PLC log from DHT peers 2380 + await this.fetchPlcLogFromPeers(did).catch(() => {}); 2381 + } 2382 + } 2383 + 2384 + // Re-announce all mirrored DIDs to the DHT 2385 + const allMirrored = getAllPlcMirrorDids(this.db); 2386 + if (allMirrored.length > 0) { 2387 + const helia = this.networkService as unknown as { getLibp2p?(): unknown }; 2388 + if (typeof helia.getLibp2p === "function" && helia.getLibp2p()) { 2389 + // Use the IpfsService routing directly via provideForDid 2390 + for (const did of allMirrored) { 2391 + this.networkService.provideForDid(did).catch(() => {}); 2392 + } 2393 + } 2394 + } 2395 + } 2396 + 2397 + /** 2398 + * Fallback: discover PLC log providers via DHT and fetch from a peer. 2399 + * Used when plc.directory is unavailable. 2400 + */ 2401 + private async fetchPlcLogFromPeers(did: string): Promise<void> { 2402 + const providers = await this.networkService.findProvidersForDid(did); 2403 + if (providers.length === 0) return; 2404 + 2405 + const { validateOperationChain, getStoredLog } = await import("../identity/plc-mirror.js"); 2406 + 2407 + for (const multiaddr of providers) { 2408 + try { 2409 + // Extract the host:port from the multiaddr for an HTTP fetch 2410 + const hostMatch = multiaddr.match(/\/ip[46]\/([^/]+)\/tcp\/(\d+)/); 2411 + if (!hostMatch) continue; 2412 + const [, host, port] = hostMatch; 2413 + 2414 + const url = `http://${host}:${port}/xrpc/org.p2pds.plc.getLog?did=${encodeURIComponent(did)}`; 2415 + const res = await fetch(url, { signal: AbortSignal.timeout(10000) }); 2416 + if (!res.ok) continue; 2417 + 2418 + const data = await res.json() as { operations?: unknown[]; status?: { validated?: boolean } }; 2419 + if (!data.operations || !Array.isArray(data.operations)) continue; 2420 + 2421 + // Validate the returned chain independently 2422 + const validation = await validateOperationChain(data.operations as any, did); 2423 + if (!validation.valid) continue; 2424 + 2425 + // Store it — we verified it ourselves 2426 + const existingLog = getStoredLog(this.db, did); 2427 + const newOpCount = data.operations.length; 2428 + 2429 + // Accept if we don't have it yet, or if the new chain is longer 2430 + if (!existingLog || newOpCount > existingLog.status.opCount) { 2431 + const isTombstoned = (data.operations as any[]).some( 2432 + (op: any) => !op.nullified && op.operation?.type === "plc_tombstone", 2433 + ); 2434 + const lastOpCreatedAt = newOpCount > 0 2435 + ? (data.operations as any[])[newOpCount - 1]!.createdAt ?? null 2436 + : null; 2437 + 2438 + this.db.prepare( 2439 + `INSERT INTO plc_mirror (did, operations_json, op_count, last_fetched_at, last_op_created_at, validated, is_tombstoned) 2440 + VALUES (?, ?, ?, ?, ?, ?, ?) 2441 + ON CONFLICT(did) DO UPDATE SET 2442 + operations_json = excluded.operations_json, 2443 + op_count = excluded.op_count, 2444 + last_fetched_at = excluded.last_fetched_at, 2445 + last_op_created_at = excluded.last_op_created_at, 2446 + validated = excluded.validated, 2447 + is_tombstoned = excluded.is_tombstoned`, 2448 + ).run( 2449 + did, 2450 + JSON.stringify(data.operations), 2451 + newOpCount, 2452 + new Date().toISOString(), 2453 + lastOpCreatedAt, 2454 + validation.valid ? 1 : 0, 2455 + isTombstoned ? 1 : 0, 2456 + ); 2457 + console.log(`[replication] PLC log for ${did} fetched from peer (${newOpCount} ops)`); 2458 + return; 2459 + } 2460 + } catch { 2461 + // Try next provider 2375 2462 } 2376 2463 } 2377 2464 }
+17
src/ui/components/replication-row.ts
··· 199 199 ` 200 200 : html`<p style="color:var(--muted);font-size:0.8rem">No recent syncs</p>`} 201 201 202 + ${d.plcLogStatus 203 + ? html` 204 + <div style="margin-top:0.75rem;margin-bottom:0.75rem"> 205 + <strong style="font-size:0.75rem">Identity Log</strong> 206 + <div style="margin-top:0.25rem;font-size:0.8rem"> 207 + ${d.plcLogStatus.archived 208 + ? html` 209 + ${statusDot(d.plcLogStatus.validated ? "synced" : "error")} 210 + <span>${d.plcLogStatus.opCount} operation${d.plcLogStatus.opCount !== 1 ? "s" : ""} archived</span> 211 + ${d.plcLogStatus.isTombstoned ? html`<span style="color:var(--danger);margin-left:0.5rem">tombstoned</span>` : ""} 212 + ` 213 + : html`${statusDot("syncing")}<span>Not yet archived</span>`} 214 + </div> 215 + </div> 216 + ` 217 + : ""} 218 + 202 219 ${canRevoke 203 220 ? html` 204 221 <button
+2
src/xrpc/app-e2e.test.ts
··· 71 71 onIdentityNotification: () => {}, 72 72 subscribeIdentityTopics: () => {}, 73 73 unsubscribeIdentityTopics: () => {}, 74 + provideForDid: async () => {}, 75 + findProvidersForDid: async () => [], 74 76 }; 75 77 76 78 function startServer(
+12
src/xrpc/app.test.ts
··· 182 182 onIdentityNotification: () => {}, 183 183 subscribeIdentityTopics: () => {}, 184 184 unsubscribeIdentityTopics: () => {}, 185 + provideForDid: async () => {}, 186 + findProvidersForDid: async () => [], 185 187 }; 186 188 187 189 const mockDidResolver = { ··· 272 274 onIdentityNotification: () => {}, 273 275 subscribeIdentityTopics: () => {}, 274 276 unsubscribeIdentityTopics: () => {}, 277 + provideForDid: async () => {}, 278 + findProvidersForDid: async () => [], 275 279 }; 276 280 277 281 replicationManager = new ReplicationManager( ··· 406 410 onIdentityNotification: () => {}, 407 411 subscribeIdentityTopics: () => {}, 408 412 unsubscribeIdentityTopics: () => {}, 413 + provideForDid: async () => {}, 414 + findProvidersForDid: async () => [], 409 415 }; 410 416 411 417 const firehose = new Firehose(repoManager); ··· 483 489 onIdentityNotification: () => {}, 484 490 subscribeIdentityTopics: () => {}, 485 491 unsubscribeIdentityTopics: () => {}, 492 + provideForDid: async () => {}, 493 + findProvidersForDid: async () => [], 486 494 }; 487 495 488 496 const replicationManager = new ReplicationManager( ··· 550 558 onIdentityNotification: () => {}, 551 559 subscribeIdentityTopics: () => {}, 552 560 unsubscribeIdentityTopics: () => {}, 561 + provideForDid: async () => {}, 562 + findProvidersForDid: async () => [], 553 563 }; 554 564 555 565 const replicationManager = new ReplicationManager( ··· 663 673 onIdentityNotification: () => {}, 664 674 subscribeIdentityTopics: () => {}, 665 675 unsubscribeIdentityTopics: () => {}, 676 + provideForDid: async () => {}, 677 + findProvidersForDid: async () => [], 666 678 }; 667 679 668 680 const mockDidResolver = {
+43
src/xrpc/app.ts
··· 9 9 import { exportRepoAsCar, exportBlobs } from "../replication/car-export.js"; 10 10 import { detectContentType } from "../format.js"; 11 11 import * as rotationKeys from "../identity/rotation-keys.js"; 12 + import { getStoredLog } from "../identity/plc-mirror.js"; 13 + import type Database from "better-sqlite3"; 12 14 13 15 const VERSION = "0.1.0"; 14 16 ··· 126 128 .filter((s) => s.status === "success" && s.startedAt >= cutoff) 127 129 .reduce((sum, s) => sum + (s.carBytes ?? 0) + (s.blobBytes ?? 0), 0); 128 130 131 + const plcLog = replicationManager.getPlcLogStatus(did); 132 + 129 133 return c.json({ 130 134 did, 131 135 syncState, ··· 140 144 peerEndpoints, 141 145 verification, 142 146 effectivePolicy, 147 + plcLogStatus: plcLog ? { archived: true, ...plcLog } : { archived: false }, 143 148 }); 144 149 } 145 150 ··· 1016 1021 } 1017 1022 1018 1023 return c.json(replicationManager.getPlcMirrorStatus()); 1024 + } 1025 + 1026 + /** 1027 + * Public (unauthenticated) endpoint to serve a stored PLC log. 1028 + * PLC logs are public data — this enables cross-node PLC log sharing. 1029 + */ 1030 + export function getPlcLogPublic( 1031 + c: Context<AppEnv>, 1032 + db: Database.Database | undefined, 1033 + ): Response { 1034 + const did = c.req.query("did"); 1035 + if (!did) { 1036 + return c.json( 1037 + { error: "MissingParameter", message: "did is required" }, 1038 + 400, 1039 + ); 1040 + } 1041 + 1042 + if (!db) { 1043 + return c.json( 1044 + { error: "NotAvailable", message: "Database not available" }, 1045 + 400, 1046 + ); 1047 + } 1048 + 1049 + const stored = getStoredLog(db, did); 1050 + if (!stored) { 1051 + return c.json( 1052 + { error: "NotFound", message: `No PLC log stored for ${did}` }, 1053 + 404, 1054 + ); 1055 + } 1056 + 1057 + return c.json({ 1058 + did, 1059 + operations: stored.operations, 1060 + status: stored.status, 1061 + }); 1019 1062 } 1020 1063 1021 1064 // ============================================