Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

add a target stat gathering script

phil 74f5ff45 72177718

+236 -32
+1
constellation/Cargo.toml
··· 2 2 name = "constellation" 3 3 version = "0.1.0" 4 4 edition = "2021" 5 + default-run = "main" 5 6 6 7 [dependencies] 7 8 anyhow = "1.0.95"
+192
constellation/src/bin/rocks-target-stats.rs
··· 1 + use bincode::config::Options; 2 + use clap::Parser; 3 + use std::collections::HashMap; 4 + use std::path::PathBuf; 5 + 6 + use constellation::storage::rocks_store::{ 7 + Collection, DidId, RKey, RPath, Target, TargetKey, TargetLinkers, _bincode_opts, 8 + }; 9 + use constellation::storage::RocksStorage; 10 + use constellation::Did; 11 + 12 + use links::parse_any_link; 13 + use rocksdb::IteratorMode; 14 + use std::time; 15 + 16 + /// Aggregate links in the at-mosphere 17 + #[derive(Parser, Debug)] 18 + #[command(version, about, long_about = None)] 19 + struct Args { 20 + /// where is rocksdb's data 21 + #[arg(short, long)] 22 + data: PathBuf, 23 + } 24 + 25 + type LinkType = String; 26 + 27 + #[derive(Debug, Eq, Hash, PartialEq)] 28 + struct SourceLink(Collection, RPath, LinkType); 29 + 30 + #[derive(Debug, Default)] 31 + struct Buckets([u64; 23]); 32 + 33 + const BUCKETS: Buckets = Buckets([ 34 + 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16384, 65535, 262144, 35 + 1048576, 36 + ]); 37 + 38 + // b1, b2, b3, b4, b5, b6, b7, b8, b9, b10, b12, b16, b32, b64, b128, b256, b512, b1024, b4096, b16384, b65535, b262144, bmax 39 + 40 + static DID_IDS_CF: &str = "did_ids"; 41 + static TARGET_IDS_CF: &str = "target_ids"; 42 + static TARGET_LINKERS_CF: &str = "target_links"; 43 + 44 + const REPORT_INTERVAL: usize = 50_000; 45 + 46 + type Stats = HashMap<SourceLink, (String, String, Buckets)>; 47 + 48 + #[derive(Debug, Default)] 49 + struct ErrStats { 50 + failed_to_get_sample: usize, 51 + failed_to_read_target_id: usize, 52 + failed_to_deserialize_target_key: usize, 53 + failed_to_parse_target_as_link: usize, 54 + failed_to_get_links: usize, 55 + failed_to_deserialize_linkers: usize, 56 + } 57 + 58 + fn thousands(n: usize) -> String { 59 + n.to_string() 60 + .as_bytes() 61 + .rchunks(3) 62 + .rev() 63 + .map(std::str::from_utf8) 64 + .collect::<Result<Vec<&str>, _>>() 65 + .unwrap() 66 + .join(",") 67 + } 68 + 69 + fn main() { 70 + let args = Args::parse(); 71 + 72 + println!("starting rocksdb..."); 73 + let rocks = RocksStorage::open_readonly(args.data).unwrap(); 74 + println!("rocks ready."); 75 + 76 + let RocksStorage { ref db, .. } = rocks; 77 + 78 + let mut stats = Stats::new(); 79 + let mut err_stats: ErrStats = Default::default(); 80 + 81 + let did_ids_cf = db.cf_handle(DID_IDS_CF).unwrap(); 82 + let target_id_cf = db.cf_handle(TARGET_IDS_CF).unwrap(); 83 + let target_links_cf = db.cf_handle(TARGET_LINKERS_CF).unwrap(); 84 + 85 + let t0 = time::Instant::now(); 86 + let mut t_prev = t0; 87 + 88 + let mut i = 0; 89 + for item in db.iterator_cf(&target_id_cf, IteratorMode::Start) { 90 + if i > 0 && i % REPORT_INTERVAL == 0 { 91 + let now = time::Instant::now(); 92 + let rate = (REPORT_INTERVAL as f32) / (now.duration_since(t_prev).as_secs_f32()); 93 + eprintln!( 94 + "{i}\t({}k)\t{:.2}\t{rate:.1}/s", 95 + thousands(i / 1000), 96 + t0.elapsed().as_secs_f32() 97 + ); 98 + t_prev = now; 99 + } 100 + i += 1; 101 + 102 + let Ok((target_key, target_id)) = item else { 103 + err_stats.failed_to_read_target_id += 1; 104 + continue; 105 + }; 106 + 107 + let Ok(TargetKey(Target(target), collection, rpath)) = 108 + _bincode_opts().deserialize(&target_key) 109 + else { 110 + err_stats.failed_to_deserialize_target_key += 1; 111 + continue; 112 + }; 113 + 114 + let source = { 115 + let Some(parsed) = parse_any_link(&target) else { 116 + err_stats.failed_to_parse_target_as_link += 1; 117 + continue; 118 + }; 119 + SourceLink(collection, rpath, parsed.name().into()) 120 + }; 121 + 122 + let Ok(Some(links_raw)) = db.get_cf(&target_links_cf, &target_id) else { 123 + err_stats.failed_to_get_links += 1; 124 + continue; 125 + }; 126 + let Ok(linkers) = _bincode_opts().deserialize::<TargetLinkers>(&links_raw) else { 127 + err_stats.failed_to_deserialize_linkers += 1; 128 + continue; 129 + }; 130 + let (n, _) = linkers.count(); 131 + 132 + if n == 0 { 133 + continue; 134 + } 135 + 136 + let mut bucket = 0; 137 + for edge in BUCKETS.0 { 138 + if n <= edge || bucket == 22 { 139 + break; 140 + } 141 + bucket += 1; 142 + } 143 + 144 + stats 145 + .entry(source) 146 + .or_insert_with(|| { 147 + let (DidId(did_id), RKey(k)) = &linkers.0[(n - 1) as usize]; 148 + if let Ok(Some(did_bytes)) = db.get_cf(&did_ids_cf, did_id.to_be_bytes()) { 149 + if let Ok(Did(did)) = _bincode_opts().deserialize(&did_bytes) { 150 + return (did, k.clone(), Default::default()); 151 + } 152 + } 153 + err_stats.failed_to_get_sample += 1; 154 + ("".into(), "".into(), Default::default()) 155 + }) 156 + .2 157 + .0[bucket] += 1; 158 + 159 + // if i >= 400_000 { break } 160 + } 161 + 162 + eprintln!( 163 + "FINISHED summarizing {} link targets in {:.1}s", 164 + thousands(i), 165 + t0.elapsed().as_secs_f32() 166 + ); 167 + eprintln!("{err_stats:?}"); 168 + 169 + for (SourceLink(Collection(c), RPath(p), t), (d, r, Buckets(b))) in stats { 170 + let sample_at_uri = if !(d.is_empty() || r.is_empty()) { 171 + format!("at://{d}/{c}/{r}") 172 + } else { 173 + "".into() 174 + }; 175 + println!( 176 + "{c:?}, {p:?}, {t:?}, {sample_at_uri:?}, {}", 177 + b.map(|n| n.to_string()).join(", ") 178 + ); 179 + } 180 + 181 + println!("sup"); 182 + } 183 + 184 + // scan plan 185 + 186 + // buckets (backlink count) 187 + // 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 16, 32, 64, 128, 256, 512, 1024, 4096, 16384, 65535, 262144, 1048576+ 188 + // by 189 + // - collection 190 + // - json path 191 + // - link type 192 + // samples for each bucket for each variation
+43 -32
constellation/src/storage/rocks_store.rs
··· 46 46 47 47 #[derive(Debug, Clone)] 48 48 pub struct RocksStorage { 49 - db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun) 49 + pub db: Arc<DBWithThreadMode<MultiThreaded>>, // TODO: mov seqs here (concat merge op will be fun) 50 50 did_id_table: IdTable<Did, DidIdValue, true>, 51 51 target_id_table: IdTable<TargetKey, TargetId, false>, 52 52 is_writer: bool, ··· 242 242 impl RocksStorage { 243 243 pub fn new(path: impl AsRef<Path>) -> Result<Self> { 244 244 Self::describe_metrics(); 245 + RocksStorage::open_readmode(path, false) 246 + } 247 + 248 + pub fn open_readonly(path: impl AsRef<Path>) -> Result<Self> { 249 + RocksStorage::open_readmode(path, true) 250 + } 251 + 252 + fn open_readmode(path: impl AsRef<Path>, readonly: bool) -> Result<Self> { 245 253 let did_id_table = IdTable::<_, _, true>::setup(DID_IDS_CF); 246 254 let target_id_table = IdTable::<_, _, false>::setup(TARGET_IDS_CF); 247 255 248 - let db = DBWithThreadMode::open_cf_descriptors( 249 - &get_db_opts(), 250 - path, 251 - vec![ 252 - // id reference tables 253 - did_id_table.cf_descriptor(), 254 - target_id_table.cf_descriptor(), 255 - // the reverse links: 256 - ColumnFamilyDescriptor::new(TARGET_LINKERS_CF, { 257 - let mut opts = rocks_opts_base(); 258 - opts.set_merge_operator_associative( 259 - "merge_op_extend_did_ids", 260 - Self::merge_op_extend_did_ids, 261 - ); 262 - opts 263 - }), 264 - // unfortunately we also need forward links to handle deletes 265 - ColumnFamilyDescriptor::new(LINK_TARGETS_CF, rocks_opts_base()), 266 - ], 267 - )?; 256 + let cfs = vec![ 257 + // id reference tables 258 + did_id_table.cf_descriptor(), 259 + target_id_table.cf_descriptor(), 260 + // the reverse links: 261 + ColumnFamilyDescriptor::new(TARGET_LINKERS_CF, { 262 + let mut opts = rocks_opts_base(); 263 + opts.set_merge_operator_associative( 264 + "merge_op_extend_did_ids", 265 + Self::merge_op_extend_did_ids, 266 + ); 267 + opts 268 + }), 269 + // unfortunately we also need forward links to handle deletes 270 + ColumnFamilyDescriptor::new(LINK_TARGETS_CF, rocks_opts_base()), 271 + ]; 272 + 273 + let db = if readonly { 274 + DBWithThreadMode::open_cf_descriptors_read_only(&get_db_opts(), path, cfs, false)? 275 + } else { 276 + DBWithThreadMode::open_cf_descriptors(&get_db_opts(), path, cfs)? 277 + }; 278 + 268 279 let db = Arc::new(db); 269 280 let did_id_table = did_id_table.init(&db)?; 270 281 let target_id_table = target_id_table.init(&db)?; ··· 933 944 impl KeyFromRocks for RecordLinkKey {} 934 945 impl ValueFromRocks for RecordLinkTargets {} 935 946 936 - fn _bincode_opts() -> impl BincodeOptions { 947 + pub fn _bincode_opts() -> impl BincodeOptions { 937 948 bincode::DefaultOptions::new().with_big_endian() // happier db -- numeric prefixes in lsm 938 949 } 939 950 fn _rk(k: impl AsRocksKey) -> Vec<u8> { ··· 952 963 Ok(_bincode_opts().deserialize(bytes)?) 953 964 } 954 965 955 - #[derive(Debug, Clone, Serialize, Deserialize)] 956 - struct Collection(String); 966 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 967 + pub struct Collection(pub String); 957 968 958 - #[derive(Debug, Clone, Serialize, Deserialize)] 959 - struct RPath(String); 969 + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] 970 + pub struct RPath(pub String); 960 971 961 972 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] 962 - struct RKey(String); 973 + pub struct RKey(pub String); 963 974 964 975 impl RKey { 965 976 fn empty() -> Self { ··· 969 980 970 981 // did ids 971 982 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 972 - struct DidId(u64); 983 + pub struct DidId(pub u64); 973 984 974 985 impl DidId { 975 986 fn empty() -> Self { ··· 995 1006 struct TargetId(u64); // key 996 1007 997 1008 #[derive(Debug, Clone, Serialize, Deserialize)] 998 - struct Target(String); // the actual target/uri 1009 + pub struct Target(pub String); // the actual target/uri 999 1010 1000 1011 // targets (uris, dids, etc.): the reverse index 1001 1012 #[derive(Debug, Clone, Serialize, Deserialize)] 1002 - struct TargetKey(Target, Collection, RPath); 1013 + pub struct TargetKey(pub Target, pub Collection, pub RPath); 1003 1014 1004 1015 #[derive(Debug, Default, Serialize, Deserialize)] 1005 - struct TargetLinkers(Vec<(DidId, RKey)>); 1016 + pub struct TargetLinkers(pub Vec<(DidId, RKey)>); 1006 1017 1007 1018 impl TargetLinkers { 1008 1019 fn remove_linker(&mut self, did: &DidId, rkey: &RKey) -> bool { ··· 1013 1024 false 1014 1025 } 1015 1026 } 1016 - fn count(&self) -> (u64, u64) { 1027 + pub fn count(&self) -> (u64, u64) { 1017 1028 // (linkers, deleted links) 1018 1029 let total = self.0.len() as u64; 1019 1030 let alive = self.0.iter().filter(|(DidId(id), _)| *id != 0).count() as u64;