Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

buggy prefix ranging sketchy stuff

phil 2e8fb71f 3bf1f2b7

+120 -12
+5 -2
ufos/src/db_types.rs
··· 75 75 pub fn to_prefix_db_bytes(&self) -> Result<Vec<u8>, EncodingError> { 76 76 self.prefix.to_db_bytes() 77 77 } 78 - pub fn range_end(&self) -> Result<Vec<u8>, EncodingError> { 79 - let prefix_bytes = self.prefix.to_db_bytes()?; 78 + pub fn prefix_range_end(prefix: &P) -> Result<Vec<u8>, EncodingError> { 79 + let prefix_bytes = prefix.to_db_bytes()?; 80 80 let (_, Bound::Excluded(range_end)) = prefix_to_range(&prefix_bytes) else { 81 81 return Err(EncodingError::BadRangeBound); 82 82 }; 83 83 Ok(range_end.to_vec()) 84 + } 85 + pub fn range_end(&self) -> Result<Vec<u8>, EncodingError> { 86 + Self::prefix_range_end(&self.prefix) 84 87 } 85 88 pub fn range(&self) -> Result<Range<Vec<u8>>, EncodingError> { 86 89 let prefix_bytes = self.prefix.to_db_bytes()?;
+2
ufos/src/error.rs
··· 15 15 pub enum StorageError { 16 16 #[error("Failed to initialize: {0}")] 17 17 InitError(String), 18 + #[error("DB seems to be in a bad state: {0}")] 19 + BadStateError(String), 18 20 #[error("Fjall error")] 19 21 FjallError(#[from] fjall::Error), 20 22 #[error("Bytes encoding error")]
+63 -4
ufos/src/storage_fjall.rs
··· 7 7 NsidRecordFeedKey, NsidRecordFeedVal, RecordLocationKey, RecordLocationVal, 8 8 LiveRecordsKey, LiveRecordsValue, LiveDidsKey, LiveDidsValue, 9 9 DeleteAccountQueueKey, DeleteAccountQueueVal, 10 + NewRollupCursorKey, NewRollupCursorValue, 11 + TakeoffKey, TakeoffValue, 10 12 }; 11 13 use crate::{ 12 14 DeleteAccount, Did, EventBatch, Nsid, RecordKey, CommitAction, ··· 18 20 use jetstream::events::Cursor; 19 21 use std::collections::HashMap; 20 22 use std::path::{Path, PathBuf}; 21 - use std::time::{Duration, Instant}; 23 + use std::time::{Duration, Instant, SystemTime}; 22 24 use tokio::sync::mpsc::Receiver; 23 25 use tokio::time::{interval_at, sleep}; 24 26 ··· 53 55 * - Jetstream server endpoint (persisted because the cursor can't be used on another instance without data loss) 54 56 * key: "js_endpoint" (literal) 55 57 * val: string (URL of the instance) 58 + * 59 + * - Launch date 60 + * key: "takeoff" (literal) 61 + * val: u64 (micros timestamp, not from jetstream for now so not precise) 56 62 * 57 63 * - Rollup cursor (bg work: roll stats into hourlies, delete accounts, old record deletes) 58 64 * key: "rollup_cursor" (literal) ··· 163 169 &global, 164 170 JetstreamEndpointValue(endpoint.to_string()), 165 171 )?; 172 + insert_static_neu::<TakeoffKey>( 173 + &global, 174 + Cursor::at(SystemTime::now()), 175 + )?; 176 + insert_static_neu::<NewRollupCursorKey>( 177 + &global, 178 + Cursor::from_start(), 179 + )?; 166 180 } 167 181 168 182 let reader = FjallReader { ··· 197 211 198 212 impl FjallWriter { 199 213 pub fn step_rollup(&mut self) -> Result<(), StorageError> { 200 - let mut batch = self.keyspace.batch(); 214 + // let mut batch = self.keyspace.batch(); 215 + 216 + let rollup_cursor = get_static_neu::<NewRollupCursorKey, NewRollupCursorValue>(&self.global)? 217 + .ok_or(StorageError::BadStateError("Could not find current rollup cursor".to_string()))?; 201 218 202 219 // timelies 203 - // trim records 220 + let live_records_range = LiveRecordsKey::range_from_cursor(rollup_cursor)?; 221 + // let live_dids_range = LiveDidsKey::range_from_cursor(rollup_cursor)?; // shoudl be in sync with live records range. we could keep both value under same key? 222 + let mut timely_iter = self.rollups.range(live_records_range); 223 + 224 + let next_timely = timely_iter 225 + .next() 226 + .transpose()? 227 + .map(|(key_bytes, val_bytes)| 228 + db_complete::<LiveRecordsKey>(&key_bytes) 229 + .map(|k| (k, val_bytes))) 230 + .transpose()?; 231 + 204 232 // delete accounts 233 + let delete_accounts_range = DeleteAccountQueueKey::new(rollup_cursor).range_to_prefix_end()?; 205 234 206 - batch.commit()?; 235 + let next_delete = self.queues.range(delete_accounts_range) 236 + .next() 237 + .transpose()? 238 + .map(|(key_bytes, val_bytes)| 239 + db_complete::<DeleteAccountQueueKey>(&key_bytes) 240 + .map(|k| (k.suffix, val_bytes))) 241 + .transpose()?; 242 + 243 + match (next_timely, next_delete) { 244 + (Some((k, timely_val_bytes)), Some((cursor, delete_val_bytes))) => { 245 + if k.cursor() < cursor { 246 + eprintln!("rollup until delete cursor"); 247 + } else { 248 + eprintln!("delete then come back for rollups"); 249 + } 250 + } 251 + (Some((k, timely_val_bytes)), None) => { 252 + eprintln!("do as much rollup as we want"); 253 + } 254 + (None, Some((cursor, delete_val_bytes))) => { 255 + eprintln!("just delete an account"); 256 + } 257 + (None, None) => { 258 + eprintln!("do nothing."); 259 + } 260 + } 261 + 262 + // batch.commit()?; 207 263 Ok(()) 208 264 } 209 265 } ··· 275 331 ); 276 332 277 333 batch.commit()?; 334 + 335 + eprintln!("ok stepping rollup now..."); 336 + self.step_rollup()?; 278 337 Ok(()) 279 338 } 280 339 }
+50 -6
ufos/src/store_types.rs
··· 37 37 /// value format: [rollup_cursor(Cursor)|collection(Nsid)] 38 38 pub type RollupCursorValue = DbConcat<Cursor, Nsid>; 39 39 40 + 41 + /// key format: ["rollup_cursor"] 42 + #[derive(Debug, PartialEq)] 43 + pub struct NewRollupCursorKey {} 44 + impl StaticStr for NewRollupCursorKey { 45 + fn static_str() -> &'static str { 46 + "rollup_cursor" 47 + } 48 + } 49 + // pub type NewRollupCursorKey = DbStaticStr<_NewRollupCursorKey>; 50 + /// value format: [rollup_cursor(Cursor)|collection(Nsid)] 51 + pub type NewRollupCursorValue = Cursor; 52 + 53 + 54 + /// key format: ["js_endpoint"] 55 + #[derive(Debug, PartialEq)] 56 + pub struct TakeoffKey {} 57 + impl StaticStr for TakeoffKey { 58 + fn static_str() -> &'static str { 59 + "takeoff" 60 + } 61 + } 62 + pub type TakeoffValue = Cursor; 63 + 64 + 40 65 /// key format: ["js_endpoint"] 41 66 #[derive(Debug, PartialEq)] 42 67 pub struct JetstreamEndpointKey {} ··· 114 139 } 115 140 } 116 141 type LiveRecordsStaticPrefix = DbStaticStr<_LiveRecordsStaticStr>; 117 - pub type LiveRecordsKey = DbConcat<LiveRecordsStaticPrefix, DbConcat<Cursor, Nsid>>; 142 + type LiveRecordsCursorPrefix = DbConcat<LiveRecordsStaticPrefix, Cursor>; 143 + pub type LiveRecordsKey = DbConcat<LiveRecordsCursorPrefix, Nsid>; 144 + impl LiveRecordsKey { 145 + pub fn range_from_cursor(cursor: Cursor) -> Result<Range<Vec<u8>>, EncodingError> { 146 + let prefix = LiveRecordsCursorPrefix::from_pair(Default::default(), cursor); 147 + let end = Self::prefix_range_end(&prefix)?; 148 + Ok(prefix.to_db_bytes()?..end.to_db_bytes()?) 149 + } 150 + pub fn cursor(&self) -> Cursor { 151 + self.prefix.suffix 152 + } 153 + } 118 154 impl From<(Cursor, &Nsid)> for LiveRecordsKey { 119 155 fn from((cursor, collection): (Cursor, &Nsid)) -> Self { 120 156 Self::from_pair( 121 - Default::default(), 122 - DbConcat::from_pair(cursor, collection.clone()), 157 + LiveRecordsCursorPrefix::from_pair(Default::default(), cursor), 158 + collection.clone(), 123 159 ) 124 160 } 125 161 } ··· 135 171 } 136 172 } 137 173 pub type LiveDidsStaticPrefix = DbStaticStr<_LiveDidsStaticStr>; 138 - pub type LiveDidsKey = DbConcat<LiveDidsStaticPrefix, DbConcat<Cursor, Nsid>>; 174 + pub type LiveDidsCursorPrefix = DbConcat<LiveDidsStaticPrefix, Cursor>; 175 + pub type LiveDidsKey = DbConcat<LiveDidsCursorPrefix, Nsid>; 176 + impl LiveDidsKey { 177 + pub fn range_from_cursor(cursor: Cursor) -> Result<Range<Vec<u8>>, EncodingError> { 178 + let prefix = LiveDidsCursorPrefix::from_pair(Default::default(), cursor); 179 + let end = Self::prefix_range_end(&prefix)?; 180 + Ok(prefix.to_db_bytes()?..end.to_db_bytes()?) 181 + } 182 + } 139 183 impl From<(Cursor, &Nsid)> for LiveDidsKey { 140 184 fn from((cursor, collection): (Cursor, &Nsid)) -> Self { 141 185 Self::from_pair( 142 - Default::default(), 143 - DbConcat::from_pair(cursor, collection.clone()), 186 + LiveDidsCursorPrefix::from_pair(Default::default(), cursor), 187 + collection.clone(), 144 188 ) 145 189 } 146 190 }