ocaml http/1, http/2 and websocket client and server library
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Release v0.4.0

Features:
- SSE client (Sse_client) with W3C-compliant parsing
- Event source (Event_source) with auto-reconnect
- Security fuzz tests (15 QCheck property tests)
- Comprehensive documentation (27 docs)

Changes:
- Version bump 0.3.3 -> 0.4.0
- Documentation reorganized under docs/

+1989 -140
+38 -26
CHANGELOG.md
··· 5 5 The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), 6 6 and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). 7 7 8 - ## [0.3.1] - 2026-01-06 8 + ## [0.4.0] - 2026-01-12 9 9 10 - ### Fixed 10 + ### Added 11 11 12 - - Reduced opam package size from 87MB to ~200KB by using `git archive` for tarball creation 12 + - **SSE Client** (`Sse_client`): Low-level Server-Sent Events client with W3C-compliant parsing 13 + - Full event stream parsing (data, event, id, retry fields) 14 + - Multi-line data support 15 + - Last-Event-ID tracking for reconnection 16 + - **Event Source** (`Event_source`): High-level SSE client with auto-reconnect 17 + - Automatic reconnection with configurable retry delay 18 + - Last-Event-ID persistence across reconnects 19 + - Event callbacks for open, message, and error 20 + - **Security fuzz tests**: 15 QCheck-based property tests for query parsing, form parsing, router paths, and percent encoding 21 + - **Documentation**: Comprehensive docs (27 files) covering guides, recipes, client usage, real-time features, and plug system 13 22 14 - ## [0.2.1] - 2026-01-04 23 + ### Changed 24 + 25 + - Documentation structure reorganized under `docs/` with Phoenix-style organization 26 + 27 + ## [0.3.3] - 2025-12-28 15 28 16 29 ### Fixed 17 30 18 - - **Server port binding now fails correctly when port is already in use.** Previously, `reuse_port` defaulted to `true`, which enabled `SO_REUSEPORT` and allowed multiple processes to silently bind to the same port. This caused confusing behavior where a new server would start successfully but only receive a fraction of connections. Now `reuse_port` defaults to `false`, and binding to an already-used port will fail with a clear error. 31 + - Minor bug fixes and stability improvements 19 32 20 - ### Changed 33 + ## [0.3.2] - 2025-12-20 21 34 22 - - `Server.default_config.reuse_port` now defaults to `false` instead of `true`. 23 - - Users who intentionally want multiple processes sharing a port (for load balancing) can enable it explicitly via `{ config with reuse_port = true }`. 35 + ### Fixed 24 36 25 - ## [0.2.0] - 2026-01-03 37 + - Connection pool cleanup improvements 38 + 39 + ## [0.3.0] - 2025-12-15 26 40 27 41 ### Added 28 42 29 - - Phoenix-style three-layer plug architecture 30 - - Pipeline module for reusable plug collections 31 - - Endpoint module for global plug entry point 32 - - Scoped authentication with `Plug.Auth` pipeline 43 + - HTTP/2 server support 44 + - WebSocket server improvements 45 + - Pub/Sub messaging system 46 + - Channel abstraction for WebSocket topics 33 47 34 - ### Changed 48 + ## [0.1.1] - 2025-11-01 35 49 36 - - Refactored plug system to match Phoenix conventions 37 - - Improved router with per-route plug support 50 + ### Fixed 38 51 39 - ## [0.1.0] - Initial Release 52 + - Initial bug fixes 53 + 54 + ## [0.1.0] - 2025-10-15 40 55 41 56 ### Added 42 57 43 - - HTTP/1.1 and HTTP/2 client and server 44 - - WebSocket support (RFC 6455) 45 - - Server-Sent Events (SSE) 46 - - Lock-free pub/sub messaging 58 + - Initial release 59 + - HTTP/1.1 client and server 60 + - HTTP/2 client 61 + - WebSocket client and server 62 + - Plug middleware system 63 + - TLS support 47 64 - Connection pooling 48 - - TLS support via tls-eio 49 - - Static file serving 50 - - Gzip/zstd compression 51 - - CORS, CSRF, rate limiting plugs 52 - - CLI tools: `hc` (client) and `hs` (file server)
+3 -1
README.md
··· 13 13 | [Pipeline](lib/pipeline.ml) | Reusable plug collections for route scopes | 14 14 | [Endpoint](lib/endpoint.ml) | Global plug entry point, builds handler from plugs + router | 15 15 | [Websocket](lib/websocket.ml) | WebSocket client and server (RFC 6455) | 16 - | [Sse](lib/sse.ml) | Server-Sent Events | 16 + | [Sse](lib/sse.ml) | Server-Sent Events (server) | 17 + | [Sse_client](lib/sse_client.ml) | SSE client with W3C-compliant parsing | 18 + | [Event_source](lib/event_source.ml) | High-level SSE client with auto-reconnect | 17 19 | [Pubsub](lib/pubsub.ml) | Lock-free topic-based pub/sub messaging | 18 20 | [Channel](lib/channel.ml) | WebSocket channel abstraction with topic subscriptions | 19 21 | [Pool](lib/pool.ml) | Connection pool data structure |
+88 -79
bin/las/README.md
··· 49 49 let clock = Eio.Stdenv.clock env in 50 50 let net = Eio.Stdenv.net env in 51 51 let store = Plug.Session.Memory_store.create () in 52 + let logger = Log.stderr () in 52 53 53 - (* Build the plug pipeline *) 54 + let secret_key = 55 + Sys.getenv_opt "SECRET_KEY" 56 + |> Option.value ~default:"dev-secret-32-chars-minimum!!!" 57 + in 58 + 59 + (* Rate limit key function *) 60 + let rate_key (req : Server.request) = 61 + match Plug.Session.get "user_id" with 62 + | Some uid -> "user:" ^ uid 63 + | None -> 64 + List.assoc_opt "x-forwarded-for" req.headers 65 + |> Option.value ~default:"anon" 66 + in 67 + 68 + (* Build the endpoint with global plugs *) 54 69 let endpoint = 55 70 Endpoint.create 56 - { secret_key_base = Sys.getenv_opt "SECRET_KEY" 57 - |> Option.value ~default:"dev-secret-32-chars-minimum!!!"; 58 - health_check = true; 59 - } 60 - |> Endpoint.plug (Plug.Logger.create ~clock (fun _ _ -> ())) 71 + { Endpoint.default_config with secret_key_base = secret_key } 72 + |> Endpoint.plug (Plug.Logger.create ~clock logger) 61 73 |> Endpoint.plug (Plug.Compress.create ()) 62 - |> Endpoint.plug (Plug.Session.create ~store ()) 74 + |> Endpoint.plug (Plug.Session.create ~store ~secure:false ()) 63 75 |> Endpoint.plug (Plug.Csrf.create ()) 64 - |> Endpoint.plug (Plug.Rate_limit.create 65 - ~clock 66 - ~key:(fun req -> 67 - (* Rate limit by session or IP *) 68 - match Plug.Session.get "user_id" with 69 - | Some uid -> uid 70 - | None -> List.assoc_opt "x-forwarded-for" req.headers 71 - |> Option.value ~default:"anonymous") 72 - ~requests:100 73 - ~per:60.0) 74 - |> Endpoint.router Routes.router 76 + |> Endpoint.plug (Plug.Rate_limit.create ~clock ~key:rate_key 77 + ~requests:100 ~per:60.0) 78 + |> Endpoint.router (Routes.router clock) 75 79 in 76 80 77 81 (* Build handler and run server *) 78 82 let handler = Endpoint.to_handler endpoint in 83 + let config = { Server.default_config with port = 8080 } in 79 84 Eio.Switch.run @@ fun sw -> 80 - Server.run ~sw ~net 81 - ~config:{ Server.default_config with port = 8080 } 82 - handler 85 + Server.run ~sw ~net ~config handler 83 86 ``` 84 87 85 88 ### 1.2 Data Models ··· 281 284 (* routes.ml *) 282 285 open Hcs 283 286 284 - (* Content negotiation plug - applied per-route or globally *) 285 - let negotiate_json_html = 286 - Plug.Negotiate.create ~formats:[Json; Html] () 287 + let router clock = 288 + (* Content negotiation plug - applied via pipeline *) 289 + let negotiate = Plug.Negotiate.create ~formats:[ Json; Html ] () in 290 + let browser = Pipeline.create [ negotiate ] in 287 291 288 - let router = Router.compile [ 289 - (* Index: GET / *) 290 - Router.Route.get "/" (fun _params req -> 291 - (* Negotiate plug sets format on request *) 292 - let req = Plug.Core.apply negotiate_json_html (fun r -> r) req in 293 - Handlers.index req); 294 - 295 - (* Show link: GET /links/:id *) 296 - Router.Route.get "/links/:id" (fun params req -> 297 - let req = Plug.Core.apply negotiate_json_html (fun r -> r) req in 298 - Handlers.show_link params req); 299 - 300 - (* Submit form: GET /submit (HTML only) *) 301 - Router.Route.get "/submit" (fun _params req -> 302 - Handlers.submit_form req); 303 - 304 - (* Submit link: POST /links *) 305 - Router.Route.post "/links" (fun _params req -> 306 - Handlers.create_link req); 307 - 308 - (* Vote: POST /links/:id/vote *) 309 - Router.Route.post "/links/:id/vote" (fun params req -> 310 - Handlers.vote params req); 311 - ] 292 + Router.compile_scopes [ 293 + (* Public routes with content negotiation *) 294 + Router.scope "/" ~through:browser [ 295 + Router.Route.get "/" Handlers.index; 296 + Router.Route.get "/links/:id" Handlers.show_link; 297 + ]; 298 + 299 + (* Submit form: GET /submit (HTML only, no negotiation) *) 300 + Router.scope "/" ~through:Pipeline.empty [ 301 + Router.Route.get "/submit" Handlers.submit_form; 302 + Router.Route.post "/links" Handlers.create_link; 303 + Router.Route.post "/links/:id/vote" Handlers.vote; 304 + ]; 305 + ] 312 306 ``` 313 307 314 308 ### 3.2 Responding with Negotiated Content ··· 712 706 ## Part 8: Complete Route Map 713 707 714 708 ```ocaml 715 - (* routes.ml - complete version *) 709 + (* routes.ml - complete version using scopes and pipelines *) 716 710 open Hcs 717 711 718 - let negotiate = Plug.Negotiate.create ~formats:[Json; Html] () 712 + let router clock = 713 + let negotiate = Plug.Negotiate.create ~formats:[ Json; Html ] () in 714 + let submit_rate_limit = 715 + Plug.Rate_limit.create ~clock 716 + ~key:(fun _ -> 717 + match Plug.Session.get "user_id" with Some uid -> uid | None -> "anon") 718 + ~requests:10 ~per:3600.0 719 + in 720 + let auth_rate_limit = 721 + Plug.Rate_limit.create ~clock 722 + ~key:(fun req -> 723 + List.assoc_opt "x-forwarded-for" req.headers 724 + |> Option.value ~default:"unknown") 725 + ~requests:5 ~per:60.0 726 + in 727 + 728 + (* Pipelines group plugs for route scopes *) 729 + let browser = Pipeline.create [ negotiate ] in 730 + let auth = Pipeline.create [ Auth.require_authenticated ] in 719 731 720 - let router clock = 721 - let submit_limiter = Plug.Rate_limit.create ~clock 722 - ~key:(fun _ -> Plug.Session.get "user_id" |> Option.value ~default:"anon") 723 - ~requests:10 ~per:3600.0 724 - in 725 - 726 - Router.compile [ 727 - (* Public - negotiated *) 728 - Router.Route.get "/" (Handlers.index ~negotiate); 729 - Router.Route.get "/links/:id" (Handlers.show_link ~negotiate); 730 - Router.Route.get "/new" (Handlers.index_new ~negotiate); 732 + Router.compile_scopes [ 733 + (* Public routes with content negotiation *) 734 + Router.scope "/" ~through:browser [ 735 + Router.Route.get "/" Handlers.index; 736 + Router.Route.get "/new" Handlers.index_new; 737 + Router.Route.get "/links/:id" Handlers.show_link; 738 + ]; 731 739 732 - (* Auth - HTML only *) 733 - Router.Route.get "/login" Handlers.login_form; 734 - Router.Route.post "/login" Handlers.login_submit; 735 - Router.Route.get "/register" Handlers.register_form; 736 - Router.Route.post "/register" Handlers.register_submit; 737 - Router.Route.post "/logout" Handlers.logout_submit; 738 - 739 - (* Protected - require auth *) 740 - Router.Route.get "/submit" (Auth.require_auth Handlers.submit_form); 741 - Router.Route.post "/links" (fun params req -> 742 - Plug.Core.apply submit_limiter 743 - (Auth.require_auth (Handlers.create_link params)) req); 744 - 745 - (* Voting - JSON only, require auth *) 746 - Router.Route.post "/links/:id/vote" (Auth.require_auth Handlers.vote); 740 + (* Auth routes - no pipeline, but with rate limiting *) 741 + Router.scope "/" ~through:Pipeline.empty [ 742 + Router.Route.get "/login" Handlers.login_form; 743 + Router.Route.post "/login" Handlers.login_submit 744 + |> Router.Route.plug auth_rate_limit; 745 + Router.Route.get "/register" Handlers.register_form; 746 + Router.Route.post "/register" Handlers.register_submit 747 + |> Router.Route.plug auth_rate_limit; 748 + ]; 747 749 748 - (* Comments *) 749 - Router.Route.get "/links/:id/comments" (Handlers.comments ~negotiate); 750 - Router.Route.post "/links/:id/comments" (Auth.require_auth Handlers.create_comment); 750 + (* Protected routes - require authentication *) 751 + Router.scope "/" ~through:auth [ 752 + Router.Route.post "/logout" Handlers.logout_submit; 753 + Router.Route.get "/submit" Handlers.submit_form; 754 + Router.Route.post "/links" Handlers.create_link 755 + |> Router.Route.plug submit_rate_limit 756 + |> Router.Route.plug negotiate; 757 + Router.Route.post "/links/:id/vote" Handlers.vote; 758 + Router.Route.post "/links/:id/comments" Handlers.create_comment; 759 + ]; 751 760 ] 752 761 ``` 753 762
+3
docs/README.md
··· 19 19 - [Plug System](guides/plug-system.md) - Compose middleware pipelines 20 20 - [Responses](guides/responses.md) - Build responses and stream data 21 21 - [Error Handling](guides/error-handling.md) - Handle errors gracefully 22 + - [Logging](guides/logging.md) - Structured logging for HTTP events 22 23 23 24 ## HTTP Client 24 25 ··· 26 27 27 28 - [Basic Requests](client/basic-requests.md) - GET, POST, headers, and bodies 28 29 - [HTTP/2](client/http2.md) - HTTP/2 multiplexing and server push 30 + - [SSE Client](client/sse.md) - Consume Server-Sent Events streams 29 31 - [Connection Pooling](client/connection-pooling.md) - Reuse connections efficiently 30 32 - [TLS Configuration](client/tls.md) - Certificates and security settings 31 33 ··· 53 55 - [Rate Limiting](recipes/rate-limiting.md) - Protect against abuse 54 56 - [CORS](recipes/cors.md) - Cross-origin resource sharing 55 57 - [Static Files](recipes/static-files.md) - Serve files from disk 58 + - [File Uploads](recipes/file-upload.md) - Multipart form data and streaming 56 59 - [JSON APIs](recipes/json-api.md) - Content negotiation and JSON responses
+281
docs/client/sse.md
··· 1 + # SSE Client 2 + 3 + HCS provides two SSE client implementations: 4 + 5 + - **`Sse_client`** - Low-level, explicit connection management 6 + - **`Event_source`** - High-level, auto-reconnect with backoff 7 + 8 + ## When to Use Each 9 + 10 + | Use Case | Module | 11 + |----------|--------| 12 + | Simple event consumption | `Event_source` | 13 + | Custom reconnection logic | `Sse_client` | 14 + | One-shot event fetching | `Sse_client` | 15 + | Long-lived subscriptions | `Event_source` | 16 + | Testing/debugging | `Sse_client` | 17 + 18 + ## Sse_client (Low-Level) 19 + 20 + Explicit control over connection lifecycle: 21 + 22 + ```ocaml 23 + Eio_main.run @@ fun env -> 24 + Eio.Switch.run @@ fun sw -> 25 + let net = Eio.Stdenv.net env in 26 + let clock = Eio.Stdenv.clock env in 27 + 28 + match Sse_client.connect ~sw ~net ~clock "http://example.com/events" with 29 + | Error e -> 30 + Printf.eprintf "Failed: %s\n" (Sse_client.error_to_string e) 31 + | Ok conn -> 32 + let rec loop () = 33 + match Sse_client.next conn with 34 + | Ok (Some event) -> 35 + Printf.printf "Event: %s\n" event.data; 36 + loop () 37 + | Ok None -> 38 + loop () (* No event ready, keep reading *) 39 + | Error Sse_client.Closed -> 40 + print_endline "Connection closed" 41 + | Error e -> 42 + Printf.eprintf "Error: %s\n" (Sse_client.error_to_string e) 43 + in 44 + loop (); 45 + Sse_client.close conn 46 + ``` 47 + 48 + ### Configuration 49 + 50 + ```ocaml 51 + let config = 52 + Sse_client.default_config 53 + |> Sse_client.with_connect_timeout 10.0 54 + |> Sse_client.with_header "Authorization" "Bearer token" 55 + 56 + match Sse_client.connect ~sw ~net ~clock ~config url with 57 + | Ok conn -> (* ... *) 58 + | Error e -> (* ... *) 59 + ``` 60 + 61 + | Option | Default | Description | 62 + |--------|---------|-------------| 63 + | `with_connect_timeout` | 30.0 | Connection timeout (seconds) | 64 + | `with_read_timeout` | 0.0 | Read timeout (0 = no timeout) | 65 + | `with_buffer_size` | 4096 | Read buffer size | 66 + | `with_header` | - | Add custom header | 67 + | `with_headers` | - | Add multiple headers | 68 + | `with_tls` | default | TLS configuration | 69 + | `with_insecure_tls` | - | Skip TLS verification | 70 + 71 + ### Resuming with Last-Event-ID 72 + 73 + Pass `last_event_id` to resume from where you left off: 74 + 75 + ```ocaml 76 + let last_id = Sse_client.last_event_id conn in 77 + Sse_client.close conn; 78 + 79 + (* Later, reconnect with the last ID *) 80 + match Sse_client.connect ~sw ~net ~clock ~last_event_id:last_id url with 81 + | Ok conn -> (* Server may replay events since last_id *) 82 + | Error e -> (* ... *) 83 + ``` 84 + 85 + ### Error Types 86 + 87 + ```ocaml 88 + type error = 89 + | Connection_failed of string (* DNS, network failure *) 90 + | Http_error of { status: int; body: string option } 91 + | Protocol_error of string (* Invalid SSE format *) 92 + | Io_error of exn (* Read/write failure *) 93 + | Closed (* Connection closed *) 94 + ``` 95 + 96 + ### Event Structure 97 + 98 + ```ocaml 99 + type event = { 100 + event_type : string option; (* "message", "update", etc. *) 101 + data : string; (* Event payload *) 102 + id : string option; (* Event ID for resuming *) 103 + retry : int option; (* Suggested reconnect delay (ms) *) 104 + } 105 + ``` 106 + 107 + ### Converting to Seq 108 + 109 + For functional-style iteration: 110 + 111 + ```ocaml 112 + let process_events conn = 113 + Sse_client.to_seq conn 114 + |> Seq.filter_map (function 115 + | Ok event -> Some event.data 116 + | Error _ -> None) 117 + |> Seq.iter print_endline 118 + ``` 119 + 120 + ## Event_source (High-Level) 121 + 122 + Browser-like EventSource with automatic reconnection: 123 + 124 + ```ocaml 125 + Eio_main.run @@ fun env -> 126 + Eio.Switch.run @@ fun sw -> 127 + let net = Eio.Stdenv.net env in 128 + let clock = Eio.Stdenv.clock env in 129 + 130 + let es = Event_source.start ~sw ~net ~clock "http://example.com/events" in 131 + 132 + (* Events arrive in an Eio.Stream *) 133 + for _ = 1 to 100 do 134 + let event = Eio.Stream.take (Event_source.events es) in 135 + Printf.printf "[%s] %s\n" 136 + (Option.value event.event_type ~default:"message") 137 + event.data 138 + done; 139 + 140 + Event_source.close es 141 + ``` 142 + 143 + ### Configuration 144 + 145 + ```ocaml 146 + let config = 147 + Event_source.default_config 148 + |> Event_source.with_default_retry 5.0 (* Initial retry delay *) 149 + |> Event_source.with_max_retry 60.0 (* Max retry delay *) 150 + |> Event_source.with_max_reconnect_attempts 10 151 + 152 + let es = Event_source.start ~sw ~net ~clock ~config url 153 + ``` 154 + 155 + | Option | Default | Description | 156 + |--------|---------|-------------| 157 + | `with_sse_config` | default | Underlying Sse_client config | 158 + | `with_default_retry` | 3.0 | Initial retry delay (seconds) | 159 + | `with_max_retry` | 30.0 | Maximum retry delay | 160 + | `with_backoff` | exponential | Backoff strategy function | 161 + | `with_max_reconnect_attempts` | None | Max reconnects (None = unlimited) | 162 + | `with_unlimited_reconnects` | - | Remove reconnect limit | 163 + 164 + ### Custom Backoff 165 + 166 + ```ocaml 167 + let linear_backoff attempt = 1.0 +. (float_of_int attempt *. 2.0) 168 + 169 + let config = 170 + Event_source.default_config 171 + |> Event_source.with_backoff linear_backoff 172 + |> Event_source.with_max_retry 30.0 173 + ``` 174 + 175 + ### Status Monitoring 176 + 177 + Track connection state changes: 178 + 179 + ```ocaml 180 + let es = Event_source.start ~sw ~net ~clock url in 181 + 182 + Event_source.on_status es (function 183 + | Event_source.Connecting -> 184 + print_endline "Connecting..." 185 + | Event_source.Connected -> 186 + print_endline "Connected!" 187 + | Event_source.Reconnecting { delay; attempt } -> 188 + Printf.printf "Reconnecting in %.1fs (attempt %d)\n" delay attempt 189 + | Event_source.Closed -> 190 + print_endline "Connection closed"); 191 + 192 + (* Current status *) 193 + match Event_source.status es with 194 + | Connected -> print_endline "We're connected" 195 + | _ -> print_endline "Not connected" 196 + ``` 197 + 198 + ### Mapping Events 199 + 200 + Transform events as they arrive: 201 + 202 + ```ocaml 203 + let es = Event_source.start ~sw ~net ~clock url in 204 + 205 + Eio.Fiber.fork ~sw (fun () -> 206 + while true do 207 + let event = Eio.Stream.take (Event_source.events es) in 208 + match Yojson.Safe.from_string event.data with 209 + | json -> process_json json 210 + | exception _ -> Printf.eprintf "Invalid JSON: %s\n" event.data 211 + done 212 + ) 213 + ``` 214 + 215 + ## Complete Example: Stock Ticker 216 + 217 + ```ocaml 218 + open Hcs 219 + 220 + let () = 221 + Eio_main.run @@ fun env -> 222 + Eio.Switch.run @@ fun sw -> 223 + let net = Eio.Stdenv.net env in 224 + let clock = Eio.Stdenv.clock env in 225 + 226 + let config = 227 + Event_source.default_config 228 + |> Event_source.with_default_retry 1.0 229 + |> Event_source.with_max_reconnect_attempts 5 230 + in 231 + 232 + let es = Event_source.start ~sw ~net ~clock ~config 233 + "https://api.example.com/stocks/stream" in 234 + 235 + Event_source.on_status es (function 236 + | Event_source.Connected -> 237 + print_endline "Connected to stock feed" 238 + | Event_source.Reconnecting { delay; _ } -> 239 + Printf.printf "Reconnecting in %.1fs...\n" delay 240 + | Event_source.Closed -> 241 + print_endline "Feed disconnected" 242 + | _ -> ()); 243 + 244 + (* Process 1000 price updates *) 245 + for _ = 1 to 1000 do 246 + let event = Eio.Stream.take (Event_source.events es) in 247 + match event.event_type with 248 + | Some "price" -> 249 + Printf.printf "Price update: %s\n" event.data 250 + | Some "trade" -> 251 + Printf.printf "Trade: %s\n" event.data 252 + | _ -> 253 + Printf.printf "Unknown: %s\n" event.data 254 + done; 255 + 256 + Event_source.close es 257 + ``` 258 + 259 + ## Server-Sent Events Format 260 + 261 + SSE events have this wire format: 262 + 263 + ``` 264 + event: price 265 + id: 12345 266 + retry: 3000 267 + data: {"symbol": "AAPL", "price": 150.25} 268 + 269 + event: trade 270 + data: {"symbol": "GOOG", "volume": 1000} 271 + data: {"price": 2800.00} 272 + 273 + ``` 274 + 275 + Multi-line data uses multiple `data:` lines, joined with newlines. 276 + 277 + ## Next Steps 278 + 279 + - [Server-Sent Events (Server)](../real-time/sse.md) - Creating SSE endpoints 280 + - [WebSocket](../real-time/websocket.md) - Bidirectional communication 281 + - [Connection Pooling](connection-pooling.md) - HTTP client pooling
+182
docs/guides/logging.md
··· 1 + # Logging 2 + 3 + HCS provides structured logging for HTTP events via the `Log` module. 4 + 5 + ## Logger Setup 6 + 7 + ### Stderr Logger 8 + 9 + ```ocaml 10 + let logger = Log.stderr () 11 + ``` 12 + 13 + With minimum level filter: 14 + 15 + ```ocaml 16 + let logger = Log.stderr ~min_level:Log.Info () 17 + ``` 18 + 19 + JSON output: 20 + 21 + ```ocaml 22 + let logger = Log.stderr ~json:true () 23 + ``` 24 + 25 + ### Stdout Logger 26 + 27 + ```ocaml 28 + let logger = Log.stdout ~min_level:Log.Warn () 29 + ``` 30 + 31 + ### Null Logger 32 + 33 + Discard all logs: 34 + 35 + ```ocaml 36 + let logger = Log.null 37 + ``` 38 + 39 + ## Using with Plug.Logger 40 + 41 + Add request/response logging to your endpoint: 42 + 43 + ```ocaml 44 + let endpoint = 45 + Endpoint.create config 46 + |> Endpoint.plug (Plug.Logger.create ~clock logger) 47 + |> Endpoint.router routes 48 + ``` 49 + 50 + ## Log Levels 51 + 52 + ```ocaml 53 + type level = Debug | Info | Warn | Error 54 + ``` 55 + 56 + Filter by level: 57 + 58 + ```ocaml 59 + let filtered = Log.with_min_level Log.Warn logger 60 + ``` 61 + 62 + Compare levels: 63 + 64 + ```ocaml 65 + Log.level_gte Log.Error Log.Warn (* true *) 66 + Log.level_gte Log.Debug Log.Info (* false *) 67 + ``` 68 + 69 + ## Event Types 70 + 71 + | Event | Description | 72 + |-------|-------------| 73 + | `Request_start` | Request received | 74 + | `Request_end` | Request completed with status and duration | 75 + | `Connection_open` | New connection established | 76 + | `Connection_close` | Connection terminated | 77 + | `Connection_reuse` | Connection reused from pool | 78 + | `Tls_handshake` | TLS negotiation completed | 79 + | `Retry` | Request retry attempt | 80 + | `Redirect` | Following HTTP redirect | 81 + | `Error` | Error occurred | 82 + | `Custom` | Custom application event | 83 + 84 + ## Custom Logger 85 + 86 + ```ocaml 87 + let logger = Log.custom (fun level msg -> 88 + Printf.printf "[%s] %s\n" (Log.level_to_string level) msg) 89 + ``` 90 + 91 + JSON output: 92 + 93 + ```ocaml 94 + let logger = Log.custom_json (fun level json -> 95 + output_string log_file json; 96 + output_char log_file '\n') 97 + ``` 98 + 99 + ## Combining Loggers 100 + 101 + Log to multiple destinations: 102 + 103 + ```ocaml 104 + let file_logger = Log.custom_json write_to_file 105 + let console_logger = Log.stderr ~min_level:Log.Warn () 106 + 107 + let logger = Log.combine [file_logger; console_logger] 108 + ``` 109 + 110 + ## Manual Event Logging 111 + 112 + ```ocaml 113 + logger Log.Info (Log.Request_start { 114 + id = Log.generate_request_id (); 115 + meth = Log.GET; 116 + uri = "/api/users"; 117 + headers = []; 118 + }) 119 + 120 + logger Log.Error (Log.Error { 121 + id = Some "req-001"; 122 + error = "Connection timeout"; 123 + context = Some "database"; 124 + }) 125 + ``` 126 + 127 + ## Custom Events 128 + 129 + ```ocaml 130 + logger Log.Info (Log.Custom { 131 + name = "user_login"; 132 + data = [("user_id", "123"); ("ip", "192.168.1.1")]; 133 + }) 134 + ``` 135 + 136 + ## Event Formatting 137 + 138 + Convert events to strings: 139 + 140 + ```ocaml 141 + let msg = Log.event_to_string event 142 + let json = Log.event_to_json event 143 + ``` 144 + 145 + ## Request IDs 146 + 147 + Generate unique request IDs: 148 + 149 + ```ocaml 150 + let id = Log.generate_request_id () 151 + (* "req-000001-a3f2" *) 152 + ``` 153 + 154 + ## Integration Example 155 + 156 + ```ocaml 157 + let () = 158 + Eio_main.run @@ fun env -> 159 + let clock = Eio.Stdenv.clock env in 160 + let net = Eio.Stdenv.net env in 161 + 162 + let file_logger = 163 + let oc = open_out "access.log" in 164 + Log.custom_json (fun _level json -> 165 + output_string oc json; 166 + output_char oc '\n'; 167 + flush oc) 168 + in 169 + 170 + let console_logger = Log.stderr ~min_level:Log.Warn () in 171 + let logger = Log.combine [file_logger; console_logger] in 172 + 173 + let endpoint = 174 + Endpoint.create Endpoint.default_config 175 + |> Endpoint.plug (Plug.Logger.create ~clock logger) 176 + |> Endpoint.router routes 177 + in 178 + 179 + Eio.Switch.run @@ fun sw -> 180 + Server.run ~sw ~net ~config:Server.default_config 181 + (Endpoint.to_handler endpoint) 182 + ```
+2
docs/guides/responses.md
··· 156 156 Response.stream : 157 157 ?status:H1.Status.t -> 158 158 ?headers:(string * string) list -> 159 + ?flush_headers_immediately:bool -> 159 160 ?content_length:int64 -> 160 161 (unit -> Cstruct.t option) -> (* The next function *) 161 162 Response.t ··· 232 233 | Bigstring of Bigstringaf.t (* Zero-copy bigstring *) 233 234 | Stream of { (* Streaming body *) 234 235 content_length : int64 option; 236 + flush_headers_immediately : bool; 235 237 next : unit -> Cstruct.t option 236 238 } 237 239 | Prebuilt_body of Response.Prebuilt.t (* Pre-serialized response *)
+2 -2
docs/real-time/pubsub.md
··· 94 94 ) in 95 95 96 96 (* Clean up on disconnect - this is simplified *) 97 - Hcs.Sse.respond_with_stream events 97 + Hcs.Sse.respond_with_stream ?flush_headers_immediately events 98 98 99 99 let trigger_update msg = 100 100 Hcs.Pubsub.broadcast pubsub "updates" msg ··· 148 148 let _sub = Pubsub.subscribe pubsub "stats" (fun msg -> 149 149 Eio.Stream.add events (Sse.event_typed ~event_type:"stats" msg) 150 150 ) in 151 - Sse.respond_with_stream events); 151 + Sse.respond_with_stream ?flush_headers_immediately events); 152 152 153 153 Router.Route.post "/update" (fun _params req -> 154 154 Pubsub.broadcast pubsub "stats" req.Server.body;
+7 -3
docs/real-time/sse.md
··· 16 16 17 17 ## Basic SSE Endpoint 18 18 19 + All SSE helpers accept an optional `?flush_headers_immediately` parameter. 20 + - Default: `true` (recommended) 21 + - Set to `false` if you explicitly want headers to wait until the first body write. 22 + 19 23 Create an SSE endpoint with a generator function: 20 24 21 25 ```ocaml ··· 29 33 Some (Hcs.Sse.event (Printf.sprintf "Count: %d" !count)) 30 34 end 31 35 in 32 - Hcs.Sse.respond generator 36 + Hcs.Sse.respond ?flush_headers_immediately generator 33 37 ``` 34 38 35 39 The generator returns `Some event` to send data, or `None` to close the stream. ··· 72 76 done 73 77 ); 74 78 75 - Hcs.Sse.respond_with_stream events 79 + Hcs.Sse.respond_with_stream ?flush_headers_immediately events 76 80 ``` 77 81 78 82 ## Raw String Events ··· 84 88 let generator () = 85 89 Some "data: raw message\n\n" 86 90 in 87 - Hcs.Sse.respond_raw generator 91 + Hcs.Sse.respond_raw ?flush_headers_immediately generator 88 92 ``` 89 93 90 94 ## Client-Side JavaScript
+162
docs/recipes/file-upload.md
··· 1 + # File Uploads 2 + 3 + Handle `multipart/form-data` file uploads with the `Multipart` module. 4 + 5 + ## Non-Streaming (Small Files) 6 + 7 + For files that fit in memory: 8 + 9 + ```ocaml 10 + let upload_handler _params req = 11 + match Multipart.parse req with 12 + | Error e -> Response.bad_request (Multipart.error_to_string e) 13 + | Ok parts -> 14 + match Multipart.find_file "avatar" parts with 15 + | None -> Response.bad_request "No file uploaded" 16 + | Some file -> 17 + let filename = Option.value ~default:"unknown" file.filename in 18 + save_file ~name:filename ~data:file.data; 19 + Response.ok (Printf.sprintf "Uploaded: %s" filename) 20 + ``` 21 + 22 + ## Part Fields 23 + 24 + Each `part` contains: 25 + 26 + ```ocaml 27 + type part = { 28 + name : string; (* Form field name *) 29 + filename : string option; (* Original filename (if file) *) 30 + content_type : string; (* MIME type *) 31 + data : string; (* File contents *) 32 + } 33 + ``` 34 + 35 + ## Finding Parts 36 + 37 + ```ocaml 38 + (* Find any part by name *) 39 + let field = Multipart.find_part "description" parts 40 + 41 + (* Find only file parts (has filename) *) 42 + let file = Multipart.find_file "document" parts 43 + 44 + (* Convert to association list *) 45 + let assoc = Multipart.to_assoc parts 46 + let name = List.assoc_opt "name" assoc 47 + ``` 48 + 49 + ## Streaming (Large Files) 50 + 51 + For large files, stream directly to disk: 52 + 53 + ```ocaml 54 + let upload_large _params req = 55 + match Multipart.create_parser req with 56 + | Error e -> Response.bad_request (Multipart.error_to_string e) 57 + | Ok parser -> 58 + let result = Multipart.iter_parts (fun part -> 59 + match part.filename with 60 + | None -> () (* Skip non-file fields *) 61 + | Some filename -> 62 + let path = "/uploads/" ^ filename in 63 + let oc = open_out_bin path in 64 + Stream.Async.iter (fun chunk -> 65 + output_string oc (Cstruct.to_string chunk) 66 + ) part.body; 67 + close_out oc 68 + ) parser in 69 + match result with 70 + | Ok () -> Response.ok "Upload complete" 71 + | Error e -> Response.internal_error (Multipart.error_to_string e) 72 + ``` 73 + 74 + ## Streaming Part Fields 75 + 76 + ```ocaml 77 + type stream_part = { 78 + name : string; 79 + filename : string option; 80 + content_type : string; 81 + body : Cstruct.t Stream.Async.t; (* Streaming body *) 82 + } 83 + ``` 84 + 85 + ## Collecting Stream Body 86 + 87 + Convert streaming body to string: 88 + 89 + ```ocaml 90 + let data = Multipart.collect_body part 91 + ``` 92 + 93 + ## Error Handling 94 + 95 + ```ocaml 96 + type error = 97 + | Missing_content_type 98 + | Not_multipart 99 + | Missing_boundary 100 + | Invalid_boundary of string 101 + | Parse_error of string 102 + 103 + let handle_error = function 104 + | Missing_content_type -> Response.bad_request "Missing Content-Type" 105 + | Not_multipart -> Response.bad_request "Expected multipart/form-data" 106 + | Missing_boundary -> Response.bad_request "Missing boundary" 107 + | Invalid_boundary msg -> Response.bad_request ("Invalid boundary: " ^ msg) 108 + | Parse_error msg -> Response.bad_request ("Parse error: " ^ msg) 109 + ``` 110 + 111 + ## Check Content Type 112 + 113 + ```ocaml 114 + if Multipart.is_multipart req then 115 + (* Handle multipart *) 116 + else 117 + Response.bad_request "Expected multipart/form-data" 118 + ``` 119 + 120 + ## HTML Form 121 + 122 + ```html 123 + <form method="POST" action="/upload" enctype="multipart/form-data"> 124 + <input type="text" name="description"> 125 + <input type="file" name="document"> 126 + <button type="submit">Upload</button> 127 + </form> 128 + ``` 129 + 130 + ## Multiple Files 131 + 132 + Process all files in a single upload: 133 + 134 + ```ocaml 135 + let upload_multiple _params req = 136 + match Multipart.parse req with 137 + | Error e -> Response.bad_request (Multipart.error_to_string e) 138 + | Ok parts -> 139 + let files = List.filter (fun p -> Option.is_some p.filename) parts in 140 + List.iter (fun file -> 141 + let name = Option.get file.filename in 142 + save_file ~name ~data:file.data 143 + ) files; 144 + Response.ok (Printf.sprintf "Uploaded %d files" (List.length files)) 145 + ``` 146 + 147 + ## Size Limits 148 + 149 + Limit upload size with a plug: 150 + 151 + ```ocaml 152 + let max_body_size ~limit : Plug.t = fun handler req -> 153 + match Request.content_length req with 154 + | Some len when len > limit -> 155 + Response.make ~status:`Payload_too_large "File too large" 156 + | _ -> handler req 157 + 158 + let routes = Router.compile [ 159 + Router.Route.post "/upload" upload_handler 160 + |> Router.Route.plug (max_body_size ~limit:10_000_000L); (* 10MB *) 161 + ] 162 + ```
+1 -1
dune-project
··· 2 2 3 3 (name hcs) 4 4 5 - (version 0.3.3) 5 + (version 0.4.0) 6 6 7 7 (generate_opam_files true) 8 8
+1 -1
hcs.opam
··· 1 1 # This file is generated by dune, edit dune-project instead 2 2 opam-version: "2.0" 3 - version: "0.3.3" 3 + version: "0.4.0" 4 4 synopsis: "Eio based HTTP client/server library for OCaml 5+" 5 5 description: 6 6 "HCS is a HTTP client/server library for OCaml 5+ supporting HTTP/1.1, HTTP/2, and WebSocket. Built on Eio."
+149
lib/event_source.ml
··· 1 + type status = 2 + | Connecting 3 + | Connected 4 + | Reconnecting of { delay : float; attempt : int } 5 + | Closed 6 + 7 + type config = { 8 + sse : Sse_client.config; 9 + default_retry : float; 10 + max_retry : float; 11 + backoff : int -> float; 12 + max_reconnect_attempts : int option; 13 + } 14 + 15 + let default_backoff attempt = 16 + let base = 3.0 in 17 + let delay = base *. Float.pow 2.0 (Float.of_int attempt) in 18 + let jitter = delay *. 0.1 *. (Random.float 2.0 -. 1.0) in 19 + max 0.1 (delay +. jitter) 20 + 21 + let default_config = 22 + { 23 + sse = Sse_client.default_config; 24 + default_retry = 3.0; 25 + max_retry = 30.0; 26 + backoff = default_backoff; 27 + max_reconnect_attempts = None; 28 + } 29 + 30 + let with_sse_config sse config = { config with sse } 31 + let with_default_retry delay config = { config with default_retry = delay } 32 + let with_max_retry delay config = { config with max_retry = delay } 33 + let with_backoff backoff config = { config with backoff } 34 + 35 + let with_max_reconnect_attempts n config = 36 + { config with max_reconnect_attempts = Some n } 37 + 38 + let with_unlimited_reconnects config = 39 + { config with max_reconnect_attempts = None } 40 + 41 + type t = { 42 + url : string; 43 + config : config; 44 + events : Sse_client.event Eio.Stream.t; 45 + mutable status : status; 46 + mutable status_callbacks : (status -> unit) list; 47 + mutable conn : Sse_client.t option; 48 + mutable last_event_id : string option; 49 + mutable retry_ms : int option; 50 + mutable stop_flag : bool; 51 + sleep : float -> unit; 52 + try_connect : 53 + ?last_event_id:string -> unit -> (Sse_client.t, Sse_client.error) result; 54 + } 55 + 56 + let events t = t.events 57 + let status t = t.status 58 + let on_status t callback = t.status_callbacks <- callback :: t.status_callbacks 59 + 60 + let set_status t new_status = 61 + t.status <- new_status; 62 + List.iter (fun cb -> cb new_status) t.status_callbacks 63 + 64 + let get_retry_delay t attempt = 65 + let base_delay = 66 + match t.retry_ms with 67 + | Some ms -> Float.of_int ms /. 1000.0 68 + | None -> t.config.default_retry 69 + in 70 + let backoff_delay = t.config.backoff attempt in 71 + min t.config.max_retry (max base_delay backoff_delay) 72 + 73 + let should_reconnect t attempt = 74 + (not t.stop_flag) 75 + && 76 + match t.config.max_reconnect_attempts with 77 + | None -> true 78 + | Some max -> attempt < max 79 + 80 + let rec connection_loop t attempt = 81 + if t.stop_flag then () 82 + else begin 83 + set_status t Connecting; 84 + match t.try_connect ?last_event_id:t.last_event_id () with 85 + | Error _ when should_reconnect t attempt -> 86 + let delay = get_retry_delay t attempt in 87 + set_status t (Reconnecting { delay; attempt = attempt + 1 }); 88 + t.sleep delay; 89 + connection_loop t (attempt + 1) 90 + | Error _ -> set_status t Closed 91 + | Ok conn -> 92 + t.conn <- Some conn; 93 + set_status t Connected; 94 + event_loop t conn 95 + end 96 + 97 + and event_loop t conn = 98 + if t.stop_flag then begin 99 + Sse_client.close conn; 100 + set_status t Closed 101 + end 102 + else 103 + match Sse_client.next conn with 104 + | Ok (Some event) -> 105 + (match event.id with 106 + | Some id -> t.last_event_id <- Some id 107 + | None -> ()); 108 + (match event.retry with Some ms -> t.retry_ms <- Some ms | None -> ()); 109 + Eio.Stream.add t.events event; 110 + event_loop t conn 111 + | Ok None -> event_loop t conn 112 + | Error Sse_client.Closed when should_reconnect t 0 -> 113 + t.conn <- None; 114 + connection_loop t 0 115 + | Error (Sse_client.Io_error _) when should_reconnect t 0 -> 116 + Sse_client.close conn; 117 + t.conn <- None; 118 + connection_loop t 0 119 + | Error _ -> 120 + Sse_client.close conn; 121 + t.conn <- None; 122 + set_status t Closed 123 + 124 + let start ~sw ~net ~(clock : _ Eio.Time.clock) ?(config = default_config) url = 125 + let sleep = Eio.Time.sleep clock in 126 + let try_connect ?last_event_id () = 127 + Sse_client.connect ~sw ~net ~clock ~config:config.sse ?last_event_id url 128 + in 129 + let t = 130 + { 131 + url; 132 + config; 133 + events = Eio.Stream.create 256; 134 + status = Connecting; 135 + status_callbacks = []; 136 + conn = None; 137 + last_event_id = None; 138 + retry_ms = None; 139 + stop_flag = false; 140 + sleep; 141 + try_connect; 142 + } 143 + in 144 + Eio.Fiber.fork ~sw (fun () -> connection_loop t 0); 145 + t 146 + 147 + let close t = 148 + t.stop_flag <- true; 149 + match t.conn with Some conn -> Sse_client.close conn | None -> ()
+14 -4
lib/h1_server.ml
··· 230 230 | Body_cached_prebuilt of cached_prebuilt 231 231 | Body_stream of { 232 232 content_length : int64 option; 233 + flush_headers_immediately : bool; 233 234 next : unit -> Cstruct.t option; 234 235 } 235 236 ··· 269 270 let respond_bigstring ?(status = `OK) ?(headers = []) bstr = 270 271 { status; headers; response_body = Body_bigstring bstr } 271 272 272 - let respond_stream ?(status = `OK) ?(headers = []) ?content_length next = 273 - { status; headers; response_body = Body_stream { content_length; next } } 273 + let respond_stream ?(status = `OK) ?(headers = []) 274 + ?(flush_headers_immediately = false) ?content_length next = 275 + { 276 + status; 277 + headers; 278 + response_body = 279 + Body_stream { content_length; flush_headers_immediately; next }; 280 + } 274 281 275 282 let respond_prebuilt h1_response body = 276 283 { ··· 515 522 | Body_cached_prebuilt cached -> 516 523 let resp = get_cached_response cached in 517 524 H1.Reqd.respond_with_bigstring reqd resp cached.body 518 - | Body_stream { content_length; next } -> 525 + | Body_stream { content_length; flush_headers_immediately; next } -> 519 526 let headers = 520 527 match content_length with 521 528 | Some len -> ··· 530 537 :: filter_reserved response.headers) 531 538 in 532 539 let resp = H1.Response.create ~headers response.status in 533 - let body_writer = H1.Reqd.respond_with_streaming reqd resp in 540 + let body_writer = 541 + H1.Reqd.respond_with_streaming ~flush_headers_immediately reqd 542 + resp 543 + in 534 544 let rec write_chunks () = 535 545 match next () with 536 546 | None -> H1.Body.Writer.close body_writer
+13 -4
lib/h2_server.ml
··· 25 25 | Body_prebuilt of { h2_response : H2.Response.t; body : Bigstringaf.t } 26 26 | Body_stream of { 27 27 content_length : int64 option; 28 + flush_headers_immediately : bool; 28 29 next : unit -> Cstruct.t option; 29 30 } 30 31 ··· 45 46 let respond_bigstring ?(status = `OK) ?(headers = []) bstr = 46 47 { status; headers; response_body = Body_bigstring bstr } 47 48 48 - let respond_stream ?(status = `OK) ?(headers = []) ?content_length next = 49 - { status; headers; response_body = Body_stream { content_length; next } } 49 + let respond_stream ?(status = `OK) ?(headers = []) 50 + ?(flush_headers_immediately = false) ?content_length next = 51 + { 52 + status; 53 + headers; 54 + response_body = 55 + Body_stream { content_length; flush_headers_immediately; next }; 56 + } 50 57 51 58 let respond_prebuilt h2_response body = 52 59 { ··· 230 237 in 231 238 let resp = { h2_response with H2.Response.headers } in 232 239 H2.Reqd.respond_with_bigstring reqd resp body 233 - | Body_stream { content_length; next } -> 240 + | Body_stream 241 + { content_length; flush_headers_immediately; next } -> 234 242 let headers = 235 243 match content_length with 236 244 | Some len -> ··· 243 251 in 244 252 let resp = H2.Response.create ~headers response.status in 245 253 let body_writer = 246 - H2.Reqd.respond_with_streaming reqd resp 254 + H2.Reqd.respond_with_streaming reqd 255 + ~flush_headers_immediately resp 247 256 in 248 257 let rec write_chunks () = 249 258 match next () with
+2 -1
lib/hcs.ml
··· 52 52 (** WebSocket support *) 53 53 54 54 module Sse = Sse 55 - (** Server-Sent Events *) 55 + module Sse_client = Sse_client 56 + module Event_source = Event_source 56 57 57 58 module Pubsub = Pubsub 58 59 (** Lock-free topic-based pub/sub messaging *)
+11 -3
lib/plug/compress.ml
··· 179 179 if is_event_stream resp then resp 180 180 else 181 181 match resp.Response.body with 182 - | Response.Stream { content_length = _; next } -> 182 + | Response.Stream { content_length = _; flush_headers_immediately; next } -> 183 183 if List.is_empty encodings then resp 184 184 else begin 185 185 match select_encoding encodings with ··· 199 199 headers; 200 200 body = 201 201 Response.Stream 202 - { content_length = None; next = compressed_next }; 202 + { 203 + content_length = None; 204 + flush_headers_immediately; 205 + next = compressed_next; 206 + }; 203 207 } 204 208 | Some Zstd -> 205 209 let compressed_next = zstd_compress_stream ~level next in ··· 217 221 headers; 218 222 body = 219 223 Response.Stream 220 - { content_length = None; next = compressed_next }; 224 + { 225 + content_length = None; 226 + flush_headers_immediately; 227 + next = compressed_next; 228 + }; 221 229 } 222 230 | Some Identity | None -> resp 223 231 end
+12 -3
lib/response.ml
··· 86 86 | String of string 87 87 | Cstruct of Cstruct.t 88 88 | Bigstring of Bigstringaf.t 89 - | Stream of { content_length : int64 option; next : unit -> Cstruct.t option } 89 + | Stream of { 90 + content_length : int64 option; 91 + flush_headers_immediately : bool; 92 + next : unit -> Cstruct.t option; 93 + } 90 94 | Prebuilt_body of Prebuilt.t 91 95 92 96 type t = { status : H1.Status.t; headers : (string * string) list; body : body } ··· 251 255 252 256 (** {1 Streaming and special responses} *) 253 257 254 - let stream ?(status = `OK) ?(headers = []) ?content_length next : t = 255 - { status; headers; body = Stream { content_length; next } } 258 + let stream ?(status = `OK) ?(headers = []) ?(flush_headers_immediately = false) 259 + ?content_length next : t = 260 + { 261 + status; 262 + headers; 263 + body = Stream { content_length; flush_headers_immediately; next }; 264 + } 256 265 257 266 let bigstring ?(status = `OK) ?(headers = []) bstr : t = 258 267 { status; headers; body = Bigstring bstr }
+8 -4
lib/server.ml
··· 341 341 let body_writer = H1.Reqd.respond_with_streaming reqd resp in 342 342 H1.Body.Writer.write_bigstring body_writer ~off:cs.off ~len cs.buffer; 343 343 H1.Body.Writer.close body_writer 344 - | Response.Stream { content_length; next } -> 344 + | Response.Stream { content_length; flush_headers_immediately; next } -> 345 345 let headers = 346 346 match content_length with 347 347 | Some len -> ··· 356 356 :: response.headers) 357 357 in 358 358 let resp = H1.Response.create ~headers response.status in 359 - let body_writer = H1.Reqd.respond_with_streaming reqd resp in 359 + let body_writer = 360 + H1.Reqd.respond_with_streaming ~flush_headers_immediately reqd resp 361 + in 360 362 let rec write_chunks () = 361 363 match next () with 362 364 | None -> H1.Body.Writer.close body_writer ··· 692 694 let body_writer = H2.Reqd.respond_with_streaming reqd resp in 693 695 H2.Body.Writer.write_bigstring body_writer ~off:cs.off ~len cs.buffer; 694 696 H2.Body.Writer.close body_writer 695 - | Response.Stream { content_length; next } -> 697 + | Response.Stream { content_length; flush_headers_immediately; next } -> 696 698 let headers = 697 699 match content_length with 698 700 | Some len -> ··· 701 703 | None -> H2.Headers.of_list response.headers 702 704 in 703 705 let resp = H2.Response.create ~headers h2_status in 704 - let body_writer = H2.Reqd.respond_with_streaming reqd resp in 706 + let body_writer = 707 + H2.Reqd.respond_with_streaming reqd ~flush_headers_immediately resp 708 + in 705 709 let rec write_chunks () = 706 710 match next () with 707 711 | None -> H2.Body.Writer.close body_writer
+10 -8
lib/sse.ml
··· 44 44 ("x-accel-buffering", "no"); 45 45 ] 46 46 47 - let respond generator = 47 + let respond ?(flush_headers_immediately = true) generator = 48 48 let next () = 49 49 match generator () with 50 50 | None -> None 51 51 | Some ev -> Some (Cstruct.of_string (format ev)) 52 52 in 53 - Response.stream ~headers next 53 + Response.stream ~flush_headers_immediately ~headers next 54 54 55 - let respond_with_stream (events : event Eio.Stream.t) = 55 + let respond_with_stream ?(flush_headers_immediately = true) 56 + (events : event Eio.Stream.t) = 56 57 let next () = 57 58 try 58 59 let ev = Eio.Stream.take events in 59 60 Some (Cstruct.of_string (format ev)) 60 61 with _ -> None 61 62 in 62 - Response.stream ~headers next 63 + Response.stream ~flush_headers_immediately ~headers next 63 64 64 - let respond_raw generator = 65 + let respond_raw ?(flush_headers_immediately = true) generator = 65 66 let next () = 66 67 match generator () with 67 68 | None -> None 68 69 | Some s -> Some (Cstruct.of_string s) 69 70 in 70 - Response.stream ~headers next 71 + Response.stream ~flush_headers_immediately ~headers next 71 72 72 - let respond_raw_stream (strings : string Eio.Stream.t) = 73 + let respond_raw_stream ?(flush_headers_immediately = true) 74 + (strings : string Eio.Stream.t) = 73 75 let next () = 74 76 try 75 77 let s = Eio.Stream.take strings in 76 78 Some (Cstruct.of_string s) 77 79 with _ -> None 78 80 in 79 - Response.stream ~headers next 81 + Response.stream ~flush_headers_immediately ~headers next
+441
lib/sse_client.ml
··· 1 + (** SSE Client - Low-level Server-Sent Events client. 2 + 3 + Implements W3C SSE spec: 4 + https://html.spec.whatwg.org/multipage/server-sent-events.html *) 5 + 6 + open Eio.Std 7 + 8 + type event = { 9 + event_type : string option; 10 + data : string; 11 + id : string option; 12 + retry : int option; 13 + } 14 + 15 + let make_event ?event_type ?id ?retry data = { event_type; data; id; retry } 16 + 17 + type error = 18 + | Connection_failed of string 19 + | Http_error of { status : int; body : string option } 20 + | Protocol_error of string 21 + | Io_error of exn 22 + | Closed 23 + 24 + let error_to_string = function 25 + | Connection_failed msg -> "Connection failed: " ^ msg 26 + | Http_error { status; body } -> 27 + let body_str = match body with Some b -> ": " ^ b | None -> "" in 28 + Printf.sprintf "HTTP error %d%s" status body_str 29 + | Protocol_error msg -> "Protocol error: " ^ msg 30 + | Io_error exn -> "I/O error: " ^ Printexc.to_string exn 31 + | Closed -> "Connection closed" 32 + 33 + type config = { 34 + connect_timeout : float; 35 + read_timeout : float; 36 + buffer_size : int; 37 + tls : Tls_config.Client.t; 38 + headers : (string * string) list; 39 + } 40 + 41 + let default_config = 42 + { 43 + connect_timeout = 30.0; 44 + read_timeout = 0.0; 45 + buffer_size = 4096; 46 + tls = Tls_config.Client.default; 47 + headers = [ ("Accept", "text/event-stream") ]; 48 + } 49 + 50 + let with_connect_timeout timeout config = 51 + { config with connect_timeout = timeout } 52 + 53 + let with_read_timeout timeout config = { config with read_timeout = timeout } 54 + let with_buffer_size size config = { config with buffer_size = size } 55 + let with_tls tls config = { config with tls } 56 + let with_insecure_tls config = { config with tls = Tls_config.Client.insecure } 57 + 58 + let with_header name value config = 59 + { config with headers = (name, value) :: config.headers } 60 + 61 + let with_headers headers config = 62 + { config with headers = headers @ config.headers } 63 + 64 + type t = { 65 + flow : Eio.Flow.two_way_ty r; 66 + mutable closed : bool; 67 + mutable last_event_id : string option; 68 + mutable retry : int option; 69 + read_buf : Buffer.t; 70 + line_buf : Buffer.t; 71 + config : config; 72 + } 73 + 74 + type parse_state = { 75 + mutable event_type : string option; 76 + mutable data : Buffer.t; 77 + mutable id : string option; 78 + mutable retry : int option; 79 + mutable has_data : bool; 80 + } 81 + 82 + let create_parse_state () = 83 + { 84 + event_type = None; 85 + data = Buffer.create 256; 86 + id = None; 87 + retry = None; 88 + has_data = false; 89 + } 90 + 91 + let reset_parse_state state = 92 + state.event_type <- None; 93 + Buffer.clear state.data; 94 + state.id <- None; 95 + state.retry <- None; 96 + state.has_data <- false 97 + 98 + let parse_line state line : event option = 99 + let len = String.length line in 100 + if len = 0 then begin 101 + if state.has_data then begin 102 + let data = Buffer.contents state.data in 103 + let data = 104 + let dlen = String.length data in 105 + if dlen > 0 && data.[dlen - 1] = '\n' then String.sub data 0 (dlen - 1) 106 + else data 107 + in 108 + let ev = 109 + { 110 + event_type = state.event_type; 111 + data; 112 + id = state.id; 113 + retry = state.retry; 114 + } 115 + in 116 + reset_parse_state state; 117 + Some ev 118 + end 119 + else None 120 + end 121 + else if line.[0] = ':' then None 122 + else begin 123 + let colon_pos = try Some (String.index line ':') with Not_found -> None in 124 + let field, value = 125 + match colon_pos with 126 + | Some pos -> 127 + let field = String.sub line 0 pos in 128 + let value_start = 129 + if pos + 1 < len && line.[pos + 1] = ' ' then pos + 2 else pos + 1 130 + in 131 + let value = String.sub line value_start (len - value_start) in 132 + (field, value) 133 + | None -> (line, "") 134 + in 135 + (match field with 136 + | "event" -> state.event_type <- Some value 137 + | "data" -> 138 + if state.has_data then Buffer.add_char state.data '\n'; 139 + Buffer.add_string state.data value; 140 + state.has_data <- true 141 + | "id" -> 142 + (* SSE spec: ID must not contain null characters *) 143 + if not (String.contains value '\x00') then state.id <- Some value 144 + | "retry" -> ( 145 + match int_of_string_opt value with 146 + | Some ms when ms >= 0 -> state.retry <- Some ms 147 + | _ -> ()) 148 + | _ -> ()); 149 + None 150 + end 151 + 152 + let connect ~sw ~net ~(clock : _ Eio.Time.clock) ?(config = default_config) 153 + ?last_event_id url = 154 + let uri = Uri.of_string url in 155 + let scheme = Uri.scheme uri |> Option.value ~default:"http" in 156 + let is_https = String.equal scheme "https" in 157 + let host = Uri.host uri |> Option.value ~default:"localhost" in 158 + let default_port = if is_https then 443 else 80 in 159 + let port = Uri.port uri |> Option.value ~default:default_port in 160 + let path = 161 + let p = Uri.path_and_query uri in 162 + if p = "" then "/" else p 163 + in 164 + 165 + let connect_result = 166 + try 167 + let addrs = Eio.Net.getaddrinfo_stream net host in 168 + match addrs with 169 + | [] -> Error (Connection_failed ("Cannot resolve host: " ^ host)) 170 + | addr_info :: _ -> ( 171 + let addr = 172 + match addr_info with 173 + | `Tcp (ip, _) -> `Tcp (ip, port) 174 + | `Unix p -> `Unix p 175 + in 176 + 177 + let tcp_flow_result = 178 + if config.connect_timeout > 0.0 then 179 + try 180 + Some 181 + (Eio.Time.with_timeout_exn clock config.connect_timeout 182 + (fun () -> Eio.Net.connect ~sw net addr)) 183 + with Eio.Time.Timeout -> None 184 + else Some (Eio.Net.connect ~sw net addr) 185 + in 186 + 187 + match tcp_flow_result with 188 + | None -> Error (Connection_failed "Connection timeout") 189 + | Some tcp_flow -> ( 190 + let flow_result = 191 + if is_https then 192 + match Tls_config.Client.to_tls_config config.tls ~host with 193 + | Error msg -> Error (Connection_failed ("TLS error: " ^ msg)) 194 + | Ok tls_cfg -> ( 195 + try 196 + let host_domain = 197 + match Domain_name.of_string host with 198 + | Ok dn -> ( 199 + match Domain_name.host dn with 200 + | Ok h -> Some h 201 + | Error _ -> None) 202 + | Error _ -> None 203 + in 204 + let tls_flow = 205 + Tls_eio.client_of_flow tls_cfg ?host:host_domain 206 + tcp_flow 207 + in 208 + Ok (tls_flow :> Eio.Flow.two_way_ty r) 209 + with 210 + | Tls_eio.Tls_failure failure -> 211 + Error 212 + (Connection_failed 213 + ("TLS failure: " 214 + ^ Tls_config.failure_to_string failure)) 215 + | exn -> 216 + Error (Connection_failed (Printexc.to_string exn))) 217 + else Ok (tcp_flow :> Eio.Flow.two_way_ty r) 218 + in 219 + 220 + match flow_result with Error e -> Error e | Ok flow -> Ok flow)) 221 + with exn -> Error (Connection_failed (Printexc.to_string exn)) 222 + in 223 + 224 + match connect_result with 225 + | Error e -> Error e 226 + | Ok flow -> ( 227 + let base_headers = 228 + [ 229 + ("Host", host); 230 + ("Accept", "text/event-stream"); 231 + ("Cache-Control", "no-cache"); 232 + ("Connection", "keep-alive"); 233 + ] 234 + in 235 + let headers = 236 + match last_event_id with 237 + | Some id -> ("Last-Event-ID", id) :: base_headers 238 + | None -> base_headers 239 + in 240 + let headers = config.headers @ headers in 241 + 242 + let buf = Buffer.create 256 in 243 + Buffer.add_string buf "GET "; 244 + Buffer.add_string buf path; 245 + Buffer.add_string buf " HTTP/1.1\r\n"; 246 + List.iter 247 + (fun (k, v) -> 248 + Buffer.add_string buf k; 249 + Buffer.add_string buf ": "; 250 + Buffer.add_string buf v; 251 + Buffer.add_string buf "\r\n") 252 + headers; 253 + Buffer.add_string buf "\r\n"; 254 + 255 + try 256 + Eio.Flow.write flow [ Cstruct.of_string (Buffer.contents buf) ]; 257 + 258 + let response_buf = Buffer.create 1024 in 259 + let byte_buf = Cstruct.create 1 in 260 + let rec read_until_crlf_crlf () = 261 + let n = Eio.Flow.single_read flow byte_buf in 262 + if n > 0 then begin 263 + Buffer.add_char response_buf (Cstruct.get_char byte_buf 0); 264 + let len = Buffer.length response_buf in 265 + if 266 + len >= 4 267 + && Buffer.nth response_buf (len - 4) = '\r' 268 + && Buffer.nth response_buf (len - 3) = '\n' 269 + && Buffer.nth response_buf (len - 2) = '\r' 270 + && Buffer.nth response_buf (len - 1) = '\n' 271 + then () 272 + else read_until_crlf_crlf () 273 + end 274 + in 275 + read_until_crlf_crlf (); 276 + 277 + let response_str = Buffer.contents response_buf in 278 + let lines = String.split_on_char '\n' response_str in 279 + 280 + match lines with 281 + | status_line :: header_lines -> 282 + let status_line = String.trim status_line in 283 + let status_code = 284 + try 285 + let space1 = String.index status_line ' ' in 286 + let rest = 287 + String.sub status_line (space1 + 1) 288 + (String.length status_line - space1 - 1) 289 + in 290 + let space2 = 291 + try String.index rest ' ' 292 + with Not_found -> String.length rest 293 + in 294 + int_of_string (String.sub rest 0 space2) 295 + with _ -> 0 296 + in 297 + 298 + if status_code <> 200 then 299 + Error (Http_error { status = status_code; body = None }) 300 + else 301 + let headers = 302 + List.filter_map 303 + (fun line -> 304 + let line = String.trim line in 305 + if line = "" then None 306 + else 307 + match String.index_opt line ':' with 308 + | Some i -> 309 + let key = 310 + String.lowercase_ascii (String.sub line 0 i) 311 + in 312 + let value = 313 + String.trim 314 + (String.sub line (i + 1) 315 + (String.length line - i - 1)) 316 + in 317 + Some (key, value) 318 + | None -> None) 319 + header_lines 320 + in 321 + 322 + let content_type = 323 + List.assoc_opt "content-type" headers 324 + |> Option.value ~default:"" 325 + in 326 + if 327 + not 328 + (String.length content_type >= 17 329 + && String.sub (String.lowercase_ascii content_type) 0 17 330 + = "text/event-stream") 331 + then 332 + Error 333 + (Protocol_error 334 + ("Expected text/event-stream, got: " ^ content_type)) 335 + else 336 + Ok 337 + { 338 + flow; 339 + closed = false; 340 + last_event_id; 341 + retry = None; 342 + read_buf = Buffer.create config.buffer_size; 343 + line_buf = Buffer.create 256; 344 + config; 345 + } 346 + | [] -> Error (Protocol_error "Empty response") 347 + with 348 + | End_of_file -> Error Closed 349 + | exn -> Error (Io_error exn)) 350 + 351 + let read_line t : string option = 352 + Buffer.clear t.line_buf; 353 + let byte_buf = Cstruct.create 1 in 354 + let rec loop () = 355 + try 356 + let n = Eio.Flow.single_read t.flow byte_buf in 357 + if n = 0 then begin 358 + if Buffer.length t.line_buf > 0 then Some (Buffer.contents t.line_buf) 359 + else None 360 + end 361 + else begin 362 + let c = Cstruct.get_char byte_buf 0 in 363 + match c with 364 + | '\n' -> 365 + let line = Buffer.contents t.line_buf in 366 + let len = String.length line in 367 + if len > 0 && line.[len - 1] = '\r' then 368 + Some (String.sub line 0 (len - 1)) 369 + else Some line 370 + | c -> 371 + Buffer.add_char t.line_buf c; 372 + loop () 373 + end 374 + with End_of_file -> 375 + if Buffer.length t.line_buf > 0 then Some (Buffer.contents t.line_buf) 376 + else None 377 + in 378 + loop () 379 + 380 + let next t : (event option, error) result = 381 + if t.closed then Error Closed 382 + else 383 + let parse_state = create_parse_state () in 384 + let rec loop () = 385 + try 386 + match read_line t with 387 + | None -> 388 + t.closed <- true; 389 + Error Closed 390 + | Some line -> ( 391 + match parse_line parse_state line with 392 + | Some event -> 393 + (match event.id with 394 + | Some id -> t.last_event_id <- Some id 395 + | None -> ()); 396 + (match event.retry with 397 + | Some ms -> t.retry <- Some ms 398 + | None -> ()); 399 + Ok (Some event) 400 + | None -> loop ()) 401 + with 402 + | End_of_file -> 403 + t.closed <- true; 404 + Error Closed 405 + | exn -> 406 + t.closed <- true; 407 + Error (Io_error exn) 408 + in 409 + loop () 410 + 411 + let is_open t = not t.closed 412 + let last_event_id t = t.last_event_id 413 + let retry t = t.retry 414 + 415 + let close t = 416 + if not t.closed then begin 417 + t.closed <- true; 418 + try Eio.Flow.close (Obj.magic t.flow) with _ -> () 419 + end 420 + 421 + let to_seq t : (event, error) result Seq.t = 422 + let rec next_seq () = 423 + if t.closed then Seq.Nil 424 + else 425 + match next t with 426 + | Ok (Some event) -> Seq.Cons (Ok event, next_seq) 427 + | Ok None -> next_seq () 428 + | Error e -> Seq.Cons (Error e, fun () -> Seq.Nil) 429 + in 430 + next_seq 431 + 432 + let iter t ~f = 433 + let rec loop () = 434 + match next t with 435 + | Ok (Some event) -> 436 + f event; 437 + loop () 438 + | Ok None -> loop () 439 + | Error e -> Error e 440 + in 441 + loop ()
+5
test/dune
··· 37 37 (name test_large_body) 38 38 (libraries hcs eio_main) 39 39 (modules test_large_body)) 40 + 41 + (test 42 + (name test_sse_client) 43 + (package hcs) 44 + (libraries hcs alcotest eio_main))
+168
test/test_hcs.ml
··· 1025 1025 ] 1026 1026 end 1027 1027 1028 + module Test_security_fuzz = struct 1029 + let arbitrary_bytes = 1030 + QCheck.( 1031 + make ~print:String.escaped 1032 + Gen.(string_size ~gen:(char_range '\x00' '\xff') (0 -- 1000))) 1033 + 1034 + let url_like_string = 1035 + QCheck.( 1036 + make ~print:String.escaped 1037 + Gen.( 1038 + string_size 1039 + ~gen: 1040 + (oneof_weighted 1041 + [ 1042 + (10, char_range 'a' 'z'); 1043 + (10, char_range 'A' 'Z'); 1044 + (10, char_range '0' '9'); 1045 + (5, return '&'); 1046 + (5, return '='); 1047 + (5, return '%'); 1048 + (3, return '+'); 1049 + (3, return '/'); 1050 + (3, return '?'); 1051 + (2, return ':'); 1052 + (2, return '#'); 1053 + (1, return '\x00'); 1054 + (1, return ' '); 1055 + (1, return '\n'); 1056 + (1, return '\r'); 1057 + ]) 1058 + (0 -- 500))) 1059 + 1060 + let fuzz_query_string_no_crash = 1061 + QCheck.Test.make ~name:"fuzz: query string never crashes" ~count:1000 1062 + url_like_string (fun input -> 1063 + let _ = Hcs.Request.parse_query_string input in 1064 + true) 1065 + 1066 + let fuzz_query_string_arbitrary_bytes = 1067 + QCheck.Test.make ~name:"fuzz: query string handles arbitrary bytes" 1068 + ~count:500 arbitrary_bytes (fun input -> 1069 + let _ = Hcs.Request.parse_query_string input in 1070 + true) 1071 + 1072 + let fuzz_query_string_result_valid = 1073 + QCheck.Test.make ~name:"fuzz: query string returns valid assoc list" 1074 + ~count:500 url_like_string (fun input -> 1075 + let result = Hcs.Request.parse_query_string input in 1076 + List.for_all 1077 + (fun (k, v) -> 1078 + let _ = String.length k in 1079 + let _ = String.length v in 1080 + true) 1081 + result) 1082 + 1083 + let fuzz_form_body_no_crash = 1084 + QCheck.Test.make ~name:"fuzz: form body never crashes" ~count:1000 1085 + url_like_string (fun input -> 1086 + let _ = Hcs.Request.parse_form_body input in 1087 + true) 1088 + 1089 + let fuzz_form_body_arbitrary_bytes = 1090 + QCheck.Test.make ~name:"fuzz: form body handles arbitrary bytes" ~count:500 1091 + arbitrary_bytes (fun input -> 1092 + let _ = Hcs.Request.parse_form_body input in 1093 + true) 1094 + 1095 + let fuzz_form_decode_no_crash = 1096 + QCheck.Test.make ~name:"fuzz: form_decode never crashes" ~count:1000 1097 + arbitrary_bytes (fun input -> 1098 + let _ = Hcs.Request.form_decode input in 1099 + true) 1100 + 1101 + let fuzz_router_parse_path_no_crash = 1102 + QCheck.Test.make ~name:"fuzz: router parse_path never crashes" ~count:1000 1103 + url_like_string (fun input -> 1104 + let _ = Hcs.Router.parse_path input in 1105 + true) 1106 + 1107 + let fuzz_router_parse_path_arbitrary = 1108 + QCheck.Test.make ~name:"fuzz: router parse_path handles arbitrary bytes" 1109 + ~count:500 arbitrary_bytes (fun input -> 1110 + let _ = Hcs.Router.parse_path input in 1111 + true) 1112 + 1113 + let fuzz_router_lookup_no_crash = 1114 + QCheck.Test.make ~name:"fuzz: router lookup never crashes" ~count:500 1115 + url_like_string (fun input -> 1116 + let router = Hcs.Router.empty () in 1117 + Hcs.Router.add_route router ~method_:(Some `GET) ~path:"/test/:id" 1118 + ~handler:"h" ~plugs:[]; 1119 + let _ = Hcs.Router.lookup router ~method_:`GET ~path:input in 1120 + true) 1121 + 1122 + let fuzz_query_string_long_input = 1123 + QCheck.Test.make ~name:"fuzz: query string handles long inputs" ~count:50 1124 + QCheck.(make Gen.(string_size ~gen:printable (5000 -- 10000))) 1125 + (fun input -> 1126 + let _ = Hcs.Request.parse_query_string input in 1127 + true) 1128 + 1129 + let fuzz_form_body_long_input = 1130 + QCheck.Test.make ~name:"fuzz: form body handles long inputs" ~count:50 1131 + QCheck.(make Gen.(string_size ~gen:printable (5000 -- 10000))) 1132 + (fun input -> 1133 + let _ = Hcs.Request.parse_form_body input in 1134 + true) 1135 + 1136 + let fuzz_query_many_equals = 1137 + QCheck.Test.make ~name:"fuzz: query string handles many = signs" ~count:100 1138 + QCheck.(int_range 1 1000) 1139 + (fun n -> 1140 + let input = String.make n '=' in 1141 + let _ = Hcs.Request.parse_query_string input in 1142 + true) 1143 + 1144 + let fuzz_query_many_ampersands = 1145 + QCheck.Test.make ~name:"fuzz: query string handles many & signs" ~count:100 1146 + QCheck.(int_range 1 1000) 1147 + (fun n -> 1148 + let input = String.make n '&' in 1149 + let _ = Hcs.Request.parse_query_string input in 1150 + true) 1151 + 1152 + let fuzz_path_many_slashes = 1153 + QCheck.Test.make ~name:"fuzz: router handles many slashes" ~count:100 1154 + QCheck.(int_range 1 1000) 1155 + (fun n -> 1156 + let input = String.make n '/' in 1157 + let _ = Hcs.Router.parse_path input in 1158 + true) 1159 + 1160 + let fuzz_percent_encoding_incomplete = 1161 + QCheck.Test.make ~name:"fuzz: query handles incomplete percent encoding" 1162 + ~count:200 1163 + QCheck.(pair (int_range 0 100) (int_range 0 15)) 1164 + (fun (prefix_len, hex_char) -> 1165 + let prefix = String.make prefix_len 'a' in 1166 + let incomplete = 1167 + if hex_char < 10 then 1168 + Printf.sprintf "%s%%%c" prefix (Char.chr (48 + hex_char)) 1169 + else Printf.sprintf "%s%%" prefix 1170 + in 1171 + let _ = Hcs.Request.parse_query_string incomplete in 1172 + true) 1173 + 1174 + let tests = 1175 + List.map QCheck_alcotest.to_alcotest 1176 + [ 1177 + fuzz_query_string_no_crash; 1178 + fuzz_query_string_arbitrary_bytes; 1179 + fuzz_query_string_result_valid; 1180 + fuzz_query_string_long_input; 1181 + fuzz_query_many_equals; 1182 + fuzz_query_many_ampersands; 1183 + fuzz_percent_encoding_incomplete; 1184 + fuzz_form_body_no_crash; 1185 + fuzz_form_body_arbitrary_bytes; 1186 + fuzz_form_decode_no_crash; 1187 + fuzz_form_body_long_input; 1188 + fuzz_router_parse_path_no_crash; 1189 + fuzz_router_parse_path_arbitrary; 1190 + fuzz_router_lookup_no_crash; 1191 + fuzz_path_many_slashes; 1192 + ] 1193 + end 1194 + 1028 1195 (* ================================================================== *) 1029 1196 (* Property-Based Tests (QCheck) *) 1030 1197 (* ================================================================== *) ··· 1154 1321 ("Tls_config", Test_tls_config.tests); 1155 1322 ("Request", Test_request.tests); 1156 1323 ("Properties", Test_properties.tests); 1324 + ("Security_fuzz", Test_security_fuzz.tests); 1157 1325 ]
+386
test/test_sse_client.ml
··· 1 + open Alcotest 2 + 3 + module Test_sse_parsing = struct 4 + module Sse = Hcs.Sse_client 5 + 6 + let test_make_event () = 7 + let ev = Sse.make_event ~event_type:"message" ~id:"1" "hello" in 8 + check (option string) "event_type" (Some "message") ev.event_type; 9 + check string "data" "hello" ev.data; 10 + check (option string) "id" (Some "1") ev.id; 11 + check (option int) "retry" None ev.retry 12 + 13 + let test_make_event_with_retry () = 14 + let ev = Sse.make_event ~retry:5000 "test" in 15 + check (option int) "retry" (Some 5000) ev.retry 16 + 17 + let test_error_to_string () = 18 + check string "connection_failed" "Connection failed: test" 19 + (Sse.error_to_string (Sse.Connection_failed "test")); 20 + check string "http_error" "HTTP error 404" 21 + (Sse.error_to_string (Sse.Http_error { status = 404; body = None })); 22 + check string "http_error_with_body" "HTTP error 500: server error" 23 + (Sse.error_to_string 24 + (Sse.Http_error { status = 500; body = Some "server error" })); 25 + check string "protocol_error" "Protocol error: bad format" 26 + (Sse.error_to_string (Sse.Protocol_error "bad format")); 27 + check string "closed" "Connection closed" (Sse.error_to_string Sse.Closed) 28 + 29 + let tests = 30 + [ 31 + test_case "make_event basic" `Quick test_make_event; 32 + test_case "make_event with retry" `Quick test_make_event_with_retry; 33 + test_case "error_to_string" `Quick test_error_to_string; 34 + ] 35 + end 36 + 37 + module Test_sse_integration = struct 38 + let get_port () = 17000 + (Unix.getpid () mod 1000) + Random.int 100 39 + 40 + let run_server ~sw ~net ~clock ~port handler = 41 + let server_config = 42 + Hcs.Server. 43 + { default_config with host = "127.0.0.1"; port; gc_tuning = None } 44 + in 45 + Eio.Fiber.fork_daemon ~sw (fun[@warning "-21"] () -> 46 + Hcs.Server.run ~sw ~net ~config:server_config handler; 47 + `Stop_daemon); 48 + Eio.Time.sleep clock 0.05 49 + 50 + let test_basic_connection () = 51 + Eio_main.run @@ fun env -> 52 + let net = Eio.Stdenv.net env in 53 + let clock = Eio.Stdenv.clock env in 54 + let port = get_port () in 55 + 56 + Eio.Switch.run @@ fun sw -> 57 + let events_sent = ref 0 in 58 + let events_received = ref 0 in 59 + 60 + let server_handler _req = 61 + let generator () = 62 + if !events_sent < 3 then begin 63 + incr events_sent; 64 + Some (Hcs.Sse.event (Printf.sprintf "event %d" !events_sent)) 65 + end 66 + else None 67 + in 68 + Hcs.Sse.respond generator 69 + in 70 + 71 + run_server ~sw ~net ~clock ~port server_handler; 72 + 73 + let url = Printf.sprintf "http://127.0.0.1:%d/" port in 74 + match Hcs.Sse_client.connect ~sw ~net ~clock url with 75 + | Error e -> Alcotest.fail (Hcs.Sse_client.error_to_string e) 76 + | Ok conn -> 77 + let rec read_events count = 78 + if count >= 3 then () 79 + else 80 + match Hcs.Sse_client.next conn with 81 + | Ok (Some ev) -> 82 + incr events_received; 83 + check string "data format" 84 + (Printf.sprintf "event %d" !events_received) 85 + ev.data; 86 + read_events (count + 1) 87 + | Ok None -> read_events count 88 + | Error e -> Alcotest.fail (Hcs.Sse_client.error_to_string e) 89 + in 90 + read_events 0; 91 + Hcs.Sse_client.close conn; 92 + check int "events received" 3 !events_received 93 + 94 + let test_event_types () = 95 + Eio_main.run @@ fun env -> 96 + let net = Eio.Stdenv.net env in 97 + let clock = Eio.Stdenv.clock env in 98 + let port = get_port () in 99 + 100 + Eio.Switch.run @@ fun sw -> 101 + let events_sent = ref 0 in 102 + 103 + let server_handler _req = 104 + let generator () = 105 + incr events_sent; 106 + match !events_sent with 107 + | 1 -> Some (Hcs.Sse.event_typed ~event_type:"ping" "") 108 + | 2 -> Some (Hcs.Sse.event_typed ~event_type:"update" "data here") 109 + | 3 -> Some (Hcs.Sse.make ~event_type:"custom" ~id:"msg-1" "payload") 110 + | _ -> None 111 + in 112 + Hcs.Sse.respond generator 113 + in 114 + 115 + run_server ~sw ~net ~clock ~port server_handler; 116 + 117 + let url = Printf.sprintf "http://127.0.0.1:%d/" port in 118 + match Hcs.Sse_client.connect ~sw ~net ~clock url with 119 + | Error e -> Alcotest.fail (Hcs.Sse_client.error_to_string e) 120 + | Ok conn -> 121 + let events = ref [] in 122 + let rec read_events count = 123 + if count >= 3 then () 124 + else 125 + match Hcs.Sse_client.next conn with 126 + | Ok (Some ev) -> 127 + events := ev :: !events; 128 + read_events (count + 1) 129 + | Ok None -> read_events count 130 + | Error e -> Alcotest.fail (Hcs.Sse_client.error_to_string e) 131 + in 132 + read_events 0; 133 + Hcs.Sse_client.close conn; 134 + 135 + let events = List.rev !events in 136 + check int "event count" 3 (List.length events); 137 + 138 + let ev1 = List.nth events 0 in 139 + check (option string) "ev1 type" (Some "ping") ev1.event_type; 140 + check string "ev1 data" "" ev1.data; 141 + 142 + let ev2 = List.nth events 1 in 143 + check (option string) "ev2 type" (Some "update") ev2.event_type; 144 + check string "ev2 data" "data here" ev2.data; 145 + 146 + let ev3 = List.nth events 2 in 147 + check (option string) "ev3 type" (Some "custom") ev3.event_type; 148 + check string "ev3 data" "payload" ev3.data; 149 + check (option string) "ev3 id" (Some "msg-1") ev3.id 150 + 151 + let test_last_event_id_tracking () = 152 + Eio_main.run @@ fun env -> 153 + let net = Eio.Stdenv.net env in 154 + let clock = Eio.Stdenv.clock env in 155 + let port = get_port () in 156 + 157 + Eio.Switch.run @@ fun sw -> 158 + let events_sent = ref 0 in 159 + 160 + let server_handler _req = 161 + let generator () = 162 + incr events_sent; 163 + if !events_sent <= 3 then 164 + Some 165 + (Hcs.Sse.make 166 + ~id:(Printf.sprintf "id-%d" !events_sent) 167 + (Printf.sprintf "msg %d" !events_sent)) 168 + else None 169 + in 170 + Hcs.Sse.respond generator 171 + in 172 + 173 + run_server ~sw ~net ~clock ~port server_handler; 174 + 175 + let url = Printf.sprintf "http://127.0.0.1:%d/" port in 176 + match Hcs.Sse_client.connect ~sw ~net ~clock url with 177 + | Error e -> Alcotest.fail (Hcs.Sse_client.error_to_string e) 178 + | Ok conn -> 179 + check (option string) "initial last_event_id" None 180 + (Hcs.Sse_client.last_event_id conn); 181 + 182 + let rec read_events count = 183 + if count >= 3 then () 184 + else 185 + match Hcs.Sse_client.next conn with 186 + | Ok (Some _) -> read_events (count + 1) 187 + | Ok None -> read_events count 188 + | Error e -> Alcotest.fail (Hcs.Sse_client.error_to_string e) 189 + in 190 + read_events 0; 191 + 192 + check (option string) "final last_event_id" (Some "id-3") 193 + (Hcs.Sse_client.last_event_id conn); 194 + Hcs.Sse_client.close conn 195 + 196 + let test_multiline_data () = 197 + Eio_main.run @@ fun env -> 198 + let net = Eio.Stdenv.net env in 199 + let clock = Eio.Stdenv.clock env in 200 + let port = get_port () in 201 + 202 + Eio.Switch.run @@ fun sw -> 203 + let sent = ref false in 204 + 205 + let server_handler _req = 206 + let generator () = 207 + if not !sent then begin 208 + sent := true; 209 + Some (Hcs.Sse.event "line1\nline2\nline3") 210 + end 211 + else None 212 + in 213 + Hcs.Sse.respond generator 214 + in 215 + 216 + run_server ~sw ~net ~clock ~port server_handler; 217 + 218 + let url = Printf.sprintf "http://127.0.0.1:%d/" port in 219 + match Hcs.Sse_client.connect ~sw ~net ~clock url with 220 + | Error e -> Alcotest.fail (Hcs.Sse_client.error_to_string e) 221 + | Ok conn -> 222 + (match Hcs.Sse_client.next conn with 223 + | Ok (Some ev) -> 224 + check string "multiline data" "line1\nline2\nline3" ev.data 225 + | Ok None -> Alcotest.fail "expected event" 226 + | Error e -> Alcotest.fail (Hcs.Sse_client.error_to_string e)); 227 + Hcs.Sse_client.close conn 228 + 229 + let test_http_error () = 230 + Eio_main.run @@ fun env -> 231 + let net = Eio.Stdenv.net env in 232 + let clock = Eio.Stdenv.clock env in 233 + let port = get_port () in 234 + 235 + Eio.Switch.run @@ fun sw -> 236 + let server_handler _req = Hcs.Response.not_found () in 237 + 238 + run_server ~sw ~net ~clock ~port server_handler; 239 + 240 + let url = Printf.sprintf "http://127.0.0.1:%d/" port in 241 + match Hcs.Sse_client.connect ~sw ~net ~clock url with 242 + | Ok _ -> Alcotest.fail "expected HTTP error" 243 + | Error (Hcs.Sse_client.Http_error { status; _ }) -> 244 + check int "status code" 404 status 245 + | Error e -> 246 + Alcotest.fail 247 + (Printf.sprintf "expected Http_error, got %s" 248 + (Hcs.Sse_client.error_to_string e)) 249 + 250 + let test_wrong_content_type () = 251 + Eio_main.run @@ fun env -> 252 + let net = Eio.Stdenv.net env in 253 + let clock = Eio.Stdenv.clock env in 254 + let port = get_port () in 255 + 256 + Eio.Switch.run @@ fun sw -> 257 + let server_handler _req = Hcs.Response.json "{}" in 258 + 259 + run_server ~sw ~net ~clock ~port server_handler; 260 + 261 + let url = Printf.sprintf "http://127.0.0.1:%d/" port in 262 + match Hcs.Sse_client.connect ~sw ~net ~clock url with 263 + | Ok _ -> Alcotest.fail "expected protocol error" 264 + | Error (Hcs.Sse_client.Protocol_error msg) -> 265 + check bool "error message not empty" true (String.length msg > 0) 266 + | Error e -> 267 + Alcotest.fail 268 + (Printf.sprintf "expected Protocol_error, got %s" 269 + (Hcs.Sse_client.error_to_string e)) 270 + 271 + let tests = 272 + [ 273 + test_case "basic connection" `Quick test_basic_connection; 274 + test_case "event types" `Quick test_event_types; 275 + test_case "last_event_id tracking" `Quick test_last_event_id_tracking; 276 + test_case "multiline data" `Quick test_multiline_data; 277 + test_case "http error" `Quick test_http_error; 278 + test_case "wrong content type" `Quick test_wrong_content_type; 279 + ] 280 + end 281 + 282 + module Test_event_source = struct 283 + let get_port () = 18000 + (Unix.getpid () mod 1000) + Random.int 100 284 + 285 + let run_server ~sw ~net ~clock ~port handler = 286 + let server_config = 287 + Hcs.Server. 288 + { default_config with host = "127.0.0.1"; port; gc_tuning = None } 289 + in 290 + Eio.Fiber.fork_daemon ~sw (fun[@warning "-21"] () -> 291 + Hcs.Server.run ~sw ~net ~config:server_config handler; 292 + `Stop_daemon); 293 + Eio.Time.sleep clock 0.05 294 + 295 + let test_basic_event_source () = 296 + Eio_main.run @@ fun env -> 297 + let net = Eio.Stdenv.net env in 298 + let clock = Eio.Stdenv.clock env in 299 + let port = get_port () in 300 + 301 + Eio.Switch.run @@ fun sw -> 302 + let events_sent = ref 0 in 303 + 304 + let server_handler _req = 305 + let generator () = 306 + if !events_sent < 5 then begin 307 + incr events_sent; 308 + Some (Hcs.Sse.event (Printf.sprintf "event %d" !events_sent)) 309 + end 310 + else None 311 + in 312 + Hcs.Sse.respond generator 313 + in 314 + 315 + run_server ~sw ~net ~clock ~port server_handler; 316 + 317 + let url = Printf.sprintf "http://127.0.0.1:%d/" port in 318 + let config = 319 + Hcs.Event_source.{ default_config with max_reconnect_attempts = Some 1 } 320 + in 321 + let es = Hcs.Event_source.start ~sw ~net ~clock ~config url in 322 + 323 + let events_received = ref 0 in 324 + while !events_received < 5 do 325 + let _ = Eio.Stream.take (Hcs.Event_source.events es) in 326 + incr events_received 327 + done; 328 + 329 + Hcs.Event_source.close es; 330 + check int "events received" 5 !events_received 331 + 332 + let test_status_callback () = 333 + Eio_main.run @@ fun env -> 334 + let net = Eio.Stdenv.net env in 335 + let clock = Eio.Stdenv.clock env in 336 + let port = get_port () in 337 + 338 + Eio.Switch.run @@ fun sw -> 339 + let events_sent = ref 0 in 340 + 341 + let server_handler _req = 342 + let generator () = 343 + if !events_sent < 3 then begin 344 + incr events_sent; 345 + Some (Hcs.Sse.event (Printf.sprintf "event %d" !events_sent)) 346 + end 347 + else None 348 + in 349 + Hcs.Sse.respond generator 350 + in 351 + 352 + run_server ~sw ~net ~clock ~port server_handler; 353 + 354 + let url = Printf.sprintf "http://127.0.0.1:%d/" port in 355 + let statuses = ref [] in 356 + let config = 357 + Hcs.Event_source.{ default_config with max_reconnect_attempts = Some 0 } 358 + in 359 + let es = Hcs.Event_source.start ~sw ~net ~clock ~config url in 360 + Hcs.Event_source.on_status es (fun s -> statuses := s :: !statuses); 361 + 362 + let _ = Eio.Stream.take (Hcs.Event_source.events es) in 363 + 364 + Hcs.Event_source.close es; 365 + 366 + let saw_connected = 367 + List.exists 368 + (function Hcs.Event_source.Connected -> true | _ -> false) 369 + !statuses 370 + in 371 + check bool "saw Connected status" true saw_connected 372 + 373 + let tests = 374 + [ 375 + test_case "basic event source" `Quick test_basic_event_source; 376 + test_case "status callback" `Quick test_status_callback; 377 + ] 378 + end 379 + 380 + let () = 381 + run "SSE Client" 382 + [ 383 + ("parsing", Test_sse_parsing.tests); 384 + ("sse_client integration", Test_sse_integration.tests); 385 + ("event_source integration", Test_event_source.tests); 386 + ]