···1515 max_connection_lifetime : float;
1616 max_connection_uses : int option;
1717 health_check :
1818- ([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) option;
1818+ ([ Eio.Resource.close_ty | Eio.Flow.two_way_ty ] Eio.Resource.t -> bool)
1919+ option;
1920 connect_timeout : float option;
2021 connect_retry_count : int;
2122 connect_retry_delay : float;
···114115(** {1 Protocol Handler Configuration}
115116116117 Protocol handlers define protocol-specific behavior for connection pools.
117117- This enables different pooling strategies for different protocols
118118- (e.g., exclusive for HTTP/1.x, shared for HTTP/2). *)
118118+ This enables different pooling strategies for different protocols (e.g.,
119119+ exclusive for HTTP/1.x, shared for HTTP/2). *)
119120120121(** Access mode for connections.
121122 - [Exclusive] - Each connection is used by one request at a time (HTTP/1.x)
122123 - [Shared] - Multiple requests can share a connection (HTTP/2) *)
123124type access_mode =
124124- | Exclusive
125125- (** Exclusive access - one request per connection at a time *)
125125+ | Exclusive (** Exclusive access - one request per connection at a time *)
126126 | Shared of int
127127 (** Shared access - up to n concurrent requests per connection *)
128128129129+type connection_flow =
130130+ [ Eio.Resource.close_ty | Eio.Flow.two_way_ty ] Eio.Resource.t
129131(** Connection type alias for protocol config *)
130130-type connection_flow = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t
131132132132-(** Protocol configuration for typed connection pools.
133133- @param 'state The protocol-specific state type (e.g., H2_client.t for HTTP/2) *)
134133type 'state protocol_config = {
135134 init_state :
136135 sw:Eio.Switch.t ->
137136 flow:connection_flow ->
138137 tls_epoch:Tls.Core.epoch_data option ->
139138 'state;
140140- (** Initialize protocol state when a new connection is created.
141141- The [sw] parameter is a connection-lifetime switch that can be used
142142- to spawn long-running fibers (e.g., HTTP/2 frame reader).
143143- For HTTP/2, this performs the handshake and returns the H2_client.t. *)
144144-139139+ (** Initialize protocol state when a new connection is created. The [sw]
140140+ parameter is a connection-lifetime switch that can be used to spawn
141141+ long-running fibers (e.g., HTTP/2 frame reader). For HTTP/2, this
142142+ performs the handshake and returns the H2_client.t. *)
145143 on_acquire : 'state -> unit;
146146- (** Called when a connection is acquired from the pool.
147147- For HTTP/2, this can start the background reader fiber if not already running. *)
148148-144144+ (** Called when a connection is acquired from the pool. For HTTP/2, this
145145+ can start the background reader fiber if not already running. *)
149146 on_release : 'state -> unit;
150150- (** Called when a connection is released back to the pool.
151151- For HTTP/2, this is typically a no-op since the reader keeps running. *)
152152-147147+ (** Called when a connection is released back to the pool. For HTTP/2,
148148+ this is typically a no-op since the reader keeps running. *)
153149 is_healthy : 'state -> bool;
154154- (** Protocol-specific health check. Return false if connection should be closed.
155155- For HTTP/2, checks if GOAWAY has been received. *)
156156-150150+ (** Protocol-specific health check. Return false if connection should be
151151+ closed. For HTTP/2, checks if GOAWAY has been received. *)
157152 on_close : 'state -> unit;
158158- (** Cleanup callback when connection is destroyed.
159159- For HTTP/2, can send GOAWAY frame. *)
160160-153153+ (** Cleanup callback when connection is destroyed. For HTTP/2, can send
154154+ GOAWAY frame. *)
161155 access_mode : 'state -> access_mode;
162162- (** Get the access mode for this connection.
163163- For HTTP/2, returns [Shared n] with max_concurrent from peer settings. *)
156156+ (** Get the access mode for this connection. For HTTP/2, returns
157157+ [Shared n] with max_concurrent from peer settings. *)
164158}
159159+(** Protocol configuration for typed connection pools.
160160+ @param 'state
161161+ The protocol-specific state type (e.g., H2_client.t for HTTP/2) *)
+29-29
lib/config.mli
···2525 ?max_idle_time:float ->
2626 ?max_connection_lifetime:float ->
2727 ?max_connection_uses:int ->
2828- ?health_check:([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) ->
2828+ ?health_check:
2929+ ([ Eio.Resource.close_ty | Eio.Flow.two_way_ty ] Eio.Resource.t -> bool) ->
2930 ?connect_timeout:float ->
3031 ?connect_retry_count:int ->
3132 ?connect_retry_delay:float ->
···8081(** Get maximum connection uses, if any. *)
81828283val health_check :
8383- t -> ([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) option
8484+ t ->
8585+ ([ Eio.Resource.close_ty | Eio.Flow.two_way_ty ] Eio.Resource.t -> bool)
8686+ option
8487(** Get custom health check function, if any. *)
85888689val connect_timeout : t -> float option
···108111109112(** {1 Protocol Handler Configuration}
110113111111- Protocol handlers define protocol-specific behavior for typed connection pools.
112112- This enables different pooling strategies for different protocols
114114+ Protocol handlers define protocol-specific behavior for typed connection
115115+ pools. This enables different pooling strategies for different protocols
113116 (e.g., exclusive for HTTP/1.x, shared for HTTP/2). *)
114117115118(** Access mode for connections.
116119 - [Exclusive] - Each connection is used by one request at a time (HTTP/1.x)
117117- - [Shared n] - Up to n concurrent requests can share a connection (HTTP/2) *)
120120+ - [Shared n] - Up to n concurrent requests can share a connection (HTTP/2)
121121+*)
118122type access_mode =
119119- | Exclusive
120120- (** Exclusive access - one request per connection at a time *)
123123+ | Exclusive (** Exclusive access - one request per connection at a time *)
121124 | Shared of int
122125 (** Shared access - up to n concurrent requests per connection *)
123126127127+type connection_flow =
128128+ [ Eio.Resource.close_ty | Eio.Flow.two_way_ty ] Eio.Resource.t
124129(** Connection flow type for protocol handlers. *)
125125-type connection_flow = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t
126130127127-(** Protocol configuration for typed connection pools.
128128- @param 'state The protocol-specific state type (e.g., H2_client.t for HTTP/2) *)
129131type 'state protocol_config = {
130132 init_state :
131133 sw:Eio.Switch.t ->
132134 flow:connection_flow ->
133135 tls_epoch:Tls.Core.epoch_data option ->
134136 'state;
135135- (** Initialize protocol state when a new connection is created.
136136- The [sw] parameter is a connection-lifetime switch that can be used
137137- to spawn long-running fibers (e.g., HTTP/2 frame reader).
138138- For HTTP/2, this performs the handshake and returns the H2_client.t. *)
139139-137137+ (** Initialize protocol state when a new connection is created. The [sw]
138138+ parameter is a connection-lifetime switch that can be used to spawn
139139+ long-running fibers (e.g., HTTP/2 frame reader). For HTTP/2, this
140140+ performs the handshake and returns the H2_client.t. *)
140141 on_acquire : 'state -> unit;
141141- (** Called when a connection is acquired from the pool.
142142- For HTTP/2, this can start the background reader fiber if not already running. *)
143143-142142+ (** Called when a connection is acquired from the pool. For HTTP/2, this
143143+ can start the background reader fiber if not already running. *)
144144 on_release : 'state -> unit;
145145- (** Called when a connection is released back to the pool.
146146- For HTTP/2, this is typically a no-op since the reader keeps running. *)
147147-145145+ (** Called when a connection is released back to the pool. For HTTP/2,
146146+ this is typically a no-op since the reader keeps running. *)
148147 is_healthy : 'state -> bool;
149149- (** Protocol-specific health check. Return false if connection should be closed.
150150- For HTTP/2, checks if GOAWAY has been received. *)
151151-148148+ (** Protocol-specific health check. Return false if connection should be
149149+ closed. For HTTP/2, checks if GOAWAY has been received. *)
152150 on_close : 'state -> unit;
153153- (** Cleanup callback when connection is destroyed.
154154- For HTTP/2, can send GOAWAY frame. *)
155155-151151+ (** Cleanup callback when connection is destroyed. For HTTP/2, can send
152152+ GOAWAY frame. *)
156153 access_mode : 'state -> access_mode;
157157- (** Get the access mode for this connection.
158158- For HTTP/2, returns [Shared n] with max_concurrent from peer settings. *)
154154+ (** Get the access mode for this connection. For HTTP/2, returns
155155+ [Shared n] with max_concurrent from peer settings. *)
159156}
157157+(** Protocol configuration for typed connection pools.
158158+ @param 'state
159159+ The protocol-specific state type (e.g., H2_client.t for HTTP/2) *)
···2222let suppress_tls_tracing () =
2323 if not !tls_tracing_suppressed then begin
2424 tls_tracing_suppressed := true;
2525- match List.find_opt (fun s -> Logs.Src.name s = "tls.tracing") (Logs.Src.list ()) with
2626- | Some tls_src ->
2727- (match Logs.Src.level tls_src with
2828- | Some Logs.Debug -> Logs.Src.set_level tls_src (Some Logs.Warning)
2929- | _ -> ())
2525+ match
2626+ List.find_opt
2727+ (fun s -> Logs.Src.name s = "tls.tracing")
2828+ (Logs.Src.list ())
2929+ with
3030+ | Some tls_src -> (
3131+ match Logs.Src.level tls_src with
3232+ | Some Logs.Debug -> Logs.Src.set_level tls_src (Some Logs.Warning)
3333+ | _ -> ())
3034 | None -> ()
3135 end
3236···61656266let () =
6367 Eio.Exn.register_pp (fun f -> function
6464- | E e ->
6565- Fmt.string f "Conpool ";
6666- pp_error f e;
6767- true
6868- | _ -> false)
6868+ | E e ->
6969+ Fmt.string f "Conpool ";
7070+ pp_error f e;
7171+ true
7272+ | _ -> false)
69737074(** {1 Connection Types} *)
71757272-type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
7676+type connection_ty = [ Eio.Resource.close_ty | Eio.Flow.two_way_ty ]
7377type connection = connection_ty Eio.Resource.t
74787579(** {1 Internal Types} *)
76807777-(** Internal connection wrapper with protocol state and tracking. *)
7881type 'state pooled_connection = {
7982 pc_flow : connection;
8083 pc_tls_flow : Tls_eio.t option;
···8992 pc_user_available : Eio.Condition.t;
9093 mutable pc_closed : bool;
9194 pc_connection_cancel : exn -> unit;
9292- (** Cancels the connection-lifetime switch, stopping any protocol fibers. *)
9595+ (** Cancels the connection-lifetime switch, stopping any protocol fibers.
9696+ *)
9397}
9898+(** Internal connection wrapper with protocol state and tracking. *)
94999595-(** Statistics for an endpoint. *)
96100type endp_stats = {
97101 mutable active : int;
9898- mutable idle : int;
9999- (** Number of idle connections (active_users = 0). *)
102102+ mutable idle : int; (** Number of idle connections (active_users = 0). *)
100103 mutable total_created : int;
101104 mutable total_reused : int;
102105 mutable total_closed : int;
103103- mutable errors : int;
104104- (** Number of connection errors encountered. *)
106106+ mutable errors : int; (** Number of connection errors encountered. *)
105107}
108108+(** Statistics for an endpoint. *)
106109107107-(** Endpoint pool storing connections. *)
108110type 'state endpoint_pool = {
109111 connections : 'state pooled_connection list ref;
110112 ep_mutex : Eio.Mutex.t;
111113 stats : endp_stats;
112114 stats_mutex : Eio.Mutex.t;
113115}
116116+(** Endpoint pool storing connections. *)
114117115115-(** Internal pool representation. *)
116118type ('state, 'clock, 'net) internal = {
117119 sw : Eio.Switch.t;
118120 net : 'net;
···123125 endpoints : (Endpoint.t, 'state endpoint_pool) Hashtbl.t;
124126 endpoints_mutex : Eio.Mutex.t;
125127}
128128+(** Internal pool representation. *)
126129127130(** {1 Public Types} *)
128131129132type 'state t =
130130- Pool : ('state, 'clock Eio.Time.clock, 'net Eio.Net.t) internal -> 'state t
133133+ | Pool : ('state, 'clock Eio.Time.clock, 'net Eio.Net.t) internal -> 'state t
131134132135type 'state connection_info = {
133136 flow : connection;
···137140138141(** {1 Default Protocol Handler}
139142140140- For simple exclusive-access protocols (HTTP/1.x, Redis, etc.),
141141- use unit state with no special initialization. *)
143143+ For simple exclusive-access protocols (HTTP/1.x, Redis, etc.), use unit
144144+ state with no special initialization. *)
142145143143-let default_protocol : unit Config.protocol_config = {
144144- Config.init_state = (fun ~sw:_ ~flow:_ ~tls_epoch:_ -> ());
145145- on_acquire = (fun () -> ());
146146- on_release = (fun () -> ());
147147- is_healthy = (fun () -> true);
148148- on_close = (fun () -> ());
149149- access_mode = (fun () -> Config.Exclusive);
150150-}
146146+let default_protocol : unit Config.protocol_config =
147147+ {
148148+ Config.init_state = (fun ~sw:_ ~flow:_ ~tls_epoch:_ -> ());
149149+ on_acquire = (fun () -> ());
150150+ on_release = (fun () -> ());
151151+ is_healthy = (fun () -> true);
152152+ on_close = (fun () -> ());
153153+ access_mode = (fun () -> Config.Exclusive);
154154+ }
151155152156(** {1 Helper Functions} *)
153157154158let get_time pool = Eio.Time.now pool.clock
155159156156-let create_endp_stats () = {
157157- active = 0;
158158- idle = 0;
159159- total_created = 0;
160160- total_reused = 0;
161161- total_closed = 0;
162162- errors = 0;
163163-}
160160+let create_endp_stats () =
161161+ {
162162+ active = 0;
163163+ idle = 0;
164164+ total_created = 0;
165165+ total_reused = 0;
166166+ total_closed = 0;
167167+ errors = 0;
168168+ }
164169165170let snapshot_stats (stats : endp_stats) : Stats.t =
166171 Stats.make ~active:stats.active ~idle:stats.idle
···182187 match addrs with
183188 | addr :: _ -> addr
184189 | [] ->
185185- raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
190190+ raise
191191+ (err (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
186192 with Eio.Io _ as ex ->
187193 let bt = Printexc.get_raw_backtrace () in
188194 Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint
···206212 (* Optional TLS handshake *)
207213 let flow, tls_flow =
208214 match pool.tls with
209209- | None ->
210210- ((socket :> connection), None)
211211- | Some tls_config ->
215215+ | None -> ((socket :> connection), None)
216216+ | Some tls_config -> (
212217 try
213218 Log.debug (fun m ->
214219 m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
···222227 ((tls :> connection), Some tls)
223228 with Eio.Io _ as ex ->
224229 let bt = Printexc.get_raw_backtrace () in
225225- Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint
230230+ Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp
231231+ endpoint)
226232 in
227233228234 (* Get TLS epoch if available *)
···243249 let ready_promise, ready_resolver = Eio.Promise.create () in
244250245251 Eio.Fiber.fork ~sw:pool.sw (fun () ->
246246- Eio.Switch.run (fun conn_sw ->
247247- conn_sw_ref := Some conn_sw;
248248- conn_cancel_ref := (fun exn -> Eio.Switch.fail conn_sw exn);
249249- (* Signal that the switch is ready *)
250250- Eio.Promise.resolve ready_resolver ();
251251- (* Block until the switch is cancelled *)
252252- let wait_forever, _never_resolved = Eio.Promise.create () in
253253- Eio.Promise.await wait_forever
254254- )
255255- );
252252+ Eio.Switch.run (fun conn_sw ->
253253+ conn_sw_ref := Some conn_sw;
254254+ (conn_cancel_ref := fun exn -> Eio.Switch.fail conn_sw exn);
255255+ (* Signal that the switch is ready *)
256256+ Eio.Promise.resolve ready_resolver ();
257257+ (* Block until the switch is cancelled *)
258258+ let wait_forever, _never_resolved = Eio.Promise.create () in
259259+ Eio.Promise.await wait_forever));
256260257261 (* Wait for the switch to be created *)
258262 Eio.Promise.await ready_promise;
···260264 let conn_cancel = !conn_cancel_ref in
261265262266 (* Initialize protocol-specific state with connection switch *)
263263- Log.debug (fun m -> m "Initializing protocol state for %a" Endpoint.pp endpoint);
267267+ Log.debug (fun m ->
268268+ m "Initializing protocol state for %a" Endpoint.pp endpoint);
264269 let state = pool.protocol.init_state ~sw:conn_sw ~flow ~tls_epoch in
265270266271 let now = get_time pool in
···289294 | Unhealthy_error of string
290295 (** Connection failed due to an error (protocol failure, etc.) *)
291296 | Unhealthy_lifecycle of string
292292- (** Connection should close due to normal lifecycle (timeout, max uses, etc.) *)
297297+ (** Connection should close due to normal lifecycle (timeout, max uses,
298298+ etc.) *)
293299294300let check_health pool conn =
295295- if conn.pc_closed then
296296- Unhealthy_lifecycle "already closed"
301301+ if conn.pc_closed then Unhealthy_lifecycle "already closed"
297302 else
298303 (* Check protocol-specific health *)
299304 let protocol_healthy = pool.protocol.is_healthy conn.pc_state in
300305 if not protocol_healthy then begin
301306 Log.debug (fun m -> m "Connection unhealthy: protocol check failed");
302307 Unhealthy_error "protocol check failed"
303303- end else
308308+ end
309309+ else
304310 let now = get_time pool in
305311 (* Check connection age *)
306312 let age = now -. conn.pc_created_at in
307313 let max_lifetime = Config.max_connection_lifetime pool.config in
308314 if age > max_lifetime then begin
309309- Log.debug (fun m -> m "Connection unhealthy: exceeded max lifetime (%.1fs > %.1fs)"
310310- age max_lifetime);
315315+ Log.debug (fun m ->
316316+ m "Connection unhealthy: exceeded max lifetime (%.1fs > %.1fs)" age
317317+ max_lifetime);
311318 Unhealthy_lifecycle "exceeded max lifetime"
312312- end else
319319+ end
320320+ else
313321 (* Check idle time - only for idle connections *)
314322 let idle_time = now -. conn.pc_last_used in
315323 let max_idle = Config.max_idle_time pool.config in
316324 if conn.pc_active_users = 0 && idle_time > max_idle then begin
317317- Log.debug (fun m -> m "Connection unhealthy: exceeded max idle time (%.1fs > %.1fs)"
318318- idle_time max_idle);
325325+ Log.debug (fun m ->
326326+ m "Connection unhealthy: exceeded max idle time (%.1fs > %.1fs)"
327327+ idle_time max_idle);
319328 Unhealthy_lifecycle "exceeded max idle time"
320320- end else
329329+ end
330330+ else
321331 (* Check use count *)
322332 match Config.max_connection_uses pool.config with
323333 | Some max_uses when conn.pc_use_count >= max_uses ->
324324- Log.debug (fun m -> m "Connection unhealthy: exceeded max uses (%d >= %d)"
325325- conn.pc_use_count max_uses);
334334+ Log.debug (fun m ->
335335+ m "Connection unhealthy: exceeded max uses (%d >= %d)"
336336+ conn.pc_use_count max_uses);
326337 Unhealthy_lifecycle "exceeded max uses"
327327- | _ ->
328328- Healthy
338338+ | _ -> Healthy
329339330340let is_healthy pool conn =
331341 match check_health pool conn with
···341351 m "Closing connection to %a" Endpoint.pp conn.pc_endpoint);
342352343353 (* Cancel connection-lifetime switch first - this stops any protocol fibers *)
344344- (try conn.pc_connection_cancel (Failure "Connection closed")
345345- with _ -> ());
354354+ (try conn.pc_connection_cancel (Failure "Connection closed") with _ -> ());
346355347356 (* Call protocol cleanup *)
348357 pool.protocol.on_close conn.pc_state;
···367376 | None ->
368377 Log.info (fun m ->
369378 m "Creating endpoint pool for %a" Endpoint.pp endpoint);
370370- let ep_pool = {
371371- connections = ref [];
372372- ep_mutex = Eio.Mutex.create ();
373373- stats = create_endp_stats ();
374374- stats_mutex = Eio.Mutex.create ();
375375- } in
379379+ let ep_pool =
380380+ {
381381+ connections = ref [];
382382+ ep_mutex = Eio.Mutex.create ();
383383+ stats = create_endp_stats ();
384384+ stats_mutex = Eio.Mutex.create ();
385385+ }
386386+ in
376387 Hashtbl.add pool.endpoints endpoint ep_pool;
377388 ep_pool)
378389···380391381392let rec acquire_connection pool ep_pool endpoint =
382393 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
383383- (* Find an existing healthy connection with available capacity *)
384384- let rec find_available = function
385385- | [] -> None
386386- | conn :: rest ->
387387- if not (is_healthy pool conn) then begin
388388- conn.pc_closed <- true;
389389- find_available rest
390390- end else begin
391391- match pool.protocol.access_mode conn.pc_state with
392392- | Config.Exclusive ->
393393- if conn.pc_active_users = 0 then
394394- Some conn
395395- else
396396- find_available rest
397397- | Config.Shared max_concurrent ->
398398- if conn.pc_active_users < max_concurrent then
399399- Some conn
400400- else
401401- find_available rest
402402- end
403403- in
394394+ (* Find an existing healthy connection with available capacity *)
395395+ let rec find_available = function
396396+ | [] -> None
397397+ | conn :: rest ->
398398+ if not (is_healthy pool conn) then begin
399399+ conn.pc_closed <- true;
400400+ find_available rest
401401+ end
402402+ else begin
403403+ match pool.protocol.access_mode conn.pc_state with
404404+ | Config.Exclusive ->
405405+ if conn.pc_active_users = 0 then Some conn
406406+ else find_available rest
407407+ | Config.Shared max_concurrent ->
408408+ if conn.pc_active_users < max_concurrent then Some conn
409409+ else find_available rest
410410+ end
411411+ in
404412405405- (* Clean up closed connections *)
406406- ep_pool.connections := List.filter (fun c -> not c.pc_closed) !(ep_pool.connections);
413413+ (* Clean up closed connections *)
414414+ ep_pool.connections :=
415415+ List.filter (fun c -> not c.pc_closed) !(ep_pool.connections);
407416408408- match find_available !(ep_pool.connections) with
409409- | Some conn ->
410410- (* Reuse existing connection *)
411411- let was_idle = conn.pc_active_users = 0 in
412412- conn.pc_active_users <- conn.pc_active_users + 1;
413413- conn.pc_last_used <- get_time pool;
414414- conn.pc_use_count <- conn.pc_use_count + 1;
417417+ match find_available !(ep_pool.connections) with
418418+ | Some conn ->
419419+ (* Reuse existing connection *)
420420+ let was_idle = conn.pc_active_users = 0 in
421421+ conn.pc_active_users <- conn.pc_active_users + 1;
422422+ conn.pc_last_used <- get_time pool;
423423+ conn.pc_use_count <- conn.pc_use_count + 1;
415424416416- Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
417417- ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
418418- ep_pool.stats.active <- ep_pool.stats.active + 1;
419419- (* Decrement idle count when connection becomes active *)
420420- if was_idle then
421421- ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
425425+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
426426+ ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
427427+ ep_pool.stats.active <- ep_pool.stats.active + 1;
428428+ (* Decrement idle count when connection becomes active *)
429429+ if was_idle then
430430+ ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
422431423423- Log.debug (fun m ->
424424- m "Reusing connection to %a (users=%d)"
425425- Endpoint.pp endpoint conn.pc_active_users);
432432+ Log.debug (fun m ->
433433+ m "Reusing connection to %a (users=%d)" Endpoint.pp endpoint
434434+ conn.pc_active_users);
426435427427- (* Notify protocol handler of acquisition *)
428428- pool.protocol.on_acquire conn.pc_state;
429429- conn
436436+ (* Notify protocol handler of acquisition *)
437437+ pool.protocol.on_acquire conn.pc_state;
438438+ conn
439439+ | None ->
440440+ (* Need to create a new connection *)
441441+ let max_conns = Config.max_connections_per_endpoint pool.config in
442442+ let current_conns = List.length !(ep_pool.connections) in
430443431431- | None ->
432432- (* Need to create a new connection *)
433433- let max_conns = Config.max_connections_per_endpoint pool.config in
434434- let current_conns = List.length !(ep_pool.connections) in
435435-436436- if current_conns >= max_conns then begin
437437- (* Wait for a connection to become available *)
438438- Log.debug (fun m ->
439439- m "At connection limit for %a (%d), waiting..."
440440- Endpoint.pp endpoint max_conns);
444444+ if current_conns >= max_conns then begin
445445+ (* Wait for a connection to become available *)
446446+ Log.debug (fun m ->
447447+ m "At connection limit for %a (%d), waiting..." Endpoint.pp
448448+ endpoint max_conns);
441449442442- (* Find a connection to wait on (prefer shared mode) *)
443443- let wait_conn = List.find_opt (fun c ->
444444- match pool.protocol.access_mode c.pc_state with
445445- | Config.Shared _ -> true
446446- | Config.Exclusive -> false
447447- ) !(ep_pool.connections) in
450450+ (* Find a connection to wait on (prefer shared mode) *)
451451+ let wait_conn =
452452+ List.find_opt
453453+ (fun c ->
454454+ match pool.protocol.access_mode c.pc_state with
455455+ | Config.Shared _ -> true
456456+ | Config.Exclusive -> false)
457457+ !(ep_pool.connections)
458458+ in
448459449449- match wait_conn with
450450- | Some conn ->
451451- (* Wait for user slot *)
452452- while conn.pc_active_users >=
453453- (match pool.protocol.access_mode conn.pc_state with
454454- | Config.Shared n -> n
455455- | Config.Exclusive -> 1)
456456- && not conn.pc_closed do
457457- Eio.Condition.await_no_mutex conn.pc_user_available
458458- done;
459459- if conn.pc_closed then
460460- acquire_connection pool ep_pool endpoint
461461- else begin
462462- conn.pc_active_users <- conn.pc_active_users + 1;
463463- conn.pc_last_used <- get_time pool;
464464- conn.pc_use_count <- conn.pc_use_count + 1;
460460+ match wait_conn with
461461+ | Some conn ->
462462+ (* Wait for user slot *)
463463+ while
464464+ (conn.pc_active_users
465465+ >=
466466+ match pool.protocol.access_mode conn.pc_state with
467467+ | Config.Shared n -> n
468468+ | Config.Exclusive -> 1)
469469+ && not conn.pc_closed
470470+ do
471471+ Eio.Condition.await_no_mutex conn.pc_user_available
472472+ done;
473473+ if conn.pc_closed then acquire_connection pool ep_pool endpoint
474474+ else begin
475475+ conn.pc_active_users <- conn.pc_active_users + 1;
476476+ conn.pc_last_used <- get_time pool;
477477+ conn.pc_use_count <- conn.pc_use_count + 1;
465478466466- Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
467467- ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
468468- ep_pool.stats.active <- ep_pool.stats.active + 1);
479479+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
480480+ ep_pool.stats.total_reused <-
481481+ ep_pool.stats.total_reused + 1;
482482+ ep_pool.stats.active <- ep_pool.stats.active + 1);
469483470470- (* Notify protocol handler of acquisition *)
471471- pool.protocol.on_acquire conn.pc_state;
472472- conn
473473- end
474474- | None ->
475475- (* All connections are exclusive and in use - wait for any *)
476476- let any_conn = List.hd !(ep_pool.connections) in
477477- while any_conn.pc_active_users > 0 && not any_conn.pc_closed do
478478- Eio.Condition.await_no_mutex any_conn.pc_user_available
479479- done;
480480- if any_conn.pc_closed then
481481- acquire_connection pool ep_pool endpoint
482482- else begin
483483- (* Connection was idle (active_users = 0), now becoming active *)
484484- Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
485485- ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
486486- ep_pool.stats.active <- ep_pool.stats.active + 1;
487487- ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
488488- any_conn.pc_active_users <- 1;
489489- any_conn.pc_last_used <- get_time pool;
490490- any_conn.pc_use_count <- any_conn.pc_use_count + 1;
491491- (* Notify protocol handler of acquisition *)
492492- pool.protocol.on_acquire any_conn.pc_state;
493493- any_conn
494494- end
495495- end else begin
496496- (* Create new connection *)
497497- let conn = create_connection pool endpoint in
498498- conn.pc_active_users <- 1;
499499- ep_pool.connections := conn :: !(ep_pool.connections);
484484+ (* Notify protocol handler of acquisition *)
485485+ pool.protocol.on_acquire conn.pc_state;
486486+ conn
487487+ end
488488+ | None ->
489489+ (* All connections are exclusive and in use - wait for any *)
490490+ let any_conn = List.hd !(ep_pool.connections) in
491491+ while any_conn.pc_active_users > 0 && not any_conn.pc_closed do
492492+ Eio.Condition.await_no_mutex any_conn.pc_user_available
493493+ done;
494494+ if any_conn.pc_closed then
495495+ acquire_connection pool ep_pool endpoint
496496+ else begin
497497+ (* Connection was idle (active_users = 0), now becoming active *)
498498+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
499499+ ep_pool.stats.total_reused <-
500500+ ep_pool.stats.total_reused + 1;
501501+ ep_pool.stats.active <- ep_pool.stats.active + 1;
502502+ ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
503503+ any_conn.pc_active_users <- 1;
504504+ any_conn.pc_last_used <- get_time pool;
505505+ any_conn.pc_use_count <- any_conn.pc_use_count + 1;
506506+ (* Notify protocol handler of acquisition *)
507507+ pool.protocol.on_acquire any_conn.pc_state;
508508+ any_conn
509509+ end
510510+ end
511511+ else begin
512512+ (* Create new connection *)
513513+ let conn = create_connection pool endpoint in
514514+ conn.pc_active_users <- 1;
515515+ ep_pool.connections := conn :: !(ep_pool.connections);
500516501501- Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
502502- ep_pool.stats.total_created <- ep_pool.stats.total_created + 1;
503503- ep_pool.stats.active <- ep_pool.stats.active + 1);
517517+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
518518+ ep_pool.stats.total_created <- ep_pool.stats.total_created + 1;
519519+ ep_pool.stats.active <- ep_pool.stats.active + 1);
504520505505- Log.info (fun m ->
506506- m "Created new connection to %a (total=%d)"
507507- Endpoint.pp endpoint (List.length !(ep_pool.connections)));
521521+ Log.info (fun m ->
522522+ m "Created new connection to %a (total=%d)" Endpoint.pp endpoint
523523+ (List.length !(ep_pool.connections)));
508524509509- (* Notify protocol handler of acquisition *)
510510- pool.protocol.on_acquire conn.pc_state;
511511- conn
512512- end)
525525+ (* Notify protocol handler of acquisition *)
526526+ pool.protocol.on_acquire conn.pc_state;
527527+ conn
528528+ end)
513529514530(** {1 Connection Release} *)
515531···518534 pool.protocol.on_release conn.pc_state;
519535520536 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
521521- let was_active = conn.pc_active_users > 0 in
522522- conn.pc_active_users <- max 0 (conn.pc_active_users - 1);
523523- let now_idle = conn.pc_active_users = 0 in
537537+ let was_active = conn.pc_active_users > 0 in
538538+ conn.pc_active_users <- max 0 (conn.pc_active_users - 1);
539539+ let now_idle = conn.pc_active_users = 0 in
524540525525- Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
526526- ep_pool.stats.active <- max 0 (ep_pool.stats.active - 1);
527527- (* Track idle count: increment when connection becomes idle *)
528528- if was_active && now_idle then
529529- ep_pool.stats.idle <- ep_pool.stats.idle + 1);
541541+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
542542+ ep_pool.stats.active <- max 0 (ep_pool.stats.active - 1);
543543+ (* Track idle count: increment when connection becomes idle *)
544544+ if was_active && now_idle then
545545+ ep_pool.stats.idle <- ep_pool.stats.idle + 1);
530546531531- (* Signal waiting fibers *)
532532- Eio.Condition.broadcast conn.pc_user_available;
547547+ (* Signal waiting fibers *)
548548+ Eio.Condition.broadcast conn.pc_user_available;
533549534534- Log.debug (fun m ->
535535- m "Released connection to %a (users=%d)"
536536- Endpoint.pp conn.pc_endpoint conn.pc_active_users);
550550+ Log.debug (fun m ->
551551+ m "Released connection to %a (users=%d)" Endpoint.pp conn.pc_endpoint
552552+ conn.pc_active_users);
537553538538- (* Check if connection should be closed *)
539539- match check_health pool conn with
540540- | Healthy -> ()
541541- | Unhealthy_error reason ->
542542- conn.pc_closed <- true;
554554+ (* Check if connection should be closed *)
555555+ match check_health pool conn with
556556+ | Healthy -> ()
557557+ | Unhealthy_error reason ->
558558+ conn.pc_closed <- true;
543559544544- Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
545545- ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1;
546546- ep_pool.stats.errors <- ep_pool.stats.errors + 1;
547547- if now_idle then
548548- ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
560560+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
561561+ ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1;
562562+ ep_pool.stats.errors <- ep_pool.stats.errors + 1;
563563+ if now_idle then
564564+ ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
549565550550- Log.warn (fun m -> m "Closing connection due to error: %s" reason);
551551- close_connection pool conn;
552552- ep_pool.connections := List.filter (fun c -> c != conn) !(ep_pool.connections)
566566+ Log.warn (fun m -> m "Closing connection due to error: %s" reason);
567567+ close_connection pool conn;
568568+ ep_pool.connections :=
569569+ List.filter (fun c -> c != conn) !(ep_pool.connections)
570570+ | Unhealthy_lifecycle reason ->
571571+ conn.pc_closed <- true;
553572554554- | Unhealthy_lifecycle reason ->
555555- conn.pc_closed <- true;
573573+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
574574+ ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1;
575575+ if now_idle then
576576+ ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
556577557557- Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
558558- ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1;
559559- if now_idle then
560560- ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
561561-562562- Log.debug (fun m -> m "Closing connection due to lifecycle: %s" reason);
563563- close_connection pool conn;
564564- ep_pool.connections := List.filter (fun c -> c != conn) !(ep_pool.connections))
578578+ Log.debug (fun m ->
579579+ m "Closing connection due to lifecycle: %s" reason);
580580+ close_connection pool conn;
581581+ ep_pool.connections :=
582582+ List.filter (fun c -> c != conn) !(ep_pool.connections))
565583566584(** {1 Public API} *)
567585568568-let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock)
569569- ?tls ?(config = Config.default) ?protocol () =
570570- let protocol = match protocol with
586586+let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock) ?tls
587587+ ?(config = Config.default) ?protocol () =
588588+ let protocol =
589589+ match protocol with
571590 | Some p -> p
572572- | None -> Obj.magic default_protocol (* Safe: unit is compatible with any 'state *)
591591+ | None ->
592592+ Obj.magic
593593+ default_protocol (* Safe: unit is compatible with any 'state *)
573594 in
574595575596 Log.info (fun m ->
576597 m "Creating connection pool (max_per_endpoint=%d)"
577598 (Config.max_connections_per_endpoint config));
578599579579- let pool = {
580580- sw;
581581- net;
582582- clock;
583583- config;
584584- tls;
585585- protocol;
586586- endpoints = Hashtbl.create 16;
587587- endpoints_mutex = Eio.Mutex.create ();
588588- } in
600600+ let pool =
601601+ {
602602+ sw;
603603+ net;
604604+ clock;
605605+ config;
606606+ tls;
607607+ protocol;
608608+ endpoints = Hashtbl.create 16;
609609+ endpoints_mutex = Eio.Mutex.create ();
610610+ }
611611+ in
589612590613 (* Auto-cleanup on switch release *)
591614 Eio.Switch.on_release sw (fun () ->
592615 Eio.Cancel.protect (fun () ->
593616 Log.info (fun m -> m "Closing connection pool");
594594- Hashtbl.iter (fun _endpoint ep_pool ->
595595- List.iter (fun conn ->
596596- close_connection pool conn
597597- ) !(ep_pool.connections)
598598- ) pool.endpoints;
617617+ Hashtbl.iter
618618+ (fun _endpoint ep_pool ->
619619+ List.iter
620620+ (fun conn -> close_connection pool conn)
621621+ !(ep_pool.connections))
622622+ pool.endpoints;
599623 Hashtbl.clear pool.endpoints));
600624601625 Pool pool
···607631 let conn = acquire_connection pool ep_pool endpoint in
608632609633 (* Release connection when switch ends *)
610610- Eio.Switch.on_release sw (fun () ->
611611- release_connection pool ep_pool conn);
634634+ Eio.Switch.on_release sw (fun () -> release_connection pool ep_pool conn);
612635613636 (* Get TLS epoch if available *)
614637 let tls_epoch =
···620643 | None -> None
621644 in
622645623623- {
624624- flow = conn.pc_flow;
625625- tls_epoch;
626626- state = conn.pc_state;
627627- }
646646+ { flow = conn.pc_flow; tls_epoch; state = conn.pc_state }
628647629648let with_connection pool endpoint f =
630649 Eio.Switch.run (fun sw -> f (connection ~sw pool endpoint))
···632651let stats (Pool pool) endpoint =
633652 match Hashtbl.find_opt pool.endpoints endpoint with
634653 | Some ep_pool ->
635635- Eio.Mutex.use_ro ep_pool.stats_mutex (fun () -> snapshot_stats ep_pool.stats)
654654+ Eio.Mutex.use_ro ep_pool.stats_mutex (fun () ->
655655+ snapshot_stats ep_pool.stats)
636656 | None ->
637657 Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0
638658 ~total_closed:0 ~errors:0
···654674 | Some ep_pool ->
655675 Eio.Cancel.protect (fun () ->
656676 Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
657657- List.iter (fun conn ->
658658- close_connection pool conn
659659- ) !(ep_pool.connections);
660660- ep_pool.connections := []);
677677+ List.iter
678678+ (fun conn -> close_connection pool conn)
679679+ !(ep_pool.connections);
680680+ ep_pool.connections := []);
661681 Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
662682 Hashtbl.remove pool.endpoints endpoint))
663683 | None ->
+26-23
lib/conpool.mli
···66(** Conpool - Protocol-aware TCP/IP connection pooling library for Eio
7788 Conpool provides efficient connection pooling with support for both
99- exclusive (HTTP/1.x) and shared (HTTP/2) access modes. All connections
1010- carry protocol-specific state managed through callbacks.
99+ exclusive (HTTP/1.x) and shared (HTTP/2) access modes. All connections carry
1010+ protocol-specific state managed through callbacks.
11111212 {2 Quick Start}
1313···1515 {[
1616 let pool = Conpool.create ~sw ~net ~clock ~tls () in
1717 Eio.Switch.run (fun conn_sw ->
1818- let conn = Conpool.connection ~sw:conn_sw pool endpoint in
1919- (* Use conn.flow for I/O *)
2020- Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn.flow)
1818+ let conn = Conpool.connection ~sw:conn_sw pool endpoint in
1919+ (* Use conn.flow for I/O *)
2020+ Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn.flow)
2121 ]}
22222323 For multiplexed protocols (HTTP/2):
···75757676(** {1 Connection Types} *)
77777878-type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
7878+type connection_ty = [ Eio.Resource.close_ty | Eio.Flow.two_way_ty ]
7979(** Type tags for a pooled connection. *)
80808181type connection = connection_ty Eio.Resource.t
···8484(** {1 Connection Pool}
85858686 All pools are typed - they carry protocol-specific state with each
8787- connection. For simple exclusive-access protocols, use the default
8888- [unit] state which requires no protocol handler. *)
8787+ connection. For simple exclusive-access protocols, use the default [unit]
8888+ state which requires no protocol handler. *)
89899090type 'state t
9191(** Connection pool with protocol-specific state ['state].
92929393- - For HTTP/1.x: use [unit t] with exclusive access (one request per connection)
9494- - For HTTP/2: use [h2_state t] with shared access (multiple streams per connection) *)
9393+ - For HTTP/1.x: use [unit t] with exclusive access (one request per
9494+ connection)
9595+ - For HTTP/2: use [h2_state t] with shared access (multiple streams per
9696+ connection) *)
95979696-(** Connection with protocol-specific state. *)
9798type 'state connection_info = {
9898- flow : connection;
9999- (** The underlying connection flow for I/O. *)
9999+ flow : connection; (** The underlying connection flow for I/O. *)
100100 tls_epoch : Tls.Core.epoch_data option;
101101 (** TLS epoch data if connection uses TLS. *)
102102- state : 'state;
103103- (** Protocol-specific state (e.g., H2_client.t for HTTP/2). *)
102102+ state : 'state; (** Protocol-specific state (e.g., H2_client.t for HTTP/2). *)
104103}
104104+(** Connection with protocol-specific state. *)
105105106106(** {2 Pool Creation} *)
107107···121121 @param clock Clock for timeouts
122122 @param tls Optional TLS client configuration
123123 @param config Pool configuration (uses {!Config.default} if not provided)
124124- @param protocol Protocol handler for state management. If not provided,
125125- creates a [unit t] pool with exclusive access mode (one user per connection).
124124+ @param protocol
125125+ Protocol handler for state management. If not provided, creates a [unit t]
126126+ pool with exclusive access mode (one user per connection).
126127127128 Examples:
128129···138139139140(** {2 Connection Acquisition} *)
140141141141-val connection : sw:Eio.Switch.t -> 'state t -> Endpoint.t -> 'state connection_info
142142+val connection :
143143+ sw:Eio.Switch.t -> 'state t -> Endpoint.t -> 'state connection_info
142144(** [connection ~sw pool endpoint] acquires a connection from the pool.
143145144146 The connection is automatically released when [sw] finishes:
···152154 Example:
153155 {[
154156 Eio.Switch.run (fun sw ->
155155- let conn = Conpool.connection ~sw pool endpoint in
156156- (* For HTTP/1.x: conn.state is () *)
157157- (* For HTTP/2: conn.state is H2_client.t *)
158158- Eio.Flow.copy_string data conn.flow)
157157+ let conn = Conpool.connection ~sw pool endpoint in
158158+ (* For HTTP/1.x: conn.state is () *)
159159+ (* For HTTP/2: conn.state is H2_client.t *)
160160+ Eio.Flow.copy_string data conn.flow)
159161 ]} *)
160162161161-val with_connection : 'state t -> Endpoint.t -> ('state connection_info -> 'a) -> 'a
163163+val with_connection :
164164+ 'state t -> Endpoint.t -> ('state connection_info -> 'a) -> 'a
162165(** [with_connection pool endpoint fn] is a convenience wrapper.
163166164167 Equivalent to: