atproto user agency toolkit for individuals and groups
8
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add sync trigger tracking and dashboard metrics

Track what initiated each sync event (firehose, gossipsub, periodic,
manual, gc, tombstone-recovery, firehose-resync) separately from the
transport source type. Adds trigger column to sync_history with schema
migration, threads trigger through all syncDid() call sites, and
displays colored trigger badges in dashboard tables with per-DID
breakdown summaries.

+101 -24
+5 -5
src/replication/firehose-incremental.test.ts
··· 285 285 286 286 await (replManager as any).handleFirehoseCommit(event); 287 287 288 - expect(syncDidSpy).toHaveBeenCalledWith(trackedDid); 288 + expect(syncDidSpy).toHaveBeenCalledWith(trackedDid, "firehose-resync"); 289 289 syncDidSpy.mockRestore(); 290 290 }); 291 291 ··· 301 301 302 302 await (replManager as any).handleFirehoseCommit(event); 303 303 304 - expect(syncDidSpy).toHaveBeenCalledWith(trackedDid); 304 + expect(syncDidSpy).toHaveBeenCalledWith(trackedDid, "firehose-resync"); 305 305 syncDidSpy.mockRestore(); 306 306 }); 307 307 ··· 315 315 316 316 await (replManager as any).handleFirehoseCommit(event); 317 317 318 - expect(syncDidSpy).toHaveBeenCalledWith(trackedDid); 318 + expect(syncDidSpy).toHaveBeenCalledWith(trackedDid, "firehose-resync"); 319 319 syncDidSpy.mockRestore(); 320 320 }); 321 321 ··· 337 337 await (replManager as any).handleFirehoseCommit(event); 338 338 339 339 // Should have fallen back to full sync 340 - expect(syncDidSpy).toHaveBeenCalledWith(trackedDid); 340 + expect(syncDidSpy).toHaveBeenCalledWith(trackedDid, "firehose-resync"); 341 341 342 342 // Should have logged the gap warning 343 343 expect(consoleSpy).toHaveBeenCalledWith( ··· 391 391 await (replManager as any).handleFirehoseCommit(event); 392 392 393 393 // Should have fallen back to full sync 394 - expect(syncDidSpy).toHaveBeenCalledWith(trackedDid); 394 + expect(syncDidSpy).toHaveBeenCalledWith(trackedDid, "firehose-resync"); 395 395 396 396 // Should have logged the fallback warning 397 397 expect(consoleSpy).toHaveBeenCalledWith(
+13 -12
src/replication/replication-manager.ts
··· 21 21 didToRkey, 22 22 type ManifestRecord, 23 23 type SyncState, 24 + type SyncTrigger, 24 25 type VerificationConfig, 25 26 type LayeredVerificationResult, 26 27 DEFAULT_VERIFICATION_CONFIG, ··· 241 242 this.updateFirehoseDids(); 242 243 243 244 // Trigger initial sync in background (fire-and-forget) 244 - this.syncDid(did).catch((err) => { 245 + this.syncDid(did, "manual").catch((err) => { 245 246 console.error(`[replication] Initial sync for admin-added ${did} failed:`, err); 246 247 }); 247 248 ··· 479 480 } 480 481 481 482 try { 482 - await this.syncDid(did); 483 + await this.syncDid(did, "periodic"); 483 484 // Record successful sync timestamp 484 485 this.lastSyncTimestamps.set(did, Date.now()); 485 486 } catch (err) { ··· 551 552 /** 552 553 * Sync a single DID: fetch repo, store blocks in IPFS, verify, update state. 553 554 */ 554 - async syncDid(did: string): Promise<void> { 555 + async syncDid(did: string, trigger: SyncTrigger = "unknown"): Promise<void> { 555 556 this.syncStorage.updateStatus(did, "syncing"); 556 557 const syncStart = Date.now(); 557 558 let sourceType = "pds"; ··· 663 664 } 664 665 665 666 // Start sync event after source is determined 666 - const syncEventId = this.syncStorage.startSyncEvent(did, sourceType); 667 + const syncEventId = this.syncStorage.startSyncEvent(did, sourceType, trigger); 667 668 668 669 try { 669 670 // 4. Parse CAR and store blocks ··· 887 888 this.syncStorage.updateStatus(state.did, "pending"); 888 889 this.syncStorage.upsertState({ did: state.did, pdsEndpoint }); 889 890 console.log(`[replication] Tombstoned DID ${state.did} is alive again, resuming`); 890 - this.syncDid(state.did).catch((err) => { 891 + this.syncDid(state.did, "tombstone-recovery").catch((err) => { 891 892 const message = err instanceof Error ? err.message : String(err); 892 893 this.syncStorage.updateStatus(state.did, "error", message); 893 894 }); ··· 936 937 for (const did of didsNeedingGc) { 937 938 if (this.stopped) break; 938 939 try { 939 - await this.syncDid(did); 940 + await this.syncDid(did, "gc"); 940 941 this.lastSyncTimestamps.set(did, Date.now()); 941 942 } catch (err) { 942 943 const message = err instanceof Error ? err.message : String(err); ··· 1091 1092 // - blocks are empty (nothing to apply) 1092 1093 if (event.tooBig || event.rebase || event.blocks.length === 0) { 1093 1094 try { 1094 - await this.syncDid(did); 1095 + await this.syncDid(did, "firehose-resync"); 1095 1096 this.lastSyncTimestamps.set(did, Date.now()); 1096 1097 } catch (err) { 1097 1098 const message = err instanceof Error ? err.message : String(err); ··· 1111 1112 `[replication] Gap detected for ${did}: local rev=${state.lastSyncRev}, event.since=${event.since}. Falling back to full sync.`, 1112 1113 ); 1113 1114 try { 1114 - await this.syncDid(did); 1115 + await this.syncDid(did, "firehose-resync"); 1115 1116 this.lastSyncTimestamps.set(did, Date.now()); 1116 1117 } catch (err) { 1117 1118 const message = err instanceof Error ? err.message : String(err); ··· 1132 1133 err instanceof Error ? err.message : String(err), 1133 1134 ); 1134 1135 try { 1135 - await this.syncDid(did); 1136 + await this.syncDid(did, "firehose-resync"); 1136 1137 this.lastSyncTimestamps.set(did, Date.now()); 1137 1138 } catch (syncErr) { 1138 1139 const message = syncErr instanceof Error ? syncErr.message : String(syncErr); ··· 1179 1180 if (state?.status === "tombstoned") { 1180 1181 this.syncStorage.updateStatus(did, "pending"); 1181 1182 console.log(`[replication] Account ${did} re-activated, triggering sync`); 1182 - this.syncDid(did).catch((err) => { 1183 + this.syncDid(did, "tombstone-recovery").catch((err) => { 1183 1184 const message = err instanceof Error ? err.message : String(err); 1184 1185 this.syncStorage.updateStatus(did, "error", message); 1185 1186 }); ··· 1197 1198 event: FirehoseCommitEvent, 1198 1199 ): Promise<void> { 1199 1200 const syncStart = Date.now(); 1200 - const syncEventId = this.syncStorage.startSyncEvent(did, "firehose"); 1201 + const syncEventId = this.syncStorage.startSyncEvent(did, "firehose", "firehose"); 1201 1202 1202 1203 try { 1203 1204 // 1. Parse the CAR bytes from the firehose event ··· 1417 1418 if (state?.lastSyncRev === notification.rev) return; 1418 1419 1419 1420 try { 1420 - await this.syncDid(did); 1421 + await this.syncDid(did, "gossipsub"); 1421 1422 this.lastSyncTimestamps.set(did, Date.now()); 1422 1423 } catch (err) { 1423 1424 const message = err instanceof Error ? err.message : String(err);
+37 -5
src/replication/sync-storage.ts
··· 4 4 */ 5 5 6 6 import type Database from "better-sqlite3"; 7 - import type { SyncState, SyncHistoryRow, AggregateMetrics, DidMetrics } from "./types.js"; 7 + import type { SyncState, SyncHistoryRow, SyncTrigger, AggregateMetrics, DidMetrics } from "./types.js"; 8 8 9 9 export class SyncStorage { 10 10 constructor(private db: Database.Database) {} ··· 90 90 id INTEGER PRIMARY KEY AUTOINCREMENT, 91 91 did TEXT NOT NULL, 92 92 source_type TEXT NOT NULL, 93 + trigger TEXT NOT NULL DEFAULT 'unknown', 93 94 started_at TEXT NOT NULL, 94 95 completed_at TEXT, 95 96 status TEXT NOT NULL DEFAULT 'in_progress', ··· 144 145 if (!columns.some((c) => c.name === "needs_gc")) { 145 146 this.db.exec( 146 147 "ALTER TABLE replication_state ADD COLUMN needs_gc INTEGER NOT NULL DEFAULT 0", 148 + ); 149 + } 150 + 151 + // Migration: add trigger column to sync_history 152 + const syncHistoryCols = this.db 153 + .prepare("PRAGMA table_info(sync_history)") 154 + .all() as Array<{ name: string }>; 155 + if (!syncHistoryCols.some((c) => c.name === "trigger")) { 156 + this.db.exec( 157 + "ALTER TABLE sync_history ADD COLUMN trigger TEXT NOT NULL DEFAULT 'unknown'", 147 158 ); 148 159 } 149 160 } ··· 750 761 /** 751 762 * Start a sync event, returning its ID for later completion. 752 763 */ 753 - startSyncEvent(did: string, sourceType: string): number { 764 + startSyncEvent(did: string, sourceType: string, trigger: SyncTrigger = "unknown"): number { 754 765 const result = this.db 755 766 .prepare( 756 - `INSERT INTO sync_history (did, source_type, started_at, status) 757 - VALUES (?, ?, datetime('now'), 'in_progress')`, 767 + `INSERT INTO sync_history (did, source_type, trigger, started_at, status) 768 + VALUES (?, ?, ?, datetime('now'), 'in_progress')`, 758 769 ) 759 - .run(did, sourceType); 770 + .run(did, sourceType, trigger); 760 771 return Number(result.lastInsertRowid); 761 772 } 762 773 ··· 927 938 batch(entries); 928 939 } 929 940 941 + /** 942 + * Get trigger breakdown for a DID's recent syncs. 943 + * Returns a map of trigger → count for the most recent N sync events. 944 + */ 945 + getTriggerBreakdown(did: string, limit: number = 20): Record<string, number> { 946 + const rows = this.db 947 + .prepare( 948 + `SELECT trigger, COUNT(*) as count FROM ( 949 + SELECT trigger FROM sync_history WHERE did = ? ORDER BY started_at DESC LIMIT ? 950 + ) GROUP BY trigger`, 951 + ) 952 + .all(did, limit) as Array<{ trigger: string; count: number }>; 953 + 954 + const result: Record<string, number> = {}; 955 + for (const row of rows) { 956 + result[row.trigger] = row.count; 957 + } 958 + return result; 959 + } 960 + 930 961 private rowToSyncHistory(row: Record<string, unknown>): SyncHistoryRow { 931 962 return { 932 963 id: row.id as number, 933 964 did: row.did as string, 934 965 sourceType: row.source_type as string, 966 + trigger: (row.trigger as SyncTrigger) ?? "unknown", 935 967 startedAt: row.started_at as string, 936 968 completedAt: (row.completed_at as string) ?? null, 937 969 status: row.status as string,
+12
src/replication/types.ts
··· 101 101 reliabilityThreshold: 0.8, 102 102 }; 103 103 104 + /** What initiated a sync event. */ 105 + export type SyncTrigger = 106 + | "firehose" 107 + | "firehose-resync" 108 + | "gossipsub" 109 + | "periodic" 110 + | "manual" 111 + | "tombstone-recovery" 112 + | "gc" 113 + | "unknown"; 114 + 104 115 /** A row from the sync_history table. */ 105 116 export interface SyncHistoryRow { 106 117 id: number; 107 118 did: string; 108 119 sourceType: string; 120 + trigger: SyncTrigger; 109 121 startedAt: string; 110 122 completedAt: string | null; 111 123 status: string;
+34 -2
src/xrpc/app.ts
··· 99 99 const syncState = syncStorage.getState(did); 100 100 const didMetrics = syncStorage.getDidMetrics(did); 101 101 const peerEndpoints = syncStorage.getPeerEndpoints(did); 102 + const triggerBreakdown = syncStorage.getTriggerBreakdown(did); 102 103 103 104 const verificationResults = replicationManager.getVerificationResults(); 104 105 const verification = verificationResults.get(did) ?? null; ··· 115 116 recordCount: didMetrics.records, 116 117 bytesHeld: didMetrics.bytesHeld, 117 118 recentSyncs: didMetrics.recentSyncs, 119 + triggerBreakdown, 118 120 peerEndpoints, 119 121 verification, 120 122 effectivePolicy, ··· 222 224 .source-pds { background: #dbeafe; color: #1e40af; } 223 225 .source-firehose { background: #fef3c7; color: #92400e; } 224 226 .source-peer_fallback { background: #fce7f3; color: #9d174d; } 227 + .trigger-badge { font-size: 0.65rem; padding: 1px 5px; border-radius: 3px; } 228 + .trigger-firehose { background: #fef3c7; color: #92400e; } 229 + .trigger-firehose-resync { background: #ffedd5; color: #9a3412; } 230 + .trigger-gossipsub { background: #ede9fe; color: #5b21b6; } 231 + .trigger-periodic { background: #dbeafe; color: #1e40af; } 232 + .trigger-manual { background: #d1fae5; color: #065f46; } 233 + .trigger-tombstone-recovery { background: #fce7f3; color: #9d174d; } 234 + .trigger-gc { background: #f3f4f6; color: #374151; } 235 + .trigger-unknown { background: var(--metric-bg); color: var(--muted); } 225 236 .policy-list { list-style: none; font-size: 0.8rem; } 226 237 .policy-list li { padding: 0.25rem 0; border-bottom: 1px solid var(--border); } 227 238 .verify-pass { color: #22c55e; font-weight: 600; } ··· 333 344 .did-source-config { background: #312e81; color: #a5b4fc; } 334 345 .did-source-admin { background: #064e3b; color: #6ee7b7; } 335 346 .did-source-policy { background: #422006; color: #fcd34d; } 347 + .trigger-firehose { background: #422006; color: #fcd34d; } 348 + .trigger-firehose-resync { background: #431407; color: #fdba74; } 349 + .trigger-gossipsub { background: #2e1065; color: #c4b5fd; } 350 + .trigger-periodic { background: #1e3a5f; color: #93c5fd; } 351 + .trigger-manual { background: #064e3b; color: #6ee7b7; } 352 + .trigger-tombstone-recovery { background: #4a1035; color: #f9a8d4; } 353 + .trigger-gc { background: #1f2937; color: #9ca3af; } 336 354 } 337 355 </style> 338 356 </head> ··· 447 465 return '<span class="source-badge ' + cls + '">' + esc(sourceType || "pds") + '</span>'; 448 466 } 449 467 468 + function triggerBadge(trigger) { 469 + var t = trigger || "unknown"; 470 + var cls = "trigger-" + t; 471 + return '<span class="trigger-badge ' + cls + '">' + esc(t) + '</span>'; 472 + } 473 + 450 474 function didSourceBadge(source) { 451 475 var cls = "did-source did-source-" + (source || "unknown"); 452 476 return '<span class="' + cls + '">' + esc(source || "unknown") + '</span>'; ··· 582 606 + '<div class="metric-box"><div class="value">' + formatNumber(d.blobCount) + '</div><div class="label">Blobs</div></div>' 583 607 + '<div class="metric-box"><div class="value">' + formatBytes(d.bytesHeld) + '</div><div class="label">Held</div></div>' 584 608 + '</div>'; 609 + var tb = d.triggerBreakdown || {}; 610 + var tbParts = []; 611 + for (var tk in tb) { if (tb.hasOwnProperty(tk)) tbParts.push(triggerBadge(tk) + ' \\u00d7' + tb[tk]); } 612 + if (tbParts.length > 0) { 613 + h += '<div style="margin-bottom:0.4rem;font-size:0.75rem;color:var(--muted)">Last 20 syncs: ' + tbParts.join(', ') + '</div>'; 614 + } 585 615 var syncs = d.recentSyncs || []; 586 616 if (syncs.length > 0) { 587 - h += '<table><thead><tr><th>Time</th><th>Source</th><th>Status</th><th>Blocks+</th><th>Duration</th><th>Bytes</th></tr></thead><tbody>'; 617 + h += '<table><thead><tr><th>Time</th><th>Source</th><th>Trigger</th><th>Status</th><th>Blocks+</th><th>Duration</th><th>Bytes</th></tr></thead><tbody>'; 588 618 for (var i = 0; i < syncs.length; i++) { 589 619 var sy = syncs[i]; 590 620 h += '<tr><td>' + timeAgo(sy.startedAt) + '</td>' 591 621 + '<td>' + sourceBadge(sy.sourceType) + '</td>' 622 + + '<td>' + triggerBadge(sy.trigger) + '</td>' 592 623 + '<td>' + esc(sy.status) + '</td>' 593 624 + '<td>' + formatNumber(sy.blocksAdded) + '</td>' 594 625 + '<td>' + (sy.durationMs != null ? sy.durationMs + 'ms' : '-') + '</td>' ··· 626 657 const el = document.getElementById("sync-history-content"); 627 658 var items = (history && history.history) || []; 628 659 if (items.length === 0) { el.innerHTML = "No sync events recorded"; return; } 629 - let html = '<table><thead><tr><th>Time</th><th>DID</th><th>Source</th><th>Status</th><th>Blocks+</th><th>Duration</th><th>Bytes</th></tr></thead><tbody>'; 660 + let html = '<table><thead><tr><th>Time</th><th>DID</th><th>Source</th><th>Trigger</th><th>Status</th><th>Blocks+</th><th>Duration</th><th>Bytes</th></tr></thead><tbody>'; 630 661 for (var i = 0; i < items.length; i++) { 631 662 var s = items[i]; 632 663 html += '<tr>' 633 664 + '<td>' + timeAgo(s.startedAt) + '</td>' 634 665 + '<td>' + esc(s.did) + '</td>' 635 666 + '<td>' + sourceBadge(s.sourceType) + '</td>' 667 + + '<td>' + triggerBadge(s.trigger) + '</td>' 636 668 + '<td>' + esc(s.status) + '</td>' 637 669 + '<td>' + formatNumber(s.blocksAdded) + '</td>' 638 670 + '<td>' + (s.durationMs != null ? s.durationMs + 'ms' : '-') + '</td>'