TCP/TLS connection pooling for Eio
1(*---------------------------------------------------------------------------
2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6(** Conpool - Protocol-aware TCP/IP connection pooling library for Eio *)
7
8let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
9
10module Log = (val Logs.src_log src : Logs.LOG)
11
12(* Re-export submodules *)
13module Endpoint = Endpoint
14module Config = Config
15module Stats = Stats
16module Cmd = Cmd
17
18(* Track whether TLS tracing has been suppressed *)
19let tls_tracing_suppressed = ref false
20
21(* Suppress TLS tracing debug output (hexdumps) unless explicitly enabled *)
22let suppress_tls_tracing () =
23 if not !tls_tracing_suppressed then begin
24 tls_tracing_suppressed := true;
25 match
26 List.find_opt
27 (fun s -> Logs.Src.name s = "tls.tracing")
28 (Logs.Src.list ())
29 with
30 | Some tls_src -> (
31 match Logs.Src.level tls_src with
32 | Some Logs.Debug -> Logs.Src.set_level tls_src (Some Logs.Warning)
33 | _ -> ())
34 | None -> ()
35 end
36
37(** {1 Error Types} *)
38
39type error =
40 | Dns_resolution_failed of { hostname : string }
41 | Connection_failed of {
42 endpoint : Endpoint.t;
43 attempts : int;
44 last_error : string;
45 }
46 | Connection_timeout of { endpoint : Endpoint.t; timeout : float }
47 | Invalid_config of string
48 | Invalid_endpoint of string
49
50let pp_error ppf = function
51 | Dns_resolution_failed { hostname } ->
52 Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname
53 | Connection_failed { endpoint; attempts; last_error } ->
54 Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp
55 endpoint attempts last_error
56 | Connection_timeout { endpoint; timeout } ->
57 Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint
58 timeout
59 | Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg
60 | Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg
61
62type Eio.Exn.err += E of error
63
64let err e = Eio.Exn.create (E e)
65
66let () =
67 Eio.Exn.register_pp (fun f -> function
68 | E e ->
69 Fmt.string f "Conpool ";
70 pp_error f e;
71 true
72 | _ -> false)
73
74(** {1 Connection Types} *)
75
76type connection_ty = [ Eio.Resource.close_ty | Eio.Flow.two_way_ty ]
77type connection = connection_ty Eio.Resource.t
78
79(** {1 Internal Types} *)
80
81type 'state pooled_connection = {
82 pc_flow : connection;
83 pc_tls_flow : Tls_eio.t option;
84 pc_state : 'state;
85 pc_created_at : float;
86 mutable pc_last_used : float;
87 (** Last time this connection was used (for idle timeout). *)
88 mutable pc_use_count : int;
89 (** Number of times this connection has been used. *)
90 pc_endpoint : Endpoint.t;
91 mutable pc_active_users : int;
92 pc_user_available : Eio.Condition.t;
93 mutable pc_closed : bool;
94 pc_connection_cancel : exn -> unit;
95 (** Cancels the connection-lifetime switch, stopping any protocol fibers.
96 *)
97}
98(** Internal connection wrapper with protocol state and tracking. *)
99
100type endp_stats = {
101 mutable active : int;
102 mutable idle : int; (** Number of idle connections (active_users = 0). *)
103 mutable total_created : int;
104 mutable total_reused : int;
105 mutable total_closed : int;
106 mutable errors : int; (** Number of connection errors encountered. *)
107}
108(** Statistics for an endpoint. *)
109
110type 'state endpoint_pool = {
111 connections : 'state pooled_connection list ref;
112 ep_mutex : Eio.Mutex.t;
113 stats : endp_stats;
114 stats_mutex : Eio.Mutex.t;
115}
116(** Endpoint pool storing connections. *)
117
118type ('state, 'clock, 'net) internal = {
119 sw : Eio.Switch.t;
120 net : 'net;
121 clock : 'clock;
122 config : Config.t;
123 tls : Tls.Config.client option;
124 protocol : 'state Config.protocol_config;
125 endpoints : (Endpoint.t, 'state endpoint_pool) Hashtbl.t;
126 endpoints_mutex : Eio.Mutex.t;
127}
128(** Internal pool representation. *)
129
130(** {1 Public Types} *)
131
132type 'state t =
133 | Pool : ('state, 'clock Eio.Time.clock, 'net Eio.Net.t) internal -> 'state t
134
135type 'state connection_info = {
136 flow : connection;
137 tls_epoch : Tls.Core.epoch_data option;
138 state : 'state;
139}
140
141(** {1 Default Protocol Handler}
142
143 For simple exclusive-access protocols (HTTP/1.x, Redis, etc.), use unit
144 state with no special initialization. *)
145
146let default_protocol : unit Config.protocol_config =
147 {
148 Config.init_state = (fun ~sw:_ ~flow:_ ~tls_epoch:_ -> ());
149 on_acquire = (fun () -> ());
150 on_release = (fun () -> ());
151 is_healthy = (fun () -> true);
152 on_close = (fun () -> ());
153 access_mode = (fun () -> Config.Exclusive);
154 }
155
156(** {1 Helper Functions} *)
157
158let time pool = Eio.Time.now pool.clock
159
160let endp_stats () =
161 {
162 active = 0;
163 idle = 0;
164 total_created = 0;
165 total_reused = 0;
166 total_closed = 0;
167 errors = 0;
168 }
169
170let snapshot_stats (stats : endp_stats) : Stats.t =
171 Stats.v ~active:stats.active ~idle:stats.idle
172 ~total_created:stats.total_created ~total_reused:stats.total_reused
173 ~total_closed:stats.total_closed ~errors:stats.errors
174
175(** {1 Connection Creation} *)
176
177let resolve_endpoint pool endpoint =
178 try
179 let addrs =
180 Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint)
181 ~service:(string_of_int (Endpoint.port endpoint))
182 in
183 match addrs with
184 | addr :: _ -> addr
185 | [] ->
186 raise
187 (err (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
188 with Eio.Io _ as ex ->
189 let bt = Printexc.get_raw_backtrace () in
190 Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint
191
192let tcp_connect pool addr endpoint =
193 try
194 match Config.connect_timeout pool.config with
195 | Some timeout ->
196 Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
197 Eio.Net.connect ~sw:pool.sw pool.net addr)
198 | None -> Eio.Net.connect ~sw:pool.sw pool.net addr
199 with Eio.Io _ as ex ->
200 let bt = Printexc.get_raw_backtrace () in
201 Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint
202
203let tls_handshake pool socket endpoint =
204 match pool.tls with
205 | None -> ((socket :> connection), None)
206 | Some tls_config -> (
207 try
208 Log.debug (fun m ->
209 m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
210 let host =
211 Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
212 in
213 let tls = Tls_eio.client_of_flow ~host tls_config socket in
214 suppress_tls_tracing ();
215 Log.info (fun m ->
216 m "TLS connection established to %a" Endpoint.pp endpoint);
217 ((tls :> connection), Some tls)
218 with Eio.Io _ as ex ->
219 let bt = Printexc.get_raw_backtrace () in
220 Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp
221 endpoint)
222
223let tls_epoch_of_flow = function
224 | Some tls_flow -> (
225 match Tls_eio.epoch tls_flow with
226 | Ok epoch -> Some epoch
227 | Error () -> None)
228 | None -> None
229
230let connection_switch pool =
231 let conn_sw_ref = ref None in
232 let conn_cancel_ref = ref (fun (_ : exn) -> ()) in
233 let ready_promise, ready_resolver = Eio.Promise.create () in
234 Eio.Fiber.fork_daemon ~sw:pool.sw (fun () ->
235 (try
236 Eio.Switch.run (fun conn_sw ->
237 conn_sw_ref := Some conn_sw;
238 (conn_cancel_ref := fun exn -> Eio.Switch.fail conn_sw exn);
239 Eio.Promise.resolve ready_resolver ();
240 Eio.Fiber.await_cancel ())
241 with
242 | Eio.Cancel.Cancelled _ -> ()
243 | exn ->
244 Log.warn (fun m ->
245 m "Connection fiber caught exception: %s" (Printexc.to_string exn)));
246 `Stop_daemon);
247 Eio.Promise.await ready_promise;
248 (Option.get !conn_sw_ref, !conn_cancel_ref)
249
250let connection pool endpoint =
251 Log.debug (fun m -> m "Creating connection to %a" Endpoint.pp endpoint);
252 let addr = resolve_endpoint pool endpoint in
253 let socket = tcp_connect pool addr endpoint in
254 Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint);
255 let flow, tls_flow = tls_handshake pool socket endpoint in
256 let tls_epoch = tls_epoch_of_flow tls_flow in
257 let conn_sw, conn_cancel = connection_switch pool in
258 Log.debug (fun m ->
259 m "Initializing protocol state for %a" Endpoint.pp endpoint);
260 let state = pool.protocol.init_state ~sw:conn_sw ~flow ~tls_epoch in
261 let now = time pool in
262 Log.info (fun m -> m "Created connection to %a" Endpoint.pp endpoint);
263 {
264 pc_flow = flow;
265 pc_tls_flow = tls_flow;
266 pc_state = state;
267 pc_created_at = now;
268 pc_last_used = now;
269 pc_use_count = 0;
270 pc_endpoint = endpoint;
271 pc_active_users = 0;
272 pc_user_available = Eio.Condition.create ();
273 pc_closed = false;
274 pc_connection_cancel = conn_cancel;
275 }
276
277(** {1 Connection Health Checking} *)
278
279(** Health check result distinguishing errors from normal lifecycle. *)
280type health_status =
281 | Healthy
282 | Unhealthy_error of string
283 (** Connection failed due to an error (protocol failure, etc.) *)
284 | Unhealthy_lifecycle of string
285 (** Connection should close due to normal lifecycle (timeout, max uses,
286 etc.) *)
287
288let check_health pool conn =
289 if conn.pc_closed then Unhealthy_lifecycle "already closed"
290 else
291 (* Check protocol-specific health *)
292 let protocol_healthy = pool.protocol.is_healthy conn.pc_state in
293 if not protocol_healthy then begin
294 Log.debug (fun m -> m "Connection unhealthy: protocol check failed");
295 Unhealthy_error "protocol check failed"
296 end
297 else
298 let now = time pool in
299 (* Check connection age *)
300 let age = now -. conn.pc_created_at in
301 let max_lifetime = Config.max_connection_lifetime pool.config in
302 if age > max_lifetime then begin
303 Log.debug (fun m ->
304 m "Connection unhealthy: exceeded max lifetime (%.1fs > %.1fs)" age
305 max_lifetime);
306 Unhealthy_lifecycle "exceeded max lifetime"
307 end
308 else
309 (* Check idle time - only for idle connections *)
310 let idle_time = now -. conn.pc_last_used in
311 let max_idle = Config.max_idle_time pool.config in
312 if conn.pc_active_users = 0 && idle_time > max_idle then begin
313 Log.debug (fun m ->
314 m "Connection unhealthy: exceeded max idle time (%.1fs > %.1fs)"
315 idle_time max_idle);
316 Unhealthy_lifecycle "exceeded max idle time"
317 end
318 else
319 (* Check use count *)
320 match Config.max_connection_uses pool.config with
321 | Some max_uses when conn.pc_use_count >= max_uses ->
322 Log.debug (fun m ->
323 m "Connection unhealthy: exceeded max uses (%d >= %d)"
324 conn.pc_use_count max_uses);
325 Unhealthy_lifecycle "exceeded max uses"
326 | _ -> Healthy
327
328let is_healthy pool conn =
329 match check_health pool conn with
330 | Healthy -> true
331 | Unhealthy_error _ | Unhealthy_lifecycle _ -> false
332
333(** {1 Connection Cleanup} *)
334
335let close_connection pool conn =
336 if not conn.pc_closed then begin
337 conn.pc_closed <- true;
338 Log.debug (fun m ->
339 m "Closing connection to %a" Endpoint.pp conn.pc_endpoint);
340
341 (* Cancel connection-lifetime switch first - this stops any protocol fibers *)
342 (try conn.pc_connection_cancel (Failure "Connection closed")
343 with exn ->
344 Log.debug (fun m ->
345 m "Ignoring cancel error: %s" (Printexc.to_string exn)));
346
347 (* Call protocol cleanup *)
348 pool.protocol.on_close conn.pc_state;
349
350 (* Close the underlying flow *)
351 Eio.Cancel.protect (fun () ->
352 try Eio.Flow.close conn.pc_flow
353 with Eio.Io _ as exn ->
354 Log.debug (fun m ->
355 m "Ignoring close error: %s" (Printexc.to_string exn)))
356 end
357
358(** {1 Endpoint Pool Management} *)
359
360let ensure_endpoint_pool pool endpoint =
361 match
362 Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
363 Hashtbl.find_opt pool.endpoints endpoint)
364 with
365 | Some ep_pool -> ep_pool
366 | None ->
367 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
368 match Hashtbl.find_opt pool.endpoints endpoint with
369 | Some ep_pool -> ep_pool
370 | None ->
371 Log.info (fun m ->
372 m "Creating endpoint pool for %a" Endpoint.pp endpoint);
373 let ep_pool =
374 {
375 connections = ref [];
376 ep_mutex = Eio.Mutex.create ();
377 stats = endp_stats ();
378 stats_mutex = Eio.Mutex.create ();
379 }
380 in
381 Hashtbl.add pool.endpoints endpoint ep_pool;
382 ep_pool)
383
384(** {1 Connection Acquisition} *)
385
386let activate_reused pool ep_pool conn ~was_idle =
387 conn.pc_active_users <- conn.pc_active_users + 1;
388 conn.pc_last_used <- time pool;
389 conn.pc_use_count <- conn.pc_use_count + 1;
390 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
391 ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
392 ep_pool.stats.active <- ep_pool.stats.active + 1;
393 if was_idle then ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
394 pool.protocol.on_acquire conn.pc_state;
395 conn
396
397let wait_for_shared_slot pool ep_pool conn =
398 while
399 (conn.pc_active_users
400 >=
401 match pool.protocol.access_mode conn.pc_state with
402 | Config.Shared n -> n
403 | Config.Exclusive -> 1)
404 && not conn.pc_closed
405 do
406 Eio.Condition.await_no_mutex conn.pc_user_available
407 done;
408 if conn.pc_closed then None
409 else Some (activate_reused pool ep_pool conn ~was_idle:false)
410
411let wait_for_exclusive_slot pool ep_pool conn =
412 while conn.pc_active_users > 0 && not conn.pc_closed do
413 Eio.Condition.await_no_mutex conn.pc_user_available
414 done;
415 if conn.pc_closed then None
416 else Some (activate_reused pool ep_pool conn ~was_idle:true)
417
418let new_connection pool ep_pool endpoint =
419 let conn = connection pool endpoint in
420 conn.pc_active_users <- 1;
421 ep_pool.connections := conn :: !(ep_pool.connections);
422 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
423 ep_pool.stats.total_created <- ep_pool.stats.total_created + 1;
424 ep_pool.stats.active <- ep_pool.stats.active + 1);
425 Log.info (fun m ->
426 m "Created new connection to %a (total=%d)" Endpoint.pp endpoint
427 (List.length !(ep_pool.connections)));
428 pool.protocol.on_acquire conn.pc_state;
429 conn
430
431let wait_for_available_slot pool ep_pool =
432 let wait_conn =
433 List.find_opt
434 (fun c ->
435 match pool.protocol.access_mode c.pc_state with
436 | Config.Shared _ -> true
437 | Config.Exclusive -> false)
438 !(ep_pool.connections)
439 in
440 match wait_conn with
441 | Some conn -> wait_for_shared_slot pool ep_pool conn
442 | None ->
443 let any_conn = List.hd !(ep_pool.connections) in
444 wait_for_exclusive_slot pool ep_pool any_conn
445
446let rec acquire_connection pool ep_pool endpoint =
447 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
448 let rec find_available = function
449 | [] -> None
450 | conn :: rest ->
451 if not (is_healthy pool conn) then begin
452 conn.pc_closed <- true;
453 find_available rest
454 end
455 else
456 begin match pool.protocol.access_mode conn.pc_state with
457 | Config.Exclusive ->
458 if conn.pc_active_users = 0 then Some conn
459 else find_available rest
460 | Config.Shared max_concurrent ->
461 if conn.pc_active_users < max_concurrent then Some conn
462 else find_available rest
463 end
464 in
465
466 ep_pool.connections :=
467 List.filter (fun c -> not c.pc_closed) !(ep_pool.connections);
468
469 match find_available !(ep_pool.connections) with
470 | Some conn ->
471 let was_idle = conn.pc_active_users = 0 in
472 Log.debug (fun m ->
473 m "Reusing connection to %a (users=%d)" Endpoint.pp endpoint
474 (conn.pc_active_users + 1));
475 activate_reused pool ep_pool conn ~was_idle
476 | None ->
477 let max_conns = Config.max_connections_per_endpoint pool.config in
478 let current_conns = List.length !(ep_pool.connections) in
479 if current_conns >= max_conns then begin
480 Log.debug (fun m ->
481 m "At connection limit for %a (%d), waiting..." Endpoint.pp
482 endpoint max_conns);
483 match wait_for_available_slot pool ep_pool with
484 | Some conn -> conn
485 | None -> acquire_connection pool ep_pool endpoint
486 end
487 else new_connection pool ep_pool endpoint)
488
489(** {1 Connection Release} *)
490
491let release_connection pool ep_pool conn =
492 (* Notify protocol handler of release *)
493 pool.protocol.on_release conn.pc_state;
494
495 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
496 let was_active = conn.pc_active_users > 0 in
497 conn.pc_active_users <- max 0 (conn.pc_active_users - 1);
498 let now_idle = conn.pc_active_users = 0 in
499
500 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
501 ep_pool.stats.active <- max 0 (ep_pool.stats.active - 1);
502 (* Track idle count: increment when connection becomes idle *)
503 if was_active && now_idle then
504 ep_pool.stats.idle <- ep_pool.stats.idle + 1);
505
506 (* Signal waiting fibers *)
507 Eio.Condition.broadcast conn.pc_user_available;
508
509 Log.debug (fun m ->
510 m "Released connection to %a (users=%d)" Endpoint.pp conn.pc_endpoint
511 conn.pc_active_users);
512
513 (* Check if connection should be closed *)
514 match check_health pool conn with
515 | Healthy -> ()
516 | Unhealthy_error reason ->
517 conn.pc_closed <- true;
518
519 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
520 ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1;
521 ep_pool.stats.errors <- ep_pool.stats.errors + 1;
522 if now_idle then
523 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
524
525 Log.warn (fun m -> m "Closing connection due to error: %s" reason);
526 close_connection pool conn;
527 ep_pool.connections :=
528 List.filter (fun c -> c != conn) !(ep_pool.connections)
529 | Unhealthy_lifecycle reason ->
530 conn.pc_closed <- true;
531
532 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
533 ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1;
534 if now_idle then
535 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
536
537 Log.debug (fun m ->
538 m "Closing connection due to lifecycle: %s" reason);
539 close_connection pool conn;
540 ep_pool.connections :=
541 List.filter (fun c -> c != conn) !(ep_pool.connections))
542
543(** {1 Public API} *)
544
545let close_all_endpoints pool =
546 Hashtbl.iter
547 (fun _endpoint ep_pool ->
548 List.iter (fun conn -> close_connection pool conn) !(ep_pool.connections))
549 pool.endpoints;
550 Hashtbl.clear pool.endpoints
551
552let v ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls
553 ?(config = Config.default) ~protocol () =
554 Log.info (fun m ->
555 m "Creating connection pool (max_per_endpoint=%d)"
556 (Config.max_connections_per_endpoint config));
557
558 let pool =
559 {
560 sw;
561 net;
562 clock;
563 config;
564 tls;
565 protocol;
566 endpoints = Hashtbl.create 16;
567 endpoints_mutex = Eio.Mutex.create ();
568 }
569 in
570
571 (* Auto-cleanup on switch release *)
572 Eio.Switch.on_release sw (fun () ->
573 Eio.Cancel.protect (fun () ->
574 Log.info (fun m -> m "Closing connection pool");
575 close_all_endpoints pool));
576
577 Pool pool
578
579let basic ~sw ~net ~clock ?tls ?config () =
580 v ~sw ~net ~clock ?tls ?config ~protocol:default_protocol ()
581
582let connection ~sw (Pool pool) endpoint =
583 Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
584
585 let ep_pool = ensure_endpoint_pool pool endpoint in
586 let conn = acquire_connection pool ep_pool endpoint in
587
588 (* Release connection when switch ends *)
589 Eio.Switch.on_release sw (fun () -> release_connection pool ep_pool conn);
590
591 (* Get TLS epoch if available *)
592 let tls_epoch =
593 match conn.pc_tls_flow with
594 | Some tls_flow -> (
595 match Tls_eio.epoch tls_flow with
596 | Ok epoch -> Some epoch
597 | Error () -> None)
598 | None -> None
599 in
600
601 { flow = conn.pc_flow; tls_epoch; state = conn.pc_state }
602
603let with_connection pool endpoint f =
604 Eio.Switch.run (fun sw -> f (connection ~sw pool endpoint))
605
606let stats (Pool pool) endpoint =
607 match Hashtbl.find_opt pool.endpoints endpoint with
608 | Some ep_pool ->
609 Eio.Mutex.use_ro ep_pool.stats_mutex (fun () ->
610 snapshot_stats ep_pool.stats)
611 | None ->
612 Stats.v ~active:0 ~idle:0 ~total_created:0 ~total_reused:0 ~total_closed:0
613 ~errors:0
614
615let all_stats (Pool pool) =
616 Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
617 Hashtbl.fold
618 (fun endpoint ep_pool acc ->
619 let stats =
620 Eio.Mutex.use_ro ep_pool.stats_mutex (fun () ->
621 snapshot_stats ep_pool.stats)
622 in
623 (endpoint, stats) :: acc)
624 pool.endpoints [])
625
626let pp ppf (Pool pool) =
627 let endpoint_count = Hashtbl.length pool.endpoints in
628 let all =
629 Hashtbl.fold
630 (fun endpoint ep_pool acc ->
631 let stats =
632 Eio.Mutex.use_ro ep_pool.stats_mutex (fun () ->
633 snapshot_stats ep_pool.stats)
634 in
635 (endpoint, stats) :: acc)
636 pool.endpoints []
637 in
638 Fmt.pf ppf "@[<v>Pool:@,- endpoints: %d@,- %a@]" endpoint_count
639 Fmt.(
640 list ~sep:cut (fun ppf (ep, stats) ->
641 Fmt.pf ppf " %a: %a" Endpoint.pp ep Stats.pp stats))
642 all
643
644let clear_endpoint (Pool pool) endpoint =
645 Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint);
646 match Hashtbl.find_opt pool.endpoints endpoint with
647 | Some ep_pool ->
648 Eio.Cancel.protect (fun () ->
649 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
650 List.iter
651 (fun conn -> close_connection pool conn)
652 !(ep_pool.connections);
653 ep_pool.connections := []);
654 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
655 Hashtbl.remove pool.endpoints endpoint))
656 | None ->
657 Log.debug (fun m ->
658 m "No endpoint pool found for %a" Endpoint.pp endpoint)