atproto user agency toolkit for individuals and groups
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add replication observability metrics and redesigned dashboard

- sync_history table tracks every sync event with source type, block/blob
counts, byte sizes, duration, and status
- size_bytes column on replication_blocks and replication_blobs for
accurate storage accounting
- Aggregate metrics API: total blocks/blobs/records/bytes held, syncs,
24h transfer volume
- Per-DID metrics: record count, bytes held, recent sync history
- New getSyncHistory endpoint for global sync event log
- Instrumented syncDid(), applyFirehoseBlocks(), syncBlobs() to record
events with full metrics
- Redesigned dashboard: metrics summary grid, enriched DID table with
expandable per-DID details, sync history card, source type badges,
formatBytes/timeAgo helpers

+989 -189
+3
src/index.ts
··· 563 563 app.get("/xrpc/org.p2pds.admin.getPolicies", requireAuth, (c) => 564 564 admin.getPolicies(c, replicationManager), 565 565 ); 566 + app.get("/xrpc/org.p2pds.admin.getSyncHistory", requireAuth, (c) => 567 + admin.getSyncHistory(c, replicationManager), 568 + ); 566 569 app.get("/xrpc/org.p2pds.admin.dashboard", (c) => 567 570 admin.getDashboard(c, networkService, replicationManager), 568 571 );
+223 -170
src/replication/replication-manager.ts
··· 439 439 */ 440 440 async syncDid(did: string): Promise<void> { 441 441 this.syncStorage.updateStatus(did, "syncing"); 442 + const syncStart = Date.now(); 443 + let sourceType = "pds"; 442 444 443 445 // 1. Resolve PDS endpoint 444 446 let state = this.syncStorage.getState(did); ··· 483 485 484 486 // 3. Fetch repo (with incremental sync if we have a previous rev) 485 487 const since = state?.lastSyncRev ?? undefined; 488 + const syncEventId = this.syncStorage.startSyncEvent(did, "pds"); 486 489 let carBytes: Uint8Array; 487 490 try { 488 491 carBytes = await this.repoFetcher.fetchRepo( ··· 504 507 try { 505 508 carBytes = await this.repoFetcher.fetchRepo(pdsEndpoint, did); 506 509 } catch { 510 + sourceType = "peer_fallback"; 507 511 carBytes = await this.fetchFromPeersOrThrow(did, since, err); 508 512 } 509 513 } else { 514 + sourceType = "peer_fallback"; 510 515 carBytes = await this.fetchFromPeersOrThrow(did, undefined, err); 511 516 } 512 517 } 513 518 514 - // 4. Parse CAR and store blocks 515 - const { root, blocks } = await readCarWithRoot(carBytes); 519 + try { 520 + // 4. Parse CAR and store blocks 521 + const { root, blocks } = await readCarWithRoot(carBytes); 516 522 517 - await this.blockStore.putBlocks(blocks); 523 + await this.blockStore.putBlocks(blocks); 518 524 519 - // 5. Collect CID strings for DHT announcement + verification 520 - const cidStrs: string[] = []; 521 - const internalMap = ( 522 - blocks as unknown as { map: Map<string, Uint8Array> } 523 - ).map; 524 - if (internalMap) { 525 - for (const cidStr of internalMap.keys()) { 526 - cidStrs.push(cidStr); 525 + // 5. Collect CID strings + sizes for DHT announcement + verification 526 + const cidStrs: string[] = []; 527 + const blockEntries: Array<{ cid: string; sizeBytes: number }> = []; 528 + const internalMap = ( 529 + blocks as unknown as { map: Map<string, Uint8Array> } 530 + ).map; 531 + if (internalMap) { 532 + for (const [cidStr, blockBytes] of internalMap.entries()) { 533 + cidStrs.push(cidStr); 534 + blockEntries.push({ cid: cidStr, sizeBytes: blockBytes.length }); 535 + } 527 536 } 528 - } 529 537 530 - // 5b. Track block CIDs for remote verification 531 - if (cidStrs.length > 0) { 532 - this.syncStorage.trackBlocks(did, cidStrs); 533 - } 538 + // 5b. Track block CIDs with sizes for remote verification 539 + if (blockEntries.length > 0) { 540 + this.syncStorage.trackBlocksWithSize(did, blockEntries); 541 + } 534 542 535 - // 6. Announce to DHT (fire-and-forget) 536 - this.networkService.provideBlocks(cidStrs).catch(() => {}); 543 + // 6. Announce to DHT (fire-and-forget) 544 + this.networkService.provideBlocks(cidStrs).catch(() => {}); 537 545 538 - // 7. Verify local block availability 539 - const verification = 540 - await this.verifier.verifyBlockAvailability(cidStrs); 541 - if (verification.missing.length > 0) { 542 - console.warn( 543 - `Local verification: ${verification.missing.length}/${verification.checked} blocks missing for ${did}`, 544 - ); 545 - } 546 - this.syncStorage.updateVerifiedAt(did); 546 + // 7. Verify local block availability 547 + const verification = 548 + await this.verifier.verifyBlockAvailability(cidStrs); 549 + if (verification.missing.length > 0) { 550 + console.warn( 551 + `Local verification: ${verification.missing.length}/${verification.checked} blocks missing for ${did}`, 552 + ); 553 + } 554 + this.syncStorage.updateVerifiedAt(did); 547 555 548 - // 8. Extract actual rev from the commit block 549 - const rootCidStr = root.toString(); 550 - let rev = rootCidStr; // fallback 551 - const commitBytes = internalMap?.get(rootCidStr); 552 - if (commitBytes) { 553 - try { 554 - const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 555 - if (typeof commitObj.rev === "string") { 556 - rev = commitObj.rev; 556 + // 8. Extract actual rev from the commit block 557 + const rootCidStr = root.toString(); 558 + let rev = rootCidStr; // fallback 559 + const commitBytes = internalMap?.get(rootCidStr); 560 + if (commitBytes) { 561 + try { 562 + const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 563 + if (typeof commitObj.rev === "string") { 564 + rev = commitObj.rev; 565 + } 566 + } catch { 567 + // If CBOR decode fails, fall back to root CID as rev 557 568 } 558 - } catch { 559 - // If CBOR decode fails, fall back to root CID as rev 560 569 } 561 - } 562 570 563 - // 9. Update sync state with both rev and root CID 564 - this.syncStorage.updateSyncProgress(did, rev, rootCidStr); 571 + // 9. Update sync state with both rev and root CID 572 + this.syncStorage.updateSyncProgress(did, rev, rootCidStr); 565 573 566 - // 9a. Publish gossipsub notification (fire-and-forget) 567 - this.networkService.publishCommitNotification(did, rootCidStr, rev).catch(() => {}); 574 + // 9a. Publish gossipsub notification (fire-and-forget) 575 + this.networkService.publishCommitNotification(did, rootCidStr, rev).catch(() => {}); 568 576 569 - // 9b. Invalidate cached ReadableRepo so it reloads with new root 570 - this.replicatedRepoReader?.invalidateCache(did); 577 + // 9b. Invalidate cached ReadableRepo so it reloads with new root 578 + this.replicatedRepoReader?.invalidateCache(did); 579 + 580 + // 9c. Track record paths for challenge generation (full MST walk) 581 + try { 582 + const recordPaths = await extractAllRecordPaths( 583 + this.blockStore, 584 + rootCidStr, 585 + ); 586 + this.syncStorage.clearRecordPaths(did); 587 + this.syncStorage.trackRecordPaths(did, recordPaths); 588 + } catch { 589 + // Non-fatal: path extraction is best-effort 590 + } 571 591 572 - // 9c. Track record paths for challenge generation (full MST walk) 573 - try { 574 - const recordPaths = await extractAllRecordPaths( 575 - this.blockStore, 576 - rootCidStr, 592 + // 10. Update manifest record 593 + const rkey = didToRkey(did); 594 + const existingManifest = await this.repoManager.getRecord( 595 + MANIFEST_NSID, 596 + rkey, 577 597 ); 578 - this.syncStorage.clearRecordPaths(did); 579 - this.syncStorage.trackRecordPaths(did, recordPaths); 580 - } catch { 581 - // Non-fatal: path extraction is best-effort 582 - } 598 + if (existingManifest) { 599 + const manifest: ManifestRecord = { 600 + $type: MANIFEST_NSID, 601 + subject: did, 602 + status: "active", 603 + lastSyncRev: rev, 604 + lastSyncAt: new Date().toISOString(), 605 + createdAt: 606 + (existingManifest.record as Record<string, unknown>) 607 + ?.createdAt as string ?? new Date().toISOString(), 608 + }; 609 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 610 + } 583 611 584 - // 10. Update manifest record 585 - const rkey = didToRkey(did); 586 - const existingManifest = await this.repoManager.getRecord( 587 - MANIFEST_NSID, 588 - rkey, 589 - ); 590 - if (existingManifest) { 591 - const manifest: ManifestRecord = { 592 - $type: MANIFEST_NSID, 593 - subject: did, 594 - status: "active", 595 - lastSyncRev: rev, 596 - lastSyncAt: new Date().toISOString(), 597 - createdAt: 598 - (existingManifest.record as Record<string, unknown>) 599 - ?.createdAt as string ?? new Date().toISOString(), 600 - }; 601 - await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 602 - } 612 + // 11. Sync blobs and capture result 613 + const blobResult = await this.syncBlobs(did); 603 614 604 - // 11. Sync blobs (fire-and-forget) 605 - this.syncBlobs(did).catch((err) => { 606 - console.warn( 607 - `[replication] Blob sync error for ${did}:`, 608 - err instanceof Error ? err.message : String(err), 609 - ); 610 - }); 615 + // 12. Complete sync event with metrics 616 + this.syncStorage.completeSyncEvent(syncEventId, { 617 + status: "success", 618 + blocksAdded: blockEntries.length, 619 + blobsAdded: blobResult.fetched, 620 + carBytes: carBytes.length, 621 + blobBytes: blobResult.totalBytes, 622 + durationMs: Date.now() - syncStart, 623 + rev, 624 + rootCid: rootCidStr, 625 + incremental: !!since, 626 + }); 627 + } catch (err) { 628 + this.syncStorage.completeSyncEvent(syncEventId, { 629 + status: "error", 630 + errorMessage: err instanceof Error ? err.message : String(err), 631 + durationMs: Date.now() - syncStart, 632 + incremental: !!since, 633 + }); 634 + throw err; 635 + } 611 636 } 612 637 613 638 /** ··· 808 833 did: string, 809 834 event: FirehoseCommitEvent, 810 835 ): Promise<void> { 811 - // 1. Parse the CAR bytes from the firehose event 812 - const { root, blocks } = await readCarWithRoot(event.blocks); 836 + const syncStart = Date.now(); 837 + const syncEventId = this.syncStorage.startSyncEvent(did, "firehose"); 813 838 814 - // 2. Store blocks in our blockstore 815 - await this.blockStore.putBlocks(blocks); 839 + try { 840 + // 1. Parse the CAR bytes from the firehose event 841 + const { root, blocks } = await readCarWithRoot(event.blocks); 842 + 843 + // 2. Store blocks in our blockstore 844 + await this.blockStore.putBlocks(blocks); 816 845 817 - // 3. Collect CID strings for DHT announcement + block tracking 818 - const cidStrs: string[] = []; 819 - const internalMap = ( 820 - blocks as unknown as { map: Map<string, Uint8Array> } 821 - ).map; 822 - if (internalMap) { 823 - for (const cidStr of internalMap.keys()) { 824 - cidStrs.push(cidStr); 846 + // 3. Collect CID strings + sizes for DHT announcement + block tracking 847 + const cidStrs: string[] = []; 848 + const blockEntries: Array<{ cid: string; sizeBytes: number }> = []; 849 + const internalMap = ( 850 + blocks as unknown as { map: Map<string, Uint8Array> } 851 + ).map; 852 + if (internalMap) { 853 + for (const [cidStr, blockBytes] of internalMap.entries()) { 854 + cidStrs.push(cidStr); 855 + blockEntries.push({ cid: cidStr, sizeBytes: blockBytes.length }); 856 + } 825 857 } 826 - } 827 858 828 - // 4. Track block CIDs 829 - if (cidStrs.length > 0) { 830 - this.syncStorage.trackBlocks(did, cidStrs); 831 - } 859 + // 4. Track block CIDs with sizes 860 + if (blockEntries.length > 0) { 861 + this.syncStorage.trackBlocksWithSize(did, blockEntries); 862 + } 832 863 833 - // 5. Announce to DHT (fire-and-forget) 834 - this.networkService.provideBlocks(cidStrs).catch(() => {}); 864 + // 5. Announce to DHT (fire-and-forget) 865 + this.networkService.provideBlocks(cidStrs).catch(() => {}); 835 866 836 - // 6. Determine rev and root CID 837 - const rootCidStr = root.toString(); 838 - let rev = event.rev; // Use the rev directly from the firehose event 867 + // 6. Determine rev and root CID 868 + const rootCidStr = root.toString(); 869 + let rev = event.rev; // Use the rev directly from the firehose event 839 870 840 - // If no rev in the event, try to extract from commit block 841 - if (!rev) { 842 - const commitBytes = internalMap?.get(rootCidStr); 843 - if (commitBytes) { 844 - try { 845 - const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 846 - if (typeof commitObj.rev === "string") { 847 - rev = commitObj.rev; 871 + // If no rev in the event, try to extract from commit block 872 + if (!rev) { 873 + const commitBytes = internalMap?.get(rootCidStr); 874 + if (commitBytes) { 875 + try { 876 + const commitObj = cborDecode(commitBytes) as Record<string, unknown>; 877 + if (typeof commitObj.rev === "string") { 878 + rev = commitObj.rev; 879 + } 880 + } catch { 881 + // Fall back to root CID as rev 882 + rev = rootCidStr; 848 883 } 849 - } catch { 850 - // Fall back to root CID as rev 884 + } else { 851 885 rev = rootCidStr; 852 886 } 853 - } else { 854 - rev = rootCidStr; 855 887 } 856 - } 857 888 858 - // 7. Update sync state 859 - this.syncStorage.updateSyncProgress(did, rev, rootCidStr); 889 + // 7. Update sync state 890 + this.syncStorage.updateSyncProgress(did, rev, rootCidStr); 860 891 861 - // 7a. Publish gossipsub notification (fire-and-forget) 862 - this.networkService.publishCommitNotification(did, rootCidStr, rev).catch(() => {}); 892 + // 7a. Publish gossipsub notification (fire-and-forget) 893 + this.networkService.publishCommitNotification(did, rootCidStr, rev).catch(() => {}); 863 894 864 - // 8. Invalidate cached ReadableRepo so it reloads with new root 865 - this.replicatedRepoReader?.invalidateCache(did); 895 + // 8. Invalidate cached ReadableRepo so it reloads with new root 896 + this.replicatedRepoReader?.invalidateCache(did); 866 897 867 - // 8b. Track record paths incrementally from firehose ops 868 - try { 869 - const createdPaths = event.ops 870 - .filter((op) => op.action === "create" || op.action === "update") 871 - .map((op) => op.path); 872 - const deletedPaths = event.ops 873 - .filter((op) => op.action === "delete") 874 - .map((op) => op.path); 875 - if (createdPaths.length > 0) { 876 - this.syncStorage.trackRecordPaths(did, createdPaths); 898 + // 8b. Track record paths incrementally from firehose ops 899 + try { 900 + const createdPaths = event.ops 901 + .filter((op) => op.action === "create" || op.action === "update") 902 + .map((op) => op.path); 903 + const deletedPaths = event.ops 904 + .filter((op) => op.action === "delete") 905 + .map((op) => op.path); 906 + if (createdPaths.length > 0) { 907 + this.syncStorage.trackRecordPaths(did, createdPaths); 908 + } 909 + if (deletedPaths.length > 0) { 910 + this.syncStorage.removeRecordPaths(did, deletedPaths); 911 + } 912 + } catch { 913 + // Non-fatal: path tracking is best-effort 877 914 } 878 - if (deletedPaths.length > 0) { 879 - this.syncStorage.removeRecordPaths(did, deletedPaths); 915 + 916 + // 9. Update manifest record 917 + const rkey = didToRkey(did); 918 + const existingManifest = await this.repoManager.getRecord( 919 + MANIFEST_NSID, 920 + rkey, 921 + ); 922 + if (existingManifest) { 923 + const manifest: ManifestRecord = { 924 + $type: MANIFEST_NSID, 925 + subject: did, 926 + status: "active", 927 + lastSyncRev: rev, 928 + lastSyncAt: new Date().toISOString(), 929 + createdAt: 930 + (existingManifest.record as Record<string, unknown>) 931 + ?.createdAt as string ?? new Date().toISOString(), 932 + }; 933 + await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 880 934 } 881 - } catch { 882 - // Non-fatal: path tracking is best-effort 883 - } 884 935 885 - // 9. Update manifest record 886 - const rkey = didToRkey(did); 887 - const existingManifest = await this.repoManager.getRecord( 888 - MANIFEST_NSID, 889 - rkey, 890 - ); 891 - if (existingManifest) { 892 - const manifest: ManifestRecord = { 893 - $type: MANIFEST_NSID, 894 - subject: did, 895 - status: "active", 896 - lastSyncRev: rev, 897 - lastSyncAt: new Date().toISOString(), 898 - createdAt: 899 - (existingManifest.record as Record<string, unknown>) 900 - ?.createdAt as string ?? new Date().toISOString(), 901 - }; 902 - await this.repoManager.putRecord(MANIFEST_NSID, rkey, manifest); 903 - } 936 + // 10. Sync blobs for firehose ops (fire-and-forget) 937 + const pdsEndpoint = this.syncStorage.getState(did)?.pdsEndpoint; 938 + if (pdsEndpoint) { 939 + this.syncBlobsForOps(did, pdsEndpoint, event.ops).catch((err) => { 940 + console.warn( 941 + `[replication] Blob sync error for firehose ops (${did}):`, 942 + err instanceof Error ? err.message : String(err), 943 + ); 944 + }); 945 + } 904 946 905 - // 10. Sync blobs for firehose ops (fire-and-forget) 906 - const pdsEndpoint = this.syncStorage.getState(did)?.pdsEndpoint; 907 - if (pdsEndpoint) { 908 - this.syncBlobsForOps(did, pdsEndpoint, event.ops).catch((err) => { 909 - console.warn( 910 - `[replication] Blob sync error for firehose ops (${did}):`, 911 - err instanceof Error ? err.message : String(err), 912 - ); 947 + // 11. Complete sync event 948 + this.syncStorage.completeSyncEvent(syncEventId, { 949 + status: "success", 950 + blocksAdded: blockEntries.length, 951 + carBytes: event.blocks.length, 952 + durationMs: Date.now() - syncStart, 953 + rev, 954 + rootCid: rootCidStr, 955 + incremental: true, 913 956 }); 957 + } catch (err) { 958 + this.syncStorage.completeSyncEvent(syncEventId, { 959 + status: "error", 960 + errorMessage: err instanceof Error ? err.message : String(err), 961 + durationMs: Date.now() - syncStart, 962 + incremental: true, 963 + }); 964 + throw err; 914 965 } 915 966 } 916 967 ··· 1147 1198 /** 1148 1199 * Sync blobs for a DID: walk all records, extract blob CIDs, fetch new ones. 1149 1200 */ 1150 - async syncBlobs(did: string): Promise<{ fetched: number; skipped: number; errors: number }> { 1201 + async syncBlobs(did: string): Promise<{ fetched: number; skipped: number; errors: number; totalBytes: number }> { 1151 1202 const state = this.syncStorage.getState(did); 1152 1203 if (!state?.pdsEndpoint || !state.rootCid) { 1153 - return { fetched: 0, skipped: 0, errors: 0 }; 1204 + return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 }; 1154 1205 } 1155 1206 1156 1207 if (!this.replicatedRepoReader) { 1157 - return { fetched: 0, skipped: 0, errors: 0 }; 1208 + return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 }; 1158 1209 } 1159 1210 1160 1211 // Walk all records and collect blob CIDs 1161 1212 const allBlobCids = new Set<string>(); 1162 1213 const repo = await this.replicatedRepoReader.getRepo(did); 1163 1214 if (!repo) { 1164 - return { fetched: 0, skipped: 0, errors: 0 }; 1215 + return { fetched: 0, skipped: 0, errors: 0, totalBytes: 0 }; 1165 1216 } 1166 1217 1167 1218 for await (const entry of repo.walkRecords()) { ··· 1175 1226 let fetched = 0; 1176 1227 let skipped = 0; 1177 1228 let errors = 0; 1229 + let totalBytes = 0; 1178 1230 1179 1231 for (const blobCid of allBlobCids) { 1180 1232 if (this.syncStorage.hasBlobCid(did, blobCid)) { ··· 1200 1252 } 1201 1253 1202 1254 await this.blockStore.putBlock(blobCid, bytes); 1203 - this.syncStorage.trackBlobs(did, [blobCid]); 1255 + this.syncStorage.trackBlobsWithSize(did, [{ cid: blobCid, sizeBytes: bytes.length }]); 1204 1256 this.networkService.provideBlocks([blobCid]).catch(() => {}); 1205 1257 fetched++; 1258 + totalBytes += bytes.length; 1206 1259 } catch (err) { 1207 1260 console.warn( 1208 1261 `[replication] Failed to fetch blob ${blobCid} for ${did}:`, ··· 1212 1265 } 1213 1266 } 1214 1267 1215 - return { fetched, skipped, errors }; 1268 + return { fetched, skipped, errors, totalBytes }; 1216 1269 } 1217 1270 1218 1271 /**
+246 -1
src/replication/sync-storage.ts
··· 4 4 */ 5 5 6 6 import type Database from "better-sqlite3"; 7 - import type { SyncState } from "./types.js"; 7 + import type { SyncState, SyncHistoryRow, AggregateMetrics, DidMetrics } from "./types.js"; 8 8 9 9 export class SyncStorage { 10 10 constructor(private db: Database.Database) {} ··· 75 75 ); 76 76 `); 77 77 78 + // Sync history table: logs each sync event with metrics. 79 + this.db.exec(` 80 + CREATE TABLE IF NOT EXISTS sync_history ( 81 + id INTEGER PRIMARY KEY AUTOINCREMENT, 82 + did TEXT NOT NULL, 83 + source_type TEXT NOT NULL, 84 + started_at TEXT NOT NULL, 85 + completed_at TEXT, 86 + status TEXT NOT NULL DEFAULT 'in_progress', 87 + error_message TEXT, 88 + blocks_added INTEGER NOT NULL DEFAULT 0, 89 + blobs_added INTEGER NOT NULL DEFAULT 0, 90 + car_bytes INTEGER NOT NULL DEFAULT 0, 91 + blob_bytes INTEGER NOT NULL DEFAULT 0, 92 + duration_ms INTEGER, 93 + rev TEXT, 94 + root_cid TEXT, 95 + incremental INTEGER NOT NULL DEFAULT 0 96 + ); 97 + CREATE INDEX IF NOT EXISTS idx_sync_history_did ON sync_history(did, started_at DESC); 98 + `); 99 + 78 100 // Migrations: add columns if missing (for existing databases) 79 101 const columns = this.db 80 102 .prepare("PRAGMA table_info(replication_state)") ··· 87 109 if (!columns.some((c) => c.name === "peer_multiaddrs")) { 88 110 this.db.exec( 89 111 "ALTER TABLE replication_state ADD COLUMN peer_multiaddrs TEXT", 112 + ); 113 + } 114 + 115 + // Migration: add size_bytes to replication_blocks and replication_blobs 116 + const blockCols = this.db 117 + .prepare("PRAGMA table_info(replication_blocks)") 118 + .all() as Array<{ name: string }>; 119 + if (!blockCols.some((c) => c.name === "size_bytes")) { 120 + this.db.exec( 121 + "ALTER TABLE replication_blocks ADD COLUMN size_bytes INTEGER NOT NULL DEFAULT 0", 122 + ); 123 + } 124 + 125 + const blobCols = this.db 126 + .prepare("PRAGMA table_info(replication_blobs)") 127 + .all() as Array<{ name: string }>; 128 + if (!blobCols.some((c) => c.name === "size_bytes")) { 129 + this.db.exec( 130 + "ALTER TABLE replication_blobs ADD COLUMN size_bytes INTEGER NOT NULL DEFAULT 0", 90 131 ); 91 132 } 92 133 } ··· 488 529 } 489 530 } 490 531 return null; 532 + } 533 + 534 + // ============================================ 535 + // Sync history tracking 536 + // ============================================ 537 + 538 + /** 539 + * Start a sync event, returning its ID for later completion. 540 + */ 541 + startSyncEvent(did: string, sourceType: string): number { 542 + const result = this.db 543 + .prepare( 544 + `INSERT INTO sync_history (did, source_type, started_at, status) 545 + VALUES (?, ?, datetime('now'), 'in_progress')`, 546 + ) 547 + .run(did, sourceType); 548 + return Number(result.lastInsertRowid); 549 + } 550 + 551 + /** 552 + * Complete a sync event with final metrics. 553 + */ 554 + completeSyncEvent( 555 + id: number, 556 + data: { 557 + status: "success" | "error"; 558 + errorMessage?: string; 559 + blocksAdded?: number; 560 + blobsAdded?: number; 561 + carBytes?: number; 562 + blobBytes?: number; 563 + durationMs?: number; 564 + rev?: string; 565 + rootCid?: string; 566 + incremental?: boolean; 567 + }, 568 + ): void { 569 + this.db 570 + .prepare( 571 + `UPDATE sync_history SET 572 + completed_at = datetime('now'), 573 + status = ?, 574 + error_message = ?, 575 + blocks_added = ?, 576 + blobs_added = ?, 577 + car_bytes = ?, 578 + blob_bytes = ?, 579 + duration_ms = ?, 580 + rev = ?, 581 + root_cid = ?, 582 + incremental = ? 583 + WHERE id = ?`, 584 + ) 585 + .run( 586 + data.status, 587 + data.errorMessage ?? null, 588 + data.blocksAdded ?? 0, 589 + data.blobsAdded ?? 0, 590 + data.carBytes ?? 0, 591 + data.blobBytes ?? 0, 592 + data.durationMs ?? null, 593 + data.rev ?? null, 594 + data.rootCid ?? null, 595 + data.incremental ? 1 : 0, 596 + id, 597 + ); 598 + } 599 + 600 + /** 601 + * Get sync history, optionally filtered by DID. 602 + */ 603 + getSyncHistory(did?: string, limit: number = 50): SyncHistoryRow[] { 604 + const query = did 605 + ? "SELECT * FROM sync_history WHERE did = ? ORDER BY started_at DESC LIMIT ?" 606 + : "SELECT * FROM sync_history ORDER BY started_at DESC LIMIT ?"; 607 + const params = did ? [did, limit] : [limit]; 608 + const rows = this.db.prepare(query).all(...params) as Array<Record<string, unknown>>; 609 + return rows.map((r) => this.rowToSyncHistory(r)); 610 + } 611 + 612 + /** 613 + * Get the count of tracked records for a DID. 614 + */ 615 + getRecordCount(did: string): number { 616 + const row = this.db 617 + .prepare( 618 + "SELECT COUNT(*) as count FROM replication_record_paths WHERE did = ?", 619 + ) 620 + .get(did) as { count: number }; 621 + return row.count; 622 + } 623 + 624 + /** 625 + * Get aggregate metrics across all replicated DIDs. 626 + */ 627 + getAggregateMetrics(): AggregateMetrics { 628 + const dids = this.db 629 + .prepare("SELECT COUNT(DISTINCT did) as count FROM replication_state") 630 + .get() as { count: number }; 631 + const blocks = this.db 632 + .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blocks") 633 + .get() as { count: number; bytes: number }; 634 + const blobs = this.db 635 + .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blobs") 636 + .get() as { count: number; bytes: number }; 637 + const records = this.db 638 + .prepare("SELECT COUNT(*) as count FROM replication_record_paths") 639 + .get() as { count: number }; 640 + const syncs = this.db 641 + .prepare("SELECT COUNT(*) as count FROM sync_history") 642 + .get() as { count: number }; 643 + const recentTransfer = this.db 644 + .prepare( 645 + `SELECT COALESCE(SUM(car_bytes + blob_bytes), 0) as bytes 646 + FROM sync_history 647 + WHERE started_at >= datetime('now', '-24 hours') AND status = 'success'`, 648 + ) 649 + .get() as { bytes: number }; 650 + 651 + return { 652 + totalDids: dids.count, 653 + totalBlocks: blocks.count, 654 + totalBlobs: blobs.count, 655 + totalRecords: records.count, 656 + totalBytesHeld: blocks.bytes + blobs.bytes, 657 + totalSyncs: syncs.count, 658 + recentTransferredBytes: recentTransfer.bytes, 659 + }; 660 + } 661 + 662 + /** 663 + * Get per-DID metrics summary. 664 + */ 665 + getDidMetrics(did: string): DidMetrics { 666 + const blocks = this.db 667 + .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blocks WHERE did = ?") 668 + .get(did) as { count: number; bytes: number }; 669 + const blobs = this.db 670 + .prepare("SELECT COUNT(*) as count, COALESCE(SUM(size_bytes), 0) as bytes FROM replication_blobs WHERE did = ?") 671 + .get(did) as { count: number; bytes: number }; 672 + const records = this.getRecordCount(did); 673 + const recentSyncs = this.getSyncHistory(did, 10); 674 + 675 + return { 676 + blocks: blocks.count, 677 + blobs: blobs.count, 678 + records, 679 + bytesHeld: blocks.bytes + blobs.bytes, 680 + recentSyncs, 681 + }; 682 + } 683 + 684 + /** 685 + * Track blocks with their sizes (batch upsert). 686 + */ 687 + trackBlocksWithSize(did: string, entries: Array<{ cid: string; sizeBytes: number }>): void { 688 + if (entries.length === 0) return; 689 + const insert = this.db.prepare( 690 + `INSERT INTO replication_blocks (did, cid, size_bytes) VALUES (?, ?, ?) 691 + ON CONFLICT(did, cid) DO UPDATE SET size_bytes = excluded.size_bytes`, 692 + ); 693 + const batch = this.db.transaction((items: Array<{ cid: string; sizeBytes: number }>) => { 694 + for (const entry of items) { 695 + insert.run(did, entry.cid, entry.sizeBytes); 696 + } 697 + }); 698 + batch(entries); 699 + } 700 + 701 + /** 702 + * Track blobs with their sizes (batch upsert). 703 + */ 704 + trackBlobsWithSize(did: string, entries: Array<{ cid: string; sizeBytes: number }>): void { 705 + if (entries.length === 0) return; 706 + const insert = this.db.prepare( 707 + `INSERT INTO replication_blobs (did, cid, size_bytes) VALUES (?, ?, ?) 708 + ON CONFLICT(did, cid) DO UPDATE SET size_bytes = excluded.size_bytes`, 709 + ); 710 + const batch = this.db.transaction((items: Array<{ cid: string; sizeBytes: number }>) => { 711 + for (const entry of items) { 712 + insert.run(did, entry.cid, entry.sizeBytes); 713 + } 714 + }); 715 + batch(entries); 716 + } 717 + 718 + private rowToSyncHistory(row: Record<string, unknown>): SyncHistoryRow { 719 + return { 720 + id: row.id as number, 721 + did: row.did as string, 722 + sourceType: row.source_type as string, 723 + startedAt: row.started_at as string, 724 + completedAt: (row.completed_at as string) ?? null, 725 + status: row.status as string, 726 + errorMessage: (row.error_message as string) ?? null, 727 + blocksAdded: row.blocks_added as number, 728 + blobsAdded: row.blobs_added as number, 729 + carBytes: row.car_bytes as number, 730 + blobBytes: row.blob_bytes as number, 731 + durationMs: (row.duration_ms as number) ?? null, 732 + rev: (row.rev as string) ?? null, 733 + rootCid: (row.root_cid as string) ?? null, 734 + incremental: (row.incremental as number) === 1, 735 + }; 491 736 } 492 737 493 738 private rowToState(row: Record<string, unknown>): SyncState {
+39
src/replication/types.ts
··· 100 100 reliabilityThreshold: 0.8, 101 101 }; 102 102 103 + /** A row from the sync_history table. */ 104 + export interface SyncHistoryRow { 105 + id: number; 106 + did: string; 107 + sourceType: string; 108 + startedAt: string; 109 + completedAt: string | null; 110 + status: string; 111 + errorMessage: string | null; 112 + blocksAdded: number; 113 + blobsAdded: number; 114 + carBytes: number; 115 + blobBytes: number; 116 + durationMs: number | null; 117 + rev: string | null; 118 + rootCid: string | null; 119 + incremental: boolean; 120 + } 121 + 122 + /** Aggregate metrics across all replicated DIDs. */ 123 + export interface AggregateMetrics { 124 + totalDids: number; 125 + totalBlocks: number; 126 + totalBlobs: number; 127 + totalRecords: number; 128 + totalBytesHeld: number; 129 + totalSyncs: number; 130 + recentTransferredBytes: number; 131 + } 132 + 133 + /** Per-DID metrics summary. */ 134 + export interface DidMetrics { 135 + blocks: number; 136 + blobs: number; 137 + records: number; 138 + bytesHeld: number; 139 + recentSyncs: SyncHistoryRow[]; 140 + } 141 + 103 142 /** Result of a single verification layer. */ 104 143 export interface LayerResult { 105 144 layer: number;
+310
src/xrpc/admin-e2e.test.ts
··· 1 + /** 2 + * E2E test: two real HTTP servers, replication between them, 3 + * then verify admin dashboard and APIs show accurate live state. 4 + * 5 + * Node A — "source PDS": has a repo with records, serves getRepo over HTTP. 6 + * Node B — "replicator": replicates Node A's DID, runs the admin dashboard. 7 + */ 8 + 9 + import { describe, it, expect, beforeEach, afterEach } from "vitest"; 10 + import { mkdtempSync, rmSync } from "node:fs"; 11 + import { tmpdir } from "node:os"; 12 + import { join } from "node:path"; 13 + import { createServer, type Server } from "node:http"; 14 + import Database from "better-sqlite3"; 15 + import { getRequestListener } from "@hono/node-server"; 16 + 17 + import { IpfsService } from "../ipfs.js"; 18 + import { RepoManager } from "../repo-manager.js"; 19 + import { ReplicationManager } from "../replication/replication-manager.js"; 20 + import { Firehose } from "../firehose.js"; 21 + import { createApp } from "../index.js"; 22 + import type { Config } from "../config.js"; 23 + import type { NetworkService } from "../ipfs.js"; 24 + 25 + function makeConfig( 26 + dataDir: string, 27 + did: string, 28 + replicateDids: string[] = [], 29 + ): Config { 30 + return { 31 + DID: did, 32 + HANDLE: "test.example.com", 33 + PDS_HOSTNAME: "test.example.com", 34 + AUTH_TOKEN: "test-auth-token", 35 + SIGNING_KEY: 36 + "0000000000000000000000000000000000000000000000000000000000000001", 37 + SIGNING_KEY_PUBLIC: 38 + "zQ3shP2mWsZYWgvZM9GJ3EvMfRXQJwuTh6BdXLvJB9gFhT3Lr", 39 + JWT_SECRET: "test-jwt-secret", 40 + PASSWORD_HASH: "$2a$10$test", 41 + DATA_DIR: dataDir, 42 + PORT: 0, 43 + IPFS_ENABLED: true, 44 + IPFS_NETWORKING: false, 45 + REPLICATE_DIDS: replicateDids, 46 + FIREHOSE_URL: "wss://localhost/xrpc/com.atproto.sync.subscribeRepos", 47 + FIREHOSE_ENABLED: false, 48 + }; 49 + } 50 + 51 + const mockNetworkService: NetworkService = { 52 + provideBlocks: async () => {}, 53 + publishCommitNotification: async () => {}, 54 + onCommitNotification: () => {}, 55 + subscribeCommitTopics: () => {}, 56 + unsubscribeCommitTopics: () => {}, 57 + getPeerId: () => null, 58 + getMultiaddrs: () => [], 59 + getConnectionCount: () => 0, 60 + getRemoteAddrs: () => [], 61 + publishIdentityNotification: async () => {}, 62 + onIdentityNotification: () => {}, 63 + subscribeIdentityTopics: () => {}, 64 + unsubscribeIdentityTopics: () => {}, 65 + }; 66 + 67 + function startServer( 68 + app: ReturnType<typeof createApp>, 69 + ): Promise<{ server: Server; port: number }> { 70 + return new Promise((resolve) => { 71 + const listener = getRequestListener(app.fetch); 72 + const server = createServer(listener); 73 + server.listen(0, "127.0.0.1", () => { 74 + const addr = server.address() as { port: number }; 75 + resolve({ server, port: addr.port }); 76 + }); 77 + }); 78 + } 79 + 80 + function closeServer(server: Server): Promise<void> { 81 + return new Promise((res) => server.close(() => res())); 82 + } 83 + 84 + const NODE_A_DID = "did:plc:nodea123"; 85 + 86 + describe("Admin E2E: two-node replication + dashboard", () => { 87 + let tmpDir: string; 88 + let dbA: InstanceType<typeof Database>; 89 + let dbB: InstanceType<typeof Database>; 90 + let ipfsB: IpfsService; 91 + let serverA: Server; 92 + let serverB: Server; 93 + let portA: number; 94 + let portB: number; 95 + let replicationManager: ReplicationManager; 96 + 97 + beforeEach(async () => { 98 + tmpDir = mkdtempSync(join(tmpdir(), "admin-e2e-test-")); 99 + 100 + // ---- Node A: source PDS with records ---- 101 + dbA = new Database(join(tmpDir, "a.db")); 102 + const configA = makeConfig(tmpDir, NODE_A_DID); 103 + const repoManagerA = new RepoManager(dbA, configA); 104 + repoManagerA.init(); 105 + 106 + // Create 5 test records on Node A 107 + for (let i = 0; i < 5; i++) { 108 + await repoManagerA.createRecord("app.bsky.feed.post", undefined, { 109 + $type: "app.bsky.feed.post", 110 + text: `Admin E2E test post ${i}`, 111 + createdAt: new Date().toISOString(), 112 + }); 113 + } 114 + 115 + const firehoseA = new Firehose(repoManagerA); 116 + const appA = createApp(configA, repoManagerA, firehoseA); 117 + ({ server: serverA, port: portA } = await startServer(appA)); 118 + 119 + // ---- Node B: replicator with admin dashboard ---- 120 + dbB = new Database(join(tmpDir, "b.db")); 121 + const configB = makeConfig(join(tmpDir, "b-data"), "did:plc:nodeb456", [ 122 + NODE_A_DID, 123 + ]); 124 + 125 + ipfsB = new IpfsService({ 126 + blocksPath: join(tmpDir, "b-ipfs-blocks"), 127 + datastorePath: join(tmpDir, "b-ipfs-datastore"), 128 + networking: false, 129 + }); 130 + await ipfsB.start(); 131 + 132 + const repoManagerB = new RepoManager(dbB, configB); 133 + repoManagerB.init(undefined, ipfsB, ipfsB); 134 + 135 + // DID resolver stub: returns Node A's HTTP endpoint 136 + const didResolver = { 137 + resolve: async (did: string) => { 138 + if (did === NODE_A_DID) { 139 + return { 140 + id: NODE_A_DID, 141 + service: [ 142 + { 143 + id: "#atproto_pds", 144 + type: "AtprotoPersonalDataServer", 145 + serviceEndpoint: `http://127.0.0.1:${portA}`, 146 + }, 147 + ], 148 + }; 149 + } 150 + return null; 151 + }, 152 + }; 153 + 154 + replicationManager = new ReplicationManager( 155 + dbB, 156 + configB, 157 + repoManagerB, 158 + ipfsB, 159 + mockNetworkService, 160 + didResolver as any, 161 + ); 162 + await replicationManager.init(); 163 + 164 + const firehoseB = new Firehose(repoManagerB); 165 + const appB = createApp( 166 + configB, 167 + repoManagerB, 168 + firehoseB, 169 + ipfsB, 170 + mockNetworkService, 171 + undefined, 172 + replicationManager, 173 + ); 174 + ({ server: serverB, port: portB } = await startServer(appB)); 175 + 176 + // Trigger sync: Node B replicates Node A's repo over HTTP 177 + await replicationManager.syncAll(); 178 + }, 30_000); 179 + 180 + afterEach(async () => { 181 + replicationManager.stop(); 182 + await closeServer(serverB); 183 + await closeServer(serverA); 184 + if (ipfsB.isRunning()) await ipfsB.stop(); 185 + dbB.close(); 186 + dbA.close(); 187 + rmSync(tmpDir, { recursive: true, force: true }); 188 + }); 189 + 190 + const authHeaders = { Authorization: "Bearer test-auth-token" }; 191 + 192 + function fetchB(path: string, opts?: RequestInit) { 193 + return fetch(`http://127.0.0.1:${portB}${path}`, { 194 + ...opts, 195 + headers: { ...authHeaders, ...opts?.headers }, 196 + }); 197 + } 198 + 199 + it("getOverview shows synced replication state with aggregate metrics", async () => { 200 + const res = await fetchB("/xrpc/org.p2pds.admin.getOverview"); 201 + expect(res.status).toBe(200); 202 + 203 + const json = (await res.json()) as Record<string, unknown>; 204 + expect(json.version).toBe("0.1.0"); 205 + 206 + const repl = json.replication as { 207 + enabled: boolean; 208 + trackedDids: string[]; 209 + syncStates: Array<{ 210 + did: string; 211 + status: string; 212 + lastSyncRev: string | null; 213 + rootCid: string | null; 214 + }>; 215 + aggregate: { 216 + totalDids: number; 217 + totalBlocks: number; 218 + totalBlobs: number; 219 + totalRecords: number; 220 + totalBytesHeld: number; 221 + totalSyncs: number; 222 + recentTransferredBytes: number; 223 + }; 224 + }; 225 + expect(repl.enabled).toBe(true); 226 + expect(repl.syncStates).toHaveLength(1); 227 + 228 + const state = repl.syncStates[0]!; 229 + expect(state.did).toBe(NODE_A_DID); 230 + expect(state.status).toBe("synced"); 231 + expect(state.lastSyncRev).not.toBeNull(); 232 + expect(state.rootCid).not.toBeNull(); 233 + 234 + // Aggregate metrics should reflect the synced data 235 + expect(repl.aggregate).toBeDefined(); 236 + expect(repl.aggregate.totalDids).toBeGreaterThanOrEqual(1); 237 + expect(repl.aggregate.totalBlocks).toBeGreaterThan(0); 238 + expect(repl.aggregate.totalBytesHeld).toBeGreaterThan(0); 239 + expect(repl.aggregate.totalSyncs).toBeGreaterThan(0); 240 + expect(repl.aggregate.recentTransferredBytes).toBeGreaterThan(0); 241 + }); 242 + 243 + it("getDidStatus shows blocks, records, and sync history for replicated DID", async () => { 244 + const res = await fetchB( 245 + `/xrpc/org.p2pds.admin.getDidStatus?did=${NODE_A_DID}`, 246 + ); 247 + expect(res.status).toBe(200); 248 + 249 + const json = (await res.json()) as Record<string, unknown>; 250 + expect(json.did).toBe(NODE_A_DID); 251 + expect((json.blockCount as number) > 0).toBe(true); 252 + expect(json.recordCount).toBeGreaterThan(0); 253 + expect(json.bytesHeld).toBeGreaterThan(0); 254 + 255 + const recentSyncs = json.recentSyncs as Array<Record<string, unknown>>; 256 + expect(recentSyncs.length).toBeGreaterThan(0); 257 + expect(recentSyncs[0]!.status).toBe("success"); 258 + expect(recentSyncs[0]!.blocksAdded).toBeGreaterThan(0); 259 + expect(recentSyncs[0]!.carBytes).toBeGreaterThan(0); 260 + expect(recentSyncs[0]!.durationMs).toBeGreaterThanOrEqual(0); 261 + 262 + const syncState = json.syncState as Record<string, unknown>; 263 + expect(syncState.status).toBe("synced"); 264 + }); 265 + 266 + it("dashboard returns HTML with expected structure", async () => { 267 + // Dashboard doesn't require auth header 268 + const res = await fetch( 269 + `http://127.0.0.1:${portB}/xrpc/org.p2pds.admin.dashboard`, 270 + ); 271 + expect(res.status).toBe(200); 272 + expect(res.headers.get("content-type")).toContain("text/html"); 273 + 274 + const html = await res.text(); 275 + expect(html).toContain("P2PDS Admin"); 276 + expect(html).toContain('id="section-overview"'); 277 + expect(html).toContain('id="section-metrics"'); 278 + expect(html).toContain('id="section-replication"'); 279 + expect(html).toContain('id="section-sync-history"'); 280 + expect(html).toContain('id="section-network"'); 281 + expect(html).toContain("test-auth-token"); 282 + }); 283 + 284 + it("getSyncHistory returns sync events after replication", async () => { 285 + const res = await fetchB("/xrpc/org.p2pds.admin.getSyncHistory"); 286 + expect(res.status).toBe(200); 287 + 288 + const json = (await res.json()) as { history: Array<Record<string, unknown>> }; 289 + expect(json.history.length).toBeGreaterThan(0); 290 + 291 + const event = json.history[0]!; 292 + expect(event.did).toBe(NODE_A_DID); 293 + expect(event.sourceType).toBe("pds"); 294 + expect(event.status).toBe("success"); 295 + expect((event.blocksAdded as number)).toBeGreaterThan(0); 296 + expect((event.carBytes as number)).toBeGreaterThan(0); 297 + expect(event.durationMs).not.toBeNull(); 298 + }); 299 + 300 + it("getNetworkStatus returns valid response", async () => { 301 + const res = await fetchB("/xrpc/org.p2pds.admin.getNetworkStatus"); 302 + expect(res.status).toBe(200); 303 + 304 + const json = (await res.json()) as Record<string, unknown>; 305 + // networking=false → peerId is null 306 + expect(json.peerId).toBeNull(); 307 + expect(json.multiaddrs).toEqual([]); 308 + expect(json.connections).toBe(0); 309 + }); 310 + });
+14
src/xrpc/admin.test.ts
··· 76 76 "/xrpc/org.p2pds.admin.getDidStatus?did=did:plc:test", 77 77 "/xrpc/org.p2pds.admin.getNetworkStatus", 78 78 "/xrpc/org.p2pds.admin.getPolicies", 79 + "/xrpc/org.p2pds.admin.getSyncHistory", 79 80 ]; 80 81 81 82 for (const endpoint of endpoints) { ··· 186 187 }); 187 188 const repl = json.replication as Record<string, unknown>; 188 189 expect(repl.enabled).toBe(true); 190 + expect(repl.aggregate).toBeDefined(); 191 + const agg = repl.aggregate as Record<string, unknown>; 192 + expect(typeof agg.totalBlocks).toBe("number"); 193 + expect(typeof agg.totalBlobs).toBe("number"); 194 + expect(typeof agg.totalRecords).toBe("number"); 195 + expect(typeof agg.totalBytesHeld).toBe("number"); 196 + expect(typeof agg.totalSyncs).toBe("number"); 197 + expect(typeof agg.recentTransferredBytes).toBe("number"); 189 198 expect(json.firehose).toBeNull(); 190 199 expect(json.policy).toBeNull(); 191 200 }); ··· 283 292 expect(syncState.lastSyncRev).toBe("rev1"); 284 293 expect(json.blockCount).toBe(0); 285 294 expect(json.blobCount).toBe(0); 295 + expect(json.recordCount).toBe(0); 296 + expect(json.bytesHeld).toBe(0); 297 + expect(json.recentSyncs).toEqual([]); 286 298 expect(json.peerEndpoints).toEqual([]); 287 299 expect(json.verification).toBeNull(); 288 300 expect(json.effectivePolicy).toBeNull(); ··· 567 579 const html = await res.text(); 568 580 expect(html).toContain("P2PDS Admin"); 569 581 expect(html).toContain('id="section-overview"'); 582 + expect(html).toContain('id="section-metrics"'); 570 583 expect(html).toContain('id="section-replication"'); 584 + expect(html).toContain('id="section-sync-history"'); 571 585 expect(html).toContain('id="section-network"'); 572 586 expect(html).toContain('id="section-policies"'); 573 587 expect(html).toContain('id="section-verification"');
+154 -18
src/xrpc/admin.ts
··· 25 25 26 26 if (replicationManager) { 27 27 const syncStates = replicationManager.getSyncStates(); 28 + const syncStorage = replicationManager.getSyncStorage(); 29 + const aggregate = syncStorage.getAggregateMetrics(); 28 30 replication = { 29 31 enabled: true, 30 32 trackedDids: syncStates.map((s) => s.did), 31 33 syncStates, 34 + aggregate, 32 35 }; 33 36 34 37 firehose = replicationManager.getFirehoseStats() ?? null; ··· 81 84 82 85 const syncStorage = replicationManager.getSyncStorage(); 83 86 const syncState = syncStorage.getState(did); 84 - const blockCount = syncStorage.getBlockCount(did); 85 - const blobCount = syncStorage.getBlobCount(did); 87 + const didMetrics = syncStorage.getDidMetrics(did); 86 88 const peerEndpoints = syncStorage.getPeerEndpoints(did); 87 89 88 90 const verificationResults = replicationManager.getVerificationResults(); ··· 95 97 did, 96 98 syncState, 97 99 effectiveSyncIntervalMs: replicationManager.getEffectiveSyncIntervalMs(did), 98 - blockCount, 99 - blobCount, 100 + blockCount: didMetrics.blocks, 101 + blobCount: didMetrics.blobs, 102 + recordCount: didMetrics.records, 103 + bytesHeld: didMetrics.bytesHeld, 104 + recentSyncs: didMetrics.recentSyncs, 100 105 peerEndpoints, 101 106 verification, 102 107 effectivePolicy, ··· 161 166 .kv { display: grid; grid-template-columns: 160px 1fr; gap: 0.3rem 1rem; font-size: 0.85rem; } 162 167 .kv dt { color: #666; } 163 168 .kv dd { word-break: break-all; } 169 + .metrics-grid { 170 + display: grid; grid-template-columns: repeat(auto-fit, minmax(140px, 1fr)); gap: 0.8rem; font-size: 0.85rem; 171 + } 172 + .metric-box { 173 + background: #f8f8f8; border-radius: 4px; padding: 0.7rem; text-align: center; 174 + } 175 + .metric-box .value { font-size: 1.3rem; font-weight: 700; } 176 + .metric-box .label { color: #666; font-size: 0.72rem; margin-top: 0.2rem; } 164 177 table { width: 100%; border-collapse: collapse; font-size: 0.82rem; } 165 178 th { text-align: left; padding: 0.4rem 0.6rem; border-bottom: 2px solid #eee; color: #666; font-weight: 600; } 166 179 td { padding: 0.4rem 0.6rem; border-bottom: 1px solid #f0f0f0; } ··· 172 185 .dot-pending { background: #9ca3af; } 173 186 .dot-error { background: #ef4444; } 174 187 .detail-row td { padding: 0.8rem; background: #fafafa; font-size: 0.8rem; } 175 - .detail-row pre { white-space: pre-wrap; word-break: break-all; max-height: 300px; overflow: auto; } 188 + .detail-inner table { margin-top: 0.5rem; } 189 + .source-badge { font-size: 0.7rem; padding: 1px 6px; border-radius: 3px; } 190 + .source-pds { background: #dbeafe; color: #1e40af; } 191 + .source-firehose { background: #fef3c7; color: #92400e; } 192 + .source-peer_fallback { background: #fce7f3; color: #9d174d; } 176 193 .policy-list { list-style: none; font-size: 0.85rem; } 177 194 .policy-list li { padding: 0.4rem 0; border-bottom: 1px solid #f0f0f0; } 178 195 .verify-pass { color: #22c55e; font-weight: 600; } ··· 195 212 <div id="overview-content" class="loading">Loading...</div> 196 213 </section> 197 214 215 + <section class="card" id="section-metrics"> 216 + <h2>Replication Summary</h2> 217 + <div id="metrics-content" class="loading">Loading...</div> 218 + </section> 219 + 198 220 <section class="card" id="section-replication"> 199 - <h2>Replication</h2> 221 + <h2>Replicated DIDs</h2> 200 222 <div id="replication-content" class="loading">Loading...</div> 223 + </section> 224 + 225 + <section class="card" id="section-sync-history"> 226 + <h2>Sync History</h2> 227 + <div id="sync-history-content" class="loading">Loading...</div> 201 228 </section> 202 229 203 230 <section class="card" id="section-network"> ··· 221 248 222 249 function esc(s) { const d = document.createElement("div"); d.textContent = String(s ?? "-"); return d.innerHTML; } 223 250 251 + function formatBytes(n) { 252 + if (n == null || n === 0) return "0 B"; 253 + var units = ["B", "KB", "MB", "GB", "TB"]; 254 + var i = Math.floor(Math.log(n) / Math.log(1024)); 255 + if (i >= units.length) i = units.length - 1; 256 + return (n / Math.pow(1024, i)).toFixed(i > 0 ? 1 : 0) + " " + units[i]; 257 + } 258 + 259 + function formatNumber(n) { 260 + if (n == null) return "-"; 261 + return Number(n).toLocaleString(); 262 + } 263 + 264 + function timeAgo(iso) { 265 + if (!iso) return "-"; 266 + var diff = Date.now() - new Date(iso + "Z").getTime(); 267 + if (diff < 0) diff = 0; 268 + if (diff < 60000) return Math.floor(diff / 1000) + "s ago"; 269 + if (diff < 3600000) return Math.floor(diff / 60000) + "m ago"; 270 + if (diff < 86400000) return Math.floor(diff / 3600000) + "h ago"; 271 + return Math.floor(diff / 86400000) + "d ago"; 272 + } 273 + 224 274 async function apiFetch(endpoint, params) { 225 275 const url = new URL("/xrpc/" + endpoint, location.origin); 226 276 if (params) Object.entries(params).forEach(([k,v]) => url.searchParams.set(k, v)); ··· 236 286 return '<span class="dot ' + cls + '"></span>' + esc(status); 237 287 } 238 288 289 + function sourceBadge(sourceType) { 290 + var cls = "source-" + (sourceType || "pds"); 291 + return '<span class="source-badge ' + cls + '">' + esc(sourceType || "pds") + '</span>'; 292 + } 293 + 239 294 function renderOverview(data) { 240 295 const el = document.getElementById("overview-content"); 241 296 const net = data.network || {}; ··· 251 306 document.getElementById("version-badge").textContent = "v" + data.version; 252 307 } 253 308 309 + function renderMetrics(data) { 310 + const el = document.getElementById("metrics-content"); 311 + const repl = data.replication; 312 + if (!repl || !repl.enabled) { el.innerHTML = "Replication disabled"; return; } 313 + const a = repl.aggregate || {}; 314 + el.innerHTML = '<div class="metrics-grid">' 315 + + '<div class="metric-box"><div class="value">' + formatNumber(a.totalDids) + '</div><div class="label">Tracked DIDs</div></div>' 316 + + '<div class="metric-box"><div class="value">' + formatNumber(a.totalRecords) + '</div><div class="label">Records</div></div>' 317 + + '<div class="metric-box"><div class="value">' + formatNumber(a.totalBlocks) + '</div><div class="label">Blocks</div></div>' 318 + + '<div class="metric-box"><div class="value">' + formatNumber(a.totalBlobs) + '</div><div class="label">Blobs</div></div>' 319 + + '<div class="metric-box"><div class="value">' + formatBytes(a.totalBytesHeld) + '</div><div class="label">Total Held</div></div>' 320 + + '<div class="metric-box"><div class="value">' + formatBytes(a.recentTransferredBytes) + '</div><div class="label">Transferred (24h)</div></div>' 321 + + '<div class="metric-box"><div class="value">' + formatNumber(a.totalSyncs) + '</div><div class="label">Total Syncs</div></div>' 322 + + '</div>'; 323 + } 324 + 254 325 function renderReplication(data) { 255 326 const el = document.getElementById("replication-content"); 256 327 const repl = data.replication; 257 328 if (!repl || !repl.enabled) { el.innerHTML = "Replication disabled"; return; } 258 329 const states = repl.syncStates || []; 259 330 if (states.length === 0) { el.innerHTML = "No tracked DIDs"; return; } 260 - let html = "<table><thead><tr><th>DID</th><th>Status</th><th>Rev</th><th>Root CID</th><th>Last Sync</th><th>Error</th></tr></thead><tbody>"; 331 + let html = "<table><thead><tr><th>DID</th><th>Status</th><th>Last Sync</th><th>Error</th></tr></thead><tbody>"; 261 332 for (const s of states) { 262 333 const st = s.status || "pending"; 263 334 const rid = "detail-" + s.did.replace(/[^a-zA-Z0-9]/g, "_"); 264 335 html += '<tr class="clickable" data-did="' + esc(s.did) + '" data-rid="' + rid + '">' 265 336 + "<td>" + esc(s.did) + "</td>" 266 337 + "<td>" + statusDot(st) + "</td>" 267 - + "<td>" + esc(s.lastSyncRev) + "</td>" 268 - + "<td>" + esc(s.rootCid ? s.rootCid.substring(0, 16) + "..." : "-") + "</td>" 269 - + "<td>" + esc(s.lastSyncAt) + "</td>" 270 - + "<td>" + esc(s.lastError || "-") + "</td>" 338 + + "<td>" + timeAgo(s.lastSyncAt) + "</td>" 339 + + "<td>" + esc(s.errorMessage || "-") + "</td>" 271 340 + "</tr>"; 272 - html += '<tr class="detail-row" id="' + rid + '" style="display:none"><td colspan="6"><pre class="loading">Click to load...</pre></td></tr>'; 341 + html += '<tr class="detail-row" id="' + rid + '" style="display:none"><td colspan="4"><div class="detail-inner loading">Click to load...</div></td></tr>'; 273 342 } 274 343 html += "</tbody></table>"; 275 344 el.innerHTML = html; ··· 280 349 const detailRow = document.getElementById(this.dataset.rid); 281 350 if (detailRow.style.display === "none") { 282 351 detailRow.style.display = ""; 283 - const pre = detailRow.querySelector("pre"); 284 - pre.textContent = "Loading..."; 352 + const inner = detailRow.querySelector(".detail-inner"); 353 + inner.innerHTML = "Loading..."; 285 354 try { 286 - const detail = await apiFetch("org.p2pds.admin.getDidStatus", { did: did }); 287 - pre.textContent = JSON.stringify(detail, null, 2); 288 - } catch (e) { pre.textContent = "Error: " + e.message; } 355 + const d = await apiFetch("org.p2pds.admin.getDidStatus", { did: did }); 356 + let h = '<div class="metrics-grid" style="margin-bottom:0.6rem">' 357 + + '<div class="metric-box"><div class="value">' + formatNumber(d.recordCount) + '</div><div class="label">Records</div></div>' 358 + + '<div class="metric-box"><div class="value">' + formatNumber(d.blockCount) + '</div><div class="label">Blocks</div></div>' 359 + + '<div class="metric-box"><div class="value">' + formatNumber(d.blobCount) + '</div><div class="label">Blobs</div></div>' 360 + + '<div class="metric-box"><div class="value">' + formatBytes(d.bytesHeld) + '</div><div class="label">Held</div></div>' 361 + + '</div>'; 362 + var syncs = d.recentSyncs || []; 363 + if (syncs.length > 0) { 364 + h += '<table><thead><tr><th>Time</th><th>Source</th><th>Status</th><th>Blocks+</th><th>Duration</th><th>Bytes</th></tr></thead><tbody>'; 365 + for (var i = 0; i < syncs.length; i++) { 366 + var sy = syncs[i]; 367 + h += '<tr><td>' + timeAgo(sy.startedAt) + '</td>' 368 + + '<td>' + sourceBadge(sy.sourceType) + '</td>' 369 + + '<td>' + esc(sy.status) + '</td>' 370 + + '<td>' + formatNumber(sy.blocksAdded) + '</td>' 371 + + '<td>' + (sy.durationMs != null ? sy.durationMs + 'ms' : '-') + '</td>' 372 + + '<td>' + formatBytes((sy.carBytes || 0) + (sy.blobBytes || 0)) + '</td>' 373 + + '</tr>'; 374 + } 375 + h += '</tbody></table>'; 376 + } 377 + inner.innerHTML = h; 378 + inner.classList.remove("loading"); 379 + } catch (e) { inner.textContent = "Error: " + e.message; } 289 380 } else { 290 381 detailRow.style.display = "none"; 291 382 } ··· 293 384 }); 294 385 } 295 386 387 + function renderSyncHistory(history) { 388 + const el = document.getElementById("sync-history-content"); 389 + var items = (history && history.history) || []; 390 + if (items.length === 0) { el.innerHTML = "No sync events recorded"; return; } 391 + let html = '<table><thead><tr><th>Time</th><th>DID</th><th>Source</th><th>Status</th><th>Blocks+</th><th>Duration</th><th>Bytes</th></tr></thead><tbody>'; 392 + for (var i = 0; i < items.length; i++) { 393 + var s = items[i]; 394 + html += '<tr>' 395 + + '<td>' + timeAgo(s.startedAt) + '</td>' 396 + + '<td>' + esc(s.did) + '</td>' 397 + + '<td>' + sourceBadge(s.sourceType) + '</td>' 398 + + '<td>' + esc(s.status) + '</td>' 399 + + '<td>' + formatNumber(s.blocksAdded) + '</td>' 400 + + '<td>' + (s.durationMs != null ? s.durationMs + 'ms' : '-') + '</td>' 401 + + '<td>' + formatBytes((s.carBytes || 0) + (s.blobBytes || 0)) + '</td>' 402 + + '</tr>'; 403 + } 404 + html += '</tbody></table>'; 405 + el.innerHTML = html; 406 + } 407 + 296 408 function renderNetwork(data) { 297 409 const el = document.getElementById("network-content"); 298 410 el.innerHTML = '<dl class="kv">' ··· 342 454 343 455 async function refresh() { 344 456 try { 345 - const [overview, network, policies] = await Promise.all([ 457 + const [overview, network, policies, syncHistory] = await Promise.all([ 346 458 apiFetch("org.p2pds.admin.getOverview"), 347 459 apiFetch("org.p2pds.admin.getNetworkStatus"), 348 460 apiFetch("org.p2pds.admin.getPolicies"), 461 + apiFetch("org.p2pds.admin.getSyncHistory", { limit: "20" }), 349 462 ]); 350 463 renderOverview(overview); 464 + renderMetrics(overview); 351 465 renderReplication(overview); 466 + renderSyncHistory(syncHistory); 352 467 renderNetwork(network); 353 468 renderPolicies(policies); 354 469 renderVerification(overview); ··· 370 485 </html>`; 371 486 372 487 return c.html(html); 488 + } 489 + 490 + export function getSyncHistory( 491 + c: Context<AuthedAppEnv>, 492 + replicationManager: ReplicationManager | undefined, 493 + ): Response { 494 + if (!replicationManager) { 495 + return c.json( 496 + { error: "ReplicationNotEnabled", message: "Replication is not enabled" }, 497 + 400, 498 + ); 499 + } 500 + 501 + const did = c.req.query("did") || undefined; 502 + const limitStr = c.req.query("limit"); 503 + const limit = limitStr ? Math.min(Math.max(parseInt(limitStr, 10) || 50, 1), 200) : 50; 504 + 505 + const syncStorage = replicationManager.getSyncStorage(); 506 + const history = syncStorage.getSyncHistory(did, limit); 507 + 508 + return c.json({ history }); 373 509 } 374 510 375 511 export function getPolicies(