···11+# v1.0.0 (dev)
22+33+- Initial release of Conpool
+15
LICENSE.md
···11+ISC License
22+33+Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>
44+55+Permission to use, copy, modify, and distribute this software for any
66+purpose with or without fee is hereby granted, provided that the above
77+copyright notice and this permission notice appear in all copies.
88+99+THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
1010+WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
1111+MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
1212+ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
1313+WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
1414+ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
1515+OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+110
README.md
···11+# Conpool - Protocol-agnostic Connection Pooling for Eio
22+33+Conpool is a connection pooling library built on Eio that manages TCP connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol.
44+55+## Key Features
66+77+- **Protocol-agnostic**: Works with HTTP, Redis, PostgreSQL, or any TCP-based protocol
88+- **Health validation**: Automatically validates connections before reuse
99+- **Per-endpoint limits**: Independent connection limits and pooling for each endpoint
1010+- **TLS support**: Optional TLS configuration for secure connections
1111+- **Statistics & monitoring**: Track connection usage, hits/misses, and health status
1212+- **Built on Eio**: Leverages Eio's structured concurrency and resource management
1313+1414+## Usage
1515+1616+Basic example establishing a connection pool:
1717+1818+```ocaml
1919+open Eio.Std
2020+2121+let run env =
2222+ Switch.run (fun sw ->
2323+ (* Create a connection pool *)
2424+ let pool = Conpool.create
2525+ ~sw
2626+ ~net:(Eio.Stdenv.net env)
2727+ ~clock:(Eio.Stdenv.clock env)
2828+ ()
2929+ in
3030+3131+ (* Define an endpoint *)
3232+ let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:80 in
3333+3434+ (* Use a connection from the pool *)
3535+ Conpool.with_connection pool endpoint (fun conn ->
3636+ Eio.Flow.copy_string "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n" conn;
3737+ let buf = Eio.Buf_read.of_flow conn ~max_size:4096 in
3838+ Eio.Buf_read.take_all buf
3939+ )
4040+ )
4141+```
4242+4343+With TLS configuration:
4444+4545+```ocaml
4646+let run env =
4747+ Switch.run (fun sw ->
4848+ (* Create TLS configuration - SNI servername is automatically set to the endpoint's hostname *)
4949+ let tls_config = Tls.Config.client ~authenticator:(Ca_certs.authenticator ()) () in
5050+5151+ (* Create pool with TLS *)
5252+ let pool = Conpool.create
5353+ ~sw
5454+ ~net:(Eio.Stdenv.net env)
5555+ ~clock:(Eio.Stdenv.clock env)
5656+ ~tls:tls_config
5757+ ()
5858+ in
5959+6060+ let endpoint = Conpool.Endpoint.make ~host:"example.com" ~port:443 in
6161+ Conpool.with_connection pool endpoint (fun conn ->
6262+ (* Use TLS-encrypted connection *)
6363+ ...
6464+ )
6565+ )
6666+```
6767+6868+Custom pool configuration:
6969+7070+```ocaml
7171+let config = Conpool.Config.make
7272+ ~max_connections_per_endpoint:20
7373+ ~max_idle_per_endpoint:5
7474+ ~connection_timeout:10.0
7575+ ~validation_interval:300.0
7676+ ()
7777+in
7878+7979+let pool = Conpool.create ~sw ~net ~clock ~config ()
8080+```
8181+8282+Monitor pool statistics:
8383+8484+```ocaml
8585+let stats = Conpool.stats pool endpoint in
8686+Printf.printf "Active: %d, Idle: %d, Hits: %d, Misses: %d\n"
8787+ (Conpool.Stats.active_connections stats)
8888+ (Conpool.Stats.idle_connections stats)
8989+ (Conpool.Stats.cache_hits stats)
9090+ (Conpool.Stats.cache_misses stats)
9191+```
9292+9393+## Installation
9494+9595+```
9696+opam install conpool
9797+```
9898+9999+## Documentation
100100+101101+API documentation is available at https://tangled.org/@anil.recoil.org/ocaml-conpool or via:
102102+103103+```
104104+opam install conpool
105105+odig doc conpool
106106+```
107107+108108+## License
109109+110110+ISC
+35
conpool.opam
···11+# This file is generated by dune, edit dune-project instead
22+opam-version: "2.0"
33+synopsis: "Protocol-agnostic TCP/IP connection pooling library for Eio"
44+description:
55+ "Conpool is a connection pooling library built on Eio.Pool that manages TCP connection lifecycles, validates connection health, and provides per-endpoint resource limiting for any TCP-based protocol (HTTP, Redis, PostgreSQL, etc.)"
66+maintainer: ["Anil Madhavapeddy <anil@recoil.org>"]
77+authors: ["Anil Madhavapeddy <anil@recoil.org>"]
88+license: "ISC"
99+homepage: "https://tangled.org/@anil.recoil.org/ocaml-conpool"
1010+bug-reports: "https://tangled.org/@anil.recoil.org/ocaml-conpool/issues"
1111+depends: [
1212+ "ocaml" {>= "5.1.0"}
1313+ "dune" {>= "3.20" & >= "3.0"}
1414+ "eio"
1515+ "tls-eio" {>= "1.0"}
1616+ "logs"
1717+ "fmt"
1818+ "cmdliner"
1919+ "odoc" {with-doc}
2020+]
2121+build: [
2222+ ["dune" "subst"] {dev}
2323+ [
2424+ "dune"
2525+ "build"
2626+ "-p"
2727+ name
2828+ "-j"
2929+ jobs
3030+ "@install"
3131+ "@runtest" {with-test}
3232+ "@doc" {with-doc}
3333+ ]
3434+]
3535+x-maintenance-intent: ["(latest)"]
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Cmdliner terms for connection pool configuration *)
77+88+open Cmdliner
99+1010+let max_connections_per_endpoint =
1111+ let doc = "Maximum concurrent connections per endpoint." in
1212+ Arg.(
1313+ value & opt int 10
1414+ & info [ "max-connections-per-endpoint" ] ~doc ~docv:"NUM")
1515+1616+let max_idle_time =
1717+ let doc = "Maximum time a connection can sit idle in seconds." in
1818+ Arg.(value & opt float 60.0 & info [ "max-idle-time" ] ~doc ~docv:"SECONDS")
1919+2020+let max_connection_lifetime =
2121+ let doc = "Maximum connection age in seconds." in
2222+ Arg.(
2323+ value & opt float 300.0
2424+ & info [ "max-connection-lifetime" ] ~doc ~docv:"SECONDS")
2525+2626+let max_connection_uses =
2727+ let doc = "Maximum times a connection can be reused (omit for unlimited)." in
2828+ Arg.(
2929+ value
3030+ & opt (some int) None
3131+ & info [ "max-connection-uses" ] ~doc ~docv:"NUM")
3232+3333+let connect_timeout =
3434+ let doc = "Connection timeout in seconds." in
3535+ Arg.(value & opt float 10.0 & info [ "connect-timeout" ] ~doc ~docv:"SECONDS")
3636+3737+let connect_retry_count =
3838+ let doc = "Number of connection retry attempts." in
3939+ Arg.(value & opt int 3 & info [ "connect-retry-count" ] ~doc ~docv:"NUM")
4040+4141+let connect_retry_delay =
4242+ let doc = "Initial retry delay in seconds (with exponential backoff)." in
4343+ Arg.(
4444+ value & opt float 0.1 & info [ "connect-retry-delay" ] ~doc ~docv:"SECONDS")
4545+4646+let config =
4747+ let make max_conn max_idle max_lifetime max_uses timeout retry_count
4848+ retry_delay =
4949+ Config.make ~max_connections_per_endpoint:max_conn ~max_idle_time:max_idle
5050+ ~max_connection_lifetime:max_lifetime ?max_connection_uses:max_uses
5151+ ~connect_timeout:timeout ~connect_retry_count:retry_count
5252+ ~connect_retry_delay:retry_delay ()
5353+ in
5454+ Term.(
5555+ const make $ max_connections_per_endpoint $ max_idle_time
5656+ $ max_connection_lifetime $ max_connection_uses $ connect_timeout
5757+ $ connect_retry_count $ connect_retry_delay)
+43
lib/cmd.mli
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Cmdliner terms for connection pool configuration *)
77+88+(** {1 Configuration Terms} *)
99+1010+val max_connections_per_endpoint : int Cmdliner.Term.t
1111+(** Cmdliner term for maximum connections per endpoint. Default: 10 Flag:
1212+ [--max-connections-per-endpoint] *)
1313+1414+val max_idle_time : float Cmdliner.Term.t
1515+(** Cmdliner term for maximum idle time in seconds. Default: 60.0 Flag:
1616+ [--max-idle-time] *)
1717+1818+val max_connection_lifetime : float Cmdliner.Term.t
1919+(** Cmdliner term for maximum connection lifetime in seconds. Default: 300.0
2020+ Flag: [--max-connection-lifetime] *)
2121+2222+val max_connection_uses : int option Cmdliner.Term.t
2323+(** Cmdliner term for maximum connection uses. Default: None (unlimited) Flag:
2424+ [--max-connection-uses] *)
2525+2626+val connect_timeout : float Cmdliner.Term.t
2727+(** Cmdliner term for connection timeout in seconds. Default: 10.0 Flag:
2828+ [--connect-timeout] *)
2929+3030+val connect_retry_count : int Cmdliner.Term.t
3131+(** Cmdliner term for number of connection retry attempts. Default: 3 Flag:
3232+ [--connect-retry-count] *)
3333+3434+val connect_retry_delay : float Cmdliner.Term.t
3535+(** Cmdliner term for initial retry delay in seconds. Default: 0.1 Flag:
3636+ [--connect-retry-delay] *)
3737+3838+(** {1 Combined Terms} *)
3939+4040+val config : Config.t Cmdliner.Term.t
4141+(** Cmdliner term that combines all configuration options into a {!Config.t}.
4242+ This term can be used in your application's main command to accept all
4343+ connection pool configuration options from the command line. *)
+164
lib/config.ml
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Configuration for connection pools *)
77+88+let src = Logs.Src.create "conpool.config" ~doc:"Connection pool configuration"
99+1010+module Log = (val Logs.src_log src : Logs.LOG)
1111+1212+type t = {
1313+ max_connections_per_endpoint : int;
1414+ max_idle_time : float;
1515+ max_connection_lifetime : float;
1616+ max_connection_uses : int option;
1717+ health_check :
1818+ ([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) option;
1919+ connect_timeout : float option;
2020+ connect_retry_count : int;
2121+ connect_retry_delay : float;
2222+ on_connection_created : (Endpoint.t -> unit) option;
2323+ on_connection_closed : (Endpoint.t -> unit) option;
2424+ on_connection_reused : (Endpoint.t -> unit) option;
2525+}
2626+2727+let make ?(max_connections_per_endpoint = 10) ?(max_idle_time = 60.0)
2828+ ?(max_connection_lifetime = 300.0) ?max_connection_uses ?health_check
2929+ ?(connect_timeout = 10.0) ?(connect_retry_count = 3)
3030+ ?(connect_retry_delay = 0.1) ?on_connection_created ?on_connection_closed
3131+ ?on_connection_reused () =
3232+ (* Validate parameters *)
3333+ if max_connections_per_endpoint <= 0 then
3434+ invalid_arg
3535+ (Printf.sprintf "max_connections_per_endpoint must be positive, got %d"
3636+ max_connections_per_endpoint);
3737+3838+ if max_idle_time <= 0.0 then
3939+ invalid_arg
4040+ (Printf.sprintf "max_idle_time must be positive, got %.2f" max_idle_time);
4141+4242+ if max_connection_lifetime <= 0.0 then
4343+ invalid_arg
4444+ (Printf.sprintf "max_connection_lifetime must be positive, got %.2f"
4545+ max_connection_lifetime);
4646+4747+ (match max_connection_uses with
4848+ | Some n when n <= 0 ->
4949+ invalid_arg
5050+ (Printf.sprintf "max_connection_uses must be positive, got %d" n)
5151+ | _ -> ());
5252+5353+ if connect_timeout <= 0.0 then
5454+ invalid_arg
5555+ (Printf.sprintf "connect_timeout must be positive, got %.2f"
5656+ connect_timeout);
5757+5858+ if connect_retry_count < 0 then
5959+ invalid_arg
6060+ (Printf.sprintf "connect_retry_count must be non-negative, got %d"
6161+ connect_retry_count);
6262+6363+ if connect_retry_delay <= 0.0 then
6464+ invalid_arg
6565+ (Printf.sprintf "connect_retry_delay must be positive, got %.2f"
6666+ connect_retry_delay);
6767+6868+ {
6969+ max_connections_per_endpoint;
7070+ max_idle_time;
7171+ max_connection_lifetime;
7272+ max_connection_uses;
7373+ health_check;
7474+ connect_timeout = Some connect_timeout;
7575+ connect_retry_count;
7676+ connect_retry_delay;
7777+ on_connection_created;
7878+ on_connection_closed;
7979+ on_connection_reused;
8080+ }
8181+8282+let default = make ()
8383+let max_connections_per_endpoint t = t.max_connections_per_endpoint
8484+let max_idle_time t = t.max_idle_time
8585+let max_connection_lifetime t = t.max_connection_lifetime
8686+let max_connection_uses t = t.max_connection_uses
8787+let health_check t = t.health_check
8888+let connect_timeout t = t.connect_timeout
8989+let connect_retry_count t = t.connect_retry_count
9090+let connect_retry_delay t = t.connect_retry_delay
9191+let on_connection_created t = t.on_connection_created
9292+let on_connection_closed t = t.on_connection_closed
9393+let on_connection_reused t = t.on_connection_reused
9494+9595+let pp ppf t =
9696+ Fmt.pf ppf
9797+ "@[<v>Config:@,\
9898+ - max_connections_per_endpoint: %d@,\
9999+ - max_idle_time: %.1fs@,\
100100+ - max_connection_lifetime: %.1fs@,\
101101+ - max_connection_uses: %s@,\
102102+ - connect_timeout: %s@,\
103103+ - connect_retry_count: %d@,\
104104+ - connect_retry_delay: %.2fs@]"
105105+ t.max_connections_per_endpoint t.max_idle_time t.max_connection_lifetime
106106+ (match t.max_connection_uses with
107107+ | Some n -> string_of_int n
108108+ | None -> "unlimited")
109109+ (match t.connect_timeout with
110110+ | Some f -> Fmt.str "%.1fs" f
111111+ | None -> "none")
112112+ t.connect_retry_count t.connect_retry_delay
113113+114114+(** {1 Protocol Handler Configuration}
115115+116116+ Protocol handlers define protocol-specific behavior for connection pools.
117117+ This enables different pooling strategies for different protocols
118118+ (e.g., exclusive for HTTP/1.x, shared for HTTP/2). *)
119119+120120+(** Access mode for connections.
121121+ - [Exclusive] - Each connection is used by one request at a time (HTTP/1.x)
122122+ - [Shared] - Multiple requests can share a connection (HTTP/2) *)
123123+type access_mode =
124124+ | Exclusive
125125+ (** Exclusive access - one request per connection at a time *)
126126+ | Shared of int
127127+ (** Shared access - up to n concurrent requests per connection *)
128128+129129+(** Connection type alias for protocol config *)
130130+type connection_flow = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t
131131+132132+(** Protocol configuration for typed connection pools.
133133+ @param 'state The protocol-specific state type (e.g., H2_client.t for HTTP/2) *)
134134+type 'state protocol_config = {
135135+ init_state :
136136+ sw:Eio.Switch.t ->
137137+ flow:connection_flow ->
138138+ tls_epoch:Tls.Core.epoch_data option ->
139139+ 'state;
140140+ (** Initialize protocol state when a new connection is created.
141141+ The [sw] parameter is a connection-lifetime switch that can be used
142142+ to spawn long-running fibers (e.g., HTTP/2 frame reader).
143143+ For HTTP/2, this performs the handshake and returns the H2_client.t. *)
144144+145145+ on_acquire : 'state -> unit;
146146+ (** Called when a connection is acquired from the pool.
147147+ For HTTP/2, this can start the background reader fiber if not already running. *)
148148+149149+ on_release : 'state -> unit;
150150+ (** Called when a connection is released back to the pool.
151151+ For HTTP/2, this is typically a no-op since the reader keeps running. *)
152152+153153+ is_healthy : 'state -> bool;
154154+ (** Protocol-specific health check. Return false if connection should be closed.
155155+ For HTTP/2, checks if GOAWAY has been received. *)
156156+157157+ on_close : 'state -> unit;
158158+ (** Cleanup callback when connection is destroyed.
159159+ For HTTP/2, can send GOAWAY frame. *)
160160+161161+ access_mode : 'state -> access_mode;
162162+ (** Get the access mode for this connection.
163163+ For HTTP/2, returns [Shared n] with max_concurrent from peer settings. *)
164164+}
+159
lib/config.mli
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Configuration for connection pools *)
77+88+(** {1 Logging} *)
99+1010+val src : Logs.Src.t
1111+(** Logs source for configuration operations. Configure logging with:
1212+ {[
1313+ Logs.Src.set_level Conpool.Config.src (Some Logs.Debug)
1414+ ]} *)
1515+1616+(** {1 Type} *)
1717+1818+type t
1919+(** Pool configuration *)
2020+2121+(** {1 Construction} *)
2222+2323+val make :
2424+ ?max_connections_per_endpoint:int ->
2525+ ?max_idle_time:float ->
2626+ ?max_connection_lifetime:float ->
2727+ ?max_connection_uses:int ->
2828+ ?health_check:([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) ->
2929+ ?connect_timeout:float ->
3030+ ?connect_retry_count:int ->
3131+ ?connect_retry_delay:float ->
3232+ ?on_connection_created:(Endpoint.t -> unit) ->
3333+ ?on_connection_closed:(Endpoint.t -> unit) ->
3434+ ?on_connection_reused:(Endpoint.t -> unit) ->
3535+ unit ->
3636+ t
3737+(** Create pool configuration with optional parameters.
3838+3939+ @param max_connections_per_endpoint
4040+ Maximum concurrent connections per endpoint (default: 10)
4141+ @param max_idle_time
4242+ Maximum time a connection can sit idle in seconds (default: 60.0)
4343+ @param max_connection_lifetime
4444+ Maximum connection age in seconds (default: 300.0)
4545+ @param max_connection_uses
4646+ Maximum times a connection can be reused (default: unlimited)
4747+ @param health_check Custom health check function (default: none)
4848+ @param connect_timeout Connection timeout in seconds (default: 10.0)
4949+ @param connect_retry_count Number of connection retry attempts (default: 3)
5050+ @param connect_retry_delay
5151+ Initial retry delay in seconds, with exponential backoff (default: 0.1)
5252+ @param on_connection_created Hook called when a connection is created
5353+ @param on_connection_closed Hook called when a connection is closed
5454+ @param on_connection_reused Hook called when a connection is reused *)
5555+5656+val default : t
5757+(** Sensible defaults for most use cases:
5858+ - max_connections_per_endpoint: 10
5959+ - max_idle_time: 60.0s
6060+ - max_connection_lifetime: 300.0s
6161+ - max_connection_uses: unlimited
6262+ - health_check: none
6363+ - connect_timeout: 10.0s
6464+ - connect_retry_count: 3
6565+ - connect_retry_delay: 0.1s
6666+ - hooks: none *)
6767+6868+(** {1 Accessors} *)
6969+7070+val max_connections_per_endpoint : t -> int
7171+(** Get maximum connections per endpoint. *)
7272+7373+val max_idle_time : t -> float
7474+(** Get maximum idle time in seconds. *)
7575+7676+val max_connection_lifetime : t -> float
7777+(** Get maximum connection lifetime in seconds. *)
7878+7979+val max_connection_uses : t -> int option
8080+(** Get maximum connection uses, if any. *)
8181+8282+val health_check :
8383+ t -> ([Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t -> bool) option
8484+(** Get custom health check function, if any. *)
8585+8686+val connect_timeout : t -> float option
8787+(** Get connection timeout in seconds, if any. *)
8888+8989+val connect_retry_count : t -> int
9090+(** Get number of connection retry attempts. *)
9191+9292+val connect_retry_delay : t -> float
9393+(** Get initial retry delay in seconds. *)
9494+9595+val on_connection_created : t -> (Endpoint.t -> unit) option
9696+(** Get connection created hook, if any. *)
9797+9898+val on_connection_closed : t -> (Endpoint.t -> unit) option
9999+(** Get connection closed hook, if any. *)
100100+101101+val on_connection_reused : t -> (Endpoint.t -> unit) option
102102+(** Get connection reused hook, if any. *)
103103+104104+(** {1 Pretty-printing} *)
105105+106106+val pp : t Fmt.t
107107+(** Pretty-printer for configuration. *)
108108+109109+(** {1 Protocol Handler Configuration}
110110+111111+ Protocol handlers define protocol-specific behavior for typed connection pools.
112112+ This enables different pooling strategies for different protocols
113113+ (e.g., exclusive for HTTP/1.x, shared for HTTP/2). *)
114114+115115+(** Access mode for connections.
116116+ - [Exclusive] - Each connection is used by one request at a time (HTTP/1.x)
117117+ - [Shared n] - Up to n concurrent requests can share a connection (HTTP/2) *)
118118+type access_mode =
119119+ | Exclusive
120120+ (** Exclusive access - one request per connection at a time *)
121121+ | Shared of int
122122+ (** Shared access - up to n concurrent requests per connection *)
123123+124124+(** Connection flow type for protocol handlers. *)
125125+type connection_flow = [Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t
126126+127127+(** Protocol configuration for typed connection pools.
128128+ @param 'state The protocol-specific state type (e.g., H2_client.t for HTTP/2) *)
129129+type 'state protocol_config = {
130130+ init_state :
131131+ sw:Eio.Switch.t ->
132132+ flow:connection_flow ->
133133+ tls_epoch:Tls.Core.epoch_data option ->
134134+ 'state;
135135+ (** Initialize protocol state when a new connection is created.
136136+ The [sw] parameter is a connection-lifetime switch that can be used
137137+ to spawn long-running fibers (e.g., HTTP/2 frame reader).
138138+ For HTTP/2, this performs the handshake and returns the H2_client.t. *)
139139+140140+ on_acquire : 'state -> unit;
141141+ (** Called when a connection is acquired from the pool.
142142+ For HTTP/2, this can start the background reader fiber if not already running. *)
143143+144144+ on_release : 'state -> unit;
145145+ (** Called when a connection is released back to the pool.
146146+ For HTTP/2, this is typically a no-op since the reader keeps running. *)
147147+148148+ is_healthy : 'state -> bool;
149149+ (** Protocol-specific health check. Return false if connection should be closed.
150150+ For HTTP/2, checks if GOAWAY has been received. *)
151151+152152+ on_close : 'state -> unit;
153153+ (** Cleanup callback when connection is destroyed.
154154+ For HTTP/2, can send GOAWAY frame. *)
155155+156156+ access_mode : 'state -> access_mode;
157157+ (** Get the access mode for this connection.
158158+ For HTTP/2, returns [Shared n] with max_concurrent from peer settings. *)
159159+}
+39
lib/connection.ml
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Internal connection representation - not exposed in public API *)
77+88+let src =
99+ Logs.Src.create "conpool.connection"
1010+ ~doc:"Connection pool internal connection management"
1111+1212+module Log = (val Logs.src_log src : Logs.LOG)
1313+1414+type t = {
1515+ flow : [Eio.Resource.close_ty | Eio.Flow.two_way_ty] Eio.Resource.t;
1616+ tls_flow : Tls_eio.t option;
1717+ created_at : float;
1818+ mutable last_used : float;
1919+ mutable use_count : int;
2020+ endpoint : Endpoint.t;
2121+ mutex : Eio.Mutex.t;
2222+}
2323+2424+let flow t = t.flow
2525+let tls_flow t = t.tls_flow
2626+let endpoint t = t.endpoint
2727+let created_at t = t.created_at
2828+let last_used t = t.last_used
2929+let use_count t = t.use_count
3030+3131+let update_usage t ~now =
3232+ Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
3333+ t.last_used <- now;
3434+ t.use_count <- t.use_count + 1)
3535+3636+let pp ppf t =
3737+ let uses = t.use_count in
3838+ Fmt.pf ppf "Connection(endpoint=%a, created_at=%.2f, uses=%d)" Endpoint.pp
3939+ t.endpoint t.created_at uses
+665
lib/conpool.ml
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Conpool - Protocol-aware TCP/IP connection pooling library for Eio *)
77+88+let src = Logs.Src.create "conpool" ~doc:"Connection pooling library"
99+1010+module Log = (val Logs.src_log src : Logs.LOG)
1111+1212+(* Re-export submodules *)
1313+module Endpoint = Endpoint
1414+module Config = Config
1515+module Stats = Stats
1616+module Cmd = Cmd
1717+1818+(* Track whether TLS tracing has been suppressed *)
1919+let tls_tracing_suppressed = ref false
2020+2121+(* Suppress TLS tracing debug output (hexdumps) unless explicitly enabled *)
2222+let suppress_tls_tracing () =
2323+ if not !tls_tracing_suppressed then begin
2424+ tls_tracing_suppressed := true;
2525+ match List.find_opt (fun s -> Logs.Src.name s = "tls.tracing") (Logs.Src.list ()) with
2626+ | Some tls_src ->
2727+ (match Logs.Src.level tls_src with
2828+ | Some Logs.Debug -> Logs.Src.set_level tls_src (Some Logs.Warning)
2929+ | _ -> ())
3030+ | None -> ()
3131+ end
3232+3333+(** {1 Error Types} *)
3434+3535+type error =
3636+ | Dns_resolution_failed of { hostname : string }
3737+ | Connection_failed of {
3838+ endpoint : Endpoint.t;
3939+ attempts : int;
4040+ last_error : string;
4141+ }
4242+ | Connection_timeout of { endpoint : Endpoint.t; timeout : float }
4343+ | Invalid_config of string
4444+ | Invalid_endpoint of string
4545+4646+let pp_error ppf = function
4747+ | Dns_resolution_failed { hostname } ->
4848+ Fmt.pf ppf "DNS resolution failed for hostname: %s" hostname
4949+ | Connection_failed { endpoint; attempts; last_error } ->
5050+ Fmt.pf ppf "Failed to connect to %a after %d attempts: %s" Endpoint.pp
5151+ endpoint attempts last_error
5252+ | Connection_timeout { endpoint; timeout } ->
5353+ Fmt.pf ppf "Connection timeout to %a after %.2fs" Endpoint.pp endpoint
5454+ timeout
5555+ | Invalid_config msg -> Fmt.pf ppf "Invalid configuration: %s" msg
5656+ | Invalid_endpoint msg -> Fmt.pf ppf "Invalid endpoint: %s" msg
5757+5858+type Eio.Exn.err += E of error
5959+6060+let err e = Eio.Exn.create (E e)
6161+6262+let () =
6363+ Eio.Exn.register_pp (fun f -> function
6464+ | E e ->
6565+ Fmt.string f "Conpool ";
6666+ pp_error f e;
6767+ true
6868+ | _ -> false)
6969+7070+(** {1 Connection Types} *)
7171+7272+type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
7373+type connection = connection_ty Eio.Resource.t
7474+7575+(** {1 Internal Types} *)
7676+7777+(** Internal connection wrapper with protocol state and tracking. *)
7878+type 'state pooled_connection = {
7979+ pc_flow : connection;
8080+ pc_tls_flow : Tls_eio.t option;
8181+ pc_state : 'state;
8282+ pc_created_at : float;
8383+ mutable pc_last_used : float;
8484+ (** Last time this connection was used (for idle timeout). *)
8585+ mutable pc_use_count : int;
8686+ (** Number of times this connection has been used. *)
8787+ pc_endpoint : Endpoint.t;
8888+ mutable pc_active_users : int;
8989+ pc_user_available : Eio.Condition.t;
9090+ mutable pc_closed : bool;
9191+ pc_connection_cancel : exn -> unit;
9292+ (** Cancels the connection-lifetime switch, stopping any protocol fibers. *)
9393+}
9494+9595+(** Statistics for an endpoint. *)
9696+type endp_stats = {
9797+ mutable active : int;
9898+ mutable idle : int;
9999+ (** Number of idle connections (active_users = 0). *)
100100+ mutable total_created : int;
101101+ mutable total_reused : int;
102102+ mutable total_closed : int;
103103+ mutable errors : int;
104104+ (** Number of connection errors encountered. *)
105105+}
106106+107107+(** Endpoint pool storing connections. *)
108108+type 'state endpoint_pool = {
109109+ connections : 'state pooled_connection list ref;
110110+ ep_mutex : Eio.Mutex.t;
111111+ stats : endp_stats;
112112+ stats_mutex : Eio.Mutex.t;
113113+}
114114+115115+(** Internal pool representation. *)
116116+type ('state, 'clock, 'net) internal = {
117117+ sw : Eio.Switch.t;
118118+ net : 'net;
119119+ clock : 'clock;
120120+ config : Config.t;
121121+ tls : Tls.Config.client option;
122122+ protocol : 'state Config.protocol_config;
123123+ endpoints : (Endpoint.t, 'state endpoint_pool) Hashtbl.t;
124124+ endpoints_mutex : Eio.Mutex.t;
125125+}
126126+127127+(** {1 Public Types} *)
128128+129129+type 'state t =
130130+ Pool : ('state, 'clock Eio.Time.clock, 'net Eio.Net.t) internal -> 'state t
131131+132132+type 'state connection_info = {
133133+ flow : connection;
134134+ tls_epoch : Tls.Core.epoch_data option;
135135+ state : 'state;
136136+}
137137+138138+(** {1 Default Protocol Handler}
139139+140140+ For simple exclusive-access protocols (HTTP/1.x, Redis, etc.),
141141+ use unit state with no special initialization. *)
142142+143143+let default_protocol : unit Config.protocol_config = {
144144+ Config.init_state = (fun ~sw:_ ~flow:_ ~tls_epoch:_ -> ());
145145+ on_acquire = (fun () -> ());
146146+ on_release = (fun () -> ());
147147+ is_healthy = (fun () -> true);
148148+ on_close = (fun () -> ());
149149+ access_mode = (fun () -> Config.Exclusive);
150150+}
151151+152152+(** {1 Helper Functions} *)
153153+154154+let get_time pool = Eio.Time.now pool.clock
155155+156156+let create_endp_stats () = {
157157+ active = 0;
158158+ idle = 0;
159159+ total_created = 0;
160160+ total_reused = 0;
161161+ total_closed = 0;
162162+ errors = 0;
163163+}
164164+165165+let snapshot_stats (stats : endp_stats) : Stats.t =
166166+ Stats.make ~active:stats.active ~idle:stats.idle
167167+ ~total_created:stats.total_created ~total_reused:stats.total_reused
168168+ ~total_closed:stats.total_closed ~errors:stats.errors
169169+170170+(** {1 Connection Creation} *)
171171+172172+let create_connection pool endpoint =
173173+ Log.debug (fun m -> m "Creating connection to %a" Endpoint.pp endpoint);
174174+175175+ (* DNS resolution *)
176176+ let addr =
177177+ try
178178+ let addrs =
179179+ Eio.Net.getaddrinfo_stream pool.net (Endpoint.host endpoint)
180180+ ~service:(string_of_int (Endpoint.port endpoint))
181181+ in
182182+ match addrs with
183183+ | addr :: _ -> addr
184184+ | [] ->
185185+ raise (err (Dns_resolution_failed { hostname = Endpoint.host endpoint }))
186186+ with Eio.Io _ as ex ->
187187+ let bt = Printexc.get_raw_backtrace () in
188188+ Eio.Exn.reraise_with_context ex bt "resolving %a" Endpoint.pp endpoint
189189+ in
190190+191191+ (* TCP connection with optional timeout *)
192192+ let socket =
193193+ try
194194+ match Config.connect_timeout pool.config with
195195+ | Some timeout ->
196196+ Eio.Time.with_timeout_exn pool.clock timeout (fun () ->
197197+ Eio.Net.connect ~sw:pool.sw pool.net addr)
198198+ | None -> Eio.Net.connect ~sw:pool.sw pool.net addr
199199+ with Eio.Io _ as ex ->
200200+ let bt = Printexc.get_raw_backtrace () in
201201+ Eio.Exn.reraise_with_context ex bt "connecting to %a" Endpoint.pp endpoint
202202+ in
203203+204204+ Log.debug (fun m -> m "TCP connection established to %a" Endpoint.pp endpoint);
205205+206206+ (* Optional TLS handshake *)
207207+ let flow, tls_flow =
208208+ match pool.tls with
209209+ | None ->
210210+ ((socket :> connection), None)
211211+ | Some tls_config ->
212212+ try
213213+ Log.debug (fun m ->
214214+ m "Initiating TLS handshake with %a" Endpoint.pp endpoint);
215215+ let host =
216216+ Domain_name.(host_exn (of_string_exn (Endpoint.host endpoint)))
217217+ in
218218+ let tls = Tls_eio.client_of_flow ~host tls_config socket in
219219+ suppress_tls_tracing ();
220220+ Log.info (fun m ->
221221+ m "TLS connection established to %a" Endpoint.pp endpoint);
222222+ ((tls :> connection), Some tls)
223223+ with Eio.Io _ as ex ->
224224+ let bt = Printexc.get_raw_backtrace () in
225225+ Eio.Exn.reraise_with_context ex bt "TLS handshake with %a" Endpoint.pp endpoint
226226+ in
227227+228228+ (* Get TLS epoch if available *)
229229+ let tls_epoch =
230230+ match tls_flow with
231231+ | Some tls_flow -> (
232232+ match Tls_eio.epoch tls_flow with
233233+ | Ok epoch -> Some epoch
234234+ | Error () -> None)
235235+ | None -> None
236236+ in
237237+238238+ (* Create connection-lifetime sub-switch via a fiber.
239239+ This switch lives for the connection's lifetime and can be used
240240+ by the protocol handler to spawn long-running fibers (e.g., HTTP/2 reader). *)
241241+ let conn_sw_ref = ref None in
242242+ let conn_cancel_ref = ref (fun (_ : exn) -> ()) in
243243+ let ready_promise, ready_resolver = Eio.Promise.create () in
244244+245245+ Eio.Fiber.fork ~sw:pool.sw (fun () ->
246246+ Eio.Switch.run (fun conn_sw ->
247247+ conn_sw_ref := Some conn_sw;
248248+ conn_cancel_ref := (fun exn -> Eio.Switch.fail conn_sw exn);
249249+ (* Signal that the switch is ready *)
250250+ Eio.Promise.resolve ready_resolver ();
251251+ (* Block until the switch is cancelled *)
252252+ let wait_forever, _never_resolved = Eio.Promise.create () in
253253+ Eio.Promise.await wait_forever
254254+ )
255255+ );
256256+257257+ (* Wait for the switch to be created *)
258258+ Eio.Promise.await ready_promise;
259259+ let conn_sw = Option.get !conn_sw_ref in
260260+ let conn_cancel = !conn_cancel_ref in
261261+262262+ (* Initialize protocol-specific state with connection switch *)
263263+ Log.debug (fun m -> m "Initializing protocol state for %a" Endpoint.pp endpoint);
264264+ let state = pool.protocol.init_state ~sw:conn_sw ~flow ~tls_epoch in
265265+266266+ let now = get_time pool in
267267+268268+ Log.info (fun m -> m "Created connection to %a" Endpoint.pp endpoint);
269269+270270+ {
271271+ pc_flow = flow;
272272+ pc_tls_flow = tls_flow;
273273+ pc_state = state;
274274+ pc_created_at = now;
275275+ pc_last_used = now;
276276+ pc_use_count = 0;
277277+ pc_endpoint = endpoint;
278278+ pc_active_users = 0;
279279+ pc_user_available = Eio.Condition.create ();
280280+ pc_closed = false;
281281+ pc_connection_cancel = conn_cancel;
282282+ }
283283+284284+(** {1 Connection Health Checking} *)
285285+286286+(** Health check result distinguishing errors from normal lifecycle. *)
287287+type health_status =
288288+ | Healthy
289289+ | Unhealthy_error of string
290290+ (** Connection failed due to an error (protocol failure, etc.) *)
291291+ | Unhealthy_lifecycle of string
292292+ (** Connection should close due to normal lifecycle (timeout, max uses, etc.) *)
293293+294294+let check_health pool conn =
295295+ if conn.pc_closed then
296296+ Unhealthy_lifecycle "already closed"
297297+ else
298298+ (* Check protocol-specific health *)
299299+ let protocol_healthy = pool.protocol.is_healthy conn.pc_state in
300300+ if not protocol_healthy then begin
301301+ Log.debug (fun m -> m "Connection unhealthy: protocol check failed");
302302+ Unhealthy_error "protocol check failed"
303303+ end else
304304+ let now = get_time pool in
305305+ (* Check connection age *)
306306+ let age = now -. conn.pc_created_at in
307307+ let max_lifetime = Config.max_connection_lifetime pool.config in
308308+ if age > max_lifetime then begin
309309+ Log.debug (fun m -> m "Connection unhealthy: exceeded max lifetime (%.1fs > %.1fs)"
310310+ age max_lifetime);
311311+ Unhealthy_lifecycle "exceeded max lifetime"
312312+ end else
313313+ (* Check idle time - only for idle connections *)
314314+ let idle_time = now -. conn.pc_last_used in
315315+ let max_idle = Config.max_idle_time pool.config in
316316+ if conn.pc_active_users = 0 && idle_time > max_idle then begin
317317+ Log.debug (fun m -> m "Connection unhealthy: exceeded max idle time (%.1fs > %.1fs)"
318318+ idle_time max_idle);
319319+ Unhealthy_lifecycle "exceeded max idle time"
320320+ end else
321321+ (* Check use count *)
322322+ match Config.max_connection_uses pool.config with
323323+ | Some max_uses when conn.pc_use_count >= max_uses ->
324324+ Log.debug (fun m -> m "Connection unhealthy: exceeded max uses (%d >= %d)"
325325+ conn.pc_use_count max_uses);
326326+ Unhealthy_lifecycle "exceeded max uses"
327327+ | _ ->
328328+ Healthy
329329+330330+let is_healthy pool conn =
331331+ match check_health pool conn with
332332+ | Healthy -> true
333333+ | Unhealthy_error _ | Unhealthy_lifecycle _ -> false
334334+335335+(** {1 Connection Cleanup} *)
336336+337337+let close_connection pool conn =
338338+ if not conn.pc_closed then begin
339339+ conn.pc_closed <- true;
340340+ Log.debug (fun m ->
341341+ m "Closing connection to %a" Endpoint.pp conn.pc_endpoint);
342342+343343+ (* Cancel connection-lifetime switch first - this stops any protocol fibers *)
344344+ (try conn.pc_connection_cancel (Failure "Connection closed")
345345+ with _ -> ());
346346+347347+ (* Call protocol cleanup *)
348348+ pool.protocol.on_close conn.pc_state;
349349+350350+ (* Close the underlying flow *)
351351+ Eio.Cancel.protect (fun () ->
352352+ try Eio.Flow.close conn.pc_flow with _ -> ())
353353+ end
354354+355355+(** {1 Endpoint Pool Management} *)
356356+357357+let get_or_create_endpoint_pool pool endpoint =
358358+ match
359359+ Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
360360+ Hashtbl.find_opt pool.endpoints endpoint)
361361+ with
362362+ | Some ep_pool -> ep_pool
363363+ | None ->
364364+ Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
365365+ match Hashtbl.find_opt pool.endpoints endpoint with
366366+ | Some ep_pool -> ep_pool
367367+ | None ->
368368+ Log.info (fun m ->
369369+ m "Creating endpoint pool for %a" Endpoint.pp endpoint);
370370+ let ep_pool = {
371371+ connections = ref [];
372372+ ep_mutex = Eio.Mutex.create ();
373373+ stats = create_endp_stats ();
374374+ stats_mutex = Eio.Mutex.create ();
375375+ } in
376376+ Hashtbl.add pool.endpoints endpoint ep_pool;
377377+ ep_pool)
378378+379379+(** {1 Connection Acquisition} *)
380380+381381+let rec acquire_connection pool ep_pool endpoint =
382382+ Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
383383+ (* Find an existing healthy connection with available capacity *)
384384+ let rec find_available = function
385385+ | [] -> None
386386+ | conn :: rest ->
387387+ if not (is_healthy pool conn) then begin
388388+ conn.pc_closed <- true;
389389+ find_available rest
390390+ end else begin
391391+ match pool.protocol.access_mode conn.pc_state with
392392+ | Config.Exclusive ->
393393+ if conn.pc_active_users = 0 then
394394+ Some conn
395395+ else
396396+ find_available rest
397397+ | Config.Shared max_concurrent ->
398398+ if conn.pc_active_users < max_concurrent then
399399+ Some conn
400400+ else
401401+ find_available rest
402402+ end
403403+ in
404404+405405+ (* Clean up closed connections *)
406406+ ep_pool.connections := List.filter (fun c -> not c.pc_closed) !(ep_pool.connections);
407407+408408+ match find_available !(ep_pool.connections) with
409409+ | Some conn ->
410410+ (* Reuse existing connection *)
411411+ let was_idle = conn.pc_active_users = 0 in
412412+ conn.pc_active_users <- conn.pc_active_users + 1;
413413+ conn.pc_last_used <- get_time pool;
414414+ conn.pc_use_count <- conn.pc_use_count + 1;
415415+416416+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
417417+ ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
418418+ ep_pool.stats.active <- ep_pool.stats.active + 1;
419419+ (* Decrement idle count when connection becomes active *)
420420+ if was_idle then
421421+ ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
422422+423423+ Log.debug (fun m ->
424424+ m "Reusing connection to %a (users=%d)"
425425+ Endpoint.pp endpoint conn.pc_active_users);
426426+427427+ (* Notify protocol handler of acquisition *)
428428+ pool.protocol.on_acquire conn.pc_state;
429429+ conn
430430+431431+ | None ->
432432+ (* Need to create a new connection *)
433433+ let max_conns = Config.max_connections_per_endpoint pool.config in
434434+ let current_conns = List.length !(ep_pool.connections) in
435435+436436+ if current_conns >= max_conns then begin
437437+ (* Wait for a connection to become available *)
438438+ Log.debug (fun m ->
439439+ m "At connection limit for %a (%d), waiting..."
440440+ Endpoint.pp endpoint max_conns);
441441+442442+ (* Find a connection to wait on (prefer shared mode) *)
443443+ let wait_conn = List.find_opt (fun c ->
444444+ match pool.protocol.access_mode c.pc_state with
445445+ | Config.Shared _ -> true
446446+ | Config.Exclusive -> false
447447+ ) !(ep_pool.connections) in
448448+449449+ match wait_conn with
450450+ | Some conn ->
451451+ (* Wait for user slot *)
452452+ while conn.pc_active_users >=
453453+ (match pool.protocol.access_mode conn.pc_state with
454454+ | Config.Shared n -> n
455455+ | Config.Exclusive -> 1)
456456+ && not conn.pc_closed do
457457+ Eio.Condition.await_no_mutex conn.pc_user_available
458458+ done;
459459+ if conn.pc_closed then
460460+ acquire_connection pool ep_pool endpoint
461461+ else begin
462462+ conn.pc_active_users <- conn.pc_active_users + 1;
463463+ conn.pc_last_used <- get_time pool;
464464+ conn.pc_use_count <- conn.pc_use_count + 1;
465465+466466+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
467467+ ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
468468+ ep_pool.stats.active <- ep_pool.stats.active + 1);
469469+470470+ (* Notify protocol handler of acquisition *)
471471+ pool.protocol.on_acquire conn.pc_state;
472472+ conn
473473+ end
474474+ | None ->
475475+ (* All connections are exclusive and in use - wait for any *)
476476+ let any_conn = List.hd !(ep_pool.connections) in
477477+ while any_conn.pc_active_users > 0 && not any_conn.pc_closed do
478478+ Eio.Condition.await_no_mutex any_conn.pc_user_available
479479+ done;
480480+ if any_conn.pc_closed then
481481+ acquire_connection pool ep_pool endpoint
482482+ else begin
483483+ (* Connection was idle (active_users = 0), now becoming active *)
484484+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
485485+ ep_pool.stats.total_reused <- ep_pool.stats.total_reused + 1;
486486+ ep_pool.stats.active <- ep_pool.stats.active + 1;
487487+ ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
488488+ any_conn.pc_active_users <- 1;
489489+ any_conn.pc_last_used <- get_time pool;
490490+ any_conn.pc_use_count <- any_conn.pc_use_count + 1;
491491+ (* Notify protocol handler of acquisition *)
492492+ pool.protocol.on_acquire any_conn.pc_state;
493493+ any_conn
494494+ end
495495+ end else begin
496496+ (* Create new connection *)
497497+ let conn = create_connection pool endpoint in
498498+ conn.pc_active_users <- 1;
499499+ ep_pool.connections := conn :: !(ep_pool.connections);
500500+501501+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
502502+ ep_pool.stats.total_created <- ep_pool.stats.total_created + 1;
503503+ ep_pool.stats.active <- ep_pool.stats.active + 1);
504504+505505+ Log.info (fun m ->
506506+ m "Created new connection to %a (total=%d)"
507507+ Endpoint.pp endpoint (List.length !(ep_pool.connections)));
508508+509509+ (* Notify protocol handler of acquisition *)
510510+ pool.protocol.on_acquire conn.pc_state;
511511+ conn
512512+ end)
513513+514514+(** {1 Connection Release} *)
515515+516516+let release_connection pool ep_pool conn =
517517+ (* Notify protocol handler of release *)
518518+ pool.protocol.on_release conn.pc_state;
519519+520520+ Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
521521+ let was_active = conn.pc_active_users > 0 in
522522+ conn.pc_active_users <- max 0 (conn.pc_active_users - 1);
523523+ let now_idle = conn.pc_active_users = 0 in
524524+525525+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
526526+ ep_pool.stats.active <- max 0 (ep_pool.stats.active - 1);
527527+ (* Track idle count: increment when connection becomes idle *)
528528+ if was_active && now_idle then
529529+ ep_pool.stats.idle <- ep_pool.stats.idle + 1);
530530+531531+ (* Signal waiting fibers *)
532532+ Eio.Condition.broadcast conn.pc_user_available;
533533+534534+ Log.debug (fun m ->
535535+ m "Released connection to %a (users=%d)"
536536+ Endpoint.pp conn.pc_endpoint conn.pc_active_users);
537537+538538+ (* Check if connection should be closed *)
539539+ match check_health pool conn with
540540+ | Healthy -> ()
541541+ | Unhealthy_error reason ->
542542+ conn.pc_closed <- true;
543543+544544+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
545545+ ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1;
546546+ ep_pool.stats.errors <- ep_pool.stats.errors + 1;
547547+ if now_idle then
548548+ ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
549549+550550+ Log.warn (fun m -> m "Closing connection due to error: %s" reason);
551551+ close_connection pool conn;
552552+ ep_pool.connections := List.filter (fun c -> c != conn) !(ep_pool.connections)
553553+554554+ | Unhealthy_lifecycle reason ->
555555+ conn.pc_closed <- true;
556556+557557+ Eio.Mutex.use_rw ~protect:true ep_pool.stats_mutex (fun () ->
558558+ ep_pool.stats.total_closed <- ep_pool.stats.total_closed + 1;
559559+ if now_idle then
560560+ ep_pool.stats.idle <- max 0 (ep_pool.stats.idle - 1));
561561+562562+ Log.debug (fun m -> m "Closing connection due to lifecycle: %s" reason);
563563+ close_connection pool conn;
564564+ ep_pool.connections := List.filter (fun c -> c != conn) !(ep_pool.connections))
565565+566566+(** {1 Public API} *)
567567+568568+let create ~sw ~(net : 'net Eio.Net.t) ~(clock : 'clock Eio.Time.clock)
569569+ ?tls ?(config = Config.default) ?protocol () =
570570+ let protocol = match protocol with
571571+ | Some p -> p
572572+ | None -> Obj.magic default_protocol (* Safe: unit is compatible with any 'state *)
573573+ in
574574+575575+ Log.info (fun m ->
576576+ m "Creating connection pool (max_per_endpoint=%d)"
577577+ (Config.max_connections_per_endpoint config));
578578+579579+ let pool = {
580580+ sw;
581581+ net;
582582+ clock;
583583+ config;
584584+ tls;
585585+ protocol;
586586+ endpoints = Hashtbl.create 16;
587587+ endpoints_mutex = Eio.Mutex.create ();
588588+ } in
589589+590590+ (* Auto-cleanup on switch release *)
591591+ Eio.Switch.on_release sw (fun () ->
592592+ Eio.Cancel.protect (fun () ->
593593+ Log.info (fun m -> m "Closing connection pool");
594594+ Hashtbl.iter (fun _endpoint ep_pool ->
595595+ List.iter (fun conn ->
596596+ close_connection pool conn
597597+ ) !(ep_pool.connections)
598598+ ) pool.endpoints;
599599+ Hashtbl.clear pool.endpoints));
600600+601601+ Pool pool
602602+603603+let connection ~sw (Pool pool) endpoint =
604604+ Log.debug (fun m -> m "Acquiring connection to %a" Endpoint.pp endpoint);
605605+606606+ let ep_pool = get_or_create_endpoint_pool pool endpoint in
607607+ let conn = acquire_connection pool ep_pool endpoint in
608608+609609+ (* Release connection when switch ends *)
610610+ Eio.Switch.on_release sw (fun () ->
611611+ release_connection pool ep_pool conn);
612612+613613+ (* Get TLS epoch if available *)
614614+ let tls_epoch =
615615+ match conn.pc_tls_flow with
616616+ | Some tls_flow -> (
617617+ match Tls_eio.epoch tls_flow with
618618+ | Ok epoch -> Some epoch
619619+ | Error () -> None)
620620+ | None -> None
621621+ in
622622+623623+ {
624624+ flow = conn.pc_flow;
625625+ tls_epoch;
626626+ state = conn.pc_state;
627627+ }
628628+629629+let with_connection pool endpoint f =
630630+ Eio.Switch.run (fun sw -> f (connection ~sw pool endpoint))
631631+632632+let stats (Pool pool) endpoint =
633633+ match Hashtbl.find_opt pool.endpoints endpoint with
634634+ | Some ep_pool ->
635635+ Eio.Mutex.use_ro ep_pool.stats_mutex (fun () -> snapshot_stats ep_pool.stats)
636636+ | None ->
637637+ Stats.make ~active:0 ~idle:0 ~total_created:0 ~total_reused:0
638638+ ~total_closed:0 ~errors:0
639639+640640+let all_stats (Pool pool) =
641641+ Eio.Mutex.use_ro pool.endpoints_mutex (fun () ->
642642+ Hashtbl.fold
643643+ (fun endpoint ep_pool acc ->
644644+ let stats =
645645+ Eio.Mutex.use_ro ep_pool.stats_mutex (fun () ->
646646+ snapshot_stats ep_pool.stats)
647647+ in
648648+ (endpoint, stats) :: acc)
649649+ pool.endpoints [])
650650+651651+let clear_endpoint (Pool pool) endpoint =
652652+ Log.info (fun m -> m "Clearing endpoint %a from pool" Endpoint.pp endpoint);
653653+ match Hashtbl.find_opt pool.endpoints endpoint with
654654+ | Some ep_pool ->
655655+ Eio.Cancel.protect (fun () ->
656656+ Eio.Mutex.use_rw ~protect:true ep_pool.ep_mutex (fun () ->
657657+ List.iter (fun conn ->
658658+ close_connection pool conn
659659+ ) !(ep_pool.connections);
660660+ ep_pool.connections := []);
661661+ Eio.Mutex.use_rw ~protect:true pool.endpoints_mutex (fun () ->
662662+ Hashtbl.remove pool.endpoints endpoint))
663663+ | None ->
664664+ Log.debug (fun m ->
665665+ m "No endpoint pool found for %a" Endpoint.pp endpoint)
+178
lib/conpool.mli
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Conpool - Protocol-aware TCP/IP connection pooling library for Eio
77+88+ Conpool provides efficient connection pooling with support for both
99+ exclusive (HTTP/1.x) and shared (HTTP/2) access modes. All connections
1010+ carry protocol-specific state managed through callbacks.
1111+1212+ {2 Quick Start}
1313+1414+ For simple exclusive-access protocols (HTTP/1.x, Redis, etc.):
1515+ {[
1616+ let pool = Conpool.create ~sw ~net ~clock ~tls () in
1717+ Eio.Switch.run (fun conn_sw ->
1818+ let conn = Conpool.connection ~sw:conn_sw pool endpoint in
1919+ (* Use conn.flow for I/O *)
2020+ Eio.Flow.copy_string "GET / HTTP/1.1\r\n\r\n" conn.flow)
2121+ ]}
2222+2323+ For multiplexed protocols (HTTP/2):
2424+ {[
2525+ let pool = Conpool.create ~sw ~net ~clock ~tls ~protocol:h2_handler () in
2626+ Eio.Switch.run (fun conn_sw ->
2727+ let conn = Conpool.connection ~sw:conn_sw pool endpoint in
2828+ (* conn.state has H2_client.t, multiple streams share the connection *)
2929+ H2_client.request conn.flow conn.state ...)
3030+ ]} *)
3131+3232+(** {1 Logging} *)
3333+3434+val src : Logs.Src.t
3535+(** Logs source for the connection pool. Configure logging with:
3636+ {[
3737+ Logs.Src.set_level Conpool.src (Some Logs.Debug);
3838+ Logs.set_reporter (Logs_fmt.reporter ())
3939+ ]} *)
4040+4141+(** {1 Core Types} *)
4242+4343+module Endpoint = Endpoint
4444+(** Network endpoint representation *)
4545+4646+module Config = Config
4747+(** Configuration for connection pools *)
4848+4949+module Stats = Stats
5050+(** Statistics for connection pool endpoints *)
5151+5252+module Cmd = Cmd
5353+(** Cmdliner terms for connection pool configuration *)
5454+5555+(** {1 Errors} *)
5656+5757+type error =
5858+ | Dns_resolution_failed of { hostname : string }
5959+ | Connection_failed of {
6060+ endpoint : Endpoint.t;
6161+ attempts : int;
6262+ last_error : string;
6363+ }
6464+ | Connection_timeout of { endpoint : Endpoint.t; timeout : float }
6565+ | Invalid_config of string
6666+ | Invalid_endpoint of string
6767+6868+type Eio.Exn.err += E of error
6969+7070+val err : error -> exn
7171+(** [err e] creates an Eio exception from a connection pool error. *)
7272+7373+val pp_error : error Fmt.t
7474+(** Pretty-printer for error values. *)
7575+7676+(** {1 Connection Types} *)
7777+7878+type connection_ty = [Eio.Resource.close_ty | Eio.Flow.two_way_ty]
7979+(** Type tags for a pooled connection. *)
8080+8181+type connection = connection_ty Eio.Resource.t
8282+(** A connection resource from the pool. *)
8383+8484+(** {1 Connection Pool}
8585+8686+ All pools are typed - they carry protocol-specific state with each
8787+ connection. For simple exclusive-access protocols, use the default
8888+ [unit] state which requires no protocol handler. *)
8989+9090+type 'state t
9191+(** Connection pool with protocol-specific state ['state].
9292+9393+ - For HTTP/1.x: use [unit t] with exclusive access (one request per connection)
9494+ - For HTTP/2: use [h2_state t] with shared access (multiple streams per connection) *)
9595+9696+(** Connection with protocol-specific state. *)
9797+type 'state connection_info = {
9898+ flow : connection;
9999+ (** The underlying connection flow for I/O. *)
100100+ tls_epoch : Tls.Core.epoch_data option;
101101+ (** TLS epoch data if connection uses TLS. *)
102102+ state : 'state;
103103+ (** Protocol-specific state (e.g., H2_client.t for HTTP/2). *)
104104+}
105105+106106+(** {2 Pool Creation} *)
107107+108108+val create :
109109+ sw:Eio.Switch.t ->
110110+ net:'net Eio.Net.t ->
111111+ clock:'clock Eio.Time.clock ->
112112+ ?tls:Tls.Config.client ->
113113+ ?config:Config.t ->
114114+ ?protocol:'state Config.protocol_config ->
115115+ unit ->
116116+ 'state t
117117+(** Create a connection pool.
118118+119119+ @param sw Switch for resource management
120120+ @param net Network interface for creating connections
121121+ @param clock Clock for timeouts
122122+ @param tls Optional TLS client configuration
123123+ @param config Pool configuration (uses {!Config.default} if not provided)
124124+ @param protocol Protocol handler for state management. If not provided,
125125+ creates a [unit t] pool with exclusive access mode (one user per connection).
126126+127127+ Examples:
128128+129129+ Simple pool for HTTP/1.x (exclusive access, no state):
130130+ {[
131131+ let pool = Conpool.create ~sw ~net ~clock ~tls ()
132132+ ]}
133133+134134+ HTTP/2 pool (shared access with H2 state):
135135+ {[
136136+ let pool = Conpool.create ~sw ~net ~clock ~tls ~protocol:h2_handler ()
137137+ ]} *)
138138+139139+(** {2 Connection Acquisition} *)
140140+141141+val connection : sw:Eio.Switch.t -> 'state t -> Endpoint.t -> 'state connection_info
142142+(** [connection ~sw pool endpoint] acquires a connection from the pool.
143143+144144+ The connection is automatically released when [sw] finishes:
145145+ - Exclusive mode: connection returns to idle pool
146146+ - Shared mode: user count is decremented
147147+148148+ Behavior depends on access mode:
149149+ - Exclusive: blocks until a connection is available
150150+ - Shared: may share an existing connection if under max_concurrent limit
151151+152152+ Example:
153153+ {[
154154+ Eio.Switch.run (fun sw ->
155155+ let conn = Conpool.connection ~sw pool endpoint in
156156+ (* For HTTP/1.x: conn.state is () *)
157157+ (* For HTTP/2: conn.state is H2_client.t *)
158158+ Eio.Flow.copy_string data conn.flow)
159159+ ]} *)
160160+161161+val with_connection : 'state t -> Endpoint.t -> ('state connection_info -> 'a) -> 'a
162162+(** [with_connection pool endpoint fn] is a convenience wrapper.
163163+164164+ Equivalent to:
165165+ {[
166166+ Eio.Switch.run (fun sw -> fn (connection ~sw pool endpoint))
167167+ ]} *)
168168+169169+(** {1 Statistics & Management} *)
170170+171171+val stats : 'state t -> Endpoint.t -> Stats.t
172172+(** Get statistics for specific endpoint. *)
173173+174174+val all_stats : 'state t -> (Endpoint.t * Stats.t) list
175175+(** Get statistics for all endpoints in pool. *)
176176+177177+val clear_endpoint : 'state t -> Endpoint.t -> unit
178178+(** Clear all connections for an endpoint. *)
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Network endpoint representation *)
77+88+let src =
99+ Logs.Src.create "conpool.endpoint" ~doc:"Connection pool endpoint operations"
1010+1111+module Log = (val Logs.src_log src : Logs.LOG)
1212+1313+type t = { host : string; port : int }
1414+1515+let make ~host ~port =
1616+ (* Validate port range *)
1717+ if port < 1 || port > 65535 then
1818+ invalid_arg
1919+ (Printf.sprintf "Invalid port number: %d (must be 1-65535)" port);
2020+2121+ (* Validate hostname is not empty *)
2222+ if String.trim host = "" then invalid_arg "Hostname cannot be empty";
2323+2424+ { host; port }
2525+2626+let host t = t.host
2727+let port t = t.port
2828+let equal t1 t2 = String.equal t1.host t2.host && t1.port = t2.port
2929+let hash t = Hashtbl.hash (t.host, t.port)
3030+let pp = Fmt.of_to_string (fun t -> Printf.sprintf "%s:%d" t.host t.port)
+45
lib/endpoint.mli
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Network endpoint representation *)
77+88+(** {1 Logging} *)
99+1010+val src : Logs.Src.t
1111+(** Logs source for endpoint operations. Configure logging with:
1212+ {[
1313+ Logs.Src.set_level Conpool.Endpoint.src (Some Logs.Debug)
1414+ ]} *)
1515+1616+(** {1 Type} *)
1717+1818+type t
1919+(** Network endpoint identified by host and port *)
2020+2121+(** {1 Construction} *)
2222+2323+val make : host:string -> port:int -> t
2424+(** Create an endpoint from a hostname and port. *)
2525+2626+(** {1 Accessors} *)
2727+2828+val host : t -> string
2929+(** Get the hostname from an endpoint. *)
3030+3131+val port : t -> int
3232+(** Get the port number from an endpoint. *)
3333+3434+(** {1 Comparison and Hashing} *)
3535+3636+val equal : t -> t -> bool
3737+(** Compare two endpoints for equality. *)
3838+3939+val hash : t -> int
4040+(** Hash an endpoint for use in hash tables. *)
4141+4242+(** {1 Pretty-printing} *)
4343+4444+val pp : t Fmt.t
4545+(** Pretty-printer for endpoints. Formats as "host:port". *)
+36
lib/stats.ml
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Statistics for connection pool endpoints *)
77+88+type t = {
99+ active : int;
1010+ idle : int;
1111+ total_created : int;
1212+ total_reused : int;
1313+ total_closed : int;
1414+ errors : int;
1515+}
1616+1717+let make ~active ~idle ~total_created ~total_reused ~total_closed ~errors =
1818+ { active; idle; total_created; total_reused; total_closed; errors }
1919+2020+let active t = t.active
2121+let idle t = t.idle
2222+let total_created t = t.total_created
2323+let total_reused t = t.total_reused
2424+let total_closed t = t.total_closed
2525+let errors t = t.errors
2626+2727+let pp ppf t =
2828+ Fmt.pf ppf
2929+ "@[<v>Stats:@,\
3030+ - Active: %d@,\
3131+ - Idle: %d@,\
3232+ - Created: %d@,\
3333+ - Reused: %d@,\
3434+ - Closed: %d@,\
3535+ - Errors: %d@]"
3636+ t.active t.idle t.total_created t.total_reused t.total_closed t.errors
+48
lib/stats.mli
···11+(*---------------------------------------------------------------------------
22+ Copyright (c) 2025 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
33+ SPDX-License-Identifier: ISC
44+ ---------------------------------------------------------------------------*)
55+66+(** Statistics for connection pool endpoints *)
77+88+(** {1 Type} *)
99+1010+type t
1111+(** Statistics snapshot for a specific endpoint *)
1212+1313+(** {1 Construction} *)
1414+1515+val make :
1616+ active:int ->
1717+ idle:int ->
1818+ total_created:int ->
1919+ total_reused:int ->
2020+ total_closed:int ->
2121+ errors:int ->
2222+ t
2323+(** Create a statistics snapshot. *)
2424+2525+(** {1 Accessors} *)
2626+2727+val active : t -> int
2828+(** Number of connections currently in use. *)
2929+3030+val idle : t -> int
3131+(** Number of connections in pool waiting to be reused. *)
3232+3333+val total_created : t -> int
3434+(** Total connections created over the endpoint's lifetime. *)
3535+3636+val total_reused : t -> int
3737+(** Total number of times connections were reused from the pool. *)
3838+3939+val total_closed : t -> int
4040+(** Total connections that have been closed. *)
4141+4242+val errors : t -> int
4343+(** Total connection errors encountered. *)
4444+4545+(** {1 Pretty-printing} *)
4646+4747+val pp : t Fmt.t
4848+(** Pretty-printer for statistics. *)