···1616cross build --release --target arm-unknown-linux-gnueabihf && scp ../target/arm-unknown-linux-gnueabihf/release/ufos angel-hair.local:ufos
1717```
18181919+for bonilla (rp4)
2020+2121+```bash
2222+cross build --release --target armv7-unknown-linux-gnueabihf && scp ../target/armv7-unknown-linux-gnueabihf/release/ufos pi@bonilla.local:ufos
2323+```
2424+2525+glibc will cause problems when switching between (`GLIBC_2.25` message). clean up (next build will be slowww)
2626+2727+```bash
2828+cargo clean
2929+```
3030+1931nginx forward proxy for websocket (run this on another host):
20322133```nginx
···95107```
9610897109try without info-level logs for better perf
110110+111111+running on bonilla
112112+113113+```bash
114114+./ufos --jetstream us-west-2 --jetstream-force --data /mnt/ufos-data-no-compression-2/
115115+```
116116+117117+(reusing data dir from angel-hair)
118118+119119+120120+ipv6 is having some trouble. but also maybe there's a deadlock somewhere
121121+122122+```bash
123123+sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1
124124+```
+15-8
ufos/src/consumer.rs
···1919const MAX_ACCOUNT_REMOVES: usize = 512; // hard limit, total account deletions. actually the least frequent event, but tiny.
2020const MAX_BATCHED_COLLECTIONS: usize = 64; // hard limit, MAX_BATCHED_RECORDS applies per collection
2121const MIN_BATCH_SPAN_SECS: f64 = 2.; // try to get a bit of rest a bit.
2222-const MAX_BATCH_SPAN_SECS: f64 = 10.; // hard limit of duration from oldest to latest event cursor within a batch, in seconds.
2323-2424-const SEND_TIMEOUT_S: f64 = 6.;
2222+const MAX_BATCH_SPAN_SECS: f64 = 60.; // hard limit of duration from oldest to latest event cursor within a batch, in seconds.
25232626-const BATCH_QUEUE_SIZE: usize = 32;
2727-// const BATCH_QUEUE_SIZE: usize = 4096;
2424+const SEND_TIMEOUT_S: f64 = 60.;
2525+const BATCH_QUEUE_SIZE: usize = 1024; // 4096 got OOM'd
28262927#[derive(Debug)]
3028struct Batcher {
···3836 cursor: Option<Cursor>,
3937 no_compress: bool,
4038) -> anyhow::Result<Receiver<EventBatch>> {
3939+ let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint);
4040+ if endpoint == jetstream_endpoint {
4141+ eprintln!("connecting to jetstream at {endpoint}");
4242+ } else {
4343+ eprintln!("connecting to jetstream at {jetstream_endpoint} => {endpoint}");
4444+ }
4145 let config: JetstreamConfig<serde_json::Value> = JetstreamConfig {
4242- endpoint: DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint),
4646+ endpoint,
4347 compression: if no_compress {
4448 JetstreamCompression::None
4549 } else {
···124128 if event_cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS
125129 && self.batch_sender.capacity() == BATCH_QUEUE_SIZE
126130 {
127127- log::warn!("queue empty: immediately sending batch.");
131131+ log::trace!("queue empty: immediately sending batch.");
128132 if let Err(send_err) = self
129133 .batch_sender
130134 .send(mem::take(&mut self.current_batch))
···140144 // holds up all consumer progress until it can send to the channel
141145 // use this when the current batch is too full to add more to it
142146 async fn send_current_batch_now(&mut self) -> anyhow::Result<()> {
143143- log::warn!("attempting to send batch now");
147147+ log::warn!(
148148+ "attempting to send batch now (capacity: {})",
149149+ self.batch_sender.capacity()
150150+ );
144151 self.batch_sender
145152 .send_timeout(
146153 mem::take(&mut self.current_batch),
+54-13
ufos/src/main.rs
···2828 /// Location to store persist data to disk
2929 #[arg(long)]
3030 data: PathBuf,
3131+ /// DEBUG: don't start the jetstream consumer or its write loop
3232+ #[arg(long, action)]
3333+ pause_writer: bool,
3434+ /// DEBUG: force the rw loop to fall behind by pausing it
3535+ #[arg(long, action)]
3636+ pause_rw: bool,
3137}
32383333-// #[tokio::main]
3434-#[tokio::main(flavor = "current_thread")] // TODO: move this to config via args
3939+// #[tokio::main(flavor = "current_thread")] // TODO: move this to config via args
4040+#[tokio::main]
3541async fn main() -> anyhow::Result<()> {
3642 env_logger::init();
3743···3945 let (storage, cursor) =
4046 store::Storage::open(args.data, &args.jetstream, args.jetstream_force).await?;
41474242- println!(
4343- "starting consumer with cursor: {cursor:?} from {:?} ago",
4444- cursor.clone().map(|c| c.elapsed())
4545- );
4646- let batches = consumer::consume(&args.jetstream, cursor, args.jetstream_no_zstd).await?;
4747-4848 println!("starting server with storage...");
4949 let serving = server::serve(storage.clone());
50505151- tokio::select! {
5252- v = serving => eprintln!("serving ended: {v:?}"),
5353- v = storage.receive(batches) => eprintln!("storage consumer ended: {v:?}"),
5454- v = storage.rw_loop() => eprintln!("storage rw-loop ended: {v:?}"),
5555- };
5151+ let t1 = tokio::task::spawn(async {
5252+ let r = serving.await;
5353+ log::warn!("serving ended with: {r:?}");
5454+ });
5555+5656+ let t2: tokio::task::JoinHandle<anyhow::Result<()>> = tokio::task::spawn({
5757+ let storage = storage.clone();
5858+ async move {
5959+ if !args.pause_writer {
6060+ println!(
6161+ "starting consumer with cursor: {cursor:?} from {:?} ago",
6262+ cursor.clone().map(|c| c.elapsed())
6363+ );
6464+ let batches =
6565+ consumer::consume(&args.jetstream, cursor, args.jetstream_no_zstd).await?;
6666+ let r = storage.receive(batches).await;
6767+ log::warn!("storage.receive ended with: {r:?}");
6868+ } else {
6969+ log::info!("not starting jetstream or the write loop.");
7070+ }
7171+ Ok(())
7272+ }
7373+ });
7474+7575+ let t3 = tokio::task::spawn(async move {
7676+ if !args.pause_rw {
7777+ let r = storage.rw_loop().await;
7878+ log::warn!("storage.rw_loop ended with: {r:?}");
7979+ } else {
8080+ log::info!("not starting rw loop.");
8181+ }
8282+ });
8383+8484+ // tokio::select! {
8585+ // // v = serving => eprintln!("serving ended: {v:?}"),
8686+ // v = storage.receive(batches) => eprintln!("storage consumer ended: {v:?}"),
8787+ // v = storage.rw_loop() => eprintln!("storage rw-loop ended: {v:?}"),
8888+ // };
8989+9090+ log::trace!("tasks running. waiting.");
9191+ t1.await?;
9292+ log::trace!("serve task ended.");
9393+ t2.await??;
9494+ log::trace!("storage receive task ended.");
9595+ t3.await?;
9696+ log::trace!("storage rw task ended.");
56975798 println!("bye!");
5899
+88-46
ufos/src/server.rs
···66use dropshot::ConfigLogging;
77use dropshot::ConfigLoggingLevel;
88use dropshot::HttpError;
99+use dropshot::HttpResponseHeaders;
910use dropshot::HttpResponseOk;
1011use dropshot::Query;
1112use dropshot::RequestContext;
1213use dropshot::ServerBuilder;
1314use schemars::JsonSchema;
1415use serde::{Deserialize, Serialize};
1616+use std::collections::HashMap;
1517use std::sync::Arc;
16181719#[derive(Clone)]
···2527 method = GET,
2628 path = "/openapi",
2729}]
2828-async fn get_openapi(
2929- ctx: RequestContext<Context>,
3030-) -> Result<HttpResponseOk<serde_json::Value>, HttpError> {
3030+async fn get_openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> {
3131 let spec = (*ctx.context().spec).clone();
3232- Ok(HttpResponseOk(spec))
3232+ ok_cors(spec)
3333}
34343535#[derive(Debug, Serialize, JsonSchema)]
···4444 method = GET,
4545 path = "/meta"
4646}]
4747-async fn get_meta_info(
4848- ctx: RequestContext<Context>,
4949-) -> Result<HttpResponseOk<MetaInfo>, HttpError> {
4747+async fn get_meta_info(ctx: RequestContext<Context>) -> OkCorsResponse<MetaInfo> {
5048 let Context { storage, .. } = ctx.context();
51495250 let failed_to_get =
···7573 .map_err(failed_to_get("jetstream cursor"))?
7674 .map(|c| c.to_raw_u64());
77757878- Ok(HttpResponseOk(MetaInfo {
7676+ ok_cors(MetaInfo {
7977 storage_info,
8078 jetstream_endpoint,
8179 jetstream_cursor,
8280 mod_cursor,
8383- }))
8181+ })
8482}
85838684#[derive(Debug, Deserialize, JsonSchema)]
8787-struct CollectionQuery {
8585+struct CollectionsQuery {
8886 collection: String, // JsonSchema not implemented for Nsid :(
8787+}
8888+impl CollectionsQuery {
8989+ fn to_multiple_nsids(&self) -> Result<Vec<Nsid>, String> {
9090+ let mut out = Vec::with_capacity(self.collection.len());
9191+ for collection in self.collection.split(',') {
9292+ let Ok(nsid) = Nsid::new(collection.to_string()) else {
9393+ return Err(format!("collection {collection:?} was not a valid NSID"));
9494+ };
9595+ out.push(nsid);
9696+ }
9797+ Ok(out)
9898+ }
8999}
90100#[derive(Debug, Serialize, JsonSchema)]
91101struct ApiRecord {
···113123 }
114124}
115125/// Get recent records by collection
126126+///
127127+/// Multiple collections are supported. they will be delivered in one big array with no
128128+/// specified order.
116129#[endpoint {
117130 method = GET,
118131 path = "/records",
119132}]
120133async fn get_records_by_collection(
121134 ctx: RequestContext<Context>,
122122- collection_query: Query<CollectionQuery>,
123123-) -> Result<HttpResponseOk<Vec<ApiRecord>>, HttpError> {
124124- let Ok(collection) = Nsid::new(collection_query.into_inner().collection) else {
125125- return Err(HttpError::for_bad_request(
126126- None,
127127- "collection must be an NSID".to_string(),
128128- ));
129129- };
135135+ collection_query: Query<CollectionsQuery>,
136136+) -> OkCorsResponse<Vec<ApiRecord>> {
130137 let Context { storage, .. } = ctx.context();
131131- let records = storage
132132- .get_collection_records(&collection, 100)
133133- .await
134134- .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
135138136136- if records.is_empty() {
137137- return Err(HttpError::for_not_found(
138138- None,
139139- format!("no saved records for collection {collection:?}"),
140140- ));
141141- }
139139+ let collections = collection_query
140140+ .into_inner()
141141+ .to_multiple_nsids()
142142+ .map_err(|reason| HttpError::for_bad_request(None, reason))?;
142143143143- let api_records = records
144144- .into_iter()
145145- .map(|r| ApiRecord::from_create_record(r, &collection))
146146- .collect();
144144+ let mut api_records = Vec::new();
147145148148- Ok(HttpResponseOk(api_records))
146146+ // TODO: set up multiple db iterators and iterate them together with merge sort
147147+ for collection in &collections {
148148+ let records = storage
149149+ .get_collection_records(collection, 100)
150150+ .await
151151+ .map_err(|e| HttpError::for_internal_error(e.to_string()))?;
152152+153153+ for record in records {
154154+ let api_record = ApiRecord::from_create_record(record, collection);
155155+ api_records.push(api_record);
156156+ }
157157+ }
158158+159159+ ok_cors(api_records)
149160}
150161151162/// Get total records seen by collection
···153164 method = GET,
154165 path = "/records/total-seen"
155166}]
156156-async fn get_asdf(
167167+async fn get_records_total_seen(
157168 ctx: RequestContext<Context>,
158158- collection_query: Query<CollectionQuery>,
159159-) -> Result<HttpResponseOk<u64>, HttpError> {
160160- let Ok(collection) = Nsid::new(collection_query.into_inner().collection) else {
161161- return Err(HttpError::for_bad_request(
162162- None,
163163- "collection must be an NSID".to_string(),
164164- ));
165165- };
169169+ collection_query: Query<CollectionsQuery>,
170170+) -> OkCorsResponse<HashMap<String, u64>> {
171171+ let Context { storage, .. } = ctx.context();
172172+173173+ let collections = collection_query
174174+ .into_inner()
175175+ .to_multiple_nsids()
176176+ .map_err(|reason| HttpError::for_bad_request(None, reason))?;
177177+178178+ let mut seen_by_collection = HashMap::with_capacity(collections.len());
179179+180180+ for collection in &collections {
181181+ let total = storage
182182+ .get_collection_total_seen(collection)
183183+ .await
184184+ .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?;
185185+186186+ seen_by_collection.insert(collection.to_string(), total);
187187+ }
188188+189189+ ok_cors(seen_by_collection)
190190+}
191191+192192+/// Get top collections
193193+#[endpoint {
194194+ method = GET,
195195+ path = "/collections"
196196+}]
197197+async fn get_top_collections(ctx: RequestContext<Context>) -> OkCorsResponse<HashMap<String, u64>> {
166198 let Context { storage, .. } = ctx.context();
167167- let total = storage
168168- .get_collection_total_seen(&collection)
199199+ let collections = storage
200200+ .get_top_collections()
169201 .await
170202 .map_err(|e| HttpError::for_internal_error(format!("boooo: {e:?}")))?;
171203172172- Ok(HttpResponseOk(total))
204204+ ok_cors(collections)
173205}
174206175207pub async fn serve(storage: Storage) -> Result<(), String> {
···184216 api.register(get_openapi).unwrap();
185217 api.register(get_meta_info).unwrap();
186218 api.register(get_records_by_collection).unwrap();
187187- api.register(get_asdf).unwrap();
219219+ api.register(get_records_total_seen).unwrap();
220220+ api.register(get_top_collections).unwrap();
188221189222 let context = Context {
190223 spec: Arc::new(
···204237 .map_err(|error| format!("failed to start server: {}", error))?
205238 .await
206239}
240240+241241+/// awkward helpers
242242+type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
243243+fn ok_cors<T: Send + Sync + Serialize + JsonSchema>(t: T) -> OkCorsResponse<T> {
244244+ let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(t));
245245+ res.headers_mut()
246246+ .insert("access-control-allow-origin", "*".parse().unwrap());
247247+ Ok(res)
248248+}
+257-111
ufos/src/store.rs
···1414use jetstream::events::Cursor;
1515use std::collections::HashMap;
1616use std::path::{Path, PathBuf};
1717+use std::sync::Arc;
1718use std::time::{Duration, Instant};
1818-use tokio::{sync::mpsc::Receiver, time::sleep};
1919+use tokio::sync::mpsc::Receiver;
2020+use tokio::time::sleep;
19212020-/// Commit the RW batch immediately if this nubmer of events have been read off the mod queue
2222+/// Commit the RW batch immediately if this number of events have been read off the mod queue
2123const MAX_BATCHED_RW_EVENTS: usize = 18;
22242325/// Commit the RW batch immediately if this number of records is reached
···2729/// - doing more work whenever scheduled means getting more CPU time in general
2830///
2931/// this is higher than [MAX_BATCHED_RW_EVENTS] because account-deletes can have lots of items
3030-const MAX_BATCHED_RW_ITEMS: usize = 36;
3232+const MAX_BATCHED_RW_ITEMS: usize = 24;
3333+3434+#[derive(Clone)]
3535+struct SerialDb {
3636+ keyspace: Keyspace,
3737+ partition: PartitionHandle,
3838+}
3939+4040+struct FakeMutex<T> {
4141+ thing: T,
4242+}
4343+impl<T: Clone> FakeMutex<T> {
4444+ pub fn new(thing: T) -> Self {
4545+ Self { thing }
4646+ }
4747+ pub async fn lock(&self) -> T {
4848+ self.thing.clone()
4949+ }
5050+}
31513252/**
3353 * data format, roughly:
···5676 **/
5777#[derive(Clone)]
5878pub struct Storage {
5959- keyspace: Keyspace,
6060- partition: PartitionHandle,
7979+ /// horrible: gate all db access behind this to force global serialization to avoid deadlock
8080+ db: Arc<FakeMutex<SerialDb>>,
6181}
62826383impl Storage {
···6888 PartitionCreateOptions::default().compression(CompressionType::None),
6989 )?;
7090 Ok(Self {
7171- keyspace,
7272- partition,
9191+ db: Arc::new(FakeMutex::new(SerialDb {
9292+ keyspace,
9393+ partition,
9494+ })),
7395 })
7496 }
7597···108130 // TODO: see rw_loop: enforce single-thread.
109131 loop {
110132 let t_sleep = Instant::now();
111111- sleep(Duration::from_secs_f64(0.3)).await; // TODO: minimize during replay
133133+ sleep(Duration::from_secs_f64(0.8)).await; // TODO: minimize during replay
112134 let slept_for = t_sleep.elapsed();
113135 let queue_size = receiver.len();
114136115137 if let Some(event_batch) = receiver.recv().await {
138138+ log::trace!("write: received write batch");
116139 let batch_summary = summarize_batch(&event_batch);
117140118141 let last = event_batch.last_jetstream_cursor.clone(); // TODO: get this from the data. track last in consumer. compute or track first.
119142120120- let keyspace = self.keyspace.clone();
121121- let partition = self.partition.clone();
143143+ let db = self.db.lock().await;
144144+ let keyspace = db.keyspace.clone();
145145+ let partition = db.partition.clone();
122146123147 let writer_t0 = Instant::now();
148148+ log::trace!("spawn_blocking for write batch");
124149 tokio::task::spawn_blocking(move || {
125150 DBWriter {
126151 keyspace,
···129154 .write_batch(event_batch, last)
130155 })
131156 .await??;
157157+ log::trace!("write: back from blocking task, successfully wrote batch");
132158 let wrote_for = writer_t0.elapsed();
159159+ drop(db);
133160134161 println!("{batch_summary}, slept {slept_for: <12?}, wrote {wrote_for: <11?}, queue: {queue_size}");
135162 } else {
163163+ log::error!("store consumer: receive channel failed (dropped/closed?)");
136164 anyhow::bail!("receive channel closed");
137165 }
138166 }
···142170 pub async fn rw_loop(&self) -> anyhow::Result<()> {
143171 // TODO: lock so that only one rw loop can possibly be run. or even better, take a mutable resource thing to enforce at compile time.
144172 loop {
145145- sleep(Duration::from_secs_f64(0.001)).await; // todo: interval rate-limit instead
146146- let keyspace = self.keyspace.clone();
147147- let partition = self.partition.clone();
173173+ sleep(Duration::from_secs_f64(0.1)).await; // todo: interval rate-limit instead
174174+175175+ let db = self.db.lock().await;
176176+ let keyspace = db.keyspace.clone();
177177+ let partition = db.partition.clone();
178178+179179+ log::trace!("rw: spawn blocking for batch...");
148180 tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
181181+ log::trace!("rw: getting rw cursor...");
149182 let mod_cursor = get_static::<ModCursorKey, ModCursorValue>(&partition)?
150183 .unwrap_or(Cursor::from_start());
151184 let range = ModQueueItemKey::new(mod_cursor.clone()).range_to_prefix_end()?;
152185153186 let mut db_batch = keyspace.batch();
154187 let mut batched_rw_items = 0;
188188+ let mut any_tasks_found = false;
155189156156- for (i, pair) in partition.range(range.clone()).enumerate() {
157157- if i >= MAX_BATCHED_RW_EVENTS {
158158- break;
159159- }
190190+ log::trace!("rw: iterating newer rw items...");
160191161161- let (key_bytes, val_bytes) = pair?;
162162- let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) {
163163- Ok(k) => k,
164164- Err(EncodingError::WrongStaticPrefix(_, _)) => {
165165- panic!("wsp: mod queue empty.");
192192+193193+ //// ITER
194194+195195+ {
196196+ let iterator = partition.range(range.clone()).enumerate();
197197+198198+ for (i, pair) in iterator {
199199+ log::trace!("rw: iterating {i}");
200200+ any_tasks_found = true;
201201+202202+ if i >= MAX_BATCHED_RW_EVENTS {
203203+ break;
166204 }
167167- otherwise => otherwise?,
168168- };
169205170170- let mod_value: ModQueueItemValue =
171171- db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?;
206206+ let (key_bytes, val_bytes) = pair?;
207207+ let mod_key = match db_complete::<ModQueueItemKey>(&key_bytes) {
208208+ Ok(k) => k,
209209+ Err(EncodingError::WrongStaticPrefix(_, _)) => {
210210+ panic!("wsp: mod queue empty.");
211211+ }
212212+ otherwise => otherwise?,
213213+ };
172214173173- batched_rw_items += DBWriter {
174174- keyspace: keyspace.clone(),
175175- partition: partition.clone(),
176176- }
177177- .write_rw(&mut db_batch, mod_key, mod_value)?;
215215+ let mod_value: ModQueueItemValue =
216216+ db_complete::<ModQueueItemStringValue>(&val_bytes)?.try_into()?;
217217+218218+ log::trace!("rw: iterating {i}: sending to batcher {mod_key:?} => {mod_value:?}");
219219+ batched_rw_items += DBWriter {
220220+ keyspace: keyspace.clone(),
221221+ partition: partition.clone(),
222222+ }
223223+ .write_rw(&mut db_batch, mod_key, mod_value)?;
224224+ log::trace!("rw: iterating {i}: back from batcher.");
178225179179- if batched_rw_items >= MAX_BATCHED_RW_ITEMS {
180180- break;
226226+ if batched_rw_items >= MAX_BATCHED_RW_ITEMS {
227227+ log::trace!("rw: iterating {i}: batch big enough, breaking out.");
228228+ break;
229229+ }
181230 }
231231+ // drop(iterator); // moved -- must be dropped hopefully
182232 }
183233184184- db_batch.commit()?;
234234+ if !any_tasks_found {
235235+ log::trace!("rw: skipping batch commit since apparently no items were added (this is normal, skipping is new)");
236236+ return Ok(());
237237+ }
238238+239239+ log::info!("rw: committing rw batch with {batched_rw_items} items (items != total inserts/deletes)...");
240240+ let r = db_batch.commit();
241241+ log::info!("rw: commit result: {r:?}");
242242+ r?;
185243 Ok(())
186244 })
187245 .await??;
246246+ log::trace!("rw: back from blocking for rw...");
188247 }
248248+ // log::warn!("exited rw loop (rw task)");
189249 }
190250191251 pub async fn get_collection_records(
···193253 collection: &Nsid,
194254 limit: usize,
195255 ) -> anyhow::Result<Vec<CreateRecord>> {
196196- let partition = self.partition.clone();
256256+ let partition = self.db.lock().await.partition.clone();
197257 let prefix = ByCollectionKey::prefix_from_collection(collection.clone())?;
198258 tokio::task::spawn_blocking(move || {
199259 let mut output = Vec::new();
200200- for pair in partition.prefix(&prefix).rev().take(limit) {
201201- let (k_bytes, v_bytes) = pair?;
202202- let (_, cursor) = db_complete::<ByCollectionKey>(&k_bytes)?.into();
203203- let (did, rkey, record) = db_complete::<ByCollectionValue>(&v_bytes)?.into();
204204- output.push(CreateRecord {
205205- did,
206206- rkey,
207207- record,
208208- cursor,
209209- })
260260+261261+ ////// ITER
262262+ {
263263+ for pair in partition.prefix(&prefix).rev().take(limit) {
264264+ let (k_bytes, v_bytes) = pair?;
265265+ let (_, cursor) = db_complete::<ByCollectionKey>(&k_bytes)?.into();
266266+ let (did, rkey, record) = db_complete::<ByCollectionValue>(&v_bytes)?.into();
267267+ output.push(CreateRecord {
268268+ did,
269269+ rkey,
270270+ record,
271271+ cursor,
272272+ })
273273+ }
210274 }
211275 Ok(output)
212276 })
···214278 }
215279216280 pub async fn get_meta_info(&self) -> anyhow::Result<StorageInfo> {
217217- let keyspace = self.keyspace.clone();
218218- let partition = self.partition.clone();
281281+ let db = self.db.lock().await;
282282+ let keyspace = db.keyspace.clone();
283283+ let partition = db.partition.clone();
219284 tokio::task::spawn_blocking(move || {
220285 Ok(StorageInfo {
221286 keyspace_disk_space: keyspace.disk_space(),
···228293 }
229294230295 pub async fn get_collection_total_seen(&self, collection: &Nsid) -> anyhow::Result<u64> {
231231- let partition = self.partition.clone();
296296+ let partition = self.db.lock().await.partition.clone();
232297 let collection = collection.clone();
233233- tokio::task::spawn_blocking(move || get_unrolled_asdf(&partition, collection)).await?
298298+ tokio::task::spawn_blocking(move || get_unrolled_collection_seen(&partition, collection))
299299+ .await?
300300+ }
301301+302302+ pub async fn get_top_collections(&self) -> anyhow::Result<HashMap<String, u64>> {
303303+ let partition = self.db.lock().await.partition.clone();
304304+ tokio::task::spawn_blocking(move || get_unrolled_top_collections(&partition)).await?
234305 }
235306236307 pub async fn get_jetstream_endpoint(&self) -> anyhow::Result<Option<JetstreamEndpointValue>> {
237237- let partition = self.partition.clone();
308308+ let partition = self.db.lock().await.partition.clone();
238309 tokio::task::spawn_blocking(move || {
239310 get_static::<JetstreamEndpointKey, JetstreamEndpointValue>(&partition)
240311 })
···242313 }
243314244315 async fn set_jetstream_endpoint(&self, endpoint: &str) -> anyhow::Result<()> {
245245- let partition = self.partition.clone();
316316+ let partition = self.db.lock().await.partition.clone();
246317 let endpoint = endpoint.to_string();
247318 tokio::task::spawn_blocking(move || {
248319 insert_static::<JetstreamEndpointKey>(&partition, JetstreamEndpointValue(endpoint))
···251322 }
252323253324 pub async fn get_jetstream_cursor(&self) -> anyhow::Result<Option<Cursor>> {
254254- let partition = self.partition.clone();
325325+ let partition = self.db.lock().await.partition.clone();
255326 tokio::task::spawn_blocking(move || {
256327 get_static::<JetstreamCursorKey, JetstreamCursorValue>(&partition)
257328 })
···259330 }
260331261332 pub async fn get_mod_cursor(&self) -> anyhow::Result<Option<Cursor>> {
262262- let partition = self.partition.clone();
333333+ let partition = self.db.lock().await.partition.clone();
263334 tokio::task::spawn_blocking(move || get_static::<ModCursorKey, ModCursorValue>(&partition))
264335 .await?
265336 }
···310381}
311382312383/// Get stats that haven't been rolled up yet
313313-fn get_unrolled_asdf(partition: &PartitionHandle, collection: Nsid) -> anyhow::Result<u64> {
384384+fn get_unrolled_collection_seen(
385385+ partition: &PartitionHandle,
386386+ collection: Nsid,
387387+) -> anyhow::Result<u64> {
314388 let range =
315389 if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(partition)? {
316390 eprintln!("found existing cursor");
···325399326400 let mut scanned = 0;
327401 let mut rolled = 0;
402402+403403+ ////// ITER
404404+405405+ {
406406+ for pair in partition.range(range) {
407407+ let (key_bytes, value_bytes) = pair?;
408408+ let key = db_complete::<ByCursorSeenKey>(&key_bytes)?;
409409+ let val = db_complete::<ByCursorSeenValue>(&value_bytes)?;
410410+411411+ if *key.collection() == collection {
412412+ let SeenCounter(n) = val;
413413+ collection_total += n;
414414+ rolled += 1;
415415+ }
416416+ scanned += 1;
417417+ }
418418+ }
419419+420420+ eprintln!("scanned: {scanned}, rolled: {rolled}");
421421+422422+ Ok(collection_total)
423423+}
424424+425425+fn get_unrolled_top_collections(
426426+ partition: &PartitionHandle,
427427+) -> anyhow::Result<HashMap<String, u64>> {
428428+ let range =
429429+ if let Some(cursor_value) = get_static::<RollupCursorKey, RollupCursorValue>(partition)? {
430430+ eprintln!("found existing cursor");
431431+ let key: ByCursorSeenKey = cursor_value.into();
432432+ key.range_from()?
433433+ } else {
434434+ eprintln!("cursor from start.");
435435+ ByCursorSeenKey::full_range()?
436436+ };
437437+438438+ let mut res = HashMap::new();
439439+ let mut scanned = 0;
440440+328441 for pair in partition.range(range) {
329442 let (key_bytes, value_bytes) = pair?;
330443 let key = db_complete::<ByCursorSeenKey>(&key_bytes)?;
331331- let val = db_complete::<ByCursorSeenValue>(&value_bytes)?;
444444+ let SeenCounter(n) = db_complete(&value_bytes)?;
445445+446446+ *res.entry(key.collection().to_string()).or_default() += n;
332447333333- if *key.collection() == collection {
334334- let SeenCounter(n) = val;
335335- collection_total += n;
336336- rolled += 1;
337337- }
338448 scanned += 1;
339449 }
340450341341- eprintln!("scanned: {scanned}, rolled: {rolled}");
451451+ eprintln!("scanned: {scanned} seen-counts.");
342452343343- Ok(collection_total)
453453+ Ok(res)
344454}
345455346456impl DBWriter {
···352462 if let Some(cursor) = last {
353463 insert_batch_static::<JetstreamCursorKey>(&mut db_batch, &self.partition, cursor)?;
354464 }
355355- Ok(db_batch.commit()?)
465465+ log::info!("write: committing write batch...");
466466+ let r = db_batch.commit();
467467+ log::info!("write: commit result: {r:?}");
468468+ r?;
469469+ Ok(())
356470 }
357471358472 fn write_rw(
···367481368482 let items_modified = match mod_value {
369483 ModQueueItemValue::DeleteAccount(did) => {
484484+ log::trace!("rw: batcher: delete account...");
370485 let (items, finished) = self.delete_account(db_batch, mod_cursor, did)?;
486486+ log::trace!("rw: batcher: back from delete account (finished? {finished})");
371487 if finished {
372488 // only remove the queued rw task if we have actually completed its account removal work
373489 remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?;
490490+ items + 1
491491+ } else {
492492+ items
374493 }
375375- items
376494 }
377495 ModQueueItemValue::DeleteRecord(did, collection, rkey) => {
496496+ log::trace!("rw: batcher: delete record...");
497497+ let items = self.delete_record(db_batch, mod_cursor, did, collection, rkey)?;
498498+ log::trace!("rw: batcher: back from delete record");
378499 remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?;
379379- self.delete_record(db_batch, mod_cursor, did, collection, rkey)?
500500+ items + 1
380501 }
381502 ModQueueItemValue::UpdateRecord(did, collection, rkey, record) => {
503503+ let items =
504504+ self.update_record(db_batch, mod_cursor, did, collection, rkey, record)?;
382505 remove_batch::<ModQueueItemKey>(db_batch, &self.partition, mod_key)?;
383383- self.update_record(db_batch, mod_cursor, did, collection, rkey, record)?
506506+ items + 1
384507 }
385508 };
386509 Ok(items_modified)
···420543 rkey: RecordKey,
421544 ) -> anyhow::Result<usize> {
422545 let key_prefix_bytes =
423423- ByIdKey::record_prefix(did, collection.clone(), rkey).to_db_bytes()?;
546546+ ByIdKey::record_prefix(did.clone(), collection.clone(), rkey.clone()).to_db_bytes()?;
547547+548548+ // put the cursor of the actual deletion event in to prevent prefix iter from touching newer docs
549549+ let key_limit =
550550+ ByIdKey::new(did, collection.clone(), rkey, cursor.clone()).to_db_bytes()?;
424551425552 let mut items_removed = 0;
426426- for pair in self.partition.prefix(&key_prefix_bytes) {
427427- // find all (hopefully 1)
428428- let (key_bytes, _) = pair?;
429429- let key = db_complete::<ByIdKey>(&key_bytes)?;
430430- let found_cursor = key.cursor();
431431- if found_cursor > cursor {
432432- // we are *only* allowed to delete records that came before the record delete event
433433- eprintln!("delete_record: found (and ignoring) newer version(s). key: {key:?}");
434434- break;
435435- }
436553437437- // remove the by_id entry
438438- db_batch.remove(&self.partition, key_bytes);
439439-440440- // remove its record sample
441441- let by_collection_key_bytes =
442442- ByCollectionKey::new(collection.clone(), found_cursor).to_db_bytes()?;
443443- db_batch.remove(&self.partition, by_collection_key_bytes);
554554+ log::trace!("delete_record: iterate over up to current cursor...");
444555445445- items_removed += 1;
446446- }
556556+ ////////// ITER
447557448448- if items_removed > 1 {
449449- eprintln!("odd, removed {items_removed} records for one record removal:");
450450- for (i, pair) in self.partition.prefix(&key_prefix_bytes).enumerate() {
558558+ {
559559+ for (i, pair) in self
560560+ .partition
561561+ .range(key_prefix_bytes..key_limit)
562562+ .enumerate()
563563+ {
564564+ log::trace!("delete_record iter {i}: found");
451565 // find all (hopefully 1)
452566 let (key_bytes, _) = pair?;
453453- let found_cursor = db_complete::<ByIdKey>(&key_bytes)?.cursor();
567567+ let key = db_complete::<ByIdKey>(&key_bytes)?;
568568+ let found_cursor = key.cursor();
454569 if found_cursor > cursor {
455455- break;
570570+ // we are *only* allowed to delete records that came before the record delete event
571571+ // log::trace!("delete_record: found (and ignoring) newer version(s). key: {key:?}");
572572+ panic!("wtf, found newer version than cursor limit we tried to set.");
573573+ // break;
456574 }
457575458458- let key = db_complete::<ByIdKey>(&key_bytes)?;
459459- eprintln!(" {i}: key {key:?}");
576576+ // remove the by_id entry
577577+ db_batch.remove(&self.partition, key_bytes);
578578+579579+ // remove its record sample
580580+ let by_collection_key_bytes =
581581+ ByCollectionKey::new(collection.clone(), found_cursor).to_db_bytes()?;
582582+ db_batch.remove(&self.partition, by_collection_key_bytes);
583583+584584+ items_removed += 1;
460585 }
461586 }
587587+588588+ // if items_removed > 1 {
589589+ // log::trace!("odd, removed {items_removed} records for one record removal:");
590590+ // for (i, pair) in self.partition.prefix(&key_prefix_bytes).enumerate() {
591591+ // // find all (hopefully 1)
592592+ // let (key_bytes, _) = pair?;
593593+ // let found_cursor = db_complete::<ByIdKey>(&key_bytes)?.cursor();
594594+ // if found_cursor > cursor {
595595+ // break;
596596+ // }
597597+598598+ // let key = db_complete::<ByIdKey>(&key_bytes)?;
599599+ // log::trace!(" {i}: key {key:?}");
600600+ // }
601601+ // }
462602 Ok(items_removed)
463603 }
464604···471611 let key_prefix_bytes = ByIdKey::did_prefix(did).to_db_bytes()?;
472612473613 let mut items_added = 0;
474474- for pair in self.partition.prefix(&key_prefix_bytes) {
475475- let (key_bytes, _) = pair?;
476614477477- let (_, collection, _rkey, found_cursor) = db_complete::<ByIdKey>(&key_bytes)?.into();
478478- if found_cursor > cursor {
479479- eprintln!(
480480- "delete account: found (and ignoring) newer records than the delete event??"
481481- );
482482- continue;
483483- }
615615+ ////////// ITER
484616485485- // remove the by_id entry
486486- db_batch.remove(&self.partition, key_bytes);
617617+ {
618618+ for pair in self.partition.prefix(&key_prefix_bytes) {
619619+ let (key_bytes, _) = pair?;
487620488488- // remove its record sample
489489- let by_collection_key_bytes =
490490- ByCollectionKey::new(collection, found_cursor).to_db_bytes()?;
491491- db_batch.remove(&self.partition, by_collection_key_bytes);
621621+ let (_, collection, _rkey, found_cursor) =
622622+ db_complete::<ByIdKey>(&key_bytes)?.into();
623623+ if found_cursor > cursor {
624624+ log::trace!(
625625+ "delete account: found (and ignoring) newer records than the delete event??"
626626+ );
627627+ continue;
628628+ }
492629493493- items_added += 1;
494494- if items_added >= MAX_BATCHED_RW_ITEMS {
495495- return Ok((items_added, false)); // there might be more records but we've done enough for this batch
630630+ // remove the by_id entry
631631+ db_batch.remove(&self.partition, key_bytes);
632632+633633+ // remove its record sample
634634+ let by_collection_key_bytes =
635635+ ByCollectionKey::new(collection, found_cursor).to_db_bytes()?;
636636+ db_batch.remove(&self.partition, by_collection_key_bytes);
637637+638638+ items_added += 1;
639639+ if items_added >= MAX_BATCHED_RW_ITEMS {
640640+ return Ok((items_added, false)); // there might be more records but we've done enough for this batch
641641+ }
496642 }
497643 }
498644