TCP/TLS connection pooling for Eio
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 658 lines 22 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6(** Conpool - Protocol-aware TCP/IP connection pooling library for Eio *) 7 8let src = Logs.Src.create "conpool" ~doc:"Connection pooling library" 9 10module Log = (val Logs.src_log src : Logs.LOG) 11 12(* Re-export submodules *) 13module Endpoint = Endpoint 14module Config = Config 15module Stats = Stats 16module Cmd = Cmd 17 18(* Track whether TLS tracing has been suppressed *) 19let tls_tracing_suppressed = ref false 20 21(* Suppress TLS tracing debug output (hexdumps) unless explicitly enabled *) 22let suppress_tls_tracing () = 23 if not !tls_tracing_suppressed then begin 24 tls_tracing_suppressed := true; 25 match 26 List.find_opt 27 (fun s -> Logs.Src.name s = "tls.tracing") 28 (Logs.Src.list ()) 29 with 30 | Some tls_src -> ( 31 match Logs.Src.level tls_src with 32 | Some Logs.Debug -> Logs.Src.set_level tls_src (Some Logs.Warning) 33 | _ -> ()) 34 | None -> () 35 end 36 37(** {1 Error Types} *) 38 39type error = 40 | Dns_resolution_failed of { hostname : string } 41 | Connection_failed of { 42 endpoint : Endpoint.t; 43 attempts : int; 44 last_error : string; 45 } 46 | Connection_timeout of { endpoint : Endpoint.t; timeout : float } 47 | Invalid_config of string 48 | Invalid_endpoint of string 49 50let pp_error ppf = function 51 | Dns_resolution_failed { hostname } -> 52 Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname 53 | Connection_failed { endpoint; attempts; last_error } -> 54 Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp 55 endpoint attempts last_error 56 | Connection_timeout { endpoint; timeout } -> 57 Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint 58 timeout 59 | Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg 60 | Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg 61 62type Eio.Exn.err += E of error 63 64let err e = Eio.Exn.create (E e) 65 66let () = 67 Eio.Exn.register_pp (fun f -> function 68 | E e -> 69 Fmt.string f "Conpool "; 70 pp_error f e; 71 true 72 | _ -> false) 73 74(** {1 Connection Types} *) 75 76type connection_ty = [ Eio.Resource.close_ty | Eio.Flow.two_way_ty ] 77type connection = connection_ty Eio.Resource.t 78 79(** {1 Internal Types} *) 80 81type 'state pooled_connection = { 82 pc_flow : connection; 83 pc_tls_flow : Tls_eio.t option; 84 pc_state : 'state; 85 pc_created_at : float; 86 mutable pc_last_used : float; 87 (** Last time this connection was used (for idle timeout). *) 88 mutable pc_use_count : int; 89 (** Number of times this connection has been used. *) 90 pc_endpoint : Endpoint.t; 91 mutable pc_active_users : int; 92 pc_user_available : Eio.Condition.t; 93 mutable pc_closed : bool; 94 pc_connection_cancel : exn -> unit; 95 (** Cancels the connection-lifetime switch, stopping any protocol fibers. 96 *) 97} 98(** Internal connection wrapper with protocol state and tracking. *) 99 100type endp_stats = { 101 mutable active : int; 102 mutable idle : int; (** Number of idle connections (active_users = 0). *) 103 mutable total_created : int; 104 mutable total_reused : int; 105 mutable total_closed : int; 106 mutable errors : int; (** Number of connection errors encountered. *) 107} 108(** Statistics for an endpoint. *) 109 110type 'state endpoint_pool = { 111 connections : 'state pooled_connection list ref; 112 ep_mutex : Eio.Mutex.t; 113 stats : endp_stats; 114 stats_mutex : Eio.Mutex.t; 115} 116(** Endpoint pool storing connections. *) 117 118type ('state, 'clock, 'net) internal = { 119 sw : Eio.Switch.t; 120 net : 'net; 121 clock : 'clock; 122 config : Config.t; 123 tls : Tls.Config.client option; 124 protocol : 'state Config.protocol_config; 125 endpoints : (Endpoint.t, 'state endpoint_pool) Hashtbl.t; 126 endpoints_mutex : Eio.Mutex.t; 127} 128(** Internal pool representation. *) 129 130(** {1 Public Types} *) 131 132type 'state t = 133 | Pool : ('state, 'clock Eio.Time.clock, 'net Eio.Net.t) internal -> 'state t 134 135type 'state connection_info = { 136 flow : connection; 137 tls_epoch : Tls.Core.epoch_data option; 138 state : 'state; 139} 140 141(** {1 Default Protocol Handler} 142 143 For simple exclusive-access protocols (HTTP/1.x, Redis, etc.), use unit 144 state with no special initialization. *) 145 146let default_protocol : unit Config.protocol_config = 147 { 148 Config.init_state = (fun ~sw:_ ~flow:_ ~tls_epoch:_ -> ()); 149 on_acquire = (fun () -> ()); 150 on_release = (fun () -> ()); 151 is_healthy = (fun () -> true); 152 on_close = (fun () -> ()); 153 access_mode = (fun () -> Config.Exclusive); 154 } 155 156(** {1 Helper Functions} *) 157 158let time pool = Eio.Time.now pool.clock 159 160let endp_stats () = 161 { 162 active = 0; 163 idle = 0; 164 total_created = 0; 165 total_reused = 0; 166 total_closed = 0; 167 errors = 0; 168 } 169 170let snapshot_stats (stats : endp_stats) : Stats.t = 171 Stats.v ~active:stats.active ~idle:stats.idle 172 ~total_created:stats.total_created ~total_reused:stats.total_reused 173 ~total_closed:stats.total_closed ~errors:stats.errors 174 175(** {1 Connection Creation} *) 176 177let resolve_endpoint pool endpoint = 178 try 179 let addrs = 180 Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) 181 ~service:(string_of_int (Endpoint.port endpoint)) 182 in 183 match addrs with 184 | addr :: _ -> addr 185 | [] -> 186 raise 187 (err (Dns_resolution_failed { hostname = Endpoint.host endpoint })) 188 with Eio.Io _ as ex -> 189 let bt = Printexc.get_raw_backtrace () in 190 Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint 191 192let tcp_connect pool addr endpoint = 193 try 194 match Config.connect_timeout pool.config with 195 | Some timeout -> 196 Eio.Time.with_timeout_exn pool.clock timeout (fun () -> 197 Eio.Net.connect ~sw:pool.sw pool.net addr) 198 | None -> Eio.Net.connect ~sw:pool.sw pool.net addr 199 with Eio.Io _ as ex -> 200 let bt = Printexc.get_raw_backtrace () in 201 Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint 202 203let tls_handshake pool socket endpoint = 204 match pool.tls with 205 | None -> ((socket :> connection), None) 206 | Some tls_config -> ( 207 try 208 Log.debug (fun m -> 209 m "Initiating TLS handshake with %a" Endpoint.pp endpoint); 210 let host = 211 Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint))) 212 in 213 let tls = Tls_eio.client_of_flow ~host tls_config socket in 214 suppress_tls_tracing (); 215 Log.info (fun m -> 216 m "TLS connection established to %a" Endpoint.pp endpoint); 217 ((tls :> connection), Some tls) 218 with Eio.Io _ as ex -> 219 let bt = Printexc.get_raw_backtrace () in 220 Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp 221 endpoint) 222 223let tls_epoch_of_flow = function 224 | Some tls_flow -> ( 225 match Tls_eio.epoch tls_flow with 226 | Ok epoch -> Some epoch 227 | Error () -> None) 228 | None -> None 229 230let connection_switch pool = 231 let conn_sw_ref = ref None in 232 let conn_cancel_ref = ref (fun (_ : exn) -> ()) in 233 let ready_promise, ready_resolver = Eio.Promise.create () in 234 Eio.Fiber.fork_daemon ~sw:pool.sw (fun () -> 235 (try 236 Eio.Switch.run (fun conn_sw -> 237 conn_sw_ref := Some conn_sw; 238 (conn_cancel_ref := fun exn -> Eio.Switch.fail conn_sw exn); 239 Eio.Promise.resolve ready_resolver (); 240 Eio.Fiber.await_cancel ()) 241 with 242 | Eio.Cancel.Cancelled _ -> () 243 | exn -> 244 Log.warn (fun m -> 245 m "Connection fiber caught exception: %s" (Printexc.to_string exn))); 246 `Stop_daemon); 247 Eio.Promise.await ready_promise; 248 (Option.get !conn_sw_ref, !conn_cancel_ref) 249 250let connection pool endpoint = 251 Log.debug (fun m -> m "Creating connection to %a" Endpoint.pp endpoint); 252 let addr = resolve_endpoint pool endpoint in 253 let socket = tcp_connect pool addr endpoint in 254 Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint); 255 let flow, tls_flow = tls_handshake pool socket endpoint in 256 let tls_epoch = tls_epoch_of_flow tls_flow in 257 let conn_sw, conn_cancel = connection_switch pool in 258 Log.debug (fun m -> 259 m "Initializing protocol state for %a" Endpoint.pp endpoint); 260 let state = pool.protocol.init_state ~sw:conn_sw ~flow ~tls_epoch in 261 let now = time pool in 262 Log.info (fun m -> m "Created connection to %a" Endpoint.pp endpoint); 263 { 264 pc_flow = flow; 265 pc_tls_flow = tls_flow; 266 pc_state = state; 267 pc_created_at = now; 268 pc_last_used = now; 269 pc_use_count = 0; 270 pc_endpoint = endpoint; 271 pc_active_users = 0; 272 pc_user_available = Eio.Condition.create (); 273 pc_closed = false; 274 pc_connection_cancel = conn_cancel; 275 } 276 277(** {1 Connection Health Checking} *) 278 279(** Health check result distinguishing errors from normal lifecycle. *) 280type health_status = 281 | Healthy 282 | Unhealthy_error of string 283 (** Connection failed due to an error (protocol failure, etc.) *) 284 | Unhealthy_lifecycle of string 285 (** Connection should close due to normal lifecycle (timeout, max uses, 286 etc.) *) 287 288let check_health pool conn = 289 if conn.pc_closed then Unhealthy_lifecycle "already closed" 290 else 291 (* Check protocol-specific health *) 292 let protocol_healthy = pool.protocol.is_healthy conn.pc_state in 293 if not protocol_healthy then begin 294 Log.debug (fun m -> m "Connection unhealthy: protocol check failed"); 295 Unhealthy_error "protocol check failed" 296 end 297 else 298 let now = time pool in 299 (* Check connection age *) 300 let age = now -. conn.pc_created_at in 301 let max_lifetime = Config.max_connection_lifetime pool.config in 302 if age > max_lifetime then begin 303 Log.debug (fun m -> 304 m "Connection unhealthy: exceeded max lifetime (%.1fs > %.1fs)" age 305 max_lifetime); 306 Unhealthy_lifecycle "exceeded max lifetime" 307 end 308 else 309 (* Check idle time - only for idle connections *) 310 let idle_time = now -. conn.pc_last_used in 311 let max_idle = Config.max_idle_time pool.config in 312 if conn.pc_active_users = 0 && idle_time > max_idle then begin 313 Log.debug (fun m -> 314 m "Connection unhealthy: exceeded max idle time (%.1fs > %.1fs)" 315 idle_time max_idle); 316 Unhealthy_lifecycle "exceeded max idle time" 317 end 318 else 319 (* Check use count *) 320 match Config.max_connection_uses pool.config with 321 | Some max_uses when conn.pc_use_count >= max_uses -> 322 Log.debug (fun m -> 323 m "Connection unhealthy: exceeded max uses (%d >= %d)" 324 conn.pc_use_count max_uses); 325 Unhealthy_lifecycle "exceeded max uses" 326 | _ -> Healthy 327 328let is_healthy pool conn = 329 match check_health pool conn with 330 | Healthy -> true 331 | Unhealthy_error _ | Unhealthy_lifecycle _ -> false 332 333(** {1 Connection Cleanup} *) 334 335let close_connection pool conn = 336 if not conn.pc_closed then begin 337 conn.pc_closed <- true; 338 Log.debug (fun m -> 339 m "Closing connection to %a" Endpoint.pp conn.pc_endpoint); 340 341 (* Cancel connection-lifetime switch first - this stops any protocol fibers *) 342 (try conn.pc_connection_cancel (Failure "Connection closed") 343 with exn -> 344 Log.debug (fun m -> 345 m "Ignoring cancel error: %s" (Printexc.to_string exn))); 346 347 (* Call protocol cleanup *) 348 pool.protocol.on_close conn.pc_state; 349 350 (* Close the underlying flow *) 351 Eio.Cancel.protect (fun () -> 352 try Eio.Flow.close conn.pc_flow 353 with Eio.Io _ as exn -> 354 Log.debug (fun m -> 355 m "Ignoring close error: %s" (Printexc.to_string exn))) 356 end 357 358(** {1 Endpoint Pool Management} *) 359 360let ensure_endpoint_pool pool endpoint = 361 match 362 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 363 Hashtbl.find_opt pool.endpoints endpoint) 364 with 365 | Some ep_pool -> ep_pool 366 | None -> 367 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 368 match Hashtbl.find_opt pool.endpoints endpoint with 369 | Some ep_pool -> ep_pool 370 | None -> 371 Log.info (fun m -> 372 m "Creating endpoint pool for %a" Endpoint.pp endpoint); 373 let ep_pool = 374 { 375 connections = ref []; 376 ep_mutex = Eio.Mutex.create (); 377 stats = endp_stats (); 378 stats_mutex = Eio.Mutex.create (); 379 } 380 in 381 Hashtbl.add pool.endpoints endpoint ep_pool; 382 ep_pool) 383 384(** {1 Connection Acquisition} *) 385 386let activate_reused pool ep_pool conn ~was_idle = 387 conn.pc_active_users <- conn.pc_active_users + 1; 388 conn.pc_last_used <- time pool; 389 conn.pc_use_count <- conn.pc_use_count + 1; 390 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 391 ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1; 392 ep_pool.stats.active <- ep_pool.stats.active + 1; 393 if was_idle then ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 394 pool.protocol.on_acquire conn.pc_state; 395 conn 396 397let wait_for_shared_slot pool ep_pool conn = 398 while 399 (conn.pc_active_users 400 >= 401 match pool.protocol.access_mode conn.pc_state with 402 | Config.Shared n -> n 403 | Config.Exclusive -> 1) 404 && not conn.pc_closed 405 do 406 Eio.Condition.await_no_mutex conn.pc_user_available 407 done; 408 if conn.pc_closed then None 409 else Some (activate_reused pool ep_pool conn ~was_idle:false) 410 411let wait_for_exclusive_slot pool ep_pool conn = 412 while conn.pc_active_users > 0 && not conn.pc_closed do 413 Eio.Condition.await_no_mutex conn.pc_user_available 414 done; 415 if conn.pc_closed then None 416 else Some (activate_reused pool ep_pool conn ~was_idle:true) 417 418let new_connection pool ep_pool endpoint = 419 let conn = connection pool endpoint in 420 conn.pc_active_users <- 1; 421 ep_pool.connections := conn :: !(ep_pool.connections); 422 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 423 ep_pool.stats.total_created <- ep_pool.stats.total_created + 1; 424 ep_pool.stats.active <- ep_pool.stats.active + 1); 425 Log.info (fun m -> 426 m "Created new connection to %a (total=%d)" Endpoint.pp endpoint 427 (List.length !(ep_pool.connections))); 428 pool.protocol.on_acquire conn.pc_state; 429 conn 430 431let wait_for_available_slot pool ep_pool = 432 let wait_conn = 433 List.find_opt 434 (fun c -> 435 match pool.protocol.access_mode c.pc_state with 436 | Config.Shared _ -> true 437 | Config.Exclusive -> false) 438 !(ep_pool.connections) 439 in 440 match wait_conn with 441 | Some conn -> wait_for_shared_slot pool ep_pool conn 442 | None -> 443 let any_conn = List.hd !(ep_pool.connections) in 444 wait_for_exclusive_slot pool ep_pool any_conn 445 446let rec acquire_connection pool ep_pool endpoint = 447 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () -> 448 let rec find_available = function 449 | [] -> None 450 | conn :: rest -> 451 if not (is_healthy pool conn) then begin 452 conn.pc_closed <- true; 453 find_available rest 454 end 455 else 456 begin match pool.protocol.access_mode conn.pc_state with 457 | Config.Exclusive -> 458 if conn.pc_active_users = 0 then Some conn 459 else find_available rest 460 | Config.Shared max_concurrent -> 461 if conn.pc_active_users < max_concurrent then Some conn 462 else find_available rest 463 end 464 in 465 466 ep_pool.connections := 467 List.filter (fun c -> not c.pc_closed) !(ep_pool.connections); 468 469 match find_available !(ep_pool.connections) with 470 | Some conn -> 471 let was_idle = conn.pc_active_users = 0 in 472 Log.debug (fun m -> 473 m "Reusing connection to %a (users=%d)" Endpoint.pp endpoint 474 (conn.pc_active_users + 1)); 475 activate_reused pool ep_pool conn ~was_idle 476 | None -> 477 let max_conns = Config.max_connections_per_endpoint pool.config in 478 let current_conns = List.length !(ep_pool.connections) in 479 if current_conns >= max_conns then begin 480 Log.debug (fun m -> 481 m "At connection limit for %a (%d), waiting..." Endpoint.pp 482 endpoint max_conns); 483 match wait_for_available_slot pool ep_pool with 484 | Some conn -> conn 485 | None -> acquire_connection pool ep_pool endpoint 486 end 487 else new_connection pool ep_pool endpoint) 488 489(** {1 Connection Release} *) 490 491let release_connection pool ep_pool conn = 492 (* Notify protocol handler of release *) 493 pool.protocol.on_release conn.pc_state; 494 495 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () -> 496 let was_active = conn.pc_active_users > 0 in 497 conn.pc_active_users <- max 0 (conn.pc_active_users - 1); 498 let now_idle = conn.pc_active_users = 0 in 499 500 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 501 ep_pool.stats.active <- max 0 (ep_pool.stats.active - 1); 502 (* Track idle count: increment when connection becomes idle *) 503 if was_active && now_idle then 504 ep_pool.stats.idle <- ep_pool.stats.idle + 1); 505 506 (* Signal waiting fibers *) 507 Eio.Condition.broadcast conn.pc_user_available; 508 509 Log.debug (fun m -> 510 m "Released connection to %a (users=%d)" Endpoint.pp conn.pc_endpoint 511 conn.pc_active_users); 512 513 (* Check if connection should be closed *) 514 match check_health pool conn with 515 | Healthy -> () 516 | Unhealthy_error reason -> 517 conn.pc_closed <- true; 518 519 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 520 ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1; 521 ep_pool.stats.errors <- ep_pool.stats.errors + 1; 522 if now_idle then 523 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 524 525 Log.warn (fun m -> m "Closing connection due to error: %s" reason); 526 close_connection pool conn; 527 ep_pool.connections := 528 List.filter (fun c -> c != conn) !(ep_pool.connections) 529 | Unhealthy_lifecycle reason -> 530 conn.pc_closed <- true; 531 532 Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 533 ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1; 534 if now_idle then 535 ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 536 537 Log.debug (fun m -> 538 m "Closing connection due to lifecycle: %s" reason); 539 close_connection pool conn; 540 ep_pool.connections := 541 List.filter (fun c -> c != conn) !(ep_pool.connections)) 542 543(** {1 Public API} *) 544 545let close_all_endpoints pool = 546 Hashtbl.iter 547 (fun _endpoint ep_pool -> 548 List.iter (fun conn -> close_connection pool conn) !(ep_pool.connections)) 549 pool.endpoints; 550 Hashtbl.clear pool.endpoints 551 552let v ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls 553 ?(config = Config.default) ~protocol () = 554 Log.info (fun m -> 555 m "Creating connection pool (max_per_endpoint=%d)" 556 (Config.max_connections_per_endpoint config)); 557 558 let pool = 559 { 560 sw; 561 net; 562 clock; 563 config; 564 tls; 565 protocol; 566 endpoints = Hashtbl.create 16; 567 endpoints_mutex = Eio.Mutex.create (); 568 } 569 in 570 571 (* Auto-cleanup on switch release *) 572 Eio.Switch.on_release sw (fun () -> 573 Eio.Cancel.protect (fun () -> 574 Log.info (fun m -> m "Closing connection pool"); 575 close_all_endpoints pool)); 576 577 Pool pool 578 579let basic ~sw ~net ~clock ?tls ?config () = 580 v ~sw ~net ~clock ?tls ?config ~protocol:default_protocol () 581 582let connection ~sw (Pool pool) endpoint = 583 Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint); 584 585 let ep_pool = ensure_endpoint_pool pool endpoint in 586 let conn = acquire_connection pool ep_pool endpoint in 587 588 (* Release connection when switch ends *) 589 Eio.Switch.on_release sw (fun () -> release_connection pool ep_pool conn); 590 591 (* Get TLS epoch if available *) 592 let tls_epoch = 593 match conn.pc_tls_flow with 594 | Some tls_flow -> ( 595 match Tls_eio.epoch tls_flow with 596 | Ok epoch -> Some epoch 597 | Error () -> None) 598 | None -> None 599 in 600 601 { flow = conn.pc_flow; tls_epoch; state = conn.pc_state } 602 603let with_connection pool endpoint f = 604 Eio.Switch.run (fun sw -> f (connection ~sw pool endpoint)) 605 606let stats (Pool pool) endpoint = 607 match Hashtbl.find_opt pool.endpoints endpoint with 608 | Some ep_pool -> 609 Eio.Mutex.use_ro ep_pool.stats_mutex (fun () -> 610 snapshot_stats ep_pool.stats) 611 | None -> 612 Stats.v ~active:0 ~idle:0 ~total_created:0 ~total_reused:0 ~total_closed:0 613 ~errors:0 614 615let all_stats (Pool pool) = 616 Eio.Mutex.use_ro pool.endpoints_mutex (fun () -> 617 Hashtbl.fold 618 (fun endpoint ep_pool acc -> 619 let stats = 620 Eio.Mutex.use_ro ep_pool.stats_mutex (fun () -> 621 snapshot_stats ep_pool.stats) 622 in 623 (endpoint, stats) :: acc) 624 pool.endpoints []) 625 626let pp ppf (Pool pool) = 627 let endpoint_count = Hashtbl.length pool.endpoints in 628 let all = 629 Hashtbl.fold 630 (fun endpoint ep_pool acc -> 631 let stats = 632 Eio.Mutex.use_ro ep_pool.stats_mutex (fun () -> 633 snapshot_stats ep_pool.stats) 634 in 635 (endpoint, stats) :: acc) 636 pool.endpoints [] 637 in 638 Fmt.pf ppf "@[<v>Pool:@,- endpoints: %d@,- %a@]" endpoint_count 639 Fmt.( 640 list ~sep:cut (fun ppf (ep, stats) -> 641 Fmt.pf ppf " %a: %a" Endpoint.pp ep Stats.pp stats)) 642 all 643 644let clear_endpoint (Pool pool) endpoint = 645 Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint); 646 match Hashtbl.find_opt pool.endpoints endpoint with 647 | Some ep_pool -> 648 Eio.Cancel.protect (fun () -> 649 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () -> 650 List.iter 651 (fun conn -> close_connection pool conn) 652 !(ep_pool.connections); 653 ep_pool.connections := []); 654 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () -> 655 Hashtbl.remove pool.endpoints endpoint)) 656 | None -> 657 Log.debug (fun m -> 658 m "No endpoint pool found for %a" Endpoint.pp endpoint)