···105105 let mut buf = Vec::<BufferedMessage>::with_capacity(BUF_SIZE);
106106 let mut failed = Vec::<BufferedMessage>::new();
107107108108+ let _g = handle.enter();
109109+108110 loop {
109111 let mut batch = self.state.db.inner.batch();
110112 let mut deleted = HashSet::new();
···187189 // wait until we receive some messages
188190 // this does mean we will have an up to 1 second delay, before we send events to consumers
189191 // but thats reasonable imo, could also be configured of course
190190- let _ = handle.block_on(tokio::time::timeout(
191191- Duration::from_secs(1),
192192- self.rx.recv_many(&mut buf, BUF_SIZE),
193193- ));
192192+ let _ = handle.block_on(async {
193193+ tokio::time::timeout(
194194+ Duration::from_secs(1),
195195+ self.rx.recv_many(&mut buf, BUF_SIZE),
196196+ )
197197+ .await
198198+ });
194199 if buf.is_empty() {
195200 if self.rx.is_closed() {
196201 error!("ingestor crashed? shutting down buffer processor");