···10101111[dependencies]
1212async-trait = "0.1.83"
1313-atrium-api = { version = "0.25.2", default-features = false, features = [
1313+atrium-api = { git = "https://github.com/uniphil/atrium", branch = "fix/nsid-allow-nonleading-name-digits", default-features = false, features = [
1414 "namespace-appbsky",
1515] }
1616tokio = { version = "1.44.2", features = ["full", "sync", "time"] }
+2-2
jetstream/src/lib.rs
···439439440440 if let Some(last) = last_cursor {
441441 if event_cursor <= *last {
442442- log::warn!("event cursor {event_cursor:?} was older than the last one: {last:?}. dropping event.");
442442+ log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
443443 continue;
444444 }
445445 }
···475475476476 if let Some(last) = last_cursor {
477477 if event_cursor <= *last {
478478- log::warn!("event cursor {event_cursor:?} was older than the last one: {last:?}. dropping event.");
478478+ log::warn!("event cursor {event_cursor:?} was not newer than the last one: {last:?}. dropping event.");
479479 continue;
480480 }
481481 }
+32-12
ufos/src/consumer.rs
···88use std::mem;
99use std::time::Duration;
1010use tokio::sync::mpsc::{channel, Receiver, Sender};
1111+use tokio::time::{timeout, Interval};
11121213use crate::error::{BatchInsertError, FirehoseEventError};
1314use crate::{DeleteAccount, EventBatch, UFOsCommit};
···3435 batch_sender: Sender<LimitedBatch>,
3536 current_batch: CurrentBatch,
3637 sketch_secret: SketchSecretPrefix,
3838+ rate_limit: Interval,
3739}
38403941pub async fn consume(
···6466 .await?;
6567 let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
6668 let mut batcher = Batcher::new(jetstream_receiver, batch_sender, sketch_secret);
6767- tokio::task::spawn(async move { batcher.run().await });
6969+ tokio::task::spawn(async move {
7070+ let r = batcher.run().await;
7171+ log::info!("batcher ended: {r:?}");
7272+ });
6873 Ok(batch_reciever)
6974}
7075···7479 batch_sender: Sender<LimitedBatch>,
7580 sketch_secret: SketchSecretPrefix,
7681 ) -> Self {
8282+ let mut rate_limit = tokio::time::interval(std::time::Duration::from_millis(5));
8383+ rate_limit.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
7784 Self {
7885 jetstream_receiver,
7986 batch_sender,
8087 current_batch: Default::default(),
8188 sketch_secret,
8989+ rate_limit,
8290 }
8391 }
84928593 pub async fn run(&mut self) -> anyhow::Result<()> {
9494+ // TODO: report errors *from here* probably, since this gets shipped off into a spawned task that might just vanish
8695 loop {
8787- if let Some(event) = self.jetstream_receiver.recv().await {
8888- self.handle_event(event).await?
8989- } else {
9090- anyhow::bail!("channel closed");
9696+ match timeout(Duration::from_millis(9_000), self.jetstream_receiver.recv()).await {
9797+ Err(_elapsed) => self.no_events_step().await?,
9898+ Ok(Some(event)) => self.handle_event(event).await?,
9999+ Ok(None) => anyhow::bail!("channel closed"),
91100 }
92101 }
93102 }
94103104104+ async fn no_events_step(&mut self) -> anyhow::Result<()> {
105105+ let empty = self.current_batch.batch.is_empty();
106106+ log::info!("no events received, stepping batcher (empty? {empty})");
107107+ if !empty {
108108+ self.send_current_batch_now(true, "no events step").await?;
109109+ }
110110+ Ok(())
111111+ }
112112+95113 async fn handle_event(&mut self, event: JetstreamEvent) -> anyhow::Result<()> {
96114 if let Some(earliest) = &self.current_batch.initial_cursor {
97115 if event.cursor.duration_since(earliest)? > Duration::from_secs_f64(MAX_BATCH_SPAN_SECS)
98116 {
9999- self.send_current_batch_now(false).await?;
117117+ self.send_current_batch_now(false, "time since event")
118118+ .await?;
100119 }
101120 } else {
102121 self.current_batch.initial_cursor = Some(event.cursor);
···126145 if event.cursor.duration_since(earliest)?.as_secs_f64() > MIN_BATCH_SPAN_SECS
127146 && self.batch_sender.capacity() == BATCH_QUEUE_SIZE
128147 {
129129- self.send_current_batch_now(true).await?;
148148+ self.send_current_batch_now(true, "available queue").await?;
130149 }
131150 }
132151 Ok(())
···141160 );
142161143162 if let Err(BatchInsertError::BatchFull(commit)) = optimistic_res {
144144- self.send_current_batch_now(false).await?;
163163+ self.send_current_batch_now(false, "handle commit").await?;
145164 self.current_batch.batch.insert_commit_by_nsid(
146165 &collection,
147166 commit,
···157176158177 async fn handle_delete_account(&mut self, did: Did, cursor: Cursor) -> anyhow::Result<()> {
159178 if self.current_batch.batch.account_removes.len() >= MAX_ACCOUNT_REMOVES {
160160- self.send_current_batch_now(false).await?;
179179+ self.send_current_batch_now(false, "delete account").await?;
161180 }
162181 self.current_batch
163182 .batch
···168187169188 // holds up all consumer progress until it can send to the channel
170189 // use this when the current batch is too full to add more to it
171171- async fn send_current_batch_now(&mut self, small: bool) -> anyhow::Result<()> {
190190+ async fn send_current_batch_now(&mut self, small: bool, referrer: &str) -> anyhow::Result<()> {
172191 let beginning = match self.current_batch.initial_cursor.map(|c| c.elapsed()) {
173192 None => "unknown".to_string(),
174193 Some(Ok(t)) => format!("{:?}", t),
175194 Some(Err(e)) => format!("+{:?}", e.duration()),
176195 };
177177- log::info!(
178178- "sending batch now from {beginning}, {}, queue capacity: {}",
196196+ log::trace!(
197197+ "sending batch now from {beginning}, {}, queue capacity: {}, referrer: {referrer}",
179198 if small { "small" } else { "full" },
180199 self.batch_sender.capacity(),
181200 );
182201 let current = mem::take(&mut self.current_batch);
202202+ self.rate_limit.tick().await;
183203 self.batch_sender
184204 .send_timeout(current.batch, Duration::from_secs_f64(SEND_TIMEOUT_S))
185205 .await?;
+24-8
ufos/src/file_consumer.rs
···1212async fn read_jsonl(f: File, sender: Sender<JetstreamEvent>) -> Result<()> {
1313 let mut lines = BufReader::new(f).lines();
1414 while let Some(line) = lines.next_line().await? {
1515- let event: JetstreamEvent =
1616- serde_json::from_str(&line).map_err(JetstreamEventError::ReceivedMalformedJSON)?;
1717- if sender.send(event).await.is_err() {
1818- log::warn!("All receivers for the jsonl fixture have been dropped, bye.");
1919- return Err(JetstreamEventError::ReceiverClosedError.into());
1515+ match serde_json::from_str::<JetstreamEvent>(&line) {
1616+ Ok(event) => match sender.send(event).await {
1717+ Ok(_) => {}
1818+ Err(e) => {
1919+ log::warn!("All receivers for the jsonl fixture have been dropped, bye: {e:?}");
2020+ return Err(JetstreamEventError::ReceiverClosedError.into());
2121+ }
2222+ },
2323+ Err(parse_err) => {
2424+ log::warn!("failed to parse event: {parse_err:?} from event:\n{line}");
2525+ continue;
2626+ }
2027 }
2128 }
2222- Ok(())
2929+ log::info!("reached end of jsonl file, looping on noop to keep server alive.");
3030+ loop {
3131+ tokio::time::sleep(std::time::Duration::from_secs_f64(10.)).await;
3232+ }
2333}
24342535pub async fn consume(
···3040 let (jsonl_sender, jsonl_receiver) = channel::<JetstreamEvent>(16);
3141 let (batch_sender, batch_reciever) = channel::<LimitedBatch>(BATCH_QUEUE_SIZE);
3242 let mut batcher = Batcher::new(jsonl_receiver, batch_sender, sketch_secret);
3333- tokio::task::spawn(async move { read_jsonl(f, jsonl_sender).await });
3434- tokio::task::spawn(async move { batcher.run().await });
4343+ tokio::task::spawn(async move {
4444+ let r = read_jsonl(f, jsonl_sender).await;
4545+ log::info!("read_jsonl finished: {r:?}");
4646+ });
4747+ tokio::task::spawn(async move {
4848+ let r = batcher.run().await;
4949+ log::info!("batcher finished: {r:?}");
5050+ });
3551 Ok(batch_reciever)
3652}