···244244 break;
245245 };
246246247247+ info!("{msg:?}");
248248+247249 if let Message::Close(_) = msg {
248250 info!("Client closed connection");
249251 break;
···254256 }
255257 Ok(())
256258}
259259+260260+#[cfg(test)]
261261+mod test {
262262+ use std::net::SocketAddr;
263263+ use std::time::Duration;
264264+265265+ use super::super::sync_routes;
266266+ use super::*;
267267+ use axum_test::TestServer;
268268+ use tokio_util::sync::CancellationToken;
269269+270270+ #[tokio::test]
271271+ async fn test_websockets_closing() {
272272+ // tracing_subscriber::fmt().init();
273273+ tranquil_config::ensure_test_defaults();
274274+ let state = AppState::new(CancellationToken::new()).await.unwrap();
275275+ let app = sync_routes()
276276+ .with_state(state)
277277+ .into_make_service_with_connect_info::<SocketAddr>();
278278+ let server = TestServer::builder().http_transport().build(app);
279279+280280+ const CONNECTIONS: usize = 100;
281281+ let mut open_sockets = Vec::with_capacity(CONNECTIONS);
282282+283283+ for _ in 0..CONNECTIONS {
284284+ let socket = server
285285+ .get_websocket("/com.atproto.sync.subscribeRepos")
286286+ .await
287287+ .into_websocket()
288288+ .await;
289289+ open_sockets.push(socket);
290290+ }
291291+ assert_eq!(SUBSCRIBER_COUNT.load(Ordering::SeqCst), CONNECTIONS);
292292+293293+ drop(open_sockets);
294294+ // disgusting awful hack to give tokio time to poll the server futures enough times to actually drop all the
295295+ // websockets on the other end as well
296296+ tokio::time::sleep(Duration::from_millis(8)).await;
297297+ assert_eq!(SUBSCRIBER_COUNT.load(Ordering::SeqCst), 0);
298298+ }
299299+}