atproto user agency toolkit for individuals and groups
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Populate record paths during sync for challenge generation

Extract all record paths via full MST walk after syncDid() and track
paths incrementally from firehose ops, enabling the challenge scheduler
to generate MST proof challenges against replicated repos.

+157 -1
+57 -1
src/replication/mst-proof.test.ts
··· 7 7 import { RepoManager } from "../repo-manager.js"; 8 8 import type { Config } from "../config.js"; 9 9 import { readCarWithRoot } from "@atproto/repo"; 10 - import { generateMstProof, verifyMstProof } from "./mst-proof.js"; 10 + import { generateMstProof, verifyMstProof, extractAllRecordPaths } from "./mst-proof.js"; 11 11 12 12 function testConfig(dataDir: string): Config { 13 13 return { ··· 583 583 ); 584 584 expect(missingVerify.valid).toBe(true); 585 585 expect(missingVerify.found).toBe(false); 586 + }); 587 + 588 + // ============================================ 589 + // extractAllRecordPaths 590 + // ============================================ 591 + 592 + it("extractAllRecordPaths returns all record paths", async () => { 593 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 594 + $type: "app.bsky.feed.post", 595 + text: "Post one", 596 + createdAt: new Date().toISOString(), 597 + }); 598 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 599 + $type: "app.bsky.feed.post", 600 + text: "Post two", 601 + createdAt: new Date().toISOString(), 602 + }); 603 + await repoManager.putRecord("app.bsky.actor.profile", "self", { 604 + $type: "app.bsky.actor.profile", 605 + displayName: "Test User", 606 + }); 607 + 608 + const rootCid = await getRepoRootCid(); 609 + 610 + const paths = await extractAllRecordPaths(ipfsService, rootCid); 611 + 612 + // Should find all three records 613 + expect(paths.length).toBe(3); 614 + expect(paths.filter((p) => p.startsWith("app.bsky.feed.post/")).length).toBe(2); 615 + expect(paths).toContain("app.bsky.actor.profile/self"); 616 + }); 617 + 618 + it("extractAllRecordPaths returns sorted paths for a large repo", async () => { 619 + for (let i = 0; i < 30; i++) { 620 + await repoManager.createRecord("app.bsky.feed.post", undefined, { 621 + $type: "app.bsky.feed.post", 622 + text: `Post ${i}`, 623 + createdAt: new Date().toISOString(), 624 + }); 625 + } 626 + 627 + const rootCid = await getRepoRootCid(); 628 + const paths = await extractAllRecordPaths(ipfsService, rootCid); 629 + 630 + expect(paths.length).toBe(30); 631 + // MST in-order walk produces sorted keys 632 + const sorted = [...paths].sort(); 633 + expect(paths).toEqual(sorted); 634 + }); 635 + 636 + it("extractAllRecordPaths returns empty for missing commit", async () => { 637 + const paths = await extractAllRecordPaths( 638 + ipfsService, 639 + "bafyreig6mxqmjlb7yjbhhhz6vqmtiw4kgipvhqoowdkggjlpzpd5tcm4", 640 + ); 641 + expect(paths).toEqual([]); 586 642 }); 587 643 });
+53
src/replication/mst-proof.ts
··· 120 120 return result; 121 121 } 122 122 123 + // ---------- Full MST walk ---------- 124 + 125 + /** 126 + * Walk an MST node recursively, collecting all record paths (keys). 127 + */ 128 + async function walkMstNode( 129 + blockStore: BlockStore, 130 + nodeCid: CID, 131 + paths: string[], 132 + ): Promise<void> { 133 + const bytes = await blockStore.getBlock(nodeCid.toString()); 134 + if (!bytes) return; 135 + 136 + const nodeData = decodeNodeData(bytes); 137 + const entries = expandEntries(nodeData); 138 + 139 + // Visit left subtree first 140 + if (nodeData.l) { 141 + await walkMstNode(blockStore, nodeData.l, paths); 142 + } 143 + 144 + // Visit each entry: collect key, then recurse into right subtree 145 + for (const entry of entries) { 146 + paths.push(entry.key); 147 + if (entry.subtree) { 148 + await walkMstNode(blockStore, entry.subtree, paths); 149 + } 150 + } 151 + } 152 + 153 + /** 154 + * Extract all record paths from a repo by walking the full MST. 155 + * 156 + * @param blockStore - Block storage containing the repo blocks 157 + * @param commitCid - CID of the commit block (repo head) 158 + * @returns All record paths in the repo (e.g. "app.bsky.feed.post/abc123") 159 + */ 160 + export async function extractAllRecordPaths( 161 + blockStore: BlockStore, 162 + commitCid: string, 163 + ): Promise<string[]> { 164 + const commitBytes = await blockStore.getBlock(commitCid); 165 + if (!commitBytes) return []; 166 + 167 + const commitObj = cborDecode(commitBytes) as { data: CID }; 168 + const mstRootCid = commitObj.data; 169 + if (!mstRootCid) return []; 170 + 171 + const paths: string[] = []; 172 + await walkMstNode(blockStore, mstRootCid, paths); 173 + return paths; 174 + } 175 + 123 176 // ---------- Generation ---------- 124 177 125 178 /**
+31
src/replication/replication-manager.ts
··· 29 29 import { RepoFetcher } from "./repo-fetcher.js"; 30 30 import { PeerDiscovery } from "./peer-discovery.js"; 31 31 import { BlockVerifier, RemoteVerifier } from "./verification.js"; 32 + import { extractAllRecordPaths } from "./mst-proof.js"; 32 33 import { 33 34 FirehoseSubscription, 34 35 type FirehoseCommitEvent, ··· 388 389 // 9b. Invalidate cached ReadableRepo so it reloads with new root 389 390 this.replicatedRepoReader?.invalidateCache(did); 390 391 392 + // 9c. Track record paths for challenge generation (full MST walk) 393 + try { 394 + const recordPaths = await extractAllRecordPaths( 395 + this.blockStore, 396 + rootCidStr, 397 + ); 398 + this.syncStorage.clearRecordPaths(did); 399 + this.syncStorage.trackRecordPaths(did, recordPaths); 400 + } catch { 401 + // Non-fatal: path extraction is best-effort 402 + } 403 + 391 404 // 10. Update manifest record 392 405 const rkey = didToRkey(did); 393 406 const existingManifest = await this.repoManager.getRecord( ··· 659 672 660 673 // 8. Invalidate cached ReadableRepo so it reloads with new root 661 674 this.replicatedRepoReader?.invalidateCache(did); 675 + 676 + // 8b. Track record paths incrementally from firehose ops 677 + try { 678 + const createdPaths = event.ops 679 + .filter((op) => op.action === "create" || op.action === "update") 680 + .map((op) => op.path); 681 + const deletedPaths = event.ops 682 + .filter((op) => op.action === "delete") 683 + .map((op) => op.path); 684 + if (createdPaths.length > 0) { 685 + this.syncStorage.trackRecordPaths(did, createdPaths); 686 + } 687 + if (deletedPaths.length > 0) { 688 + this.syncStorage.removeRecordPaths(did, deletedPaths); 689 + } 690 + } catch { 691 + // Non-fatal: path tracking is best-effort 692 + } 662 693 663 694 // 9. Update manifest record 664 695 const rkey = didToRkey(did);
+16
src/replication/sync-storage.ts
··· 300 300 } 301 301 302 302 /** 303 + * Remove specific record paths for a DID (batch delete). 304 + */ 305 + removeRecordPaths(did: string, paths: string[]): void { 306 + if (paths.length === 0) return; 307 + const remove = this.db.prepare( 308 + "DELETE FROM replication_record_paths WHERE did = ? AND record_path = ?", 309 + ); 310 + const batch = this.db.transaction((items: string[]) => { 311 + for (const path of items) { 312 + remove.run(did, path); 313 + } 314 + }); 315 + batch(paths); 316 + } 317 + 318 + /** 303 319 * Clear all tracked record paths for a DID. 304 320 */ 305 321 clearRecordPaths(did: string): void {