···189189 let index = Arc::new(StatusIndex::new());
190190 tokio::select! {
191191 // this finally starts hydrant, so it will start crawling and backfilling etc.
192192- r = hydrant.run() => r,
192192+ r = hydrant.run()? => r,
193193 _ = run_ticker(index.clone()) => Ok(()),
194194 _ = handle_stream(index.clone(), hydrant.repos.clone(), stream) => Ok(()),
195195 }
+8-8
src/control.rs
···162162 /// resolves with `Ok(())` if a fatal component exits cleanly, or `Err(e)` if it
163163 /// fails. intended for use in `tokio::select!` alongside [`serve`](Self::serve).
164164 ///
165165- /// panics if called more than once on the same `Hydrant` instance.
166166- pub fn run(&self) -> impl Future<Output = Result<()>> {
165165+ /// returns an error if called more than once on the same `Hydrant` instance.
166166+ pub fn run(&self) -> Result<impl Future<Output = Result<()>>> {
167167 let state = self.state.clone();
168168 let config = self.config.clone();
169169- let started = self.started.clone();
170169171171- async move {
172172- if started.swap(true, Ordering::SeqCst) {
173173- panic!("Hydrant::run() called more than once");
174174- }
170170+ if self.started.swap(true, Ordering::SeqCst) {
171171+ miette::bail!("Hydrant::run() called more than once");
172172+ }
175173174174+ let fut = async move {
176175 // internal buffered channel between ingestors / backfill and the firehose worker
177176 let (buffer_tx, buffer_rx) = mpsc::unbounded_channel();
178177···499498 Err(_) => return Ok(()),
500499 }
501500 }
502502- }
501501+ };
502502+ Ok(fut)
503503 }
504504505505 /// subscribe to the ordered event stream.
+2-2
src/main.rs
···23232424 if app.enable_debug {
2525 tokio::select! {
2626- r = hydrant.run() => r,
2626+ r = hydrant.run()? => r,
2727 r = hydrant.serve(app.api_port) => r,
2828 r = hydrant.serve_debug(app.debug_port) => r,
2929 }
3030 } else {
3131 tokio::select! {
3232- r = hydrant.run() => r,
3232+ r = hydrant.run()? => r,
3333 r = hydrant.serve(app.api_port) => r,
3434 }
3535 }