Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

neat, a failing basic test

phil 16adc37c 7c1417e9

+28 -5
+3 -1
ufos/src/storage.rs
··· 1 + use std::path::Path; 2 + use jetstream::exports::Nsid; 1 3 use crate::{error::StorageError, Cursor, EventBatch}; 2 - use std::path::Path; 3 4 4 5 pub trait StorageWhatever<R: StoreReader, W: StoreWriter, C> { // TODO: extract this 5 6 fn init( ··· 15 16 } 16 17 17 18 pub trait StoreReader: Clone { 19 + fn get_total_by_collection(&self, collection: &Nsid) -> Result<u64, StorageError>; 18 20 }
+21 -4
ufos/src/storage_fjall.rs
··· 204 204 rollups: PartitionHandle, 205 205 } 206 206 207 - impl StoreReader for FjallReader {} 207 + impl StoreReader for FjallReader { 208 + fn get_total_by_collection(&self, collection: &jetstream::exports::Nsid) -> Result<u64, StorageError> { 209 + // TODO: start from rollup 210 + let full_range = LiveRecordsKey::range_from_cursor(Cursor::from_start())?; 211 + let mut total = 0; 212 + for kv in self.rollups.range(full_range) { 213 + let (key_bytes, val_bytes) = kv?; 214 + let key = db_complete::<LiveRecordsKey>(&key_bytes)?; 215 + if key.collection() == collection { 216 + let LiveRecordsValue(n) = db_complete(&val_bytes)?; 217 + total += n; 218 + } 219 + } 220 + Ok(total) 221 + } 222 + } 208 223 209 224 pub struct FjallWriter { 210 225 keyspace: Keyspace, ··· 1137 1152 1138 1153 #[test] 1139 1154 fn test_insert_one() -> anyhow::Result<()> { 1140 - // let db_path = tempfile::tempdir()?; 1141 - let (_read, mut write, _) = FjallStorage::init( 1155 + let (read, mut write, _) = FjallStorage::init( 1142 1156 tempfile::tempdir()?, 1143 1157 "offline test (no real jetstream endpoint)".to_string(), 1144 1158 false, ··· 1162 1176 commits.truncating_insert(commit, 1); 1163 1177 1164 1178 let mut commits_by_nsid = HashMap::new(); 1165 - commits_by_nsid.insert(collection, commits); 1179 + commits_by_nsid.insert(collection.clone(), commits); 1166 1180 1167 1181 write.insert_batch(EventBatch { 1168 1182 commits_by_nsid, 1169 1183 ..Default::default() 1170 1184 })?; 1185 + 1186 + let total = read.get_total_by_collection(&collection)?; 1187 + assert_eq!(total, 1); 1171 1188 1172 1189 Ok(()) 1173 1190 }
+4
ufos/src/store_types.rs
··· 138 138 "live_records" 139 139 } 140 140 } 141 + // TODO: merge counts with hlls 141 142 type LiveRecordsStaticPrefix = DbStaticStr<_LiveRecordsStaticStr>; 142 143 type LiveRecordsCursorPrefix = DbConcat<LiveRecordsStaticPrefix, Cursor>; 143 144 pub type LiveRecordsKey = DbConcat<LiveRecordsCursorPrefix, Nsid>; ··· 149 150 } 150 151 pub fn cursor(&self) -> Cursor { 151 152 self.prefix.suffix 153 + } 154 + pub fn collection(&self) -> &Nsid { 155 + &self.suffix 152 156 } 153 157 } 154 158 impl From<(Cursor, &Nsid)> for LiveRecordsKey {