Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

rewrite ufos consumer

make it make sense

phil 7b87bd38 28d0874d

+380 -354
+21
Cargo.lock
··· 555 555 ] 556 556 557 557 [[package]] 558 + name = "cardinality-estimator" 559 + version = "1.0.2" 560 + source = "registry+https://github.com/rust-lang/crates.io-index" 561 + checksum = "6ae5e12c435064f9e8ec53c5a782ca9a362702a4863fe1b6448f524ecede8fe3" 562 + dependencies = [ 563 + "enum_dispatch", 564 + "serde", 565 + "wyhash", 566 + ] 567 + 568 + [[package]] 558 569 name = "cc" 559 570 version = "1.2.17" 560 571 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 3752 3763 dependencies = [ 3753 3764 "anyhow", 3754 3765 "bincode 2.0.1", 3766 + "cardinality-estimator", 3755 3767 "clap", 3756 3768 "dropshot", 3757 3769 "env_logger", ··· 4225 4237 version = "0.5.5" 4226 4238 source = "registry+https://github.com/rust-lang/crates.io-index" 4227 4239 checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" 4240 + 4241 + [[package]] 4242 + name = "wyhash" 4243 + version = "0.5.0" 4244 + source = "registry+https://github.com/rust-lang/crates.io-index" 4245 + checksum = "baf6e163c25e3fac820b4b453185ea2dea3b6a3e0a721d4d23d75bd33734c295" 4246 + dependencies = [ 4247 + "rand_core 0.6.4", 4248 + ] 4228 4249 4229 4250 [[package]] 4230 4251 name = "xxhash-rust"
+1 -1
jetstream/src/events.rs
··· 12 12 use crate::exports; 13 13 14 14 /// Opaque wrapper for the time_us cursor used by jetstream 15 - #[derive(Deserialize, Debug, Clone, PartialEq, PartialOrd)] 15 + #[derive(Deserialize, Debug, Copy, Clone, PartialEq, PartialOrd)] 16 16 pub struct Cursor(u64); 17 17 18 18 #[derive(Debug, Deserialize)]
+1
ufos/Cargo.toml
··· 6 6 [dependencies] 7 7 anyhow = "1.0.97" 8 8 bincode = { version = "2.0.1", features = ["serde"] } 9 + cardinality-estimator = { version = "1.0.2", features = ["with_serde"] } 9 10 clap = { version = "4.5.31", features = ["derive"] } 10 11 dropshot = "0.16.0" 11 12 env_logger = "0.11.7"
+64 -129
ufos/src/consumer.rs
··· 1 1 use jetstream::{ 2 - events::{CommitEvent, CommitOp, Cursor, EventKind, JetstreamEvent}, 3 - exports::Did, 2 + events::{Cursor, EventKind, JetstreamEvent}, 3 + exports::{Did, Nsid}, 4 4 DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector, 5 5 JetstreamReceiver, 6 6 }; ··· 8 8 use std::time::Duration; 9 9 use tokio::sync::mpsc::{channel, Receiver, Sender}; 10 10 11 - use crate::{CreateRecord, DeleteAccount, DeleteRecord, EventBatch, ModifyRecord, UpdateRecord}; 11 + use crate::{DeleteAccount, EventBatch, UFOsCommit}; 12 + use crate::error::FirehoseEventError; 12 13 13 - const MAX_BATCHED_RECORDS: usize = 64; // *non-blocking* limit. drops oldest batched record per collection once reached. 14 - const MAX_BATCHED_MODIFIES: usize = 32; // hard limit, total updates and deletes across all collections. 15 - const MAX_ACCOUNT_REMOVES: usize = 128; // hard limit, total account deletions. actually the least frequent event, but tiny. 16 - const MAX_BATCHED_COLLECTIONS: usize = 32; // hard limit, MAX_BATCHED_RECORDS applies per collection 17 - const MIN_BATCH_SPAN_SECS: f64 = 2.; // try to get a bit of rest a bit. 18 - const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit of duration from oldest to latest event cursor within a batch, in seconds. 14 + const MAX_BATCHED_RECORDS: usize = 128; // *non-blocking* limit. drops oldest batched record per collection once reached. 15 + const MAX_ACCOUNT_REMOVES: usize = 1024; // hard limit, extremely unlikely to reach, but just in case 16 + const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per-collection 17 + const MIN_BATCH_SPAN_SECS: f64 = 2.; // breathe 18 + const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit, pause consumer if we're unable to send by now 19 + const SEND_TIMEOUT_S: f64 = 15.; // if the channel is blocked longer than this, something is probably up 20 + const BATCH_QUEUE_SIZE: usize = 1; // nearly-rendez-vous 19 21 20 - const SEND_TIMEOUT_S: f64 = 60.; 21 - const BATCH_QUEUE_SIZE: usize = 64; // 4096 got OOM'd. update: 1024 also got OOM'd during L0 compaction blocking 22 + #[derive(Debug, Default)] 23 + struct CurrentBatch { 24 + initial_cursor: Option<Cursor>, 25 + batch: EventBatch, 26 + } 22 27 23 28 #[derive(Debug)] 24 29 struct Batcher { 25 30 jetstream_receiver: JetstreamReceiver, 26 31 batch_sender: Sender<EventBatch>, 27 - current_batch: EventBatch, 32 + current_batch: CurrentBatch, 28 33 } 29 34 30 35 pub async fn consume( ··· 34 39 ) -> anyhow::Result<Receiver<EventBatch>> { 35 40 let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint); 36 41 if endpoint == jetstream_endpoint { 37 - eprintln!("connecting to jetstream at {endpoint}"); 42 + log::info!("connecting to jetstream at {endpoint}"); 38 43 } else { 39 - eprintln!("connecting to jetstream at {jetstream_endpoint} => {endpoint}"); 44 + log::info!("connecting to jetstream at {jetstream_endpoint} => {endpoint}"); 40 45 } 41 46 let config: JetstreamConfig = JetstreamConfig { 42 47 endpoint, ··· 46 51 JetstreamCompression::Zstd 47 52 }, 48 53 replay_on_reconnect: true, 49 - channel_size: 64, // small because we expect to be fast....? 54 + channel_size: 1024, // buffer up to ~1s of jetstream events 50 55 ..Default::default() 51 56 }; 52 57 let jetstream_receiver = JetstreamConnector::new(config)? ··· 78 83 } 79 84 80 85 async fn handle_event(&mut self, event: JetstreamEvent) -> anyhow::Result<()> { 81 - let event_cursor = event.cursor; 82 - 83 - if let Some(earliest) = &self.current_batch.first_jetstream_cursor { 84 - if event_cursor.duration_since(earliest)? > Duration::from_secs_f64(MAX_BATCH_SPAN_SECS) 86 + if let Some(earliest) = &self.current_batch.initial_cursor { 87 + if event.cursor.duration_since(earliest)? > Duration::from_secs_f64(MAX_BATCH_SPAN_SECS) 85 88 { 86 89 self.send_current_batch_now().await?; 87 90 } 88 91 } else { 89 - self.current_batch.first_jetstream_cursor = Some(event_cursor.clone()); 92 + self.current_batch.initial_cursor = Some(event.cursor); 90 93 } 91 94 92 95 match event.kind { 93 - EventKind::Commit if event.commit.is_some() => { 94 - let commit = event.commit.unwrap(); 95 - match commit.operation { 96 - CommitOp::Create => { 97 - self.handle_create_record(event.did, commit, event_cursor.clone()) 98 - .await?; 99 - } 100 - CommitOp::Update => { 101 - self.handle_modify_record(modify_update( 102 - event.did, 103 - commit, 104 - event_cursor.clone(), 105 - )) 106 - .await?; 107 - } 108 - CommitOp::Delete => { 109 - self.handle_modify_record(modify_delete( 110 - event.did, 111 - commit, 112 - event_cursor.clone(), 113 - )) 114 - .await?; 115 - } 116 - } 96 + EventKind::Commit => { 97 + let commit = event.commit.ok_or(FirehoseEventError::CommitEventMissingCommit)?; 98 + let (commit, nsid) = UFOsCommit::from_commit_info(commit, event.did, event.cursor)?; 99 + self.handle_commit(commit, nsid).await?; 117 100 } 118 - EventKind::Account if event.account.is_some() => { 119 - let account = event.account.unwrap(); 101 + EventKind::Account => { 102 + let account = event.account.ok_or(FirehoseEventError::AccountEventMissingAccount)?; 120 103 if !account.active { 121 - self.handle_remove_account(account.did, event_cursor.clone()) 122 - .await?; 104 + self.handle_delete_account(event.did, event.cursor).await?; 123 105 } 124 106 } 125 107 _ => {} 126 - }; 127 - self.current_batch.last_jetstream_cursor = Some(event_cursor.clone()); 108 + } 128 109 129 110 // if the queue is empty and we have enough, send immediately. otherewise, let the current batch fill up. 130 - if let Some(earliest) = &self.current_batch.first_jetstream_cursor { 131 - if event_cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS 132 - && self.batch_sender.capacity() == BATCH_QUEUE_SIZE 133 - { 134 - log::trace!("queue empty: immediately sending batch."); 135 - if let Err(send_err) = self 136 - .batch_sender 137 - .send(mem::take(&mut self.current_batch)) 138 - .await 139 - { 140 - anyhow::bail!("Could not send batch, likely because the receiver closed or dropped: {send_err:?}"); 141 - } 111 + if let Some(earliest) = &self.current_batch.initial_cursor { 112 + if event.cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS && 113 + self.batch_sender.capacity() == BATCH_QUEUE_SIZE { 114 + log::info!("queue empty: immediately sending batch."); 115 + self.send_current_batch_now().await?; 142 116 } 143 117 } 144 118 Ok(()) 145 119 } 146 120 147 - // holds up all consumer progress until it can send to the channel 148 - // use this when the current batch is too full to add more to it 149 - async fn send_current_batch_now(&mut self) -> anyhow::Result<()> { 150 - log::warn!( 151 - "attempting to send batch now (capacity: {})", 152 - self.batch_sender.capacity() 153 - ); 154 - self.batch_sender 155 - .send_timeout( 156 - mem::take(&mut self.current_batch), 157 - Duration::from_secs_f64(SEND_TIMEOUT_S), 158 - ) 159 - .await?; 160 - Ok(()) 161 - } 162 - 163 - async fn handle_create_record( 164 - &mut self, 165 - did: Did, 166 - commit: CommitEvent, 167 - cursor: Cursor, 168 - ) -> anyhow::Result<()> { 121 + async fn handle_commit(&mut self, commit: UFOsCommit, nsid: Nsid) -> anyhow::Result<()> { 169 122 if !self 170 123 .current_batch 171 - .record_creates 172 - .contains_key(&commit.collection) 173 - && self.current_batch.record_creates.len() >= MAX_BATCHED_COLLECTIONS 124 + .batch 125 + .commits_by_nsid 126 + .contains_key(&nsid) 127 + && self.current_batch.batch.commits_by_nsid.len() >= MAX_BATCHED_COLLECTIONS 174 128 { 175 129 self.send_current_batch_now().await?; 176 130 } 177 - let record = serde_json::from_str(commit.record.unwrap().get())?; 178 - let record = CreateRecord { 179 - did, 180 - rkey: commit.rkey, 181 - record, 182 - cursor, 183 - }; 184 - let collection = self 131 + 132 + self 185 133 .current_batch 186 - .record_creates 187 - .entry(commit.collection) 188 - .or_default(); 189 - collection.total_seen += 1; 190 - collection.samples.push_front(record); 191 - collection.samples.truncate(MAX_BATCHED_RECORDS); 192 - Ok(()) 193 - } 134 + .batch 135 + .commits_by_nsid 136 + .entry(nsid) 137 + .or_default() 138 + .truncating_insert(commit, MAX_BATCHED_RECORDS); 194 139 195 - async fn handle_modify_record(&mut self, modify_record: ModifyRecord) -> anyhow::Result<()> { 196 - if self.current_batch.record_modifies.len() >= MAX_BATCHED_MODIFIES { 197 - self.send_current_batch_now().await?; 198 - } 199 - self.current_batch.record_modifies.push(modify_record); 200 140 Ok(()) 201 141 } 202 142 203 - async fn handle_remove_account(&mut self, did: Did, cursor: Cursor) -> anyhow::Result<()> { 204 - if self.current_batch.account_removes.len() >= MAX_ACCOUNT_REMOVES { 143 + async fn handle_delete_account(&mut self, did: Did, cursor: Cursor) -> anyhow::Result<()> { 144 + if self.current_batch.batch.account_removes.len() >= MAX_ACCOUNT_REMOVES { 205 145 self.send_current_batch_now().await?; 206 146 } 207 147 self.current_batch 148 + .batch 208 149 .account_removes 209 150 .push(DeleteAccount { did, cursor }); 210 151 Ok(()) 211 152 } 212 - } 213 153 214 - fn modify_update(did: Did, commit: CommitEvent, cursor: Cursor) -> ModifyRecord { 215 - let record = serde_json::from_str(commit.record.unwrap().get()).unwrap(); 216 - ModifyRecord::Update(UpdateRecord { 217 - did, 218 - collection: commit.collection, 219 - rkey: commit.rkey, 220 - record, 221 - cursor, 222 - }) 223 - } 224 - 225 - fn modify_delete(did: Did, commit: CommitEvent, cursor: Cursor) -> ModifyRecord { 226 - ModifyRecord::Delete(DeleteRecord { 227 - did, 228 - collection: commit.collection, 229 - rkey: commit.rkey, 230 - cursor, 231 - }) 154 + // holds up all consumer progress until it can send to the channel 155 + // use this when the current batch is too full to add more to it 156 + async fn send_current_batch_now(&mut self) -> anyhow::Result<()> { 157 + log::info!( 158 + "attempting to send batch now (capacity: {})", 159 + self.batch_sender.capacity() 160 + ); 161 + let current = mem::take(&mut self.current_batch); 162 + self.batch_sender 163 + .send_timeout(current.batch, Duration::from_secs_f64(SEND_TIMEOUT_S)) 164 + .await?; 165 + Ok(()) 166 + } 232 167 }
+11
ufos/src/error.rs
··· 1 + use thiserror::Error; 2 + 3 + #[derive(Debug, Error)] 4 + pub enum FirehoseEventError { 5 + #[error("Create/Update commit operation missing record data")] 6 + CruMissingRecord, 7 + #[error("Account event missing account info")] 8 + AccountEventMissingAccount, 9 + #[error("Commit event missing commit info")] 10 + CommitEventMissingCommit, 11 + }
+92 -33
ufos/src/lib.rs
··· 1 1 pub mod consumer; 2 2 pub mod db_types; 3 + pub mod error; 3 4 pub mod server; 4 - // pub mod storage; 5 5 pub mod storage_fjall; 6 6 pub mod store_types; 7 7 8 - use jetstream::events::Cursor; 8 + use jetstream::events::{Cursor, CommitEvent, CommitOp}; 9 9 use jetstream::exports::{Did, Nsid, RecordKey}; 10 10 use std::collections::{HashMap, VecDeque}; 11 - 12 - #[derive(Debug, Clone)] 13 - pub struct CreateRecord { 14 - pub did: Did, 15 - // collection omitted because the batch keys off it 16 - pub rkey: RecordKey, 17 - pub record: serde_json::Value, 18 - pub cursor: Cursor, 19 - } 11 + use serde_json::value::RawValue; 12 + use cardinality_estimator::CardinalityEstimator; 13 + use error::FirehoseEventError; 20 14 21 15 #[derive(Debug, Default, Clone)] 22 - pub struct CollectionSamples { 16 + pub struct CollectionCommits { 23 17 pub total_seen: usize, 24 - pub samples: VecDeque<CreateRecord>, 18 + pub dids_estimate: CardinalityEstimator<Did>, 19 + pub commits: VecDeque<UFOsCommit>, 20 + } 21 + 22 + impl CollectionCommits { 23 + pub fn truncating_insert(&mut self, commit: UFOsCommit, limit: usize) { 24 + self.total_seen += 1; 25 + self.dids_estimate.insert(&commit.did); 26 + self.commits.truncate(limit - 1); 27 + self.commits.push_front(commit); 28 + } 25 29 } 26 30 27 31 #[derive(Debug, Clone)] 28 - pub struct UpdateRecord { 32 + pub struct DeleteAccount { 29 33 pub did: Did, 30 - pub collection: Nsid, 31 - pub rkey: RecordKey, 32 - pub record: serde_json::Value, 33 34 pub cursor: Cursor, 34 35 } 35 36 36 37 #[derive(Debug, Clone)] 37 - pub struct DeleteRecord { 38 - pub did: Did, 39 - pub collection: Nsid, 40 - pub rkey: RecordKey, 41 - pub cursor: Cursor, 38 + pub enum CommitAction { 39 + Put { record: Box<RawValue>, is_update: bool }, 40 + Cut, 42 41 } 43 42 44 43 #[derive(Debug, Clone)] 45 - pub enum ModifyRecord { 46 - Update(UpdateRecord), 47 - Delete(DeleteRecord), 44 + pub struct UFOsCommit { 45 + cursor: Cursor, 46 + did: Did, 47 + rkey: RecordKey, 48 + rev: String, 49 + action: CommitAction, 48 50 } 49 51 50 - #[derive(Debug, Clone)] 51 - pub struct DeleteAccount { 52 - pub did: Did, 53 - pub cursor: Cursor, 52 + impl UFOsCommit { 53 + pub fn from_commit_info( 54 + commit: CommitEvent, 55 + did: Did, 56 + cursor: Cursor 57 + ) -> Result<(Self, Nsid), FirehoseEventError> { 58 + let action = match commit.operation { 59 + CommitOp::Delete => CommitAction::Cut, 60 + cru @ _ => CommitAction::Put { 61 + record: commit.record.ok_or(FirehoseEventError::CruMissingRecord)?, 62 + is_update: cru == CommitOp::Update, 63 + } 64 + }; 65 + let batched = Self { 66 + cursor, 67 + did, 68 + rkey: commit.rkey, 69 + rev: commit.rev, 70 + action, 71 + }; 72 + Ok((batched, commit.collection)) 73 + } 54 74 } 55 75 56 76 #[derive(Debug, Default, Clone)] 57 77 pub struct EventBatch { 58 - pub record_creates: HashMap<Nsid, CollectionSamples>, 59 - pub record_modifies: Vec<ModifyRecord>, 78 + pub commits_by_nsid: HashMap<Nsid, CollectionCommits>, 60 79 pub account_removes: Vec<DeleteAccount>, 61 - pub first_jetstream_cursor: Option<Cursor>, 62 - pub last_jetstream_cursor: Option<Cursor>, 80 + } 81 + 82 + impl EventBatch { 83 + pub fn total_records(&self) -> usize { 84 + self.commits_by_nsid.values().map(|v| v.commits.len()).sum() 85 + } 86 + pub fn total_seen(&self) -> usize { 87 + self.commits_by_nsid.values().map(|v| v.total_seen).sum() 88 + } 89 + pub fn total_collections(&self) -> usize { 90 + self.commits_by_nsid.len() 91 + } 92 + pub fn account_removes(&self) -> usize { 93 + self.account_removes.len() 94 + } 95 + pub fn estimate_dids(&self) -> usize { 96 + let mut estimator = CardinalityEstimator::<Did>::new(); 97 + for commits in self.commits_by_nsid.values() { 98 + estimator.merge(&commits.dids_estimate); 99 + } 100 + estimator.estimate() 101 + } 102 + pub fn latest_cursor(&self) -> Option<Cursor> { 103 + let mut oldest = Cursor::from_start(); 104 + for commits in self.commits_by_nsid.values() { 105 + if let Some(commit) = commits.commits.front() { 106 + if commit.cursor > oldest { 107 + oldest = commit.cursor; 108 + } 109 + } 110 + } 111 + if let Some(del) = self.account_removes.last() { 112 + if del.cursor > oldest { 113 + oldest = del.cursor; 114 + } 115 + } 116 + if oldest > Cursor::from_start() { 117 + Some(oldest) 118 + } else { 119 + None 120 + } 121 + } 63 122 }
+11 -11
ufos/src/main.rs
··· 54 54 }); 55 55 56 56 let t2: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::task::spawn({ 57 - let storage = storage.clone(); 57 + let _storage = storage.clone(); 58 58 async move { 59 59 if !args.pause_writer { 60 60 println!( ··· 72 72 } 73 73 }); 74 74 75 - let t3 = tokio::task::spawn(async move { 76 - if !args.pause_rw { 77 - let r = storage.rw_loop().await; 78 - log::warn!("storage.rw_loop ended with: {r:?}"); 79 - } else { 80 - log::info!("not starting rw loop."); 81 - } 82 - }); 75 + // let t3 = tokio::task::spawn(async move { 76 + // if !args.pause_rw { 77 + // let r = storage.rw_loop().await; 78 + // log::warn!("storage.rw_loop ended with: {r:?}"); 79 + // } else { 80 + // log::info!("not starting rw loop."); 81 + // } 82 + // }); 83 83 84 84 // tokio::select! { 85 85 // // v = serving => eprintln!("serving ended: {v:?}"), ··· 92 92 log::trace!("serve task ended."); 93 93 t2.await??; 94 94 log::trace!("storage receive task ended."); 95 - t3.await?; 96 - log::trace!("storage rw task ended."); 95 + // t3.await?; 96 + // log::trace!("storage rw task ended."); 97 97 98 98 println!("bye!"); 99 99
+58 -58
ufos/src/server.rs
··· 1 1 use crate::storage_fjall::{Storage, StorageInfo}; 2 - use crate::{CreateRecord, Nsid}; 2 + use crate::{Nsid}; 3 3 use dropshot::endpoint; 4 4 use dropshot::ApiDescription; 5 5 use dropshot::ConfigDropshot; ··· 97 97 Ok(out) 98 98 } 99 99 } 100 - #[derive(Debug, Serialize, JsonSchema)] 101 - struct ApiRecord { 102 - did: String, 103 - collection: String, 104 - rkey: String, 105 - record: serde_json::Value, 106 - time_us: u64, 107 - } 108 - impl ApiRecord { 109 - fn from_create_record(create_record: CreateRecord, collection: &Nsid) -> Self { 110 - let CreateRecord { 111 - did, 112 - rkey, 113 - record, 114 - cursor, 115 - } = create_record; 116 - Self { 117 - did: did.to_string(), 118 - collection: collection.to_string(), 119 - rkey: rkey.to_string(), 120 - record, 121 - time_us: cursor.to_raw_u64(), 122 - } 123 - } 124 - } 125 - /// Get recent records by collection 126 - /// 127 - /// Multiple collections are supported. they will be delivered in one big array with no 128 - /// specified order. 129 - #[endpoint { 130 - method = GET, 131 - path = "/records", 132 - }] 133 - async fn get_records_by_collection( 134 - ctx: RequestContext<Context>, 135 - collection_query: Query<CollectionsQuery>, 136 - ) -> OkCorsResponse<Vec<ApiRecord>> { 137 - let Context { storage, .. } = ctx.context(); 100 + // #[derive(Debug, Serialize, JsonSchema)] 101 + // struct ApiRecord { 102 + // did: String, 103 + // collection: String, 104 + // rkey: String, 105 + // record: serde_json::Value, 106 + // time_us: u64, 107 + // } 108 + // impl ApiRecord { 109 + // fn from_create_record(create_record: CreateRecord, collection: &Nsid) -> Self { 110 + // let CreateRecord { 111 + // did, 112 + // rkey, 113 + // record, 114 + // cursor, 115 + // } = create_record; 116 + // Self { 117 + // did: did.to_string(), 118 + // collection: collection.to_string(), 119 + // rkey: rkey.to_string(), 120 + // record, 121 + // time_us: cursor.to_raw_u64(), 122 + // } 123 + // } 124 + // } 125 + // /// Get recent records by collection 126 + // /// 127 + // /// Multiple collections are supported. they will be delivered in one big array with no 128 + // /// specified order. 129 + // #[endpoint { 130 + // method = GET, 131 + // path = "/records", 132 + // }] 133 + // async fn get_records_by_collection( 134 + // ctx: RequestContext<Context>, 135 + // collection_query: Query<CollectionsQuery>, 136 + // ) -> OkCorsResponse<Vec<ApiRecord>> { 137 + // let Context { storage, .. } = ctx.context(); 138 138 139 - let collections = collection_query 140 - .into_inner() 141 - .to_multiple_nsids() 142 - .map_err(|reason| HttpError::for_bad_request(None, reason))?; 139 + // let collections = collection_query 140 + // .into_inner() 141 + // .to_multiple_nsids() 142 + // .map_err(|reason| HttpError::for_bad_request(None, reason))?; 143 143 144 - let mut api_records = Vec::new(); 144 + // let mut api_records = Vec::new(); 145 145 146 - // TODO: set up multiple db iterators and iterate them together with merge sort 147 - for collection in &collections { 148 - let records = storage 149 - .get_collection_records(collection, 100) 150 - .await 151 - .map_err(|e| HttpError::for_internal_error(e.to_string()))?; 146 + // // TODO: set up multiple db iterators and iterate them together with merge sort 147 + // for collection in &collections { 148 + // let records = storage 149 + // .get_collection_records(collection, 100) 150 + // .await 151 + // .map_err(|e| HttpError::for_internal_error(e.to_string()))?; 152 152 153 - for record in records { 154 - let api_record = ApiRecord::from_create_record(record, collection); 155 - api_records.push(api_record); 156 - } 157 - } 153 + // for record in records { 154 + // let api_record = ApiRecord::from_create_record(record, collection); 155 + // api_records.push(api_record); 156 + // } 157 + // } 158 158 159 - ok_cors(api_records) 160 - } 159 + // ok_cors(api_records) 160 + // } 161 161 162 162 /// Get total records seen by collection 163 163 #[endpoint { ··· 215 215 216 216 api.register(get_openapi).unwrap(); 217 217 api.register(get_meta_info).unwrap(); 218 - api.register(get_records_by_collection).unwrap(); 218 + // api.register(get_records_by_collection).unwrap(); 219 219 api.register(get_records_total_seen).unwrap(); 220 220 api.register(get_top_collections).unwrap(); 221 221
+121 -122
ufos/src/storage_fjall.rs
··· 6 6 RollupCursorKey, RollupCursorValue, SeenCounter, 7 7 }; 8 8 use crate::{ 9 - CollectionSamples, CreateRecord, DeleteAccount, Did, EventBatch, ModifyRecord, Nsid, RecordKey, 9 + DeleteAccount, Did, EventBatch, Nsid, RecordKey, 10 10 }; 11 11 use fjall::{ 12 12 Batch as FjallBatch, CompressionType, Config, Keyspace, PartitionCreateOptions, PartitionHandle, ··· 118 118 loop { 119 119 let t_sleep = Instant::now(); 120 120 sleep(Duration::from_secs_f64(0.08)).await; // TODO: minimize during replay 121 - let slept_for = t_sleep.elapsed(); 122 - let queue_size = receiver.len(); 121 + let _slept_for = t_sleep.elapsed(); 122 + let _queue_size = receiver.len(); 123 123 124 124 if let Some(event_batch) = receiver.recv().await { 125 - log::trace!("write: received write batch"); 125 + log::info!("write: received write batch"); 126 126 let batch_summary = summarize_batch(&event_batch); 127 + log::info!("{}", batch_summary); 127 128 128 - let last = event_batch.last_jetstream_cursor.clone(); // TODO: get this from the data. track last in consumer. compute or track first. 129 + // todo!(); 130 + // let last = event_batch.last_jetstream_cursor.clone(); // TODO: get this from the data. track last in consumer. compute or track first. 129 131 130 - let db = &self.db; 131 - let keyspace = db.keyspace.clone(); 132 - let partition = db.partition.clone(); 132 + // let db = &self.db; 133 + // let keyspace = db.keyspace.clone(); 134 + // let partition = db.partition.clone(); 133 135 134 - let writer_t0 = Instant::now(); 135 - log::trace!("spawn_blocking for write batch"); 136 - tokio::task::spawn_blocking(move || { 137 - DBWriter { 138 - keyspace, 139 - partition, 140 - } 141 - .write_batch(event_batch, last) 142 - }) 143 - .await??; 144 - log::trace!("write: back from blocking task, successfully wrote batch"); 145 - let wrote_for = writer_t0.elapsed(); 136 + // let writer_t0 = Instant::now(); 137 + // log::trace!("spawn_blocking for write batch"); 138 + // tokio::task::spawn_blocking(move || { 139 + // DBWriter { 140 + // keyspace, 141 + // partition, 142 + // } 143 + // .write_batch(event_batch, last) 144 + // }) 145 + // .await??; 146 + // log::trace!("write: back from blocking task, successfully wrote batch"); 147 + // let wrote_for = writer_t0.elapsed(); 146 148 147 - println!("{batch_summary}, slept {slept_for: <12?}, wrote {wrote_for: <11?}, queue: {queue_size}"); 149 + // println!("{batch_summary}, slept {slept_for: <12?}, wrote {wrote_for: <11?}, queue: {queue_size}"); 148 150 } else { 149 151 log::error!("store consumer: receive channel failed (dropped/closed?)"); 150 152 anyhow::bail!("receive channel closed"); ··· 284 286 285 287 pub async fn get_collection_records( 286 288 &self, 287 - collection: &Nsid, 288 - limit: usize, 289 - ) -> anyhow::Result<Vec<CreateRecord>> { 290 - let partition = self.db.partition.clone(); 291 - let prefix = ByCollectionKey::prefix_from_collection(collection.clone())?; 292 - tokio::task::spawn_blocking(move || { 293 - let mut output = Vec::new(); 289 + _collection: &Nsid, 290 + _limit: usize, 291 + ) -> anyhow::Result<Vec<()>> { 292 + todo!(); 293 + // let partition = self.db.partition.clone(); 294 + // let prefix = ByCollectionKey::prefix_from_collection(collection.clone())?; 295 + // tokio::task::spawn_blocking(move || { 296 + // let mut output = Vec::new(); 294 297 295 - for pair in partition.prefix(&prefix).rev().take(limit) { 296 - let (k_bytes, v_bytes) = pair?; 297 - let (_, cursor) = db_complete::<ByCollectionKey>(&k_bytes)?.into(); 298 - let (did, rkey, record) = db_complete::<ByCollectionValue>(&v_bytes)?.into(); 299 - output.push(CreateRecord { 300 - did, 301 - rkey, 302 - record, 303 - cursor, 304 - }) 305 - } 306 - Ok(output) 307 - }) 308 - .await? 298 + // for pair in partition.prefix(&prefix).rev().take(limit) { 299 + // let (k_bytes, v_bytes) = pair?; 300 + // let (_, cursor) = db_complete::<ByCollectionKey>(&k_bytes)?.into(); 301 + // let (did, rkey, record) = db_complete::<ByCollectionValue>(&v_bytes)?.into(); 302 + // output.push(CreateRecord { 303 + // did, 304 + // rkey, 305 + // record, 306 + // cursor, 307 + // }) 308 + // } 309 + // Ok(output) 310 + // }) 311 + // .await? 309 312 } 310 313 311 314 pub async fn get_meta_info(&self) -> anyhow::Result<StorageInfo> { ··· 481 484 } 482 485 483 486 impl DBWriter { 484 - fn write_batch(self, event_batch: EventBatch, last: Option<Cursor>) -> anyhow::Result<()> { 485 - let mut db_batch = self.keyspace.batch(); 486 - self.add_record_creates(&mut db_batch, event_batch.record_creates)?; 487 - self.add_record_modifies(&mut db_batch, event_batch.record_modifies)?; 488 - self.add_account_removes(&mut db_batch, event_batch.account_removes)?; 489 - if let Some(cursor) = last { 490 - insert_batch_static::<JetstreamCursorKey>(&mut db_batch, &self.partition, cursor)?; 491 - } 492 - log::info!("write: committing write batch..."); 493 - let r = db_batch.commit(); 494 - log::info!("write: commit result: {r:?}"); 495 - r?; 496 - Ok(()) 487 + fn write_batch(self, _event_batch: EventBatch, _last: Option<Cursor>) -> anyhow::Result<()> { 488 + todo!(); 489 + // let mut db_batch = self.keyspace.batch(); 490 + // self.add_record_creates(&mut db_batch, event_batch.record_creates)?; 491 + // self.add_record_modifies(&mut db_batch, event_batch.record_modifies)?; 492 + // self.add_account_removes(&mut db_batch, event_batch.account_removes)?; 493 + // if let Some(cursor) = last { 494 + // insert_batch_static::<JetstreamCursorKey>(&mut db_batch, &self.partition, cursor)?; 495 + // } 496 + // log::info!("write: committing write batch..."); 497 + // let r = db_batch.commit(); 498 + // log::info!("write: commit result: {r:?}"); 499 + // r?; 500 + // Ok(()) 497 501 } 498 502 499 503 fn write_rw( ··· 665 669 666 670 fn add_record_creates( 667 671 &self, 668 - db_batch: &mut FjallBatch, 669 - record_creates: HashMap<Nsid, CollectionSamples>, 672 + _db_batch: &mut FjallBatch, 673 + _record_creates: HashMap<Nsid, ()>, 670 674 ) -> anyhow::Result<()> { 671 - for ( 672 - collection, 673 - CollectionSamples { 674 - total_seen, 675 - samples, 676 - }, 677 - ) in record_creates.into_iter() 678 - { 679 - if let Some(last_record) = &samples.back() { 680 - db_batch.insert( 681 - &self.partition, 682 - ByCursorSeenKey::new(last_record.cursor.clone(), collection.clone()) 683 - .to_db_bytes()?, 684 - ByCursorSeenValue::new(total_seen as u64).to_db_bytes()?, 685 - ); 686 - } else { 687 - log::error!( 688 - "collection samples should only exist when at least one sample has been added" 689 - ); 690 - } 675 + todo!(); 676 + // for ( 677 + // collection, 678 + // CollectionSamples { 679 + // total_seen, 680 + // samples, 681 + // }, 682 + // ) in record_creates.into_iter() 683 + // { 684 + // if let Some(last_record) = &samples.back() { 685 + // db_batch.insert( 686 + // &self.partition, 687 + // ByCursorSeenKey::new(last_record.cursor.clone(), collection.clone()) 688 + // .to_db_bytes()?, 689 + // ByCursorSeenValue::new(total_seen as u64).to_db_bytes()?, 690 + // ); 691 + // } else { 692 + // log::error!( 693 + // "collection samples should only exist when at least one sample has been added" 694 + // ); 695 + // } 691 696 692 - for CreateRecord { 693 - did, 694 - rkey, 695 - cursor, 696 - record, 697 - } in samples.into_iter().rev() 698 - { 699 - self.add_record(db_batch, cursor, did, collection.clone(), rkey, record)?; 700 - } 701 - } 702 - Ok(()) 697 + // for CreateRecord { 698 + // did, 699 + // rkey, 700 + // cursor, 701 + // record, 702 + // } in samples.into_iter().rev() 703 + // { 704 + // self.add_record(db_batch, cursor, did, collection.clone(), rkey, record)?; 705 + // } 706 + // } 707 + // Ok(()) 703 708 } 704 709 705 710 fn add_record( ··· 730 735 731 736 fn add_record_modifies( 732 737 &self, 733 - db_batch: &mut FjallBatch, 734 - record_modifies: Vec<ModifyRecord>, 738 + _db_batch: &mut FjallBatch, 739 + _record_modifies: Vec<()>, 735 740 ) -> anyhow::Result<()> { 736 - for modification in record_modifies { 737 - let (cursor, db_val) = match modification { 738 - ModifyRecord::Update(u) => ( 739 - u.cursor, 740 - ModQueueItemValue::UpdateRecord(u.did, u.collection, u.rkey, u.record), 741 - ), 742 - ModifyRecord::Delete(d) => ( 743 - d.cursor, 744 - ModQueueItemValue::DeleteRecord(d.did, d.collection, d.rkey), 745 - ), 746 - }; 747 - db_batch.insert( 748 - &self.partition, 749 - ModQueueItemKey::new(cursor).to_db_bytes()?, 750 - db_val.to_db_bytes()?, 751 - ); 752 - } 753 - Ok(()) 741 + todo!(); 742 + // for modification in record_modifies { 743 + // let (cursor, db_val) = match modification { 744 + // ModifyRecord::Update(u) => ( 745 + // u.cursor, 746 + // ModQueueItemValue::UpdateRecord(u.did, u.collection, u.rkey, u.record), 747 + // ), 748 + // ModifyRecord::Delete(d) => ( 749 + // d.cursor, 750 + // ModQueueItemValue::DeleteRecord(d.did, d.collection, d.rkey), 751 + // ), 752 + // }; 753 + // db_batch.insert( 754 + // &self.partition, 755 + // ModQueueItemKey::new(cursor).to_db_bytes()?, 756 + // db_val.to_db_bytes()?, 757 + // ); 758 + // } 759 + // Ok(()) 754 760 } 755 761 756 762 fn add_account_removes( ··· 785 791 ////////// temp stuff to remove: 786 792 787 793 fn summarize_batch(batch: &EventBatch) -> String { 788 - let EventBatch { 789 - record_creates, 790 - record_modifies, 791 - account_removes, 792 - last_jetstream_cursor, 793 - .. 794 - } = batch; 795 - let total_records: usize = record_creates.values().map(|v| v.total_seen).sum(); 796 - let total_samples: usize = record_creates.values().map(|v| v.samples.len()).sum(); 797 794 format!( 798 - "batch of {total_samples: >3} samples from {total_records: >4} records in {: >2} collections, {: >3} modifies, {} acct removes, cursor {: <12?}", 799 - record_creates.len(), 800 - record_modifies.len(), 801 - account_removes.len(), 802 - last_jetstream_cursor.clone().map(|c| c.elapsed()) 795 + "batch of {: >3} samples from {: >4} records in {: >2} collections from ~{: >4} DIDs, {} acct removes, cursor {: <12?}", 796 + batch.total_records(), 797 + batch.total_seen(), 798 + batch.total_collections(), 799 + batch.estimate_dids(), 800 + batch.account_removes(), 801 + batch.latest_cursor().map(|c| c.elapsed()), 803 802 ) 804 803 }