TCP/TLS connection pooling for Eio
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(lint): resolve E331 redundant function prefixes

- conpool: create_connection_switch → connection_switch,
create_new_connection → new_connection
- crow: find_finding → finding

+125 -178
+125 -178
lib/conpool.ml
··· 174 174 175 175 (** {1 Connection Creation} *) 176 176 177 - let connection pool endpoint = 178 - Log.debug (fun m -> m "Creating connection to %a" Endpoint.pp endpoint); 177 + let resolve_endpoint pool endpoint = 178 + try 179 + let addrs = 180 + Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) 181 + ~service:(string_of_int (Endpoint.port endpoint)) 182 + in 183 + match addrs with 184 + | addr :: _ -> addr 185 + | [] -> 186 + raise 187 + (err (Dns_resolution_failed { hostname = Endpoint.host endpoint })) 188 + with Eio.Io _ as ex -> 189 + let bt = Printexc.get_raw_backtrace () in 190 + Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint 179 191 180 - (* DNS resolution *) 181 - let addr = 182 - try 183 - let addrs = 184 - Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint) 185 - ~service:(string_of_int (Endpoint.port endpoint)) 186 - in 187 - match addrs with 188 - | addr :: _ -> addr 189 - | [] -> 190 - raise 191 - (err (Dns_resolution_failed { hostname = Endpoint.host endpoint })) 192 - with Eio.Io _ as ex -> 193 - let bt = Printexc.get_raw_backtrace () in 194 - Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint 195 - in 192 + let tcp_connect pool addr endpoint = 193 + try 194 + match Config.connect_timeout pool.config with 195 + | Some timeout -> 196 + Eio.Time.with_timeout_exn pool.clock timeout (fun () -> 197 + Eio.Net.connect ~sw:pool.sw pool.net addr) 198 + | None -> Eio.Net.connect ~sw:pool.sw pool.net addr 199 + with Eio.Io _ as ex -> 200 + let bt = Printexc.get_raw_backtrace () in 201 + Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint 196 202 197 - (* TCP connection with optional timeout *) 198 - let socket = 199 - try 200 - match Config.connect_timeout pool.config with 201 - | Some timeout -> 202 - Eio.Time.with_timeout_exn pool.clock timeout (fun () -> 203 - Eio.Net.connect ~sw:pool.sw pool.net addr) 204 - | None -> Eio.Net.connect ~sw:pool.sw pool.net addr 205 - with Eio.Io _ as ex -> 206 - let bt = Printexc.get_raw_backtrace () in 207 - Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint 208 - in 203 + let tls_handshake pool socket endpoint = 204 + match pool.tls with 205 + | None -> ((socket :> connection), None) 206 + | Some tls_config -> ( 207 + try 208 + Log.debug (fun m -> 209 + m "Initiating TLS handshake with %a" Endpoint.pp endpoint); 210 + let host = 211 + Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint))) 212 + in 213 + let tls = Tls_eio.client_of_flow ~host tls_config socket in 214 + suppress_tls_tracing (); 215 + Log.info (fun m -> 216 + m "TLS connection established to %a" Endpoint.pp endpoint); 217 + ((tls :> connection), Some tls) 218 + with Eio.Io _ as ex -> 219 + let bt = Printexc.get_raw_backtrace () in 220 + Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp 221 + endpoint) 209 222 210 - Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint); 223 + let tls_epoch_of_flow = function 224 + | Some tls_flow -> ( 225 + match Tls_eio.epoch tls_flow with 226 + | Ok epoch -> Some epoch 227 + | Error () -> None) 228 + | None -> None 211 229 212 - (* Optional TLS handshake *) 213 - let flow, tls_flow = 214 - match pool.tls with 215 - | None -> ((socket :> connection), None) 216 - | Some tls_config -> ( 217 - try 218 - Log.debug (fun m -> 219 - m "Initiating TLS handshake with %a" Endpoint.pp endpoint); 220 - let host = 221 - Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint))) 222 - in 223 - let tls = Tls_eio.client_of_flow ~host tls_config socket in 224 - suppress_tls_tracing (); 225 - Log.info (fun m -> 226 - m "TLS connection established to %a" Endpoint.pp endpoint); 227 - ((tls :> connection), Some tls) 228 - with Eio.Io _ as ex -> 229 - let bt = Printexc.get_raw_backtrace () in 230 - Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp 231 - endpoint) 232 - in 233 - 234 - (* Get TLS epoch if available *) 235 - let tls_epoch = 236 - match tls_flow with 237 - | Some tls_flow -> ( 238 - match Tls_eio.epoch tls_flow with 239 - | Ok epoch -> Some epoch 240 - | Error () -> None) 241 - | None -> None 242 - in 243 - 244 - (* Create connection-lifetime sub-switch via a fiber. 245 - This switch lives for the connection's lifetime and can be used 246 - by the protocol handler to spawn long-running fibers (e.g., HTTP/2 reader). *) 230 + let connection_switch pool = 247 231 let conn_sw_ref = ref None in 248 232 let conn_cancel_ref = ref (fun (_ : exn) -> ()) in 249 233 let ready_promise, ready_resolver = Eio.Promise.create () in 250 - 251 - (* Use fork_daemon so connection fibers don't prevent parent switch from completing. 252 - When the parent switch completes, all connection daemon fibers are cancelled, 253 - which triggers cleanup of their inner switches and connection resources. *) 254 234 Eio.Fiber.fork_daemon ~sw:pool.sw (fun () -> 255 235 (try 256 236 Eio.Switch.run (fun conn_sw -> 257 237 conn_sw_ref := Some conn_sw; 258 238 (conn_cancel_ref := fun exn -> Eio.Switch.fail conn_sw exn); 259 - (* Signal that the switch is ready *) 260 239 Eio.Promise.resolve ready_resolver (); 261 - (* Block until the switch is cancelled *) 262 240 Eio.Fiber.await_cancel ()) 263 241 with 264 242 | Eio.Cancel.Cancelled _ -> () ··· 266 244 Log.warn (fun m -> 267 245 m "Connection fiber caught exception: %s" (Printexc.to_string exn))); 268 246 `Stop_daemon); 269 - 270 - (* Wait for the switch to be created *) 271 247 Eio.Promise.await ready_promise; 272 - let conn_sw = Option.get !conn_sw_ref in 273 - let conn_cancel = !conn_cancel_ref in 248 + (Option.get !conn_sw_ref, !conn_cancel_ref) 274 249 275 - (* Initialize protocol-specific state with connection switch *) 250 + let connection pool endpoint = 251 + Log.debug (fun m -> m "Creating connection to %a" Endpoint.pp endpoint); 252 + let addr = resolve_endpoint pool endpoint in 253 + let socket = tcp_connect pool addr endpoint in 254 + Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint); 255 + let flow, tls_flow = tls_handshake pool socket endpoint in 256 + let tls_epoch = tls_epoch_of_flow tls_flow in 257 + let conn_sw, conn_cancel = connection_switch pool in 276 258 Log.debug (fun m -> 277 259 m "Initializing protocol state for %a" Endpoint.pp endpoint); 278 260 let state = pool.protocol.init_state ~sw:conn_sw ~flow ~tls_epoch in 279 - 280 261 let now = time pool in 281 - 282 262 Log.info (fun m -> m "Created connection to %a" Endpoint.pp endpoint); 283 - 284 263 { 285 264 pc_flow = flow; 286 265 pc_tls_flow = tls_flow; ··· 404 383 405 384 (** {1 Connection Acquisition} *) 406 385 386 + let activate_reused pool ep_pool conn ~was_idle = 387 + conn.pc_active_users <- conn.pc_active_users + 1; 388 + conn.pc_last_used <- time pool; 389 + conn.pc_use_count <- conn.pc_use_count + 1; 390 + Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 391 + ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1; 392 + ep_pool.stats.active <- ep_pool.stats.active + 1; 393 + if was_idle then ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 394 + pool.protocol.on_acquire conn.pc_state; 395 + conn 396 + 397 + let wait_for_shared_slot pool ep_pool conn endpoint = 398 + while 399 + (conn.pc_active_users 400 + >= 401 + match pool.protocol.access_mode conn.pc_state with 402 + | Config.Shared n -> n 403 + | Config.Exclusive -> 1) 404 + && not conn.pc_closed 405 + do 406 + Eio.Condition.await_no_mutex conn.pc_user_available 407 + done; 408 + if conn.pc_closed then None 409 + else Some (activate_reused pool ep_pool conn ~was_idle:false) 410 + 411 + let wait_for_exclusive_slot pool ep_pool conn endpoint = 412 + while conn.pc_active_users > 0 && not conn.pc_closed do 413 + Eio.Condition.await_no_mutex conn.pc_user_available 414 + done; 415 + if conn.pc_closed then None 416 + else Some (activate_reused pool ep_pool conn ~was_idle:true) 417 + 418 + let new_connection pool ep_pool endpoint = 419 + let conn = connection pool endpoint in 420 + conn.pc_active_users <- 1; 421 + ep_pool.connections := conn :: !(ep_pool.connections); 422 + Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 423 + ep_pool.stats.total_created <- ep_pool.stats.total_created + 1; 424 + ep_pool.stats.active <- ep_pool.stats.active + 1); 425 + Log.info (fun m -> 426 + m "Created new connection to %a (total=%d)" Endpoint.pp endpoint 427 + (List.length !(ep_pool.connections))); 428 + pool.protocol.on_acquire conn.pc_state; 429 + conn 430 + 407 431 let rec acquire_connection pool ep_pool endpoint = 408 432 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () -> 409 - (* Find an existing healthy connection with available capacity *) 410 433 let rec find_available = function 411 434 | [] -> None 412 435 | conn :: rest -> ··· 425 448 end 426 449 in 427 450 428 - (* Clean up closed connections *) 429 451 ep_pool.connections := 430 452 List.filter (fun c -> not c.pc_closed) !(ep_pool.connections); 431 453 432 454 match find_available !(ep_pool.connections) with 433 455 | Some conn -> 434 - (* Reuse existing connection *) 435 456 let was_idle = conn.pc_active_users = 0 in 436 - conn.pc_active_users <- conn.pc_active_users + 1; 437 - conn.pc_last_used <- time pool; 438 - conn.pc_use_count <- conn.pc_use_count + 1; 439 - 440 - Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 441 - ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1; 442 - ep_pool.stats.active <- ep_pool.stats.active + 1; 443 - (* Decrement idle count when connection becomes active *) 444 - if was_idle then 445 - ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 446 - 447 457 Log.debug (fun m -> 448 458 m "Reusing connection to %a (users=%d)" Endpoint.pp endpoint 449 - conn.pc_active_users); 450 - 451 - (* Notify protocol handler of acquisition *) 452 - pool.protocol.on_acquire conn.pc_state; 453 - conn 459 + (conn.pc_active_users + 1)); 460 + activate_reused pool ep_pool conn ~was_idle 454 461 | None -> 455 - (* Need to create a new connection *) 456 462 let max_conns = Config.max_connections_per_endpoint pool.config in 457 463 let current_conns = List.length !(ep_pool.connections) in 458 - 459 464 if current_conns >= max_conns then begin 460 - (* Wait for a connection to become available *) 461 465 Log.debug (fun m -> 462 466 m "At connection limit for %a (%d), waiting..." Endpoint.pp 463 467 endpoint max_conns); 464 - 465 - (* Find a connection to wait on (prefer shared mode) *) 466 468 let wait_conn = 467 469 List.find_opt 468 470 (fun c -> ··· 471 473 | Config.Exclusive -> false) 472 474 !(ep_pool.connections) 473 475 in 474 - 475 476 match wait_conn with 476 - | Some conn -> 477 - (* Wait for user slot *) 478 - while 479 - (conn.pc_active_users 480 - >= 481 - match pool.protocol.access_mode conn.pc_state with 482 - | Config.Shared n -> n 483 - | Config.Exclusive -> 1) 484 - && not conn.pc_closed 485 - do 486 - Eio.Condition.await_no_mutex conn.pc_user_available 487 - done; 488 - if conn.pc_closed then acquire_connection pool ep_pool endpoint 489 - else begin 490 - conn.pc_active_users <- conn.pc_active_users + 1; 491 - conn.pc_last_used <- time pool; 492 - conn.pc_use_count <- conn.pc_use_count + 1; 493 - 494 - Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 495 - ep_pool.stats.total_reused <- 496 - ep_pool.stats.total_reused + 1; 497 - ep_pool.stats.active <- ep_pool.stats.active + 1); 498 - 499 - (* Notify protocol handler of acquisition *) 500 - pool.protocol.on_acquire conn.pc_state; 501 - conn 502 - end 503 - | None -> 504 - (* All connections are exclusive and in use - wait for any *) 477 + | Some conn -> ( 478 + match wait_for_shared_slot pool ep_pool conn endpoint with 479 + | Some conn -> conn 480 + | None -> acquire_connection pool ep_pool endpoint) 481 + | None -> ( 505 482 let any_conn = List.hd !(ep_pool.connections) in 506 - while any_conn.pc_active_users > 0 && not any_conn.pc_closed do 507 - Eio.Condition.await_no_mutex any_conn.pc_user_available 508 - done; 509 - if any_conn.pc_closed then 510 - acquire_connection pool ep_pool endpoint 511 - else begin 512 - (* Connection was idle (active_users = 0), now becoming active *) 513 - Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 514 - ep_pool.stats.total_reused <- 515 - ep_pool.stats.total_reused + 1; 516 - ep_pool.stats.active <- ep_pool.stats.active + 1; 517 - ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1)); 518 - any_conn.pc_active_users <- 1; 519 - any_conn.pc_last_used <- time pool; 520 - any_conn.pc_use_count <- any_conn.pc_use_count + 1; 521 - (* Notify protocol handler of acquisition *) 522 - pool.protocol.on_acquire any_conn.pc_state; 523 - any_conn 524 - end 483 + match 484 + wait_for_exclusive_slot pool ep_pool any_conn endpoint 485 + with 486 + | Some conn -> conn 487 + | None -> acquire_connection pool ep_pool endpoint) 525 488 end 526 - else begin 527 - (* Create new connection *) 528 - let conn = connection pool endpoint in 529 - conn.pc_active_users <- 1; 530 - ep_pool.connections := conn :: !(ep_pool.connections); 531 - 532 - Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () -> 533 - ep_pool.stats.total_created <- ep_pool.stats.total_created + 1; 534 - ep_pool.stats.active <- ep_pool.stats.active + 1); 535 - 536 - Log.info (fun m -> 537 - m "Created new connection to %a (total=%d)" Endpoint.pp endpoint 538 - (List.length !(ep_pool.connections))); 539 - 540 - (* Notify protocol handler of acquisition *) 541 - pool.protocol.on_acquire conn.pc_state; 542 - conn 543 - end) 489 + else new_connection pool ep_pool endpoint) 544 490 545 491 (** {1 Connection Release} *) 546 492 ··· 598 544 599 545 (** {1 Public API} *) 600 546 547 + let close_all_endpoints pool = 548 + Hashtbl.iter 549 + (fun _endpoint ep_pool -> 550 + List.iter (fun conn -> close_connection pool conn) !(ep_pool.connections)) 551 + pool.endpoints; 552 + Hashtbl.clear pool.endpoints 553 + 601 554 let v ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls 602 555 ?(config = Config.default) ~protocol () = 603 556 Log.info (fun m -> ··· 621 574 Eio.Switch.on_release sw (fun () -> 622 575 Eio.Cancel.protect (fun () -> 623 576 Log.info (fun m -> m "Closing connection pool"); 624 - Hashtbl.iter 625 - (fun _endpoint ep_pool -> 626 - List.iter 627 - (fun conn -> close_connection pool conn) 628 - !(ep_pool.connections)) 629 - pool.endpoints; 630 - Hashtbl.clear pool.endpoints)); 577 + close_all_endpoints pool)); 631 578 632 579 Pool pool 633 580