Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

even more backfill tweaks

it's done. maybe we won't do it again. that would be nice.

phil b59da61b af662ead

+46 -38
+24
ufos/src/lib.rs
··· 20 20 use serde_json::value::RawValue; 21 21 use sha2::Sha256; 22 22 use std::collections::HashMap; 23 + use std::time::Duration; 23 24 24 25 fn did_element(sketch_secret: &SketchSecretPrefix, did: &Did) -> Element<14> { 25 26 Element::from_digest_with_prefix::<Sha256>(sketch_secret, did.as_bytes()) 27 + } 28 + 29 + pub fn nice_duration(dt: Duration) -> String { 30 + let secs = dt.as_secs_f64(); 31 + if secs < 1. { 32 + return format!("{:.0}ms", secs * 1000.); 33 + } 34 + if secs < 60. { 35 + return format!("{secs:.02}s"); 36 + } 37 + let mins = (secs / 60.).floor(); 38 + let rsecs = secs - (mins * 60.); 39 + if mins < 60. { 40 + return format!("{mins:.0}m{rsecs:.0}s"); 41 + } 42 + let hrs = (mins / 60.).floor(); 43 + let rmins = mins - (hrs * 60.); 44 + if hrs < 24. { 45 + return format!("{hrs:.0}h{rmins:.0}m{rsecs:.0}s"); 46 + } 47 + let days = (hrs / 24.).floor(); 48 + let rhrs = hrs - (days * 24.); 49 + format!("{days:.0}d{rhrs:.0}h{rmins:.0}m{rsecs:.0}s") 26 50 } 27 51 28 52 #[derive(Debug, Default, Clone)]
+1 -24
ufos/src/main.rs
··· 9 9 use ufos::storage_fjall::FjallStorage; 10 10 use ufos::storage_mem::MemStorage; 11 11 use ufos::store_types::SketchSecretPrefix; 12 - use ufos::ConsumerInfo; 12 + use ufos::{nice_duration, ConsumerInfo}; 13 13 14 14 #[cfg(not(target_env = "msvc"))] 15 15 use tikv_jemallocator::Jemalloc; ··· 210 210 started_at: SystemTime, 211 211 now: SystemTime, 212 212 ) { 213 - let nice_duration = |dt: Duration| { 214 - let secs = dt.as_secs_f64(); 215 - if secs < 1. { 216 - return format!("{:.0}ms", secs * 1000.); 217 - } 218 - if secs < 60. { 219 - return format!("{secs:.02}s"); 220 - } 221 - let mins = (secs / 60.).floor(); 222 - let rsecs = secs - (mins * 60.); 223 - if mins < 60. { 224 - return format!("{mins:.0}m{rsecs:.0}s"); 225 - } 226 - let hrs = (mins / 60.).floor(); 227 - let rmins = mins - (hrs * 60.); 228 - if hrs < 24. { 229 - return format!("{hrs:.0}h{rmins:.0}m{rsecs:.0}s"); 230 - } 231 - let days = (hrs / 24.).floor(); 232 - let rhrs = hrs - (days * 24.); 233 - format!("{days:.0}d{rhrs:.0}h{rmins:.0}m{rsecs:.0}s") 234 - }; 235 - 236 213 let nice_dt_two_maybes = |earlier: Option<Cursor>, later: Option<Cursor>| match (earlier, later) 237 214 { 238 215 (Some(earlier), Some(later)) => match later.duration_since(&earlier) {
+21 -14
ufos/src/storage_fjall.rs
··· 13 13 WEEK_IN_MICROS, 14 14 }; 15 15 use crate::{ 16 - CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, OrderCollectionsBy, UFOsRecord, 16 + nice_duration, CommitAction, ConsumerInfo, Did, EventBatch, Nsid, NsidCount, 17 + OrderCollectionsBy, UFOsRecord, 17 18 }; 18 19 use async_trait::async_trait; 19 20 use fjall::{ ··· 1241 1242 1242 1243 let mut live_records_found = 0; 1243 1244 let mut candidate_new_feed_lower_cursor = None; 1244 - let mut ended_early = false; 1245 + let ended_early = false; 1246 + let mut current_cursor: Option<Cursor> = None; 1245 1247 for (i, kv) in self.feeds.range(live_range).rev().enumerate() { 1246 1248 if i > 0 && i % 500_000 == 0 { 1247 - log::info!("trim: at {i} for {:?}", collection.to_string()); 1248 - } 1249 - if !full_scan && i > 10_000_000 { 1250 1249 log::info!( 1251 - "stopping trim early for {:?}: already scanned 10M elements", 1252 - collection.to_string() 1250 + "trim: at {i} for {:?} (now at {})", 1251 + collection.to_string(), 1252 + current_cursor 1253 + .map(|c| c 1254 + .elapsed() 1255 + .map(nice_duration) 1256 + .unwrap_or("[not past]".into())) 1257 + .unwrap_or("??".into()), 1253 1258 ); 1254 - ended_early = true; 1255 - break; 1256 1259 } 1257 1260 let (key_bytes, val_bytes) = kv?; 1258 1261 let feed_key = db_complete::<NsidRecordFeedKey>(&key_bytes)?; ··· 1268 1271 }; 1269 1272 1270 1273 let (meta, _) = RecordLocationMeta::from_db_bytes(&location_val_bytes)?; 1274 + current_cursor = Some(meta.cursor()); 1271 1275 1272 1276 if meta.cursor() != feed_key.cursor() { 1273 1277 // older/different version ··· 1335 1339 async fn run(mut self, backfill: bool) -> StorageResult<()> { 1336 1340 let mut dirty_nsids = HashSet::new(); 1337 1341 1342 + // backfill condition here is iffy -- longer is good when doing the main ingest and then collection trims 1343 + // shorter once those are done helps things catch up 1344 + // the best setting for non-backfill is non-obvious.. it can be pretty slow and still be fine 1338 1345 let mut rollup = 1339 - tokio::time::interval(Duration::from_micros(if backfill { 1_000 } else { 81_000 })); 1340 - rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 1346 + tokio::time::interval(Duration::from_micros(if backfill { 100 } else { 32_000 })); 1347 + rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); 1341 1348 1342 - let mut trim = 1343 - tokio::time::interval(Duration::from_millis(if backfill { 500 } else { 6_000 })); 1349 + // backfill condition again iffy. collection trims should probably happen in their own phase. 1350 + let mut trim = tokio::time::interval(Duration::from_secs(if backfill { 18 } else { 9 })); 1344 1351 trim.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 1345 1352 1346 1353 loop { ··· 1370 1377 completed.insert(collection.clone()); 1371 1378 } 1372 1379 if total_deleted > 10_000_000 { 1373 - log::info!("trim stopped early, more than 100M records already deleted."); 1380 + log::info!("trim stopped early, more than 10M records already deleted."); 1374 1381 break; 1375 1382 } 1376 1383 }