···117117- [ ] put delete-account tasks into a separate (persisted?) task queue for the writer so it can work on them incrementally.
118118- [ ] jetstream: connect retry: only reset counter after some *time* has passed.
119119- [x] either count or estimate the total number of links added (distinct from link targets)
120120+- [x] jetstream: don't crash on connection refused (retry * backoff)
120121121122cache
122123- [ ] set api response headers
+35-5
link_aggregator/src/consumer/jetstream.rs
···8585 let host = req.uri().host().expect("jetstream request uri has a host");
8686 let port = req.uri().port().map(|p| p.as_u16()).unwrap_or(443);
8787 let dest = format!("{host}:{port}");
8888- let addr = dest
8989- .to_socket_addrs()?
9090- .next()
9191- .expect("can resolve the jetstream address"); // TODO ugh
9292- let tcp_stream = std::net::TcpStream::connect_timeout(&addr, time::Duration::from_secs(8))?; // TODO: handle
8888+ let addr = match dest.to_socket_addrs().map(|mut d| d.next()) {
8989+ Ok(Some(a)) => a,
9090+ Ok(None) => {
9191+ eprintln!(
9292+ "jetstream: could not resolve an address for {dest:?}. retrying after a bit?"
9393+ );
9494+ thread::sleep(time::Duration::from_secs(15));
9595+ continue;
9696+ }
9797+ Err(e) => {
9898+ eprintln!("jetstream failed to resolve address {dest:?}: {e:?} waiting and then retrying...");
9999+ thread::sleep(time::Duration::from_secs(3));
100100+ continue;
101101+ }
102102+ };
103103+ let tcp_stream = match std::net::TcpStream::connect_timeout(
104104+ &addr,
105105+ time::Duration::from_secs(8),
106106+ ) {
107107+ Ok(s) => s,
108108+ Err(e) => {
109109+ eprintln!(
110110+ "jetstream failed to make tcp connection: {e:?}. (todo: clean up retry logic)"
111111+ );
112112+ connect_retries += 1;
113113+ if connect_retries >= 7 {
114114+ eprintln!("jetstream: no more connect retries, breaking out.");
115115+ break;
116116+ }
117117+ let backoff = time::Duration::from_secs(connect_retries.try_into().unwrap());
118118+ eprintln!("jetstream tcp failed to connect: {e:?}. backing off {backoff:?} before retrying...");
119119+ thread::sleep(backoff);
120120+ continue;
121121+ }
122122+ };
93123 tcp_stream.set_read_timeout(Some(time::Duration::from_secs(4)))?;
94124 tcp_stream.set_write_timeout(Some(time::Duration::from_secs(4)))?;
95125