Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

collection trim: just directly delete

batching doesn't buy anything since we have to commit intermediate batches anyway in case they are too big.

seems like ~same performance, but this is a little less code and doesn't lose progress if interrupted.

phil af662ead 7874c9ba

+34 -29
+1 -1
ufos/src/storage.rs
··· 58 58 collection: &Nsid, 59 59 limit: usize, 60 60 full_scan: bool, 61 - ) -> StorageResult<(usize, usize)>; 61 + ) -> StorageResult<(usize, usize, bool)>; 62 62 63 63 fn delete_account(&mut self, did: &Did) -> StorageResult<usize>; 64 64 }
+31 -26
ufos/src/storage_fjall.rs
··· 30 30 }; 31 31 use std::time::{Duration, Instant, SystemTime}; 32 32 33 - const MAX_BATCHED_CLEANUP_SIZE: usize = 1024; // try to commit progress for longer feeds 34 33 const MAX_BATCHED_ACCOUNT_DELETE_RECORDS: usize = 1024; 35 34 const MAX_BATCHED_ROLLUP_COUNTS: usize = 256; 36 35 ··· 1220 1219 collection: &Nsid, 1221 1220 limit: usize, 1222 1221 full_scan: bool, 1223 - ) -> StorageResult<(usize, usize)> { 1222 + ) -> StorageResult<(usize, usize, bool)> { 1224 1223 let mut dangling_feed_keys_cleaned = 0; 1225 1224 let mut records_deleted = 0; 1226 1225 ··· 1243 1242 let mut live_records_found = 0; 1244 1243 let mut candidate_new_feed_lower_cursor = None; 1245 1244 let mut ended_early = false; 1246 - let mut batch = self.keyspace.batch(); 1247 1245 for (i, kv) in self.feeds.range(live_range).rev().enumerate() { 1246 + if i > 0 && i % 500_000 == 0 { 1247 + log::info!("trim: at {i} for {:?}", collection.to_string()); 1248 + } 1248 1249 if !full_scan && i > 10_000_000 { 1249 - log::info!("stopping collection trim early: already scanned 10M elements"); 1250 + log::info!( 1251 + "stopping trim early for {:?}: already scanned 10M elements", 1252 + collection.to_string() 1253 + ); 1250 1254 ended_early = true; 1251 1255 break; 1252 1256 } ··· 1258 1262 1259 1263 let Some(location_val_bytes) = self.records.get(&location_key_bytes)? else { 1260 1264 // record was deleted (hopefully) 1261 - batch.remove(&self.feeds, &*key_bytes); 1265 + self.feeds.remove(&*key_bytes)?; 1262 1266 dangling_feed_keys_cleaned += 1; 1263 1267 continue; 1264 1268 }; ··· 1267 1271 1268 1272 if meta.cursor() != feed_key.cursor() { 1269 1273 // older/different version 1270 - batch.remove(&self.feeds, &*key_bytes); 1274 + self.feeds.remove(&*key_bytes)?; 1271 1275 dangling_feed_keys_cleaned += 1; 1272 1276 continue; 1273 1277 } 1274 1278 if meta.rev != feed_val.rev() { 1275 1279 // weird... 1276 1280 log::warn!("record lookup: cursor match but rev did not...? removing."); 1277 - batch.remove(&self.feeds, &*key_bytes); 1278 - batch.remove(&self.records, &location_key_bytes); 1281 + self.records.remove(&location_key_bytes)?; 1282 + self.feeds.remove(&*key_bytes)?; 1279 1283 dangling_feed_keys_cleaned += 1; 1280 1284 continue; 1281 1285 } 1282 1286 1283 - if batch.len() >= MAX_BATCHED_CLEANUP_SIZE { 1284 - batch.commit()?; 1285 - batch = self.keyspace.batch(); 1286 - } 1287 - 1288 1287 live_records_found += 1; 1289 1288 if live_records_found <= limit { 1290 1289 continue; ··· 1293 1292 candidate_new_feed_lower_cursor = Some(feed_key.cursor()); 1294 1293 } 1295 1294 1296 - batch.remove(&self.feeds, key_bytes); 1297 - batch.remove(&self.records, &location_key_bytes); 1295 + self.feeds.remove(&location_key_bytes)?; 1296 + self.feeds.remove(key_bytes)?; 1298 1297 records_deleted += 1; 1299 1298 } 1300 1299 1301 1300 if !ended_early { 1302 1301 if let Some(new_cursor) = candidate_new_feed_lower_cursor { 1303 - batch.insert( 1304 - &self.global, 1302 + self.global.insert( 1305 1303 &TrimCollectionCursorKey::new(collection.clone()).to_db_bytes()?, 1306 1304 &new_cursor.to_db_bytes()?, 1307 - ); 1305 + )?; 1308 1306 } 1309 1307 } 1310 1308 1311 - batch.commit()?; 1312 - 1313 1309 log::trace!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records (ended early? {ended_early})"); 1314 - Ok((dangling_feed_keys_cleaned, records_deleted)) 1310 + Ok((dangling_feed_keys_cleaned, records_deleted, ended_early)) 1315 1311 } 1316 1312 1317 1313 fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> { ··· 1340 1336 let mut dirty_nsids = HashSet::new(); 1341 1337 1342 1338 let mut rollup = 1343 - tokio::time::interval(Duration::from_micros(if backfill { 100 } else { 81_000 })); 1339 + tokio::time::interval(Duration::from_micros(if backfill { 1_000 } else { 81_000 })); 1344 1340 rollup.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); 1345 1341 1346 1342 let mut trim = ··· 1350 1346 loop { 1351 1347 tokio::select! { 1352 1348 _ = rollup.tick() => { 1353 - let (n, dirty) = tokio::task::block_in_place(|| self.0.step_rollup())?; 1349 + let mut db = self.0.clone(); 1350 + let (n, dirty) = tokio::task::spawn_blocking(move || db.step_rollup()).await??; 1354 1351 if n == 0 { 1355 1352 rollup.reset_after(Duration::from_millis(1_200)); // we're caught up, take a break 1356 1353 } ··· 1362 1359 log::trace!("trimming {n} nsids: {dirty_nsids:?}"); 1363 1360 let t0 = Instant::now(); 1364 1361 let (mut total_danglers, mut total_deleted) = (0, 0); 1362 + let mut completed = HashSet::new(); 1365 1363 for collection in &dirty_nsids { 1366 - let (danglers, deleted) = tokio::task::block_in_place(|| self.0.trim_collection(collection, 512, false))?; 1364 + let mut db = self.0.clone(); 1365 + let c = collection.clone(); 1366 + let (danglers, deleted, ended_early) = tokio::task::spawn_blocking(move || db.trim_collection(&c, 512, false)).await??; 1367 1367 total_danglers += danglers; 1368 1368 total_deleted += deleted; 1369 - if total_deleted > 100_000_000 { 1369 + if !ended_early { 1370 + completed.insert(collection.clone()); 1371 + } 1372 + if total_deleted > 10_000_000 { 1370 1373 log::info!("trim stopped early, more than 100M records already deleted."); 1371 1374 break; 1372 1375 } 1373 1376 } 1377 + for c in completed { 1378 + dirty_nsids.remove(&c); 1379 + } 1374 1380 log::info!("finished trimming {n} nsids in {:?}: {total_danglers} dangling and {total_deleted} total removed.", t0.elapsed()); 1375 - dirty_nsids.clear(); 1376 1381 }, 1377 1382 }; 1378 1383 }
+2 -2
ufos/src/storage_mem.rs
··· 928 928 limit: usize, 929 929 _full_scan: bool, 930 930 // TODO: could add a start cursor limit to avoid iterating deleted stuff at the start (/end) 931 - ) -> StorageResult<(usize, usize)> { 931 + ) -> StorageResult<(usize, usize, bool)> { 932 932 let mut dangling_feed_keys_cleaned = 0; 933 933 let mut records_deleted = 0; 934 934 ··· 984 984 batch.commit()?; 985 985 986 986 log::info!("trim_collection ({collection:?}) removed {dangling_feed_keys_cleaned} dangling feed entries and {records_deleted} records"); 987 - Ok((dangling_feed_keys_cleaned, records_deleted)) 987 + Ok((dangling_feed_keys_cleaned, records_deleted, false)) 988 988 } 989 989 990 990 fn delete_account(&mut self, did: &Did) -> Result<usize, StorageError> {