Constellation, Spacedust, Slingshot, UFOs: atproto crates and services for microcosm
75
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 161 lines 5.3 kB view raw
1use crate::store_types::{CountsValue, HourTruncatedCursor, SketchSecretPrefix}; 2use crate::{ 3 error::StorageError, ConsumerInfo, Cursor, EventBatch, JustCount, NsidCount, NsidPrefix, 4 OrderCollectionsBy, PrefixChild, UFOsRecord, 5}; 6use async_trait::async_trait; 7use jetstream::exports::{Did, Nsid}; 8use metrics::{describe_histogram, histogram, Unit}; 9use std::collections::{HashMap, HashSet}; 10use std::path::Path; 11use std::time::{Duration, Instant}; 12use tokio::sync::mpsc::Receiver; 13use tokio_util::sync::CancellationToken; 14 15pub type StorageResult<T> = Result<T, StorageError>; 16 17pub trait StorageWhatever<R: StoreReader, W: StoreWriter<B>, B: StoreBackground, C> { 18 fn init( 19 path: impl AsRef<Path>, 20 endpoint: String, 21 force_endpoint: bool, 22 config: C, 23 ) -> StorageResult<(R, W, Option<Cursor>, SketchSecretPrefix)> 24 where 25 Self: Sized; 26} 27 28#[async_trait] 29pub trait StoreWriter<B: StoreBackground>: Clone + Send + Sync 30where 31 Self: 'static, 32{ 33 fn background_tasks(&mut self, reroll: bool) -> StorageResult<B>; 34 35 async fn receive_batches<const LIMIT: usize>( 36 self, 37 mut batches: Receiver<EventBatch<LIMIT>>, 38 ) -> StorageResult<()> { 39 describe_histogram!( 40 "storage_slow_batches", 41 Unit::Microseconds, 42 "batches that took more than 3s to insert" 43 ); 44 describe_histogram!( 45 "storage_batch_insert_time", 46 Unit::Microseconds, 47 "total time to insert one commit batch" 48 ); 49 while let Some(event_batch) = batches.recv().await { 50 let token = CancellationToken::new(); 51 let cancelled = token.clone(); 52 tokio::spawn(async move { 53 let started = Instant::now(); 54 let mut concerned = false; 55 loop { 56 tokio::select! { 57 _ = tokio::time::sleep(Duration::from_secs(3)) => { 58 if !concerned { 59 log::warn!("taking a long time to insert an event batch..."); 60 } 61 concerned = true; 62 } 63 _ = cancelled.cancelled() => { 64 if concerned { 65 log::warn!("finally inserted slow event batch (or failed) after {:?}", started.elapsed()); 66 histogram!("storage_slow_batches").record(started.elapsed().as_micros() as f64); 67 } 68 break 69 } 70 } 71 } 72 }); 73 tokio::task::spawn_blocking({ 74 let mut me = self.clone(); 75 move || { 76 let _guard = token.drop_guard(); 77 let t0 = Instant::now(); 78 let r = me.insert_batch(event_batch); 79 histogram!("storage_batch_insert_time").record(t0.elapsed().as_micros() as f64); 80 r 81 } 82 }) 83 .await??; 84 } 85 86 Err(StorageError::BatchSenderExited) 87 } 88 89 fn insert_batch<const LIMIT: usize>( 90 &mut self, 91 event_batch: EventBatch<LIMIT>, 92 ) -> StorageResult<()>; 93 94 fn step_rollup(&mut self) -> StorageResult<(usize, HashSet<Nsid>)>; 95 96 fn trim_collection( 97 &mut self, 98 collection: &Nsid, 99 limit: usize, 100 full_scan: bool, 101 ) -> StorageResult<(usize, usize, bool)>; 102 103 fn delete_account(&mut self, did: &Did) -> StorageResult<usize>; 104} 105 106#[async_trait] 107pub trait StoreBackground: Send + Sync { 108 async fn run(mut self, backfill: bool) -> StorageResult<()>; 109} 110 111#[async_trait] 112pub trait StoreReader: Send + Sync { 113 fn name(&self) -> String; 114 115 fn update_metrics(&self) {} 116 117 async fn get_storage_stats(&self) -> StorageResult<serde_json::Value>; 118 119 async fn get_consumer_info(&self) -> StorageResult<ConsumerInfo>; 120 121 async fn get_collections( 122 &self, 123 limit: usize, 124 order: OrderCollectionsBy, 125 since: Option<HourTruncatedCursor>, 126 until: Option<HourTruncatedCursor>, 127 ) -> StorageResult<(Vec<NsidCount>, Option<Vec<u8>>)>; 128 129 async fn get_prefix( 130 &self, 131 prefix: NsidPrefix, 132 limit: usize, 133 order: OrderCollectionsBy, 134 since: Option<HourTruncatedCursor>, 135 until: Option<HourTruncatedCursor>, 136 ) -> StorageResult<(JustCount, Vec<PrefixChild>, Option<Vec<u8>>)>; 137 138 async fn get_timeseries( 139 &self, 140 collections: Vec<Nsid>, 141 since: HourTruncatedCursor, 142 until: Option<HourTruncatedCursor>, 143 step: u64, 144 ) -> StorageResult<(Vec<HourTruncatedCursor>, HashMap<Nsid, Vec<CountsValue>>)>; 145 146 async fn get_collection_counts( 147 &self, 148 collection: &Nsid, 149 since: HourTruncatedCursor, 150 until: Option<HourTruncatedCursor>, 151 ) -> StorageResult<JustCount>; 152 153 async fn get_records_by_collections( 154 &self, 155 collections: HashSet<Nsid>, 156 limit: usize, 157 expand_each_collection: bool, 158 ) -> StorageResult<Vec<UFOsRecord>>; 159 160 async fn search_collections(&self, terms: Vec<String>) -> StorageResult<Vec<NsidCount>>; 161}