lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

admin page fixups

phil aca6a616 d2854cb3

+123 -61
+8 -4
src/server/admin.rs
··· 24 24 first_startup_ms: u64, 25 25 startup_count: u64, 26 26 repos_queued_total: u64, 27 - collection_births_total: u64, 28 - collection_deaths_total: u64, 27 + collection_births_firehose: u64, 28 + collection_deaths_firehose: u64, 29 + collection_births_resync: u64, 30 + collection_deaths_resync: u64, 29 31 resyncs_completed_total: u64, 30 32 resync_queue_depth: i64, 31 33 resync_buffer_count: i64, ··· 106 108 first_startup_ms: s.first_startup_ms.load(Ordering::Relaxed), 107 109 startup_count: s.startup_count.load(Ordering::Relaxed), 108 110 repos_queued_total: s.repos_queued_total.load(Ordering::Relaxed), 109 - collection_births_total: s.collection_births_total.load(Ordering::Relaxed), 110 - collection_deaths_total: s.collection_deaths_total.load(Ordering::Relaxed), 111 + collection_births_firehose: s.collection_births_firehose.load(Ordering::Relaxed), 112 + collection_deaths_firehose: s.collection_deaths_firehose.load(Ordering::Relaxed), 113 + collection_births_resync: s.collection_births_resync.load(Ordering::Relaxed), 114 + collection_deaths_resync: s.collection_deaths_resync.load(Ordering::Relaxed), 111 115 resyncs_completed_total: s.resyncs_completed_total.load(Ordering::Relaxed), 112 116 resync_queue_depth: s.resync_queue_depth.load(Ordering::Relaxed), 113 117 resync_buffer_count: s.resync_buffer_count.load(Ordering::Relaxed),
+63 -36
src/server/admin_page.rs
··· 51 51 .dot.on{background:#4caf50} 52 52 .empty{color:#bbb;font-style:italic} 53 53 .sec-head{margin-top:0.6rem;font-size:0.75rem;color:#999;text-transform:uppercase;letter-spacing:0.04em;padding-bottom:0.2rem;border-bottom:1px solid #eee} 54 + details summary{cursor:pointer;font-size:0.82rem;color:#555} 55 + details summary:hover{color:#222} 56 + details[open] summary{margin-bottom:0.3rem} 57 + details table{margin-top:0.15rem} 58 + .detail-paths{padding:0.2rem 0 0.2rem 1rem;font-size:0.78rem;color:#666} 59 + .detail-paths div{padding:0.1rem 0;font-family:ui-monospace,'SF Mono',monospace} 54 60 </style> 55 61 </head> 56 62 <body> ··· 61 67 <div class="grid" id="g"></div> 62 68 <script> 63 69 var $=function(s){return document.getElementById(s)}; 64 - var N=function(n){return typeof n==='number'?n.toLocaleString():'—'}; 70 + var N=function(n){return typeof n==='number'?n.toLocaleString():'\u2014'}; 65 71 66 72 function dur(ms){ 67 73 var s=Math.floor(ms/1000),d=Math.floor(s/86400),h=Math.floor(s%86400/3600),m=Math.floor(s%3600/60); ··· 82 88 function render(d){ 83 89 var now=Date.now(); 84 90 $('info').textContent=d.first_startup_ms 85 - ?'up '+dur(now-d.first_startup_ms)+' \u00b7 '+d.startup_count+' start'+(d.startup_count!==1?'s':'') 91 + ?'first started '+dur(now-d.first_startup_ms)+' ago \u00b7 '+d.startup_count+' start'+(d.startup_count!==1?'s':'') 86 92 :''; 87 93 88 94 document.title='lightrail \u00b7 Q:'+N(d.resync_queue_depth); 89 95 90 96 var h=''; 91 97 92 - /* Backfill */ 98 + /* Indexing */ 93 99 var bfBadge=d.upstream_backfill_complete 94 100 ?'<span class="badge bg">complete</span>' 95 101 :'<span class="badge by">in progress</span>'; 96 - var bf=row('Status',bfBadge)+row('Repos queued',N(d.repos_queued_total)); 97 - if(d.upstream_backfill_completed_at)bf+=row('Completed',new Date(d.upstream_backfill_completed_at).toLocaleString()); 98 - h+=card('Backfill',bf); 102 + var idx=row('Backfill',bfBadge) 103 + +row('Repos found',N(d.repos_queued_total)) 104 + +row('Resyncs completed',N(d.resyncs_completed_total)) 105 + +row('Resync queue depth',N(d.resync_queue_depth)) 106 + +row('Replay buffer',N(d.resync_buffer_count)); 107 + if(d.upstream_backfill_completed_at)idx+=row('Backfill completed',new Date(d.upstream_backfill_completed_at).toLocaleString()); 108 + h+=card('Indexing',idx); 99 109 100 - /* Pipeline */ 101 - h+=card('Pipeline', 102 - row('Resyncs completed',N(d.resyncs_completed_total)) 103 - +row('Collection births',N(d.collection_births_total)) 104 - +row('Collection deaths',N(d.collection_deaths_total)) 105 - +row('Queue depth',N(d.resync_queue_depth)) 106 - +row('Buffer count',N(d.resync_buffer_count)) 110 + /* Collections */ 111 + var birthTotal=d.collection_births_firehose+d.collection_births_resync; 112 + var deathTotal=d.collection_deaths_firehose+d.collection_deaths_resync; 113 + h+=card('Collections', 114 + row('Distinct (estimate)',N(d.distinct_collections)) 115 + +row('Births',N(birthTotal) 116 + +' <span style="font-weight:400;color:#888;font-size:0.78rem">(firehose\u00a0'+N(d.collection_births_firehose)+' \u00b7 resync\u00a0'+N(d.collection_births_resync)+')</span>') 117 + +row('Deaths',N(deathTotal) 118 + +' <span style="font-weight:400;color:#888;font-size:0.78rem">(firehose\u00a0'+N(d.collection_deaths_firehose)+' \u00b7 resync\u00a0'+N(d.collection_deaths_resync)+')</span>') 107 119 ); 108 120 109 - /* Cardinality */ 110 - var aMax=Math.max(d.distinct_accounts_all,1); 111 - var pct=function(v){return Math.min(100,v/aMax*100)}; 112 - h+=card('Cardinality Estimates', 113 - row('Collections',N(d.distinct_collections)) 114 - +row('PDS hosts',N(d.distinct_pds_hosts)) 115 - +'<div style="margin-top:0.5rem">' 116 - +barRow('All accounts',d.distinct_accounts_all,pct(d.distinct_accounts_all),'#64b5f6') 117 - +barRow('Resynced',d.distinct_accounts_resynced,pct(d.distinct_accounts_resynced),'#81c784') 118 - +barRow('Commit (strict)',d.distinct_accounts_commit_strict,pct(d.distinct_accounts_commit_strict),'#4a90d9') 119 - +barRow('Commit (lenient)',d.distinct_accounts_commit_lenient,pct(d.distinct_accounts_commit_lenient),'#ffb74d') 120 - +barRow('Desynced',d.distinct_accounts_desynced,pct(d.distinct_accounts_desynced),'#e57373') 121 - +'</div>' 121 + /* Accounts — sync group */ 122 + var syncMax=Math.max(d.distinct_accounts_all,1); 123 + var syncPct=function(v){return Math.min(100,v/syncMax*100)}; 124 + h+=card('Accounts', 125 + '<div class="sec-head">Synced</div>' 126 + +barRow('All synced accounts',d.distinct_accounts_all,syncPct(d.distinct_accounts_all),'#64b5f6') 127 + +barRow('Resynced (full fetch)',d.distinct_accounts_resynced,syncPct(d.distinct_accounts_resynced),'#81c784') 128 + +'<div class="sec-head" style="margin-top:0.6rem">Firehose</div>' 129 + +(function(){ 130 + var fhMax=Math.max(d.distinct_accounts_commit_strict,d.distinct_accounts_commit_lenient,d.distinct_accounts_desynced,1); 131 + var fhPct=function(v){return Math.min(100,v/fhMax*100)}; 132 + return barRow('Events processed (strict)',d.distinct_accounts_commit_strict,fhPct(d.distinct_accounts_commit_strict),'#4a90d9') 133 + +barRow('Events processed (lenient)',d.distinct_accounts_commit_lenient,fhPct(d.distinct_accounts_commit_lenient),'#ffb74d') 134 + +barRow('Became desynced',d.distinct_accounts_desynced,fhPct(d.distinct_accounts_desynced),'#e57373'); 135 + })() 136 + +row('PDS hosts seen (firehose)','~'+N(d.distinct_pds_hosts)) 122 137 ); 123 138 124 139 /* Dispatcher */ 125 140 if(d.dispatcher){ 126 141 var dp=d.dispatcher; 127 142 var body=row('Workers',N(dp.worker_count)) 128 - +row('Busy',N(dp.busy.length)) 129 143 +row('Cooling',N(dp.cooling.length)); 130 144 145 + if(dp.busy.length>0){ 146 + body+='<details><summary>Busy accounts ('+dp.busy.length+')</summary>'; 147 + body+='<table><tr><th>DID</th><th style="text-align:right">Host</th></tr>'; 148 + dp.busy.forEach(function(e){ 149 + body+='<tr><td>'+esc(e.did)+'</td><td class="num" style="font-family:ui-monospace,monospace;font-size:0.78rem">'+esc(e.host)+'</td></tr>'; 150 + }); 151 + body+='</table></details>'; 152 + }else{ 153 + body+=row('Busy','0'); 154 + } 155 + 131 156 if(dp.hosts.length>0){ 132 - body+='<div class="sec-head">Active hosts</div>'; 157 + body+='<div class="sec-head">Active hosts ('+dp.hosts.length+')</div>'; 133 158 body+='<table><tr><th>Host</th><th style="text-align:right">Workers</th></tr>'; 134 159 dp.hosts.slice(0,20).forEach(function(e){ 135 160 body+='<tr><td>'+esc(e.host)+'</td><td class="num">'+e.workers+'</td></tr>'; 136 161 }); 137 - if(dp.hosts.length>20)body+='<tr><td colspan="2" class="empty">+' +(dp.hosts.length-20)+' more</td></tr>'; 162 + if(dp.hosts.length>20)body+='<tr><td colspan="2" class="empty">+'+(dp.hosts.length-20)+' more</td></tr>'; 138 163 body+='</table>'; 139 164 } 140 165 ··· 159 184 var tb=Object.values(b[1]).reduce(function(s,n){return s+n},0); 160 185 return tb-ta; 161 186 }); 162 - var body='<table><tr><th>Host</th><th style="text-align:right">Blocked reqs</th></tr>'; 163 - hosts.slice(0,20).forEach(function(pair){ 164 - var total=Object.values(pair[1]).reduce(function(s,n){return s+n},0); 165 - body+='<tr><td>'+esc(pair[0])+'</td><td class="num">'+N(total)+'</td></tr>'; 187 + var body=''; 188 + hosts.forEach(function(pair){ 189 + var paths=Object.entries(pair[1]).sort(function(a,b){return b[1]-a[1]}); 190 + var total=paths.reduce(function(s,p){return s+p[1]},0); 191 + body+='<details><summary>'+esc(pair[0])+' \u2014 '+N(total)+' blocked</summary>'; 192 + body+='<div class="detail-paths">'; 193 + paths.forEach(function(p){body+='<div>'+esc(p[0])+': '+N(p[1])+'</div>'}); 194 + body+='</div></details>'; 166 195 }); 167 - if(hosts.length>20)body+='<tr><td colspan="2" class="empty">+'+(hosts.length-20)+' more</td></tr>'; 168 - body+='</table>'; 169 - h+=card('Throttled Hosts',body); 196 + h+=card('Throttled Hosts (' + Object.keys(d.throttled_hosts).length + ')',body); 170 197 } 171 198 } 172 199
+40 -16
src/storage/meta.rs
··· 25 25 pub startup_count: AtomicU64, 26 26 /// New repos ever enqueued for resync. 27 27 pub repos_queued_total: AtomicU64, 28 - pub collection_births_total: AtomicU64, 29 - pub collection_deaths_total: AtomicU64, 28 + pub collection_births_firehose: AtomicU64, 29 + pub collection_deaths_firehose: AtomicU64, 30 + /// Collection births discovered during resyncs (not included in _total). 31 + pub collection_births_resync: AtomicU64, 32 + /// Collection deaths discovered during resyncs (not included in _total). 33 + pub collection_deaths_resync: AtomicU64, 30 34 /// Successful full resyncs completed. 31 35 pub resyncs_completed_total: AtomicU64, 32 36 /// Approximate queue depth; may drift slightly after a crash. ··· 57 61 first_startup_ms: AtomicU64::new(0), 58 62 startup_count: AtomicU64::new(0), 59 63 repos_queued_total: AtomicU64::new(0), 60 - collection_births_total: AtomicU64::new(0), 61 - collection_deaths_total: AtomicU64::new(0), 64 + collection_births_firehose: AtomicU64::new(0), 65 + collection_deaths_firehose: AtomicU64::new(0), 66 + collection_births_resync: AtomicU64::new(0), 67 + collection_deaths_resync: AtomicU64::new(0), 62 68 resyncs_completed_total: AtomicU64::new(0), 63 69 resync_queue_depth: AtomicI64::new(0), 64 70 resync_buffer_count: AtomicI64::new(0), ··· 90 96 const K_FIRST_STARTUP: &[u8] = b"first_startup"; 91 97 const K_STARTUP_COUNT: &[u8] = b"startup_count"; 92 98 const K_REPOS_QUEUED: &[u8] = b"repos_queued_total"; 93 - const K_BIRTHS: &[u8] = b"births_total"; 94 - const K_DEATHS: &[u8] = b"deaths_total"; 99 + const K_BIRTHS_FIREHOSE: &[u8] = b"births_firehose"; 100 + const K_DEATHS_FIREHOSE: &[u8] = b"deaths_firehose"; 101 + const K_BIRTHS_RESYNC: &[u8] = b"births_resync"; 102 + const K_DEATHS_RESYNC: &[u8] = b"deaths_resync"; 95 103 const K_RESYNCS_DONE: &[u8] = b"resyncs_completed_total"; 96 104 const K_QUEUE_DEPTH: &[u8] = b"queue_depth"; 97 105 const K_BUFFER_COUNT: &[u8] = b"buffer_count"; ··· 186 194 first_startup_ms: AtomicU64::new(read_u64(ks, K_FIRST_STARTUP)?), 187 195 startup_count: AtomicU64::new(read_u64(ks, K_STARTUP_COUNT)?), 188 196 repos_queued_total: AtomicU64::new(read_u64(ks, K_REPOS_QUEUED)?), 189 - collection_births_total: AtomicU64::new(read_u64(ks, K_BIRTHS)?), 190 - collection_deaths_total: AtomicU64::new(read_u64(ks, K_DEATHS)?), 197 + collection_births_firehose: AtomicU64::new(read_u64(ks, K_BIRTHS_FIREHOSE)?), 198 + collection_deaths_firehose: AtomicU64::new(read_u64(ks, K_DEATHS_FIREHOSE)?), 199 + collection_births_resync: AtomicU64::new(read_u64(ks, K_BIRTHS_RESYNC)?), 200 + collection_deaths_resync: AtomicU64::new(read_u64(ks, K_DEATHS_RESYNC)?), 191 201 resyncs_completed_total: AtomicU64::new(read_u64(ks, K_RESYNCS_DONE)?), 192 202 resync_queue_depth: AtomicI64::new(read_i64(ks, K_QUEUE_DEPTH)?), 193 203 resync_buffer_count: AtomicI64::new(read_i64(ks, K_BUFFER_COUNT)?), ··· 232 242 )?; 233 243 write_u64( 234 244 ks, 235 - K_BIRTHS, 236 - s.collection_births_total.load(Ordering::Relaxed), 245 + K_BIRTHS_FIREHOSE, 246 + s.collection_births_firehose.load(Ordering::Relaxed), 237 247 )?; 238 248 write_u64( 239 249 ks, 240 - K_DEATHS, 241 - s.collection_deaths_total.load(Ordering::Relaxed), 250 + K_DEATHS_FIREHOSE, 251 + s.collection_deaths_firehose.load(Ordering::Relaxed), 252 + )?; 253 + write_u64( 254 + ks, 255 + K_BIRTHS_RESYNC, 256 + s.collection_births_resync.load(Ordering::Relaxed), 257 + )?; 258 + write_u64( 259 + ks, 260 + K_DEATHS_RESYNC, 261 + s.collection_deaths_resync.load(Ordering::Relaxed), 242 262 )?; 243 263 write_u64( 244 264 ks, ··· 303 323 let db = open_temporary().unwrap(); 304 324 305 325 db.stats.repos_queued_total.store(42, Ordering::Relaxed); 306 - db.stats.collection_births_total.store(7, Ordering::Relaxed); 307 - db.stats.collection_deaths_total.store(3, Ordering::Relaxed); 326 + db.stats 327 + .collection_births_firehose 328 + .store(7, Ordering::Relaxed); 329 + db.stats 330 + .collection_deaths_firehose 331 + .store(3, Ordering::Relaxed); 308 332 db.stats 309 333 .resyncs_completed_total 310 334 .store(100, Ordering::Relaxed); ··· 314 338 315 339 let s2 = load(&db.ks).unwrap(); 316 340 assert_eq!(s2.repos_queued_total.load(Ordering::Relaxed), 42); 317 - assert_eq!(s2.collection_births_total.load(Ordering::Relaxed), 7); 318 - assert_eq!(s2.collection_deaths_total.load(Ordering::Relaxed), 3); 341 + assert_eq!(s2.collection_births_firehose.load(Ordering::Relaxed), 7); 342 + assert_eq!(s2.collection_deaths_firehose.load(Ordering::Relaxed), 3); 319 343 assert_eq!(s2.resyncs_completed_total.load(Ordering::Relaxed), 100); 320 344 assert_eq!(s2.resync_queue_depth.load(Ordering::Relaxed), -5); 321 345 }
+2 -2
src/sync/firehose/commit_event.rs
··· 539 539 540 540 if n_born > 0 { 541 541 db.stats 542 - .collection_births_total 542 + .collection_births_firehose 543 543 .fetch_add(n_born, Ordering::Relaxed); 544 544 metrics::counter!("lightrail_collection_births_total", "source" => "firehose") 545 545 .increment(n_born); 546 546 } 547 547 if n_died > 0 { 548 548 db.stats 549 - .collection_deaths_total 549 + .collection_deaths_firehose 550 550 .fetch_add(n_died, Ordering::Relaxed); 551 551 metrics::counter!("lightrail_collection_deaths_total", "source" => "firehose") 552 552 .increment(n_died);
+10 -3
src/sync/resync/mod.rs
··· 21 21 22 22 use std::collections::BTreeSet; 23 23 use std::sync::Arc; 24 + use std::sync::atomic::Ordering; 24 25 use std::time::Duration; 25 26 26 27 use cid::Cid as RawCid; ··· 174 175 Err(e) => return Err(ResyncError::Fetch(e)), 175 176 }; 176 177 177 - let db = db.clone(); 178 + let db_w = db.clone(); 178 179 let collections = repo_snapshot.collections; 179 180 let rev = repo_snapshot.rev; 180 181 let prev_data = repo_snapshot.data.to_bytes(); 181 182 let (n_inserted, n_removed) = tokio::task::spawn_blocking(move || { 182 - crate::storage::repo::put_prev(&db, &did, &RepoPrev { rev, prev_data })?; 183 - crate::storage::collection_index::sync_collections(&db, &did, &collections) 183 + crate::storage::repo::put_prev(&db_w, &did, &RepoPrev { rev, prev_data })?; 184 + crate::storage::collection_index::sync_collections(&db_w, &did, &collections) 184 185 }) 185 186 .await 186 187 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 187 188 if n_inserted > 0 { 189 + db.stats 190 + .collection_births_resync 191 + .fetch_add(n_inserted as u64, Ordering::Relaxed); 188 192 metrics::counter!("lightrail_collection_births_total", "source" => "resync") 189 193 .increment(n_inserted as u64); 190 194 } 191 195 if n_removed > 0 { 196 + db.stats 197 + .collection_deaths_resync 198 + .fetch_add(n_removed as u64, Ordering::Relaxed); 192 199 metrics::counter!("lightrail_collection_deaths_total", "source" => "resync") 193 200 .increment(n_removed as u64); 194 201 }