this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

more

+662 -195
+4 -1
requests/dune-project
··· 25 25 ca-certs 26 26 mirage-crypto-rng-eio 27 27 uri 28 - yojson)) 28 + yojson 29 + digestif 30 + base64 31 + logs))
+1 -1
requests/lib/dune
··· 1 1 (library 2 2 (public_name requests) 3 3 (name requests) 4 - (libraries eio cohttp-eio tls-eio ca-certs x509 uri yojson logs base64 unix)) 4 + (libraries eio cohttp-eio tls-eio ca-certs x509 uri yojson logs base64 unix digestif))
+652 -193
requests/lib/requests.ml
··· 99 99 | Bearer { token } -> 100 100 Cohttp.Header.add headers "Authorization" (Printf.sprintf "Bearer %s" token) 101 101 | OAuth1 { consumer_key; consumer_secret; token; token_secret; signature_method } -> 102 - (* Simplified OAuth 1.0 - would need proper signature generation *) 103 - Log.warn (fun m -> m "OAuth1 simplified - full implementation needed"); 102 + (* TODO FIXME use mirage-random properly here instead of timestamp/nonce *) 103 + let timestamp = Printf.sprintf "%.0f" (Unix.gettimeofday ()) in 104 + let nonce = Printf.sprintf "%016x%016x" (Random.bits ()) (Random.bits ()) in 105 + 106 + let signature_method_str = match signature_method with 107 + | `HMAC_SHA1 -> "HMAC-SHA1" 108 + | `HMAC_SHA256 -> "HMAC-SHA256" 109 + | `PLAINTEXT -> "PLAINTEXT" in 110 + 104 111 let oauth_params = [ 105 112 ("oauth_consumer_key", consumer_key); 106 - ("oauth_nonce", Printf.sprintf "%d" (Random.int 1000000)); 107 - ("oauth_signature_method", 108 - match signature_method with 109 - | `HMAC_SHA1 -> "HMAC-SHA1" 110 - | `HMAC_SHA256 -> "HMAC-SHA256" 111 - | `PLAINTEXT -> "PLAINTEXT"); 112 - ("oauth_timestamp", Printf.sprintf "%.0f" (Unix.gettimeofday ())); 113 + ("oauth_nonce", nonce); 114 + ("oauth_signature_method", signature_method_str); 115 + ("oauth_timestamp", timestamp); 113 116 ("oauth_version", "1.0"); 117 + ] @ (match token with 118 + | Some t -> [("oauth_token", t)] 119 + | None -> []) in 120 + 121 + (* Build signature base string *) 122 + let method_str = match meth with 123 + | `GET -> "GET" | `POST -> "POST" | `PUT -> "PUT" 124 + | `DELETE -> "DELETE" | `HEAD -> "HEAD" | `OPTIONS -> "OPTIONS" 125 + | `PATCH -> "PATCH" in 126 + 127 + let normalized_url = 128 + let u = Uri.with_port (Uri.with_fragment uri None) None in 129 + Uri.to_string u in 130 + 131 + let params_for_sig = 132 + oauth_params @ 133 + (Uri.query uri |> List.map (fun (k, vs) -> 134 + List.map (fun v -> (k, v)) vs) |> List.flatten) in 135 + 136 + let sorted_params = List.sort (fun (k1, v1) (k2, v2) -> 137 + match String.compare k1 k2 with 138 + | 0 -> String.compare v1 v2 139 + | n -> n) params_for_sig in 140 + 141 + let param_string = sorted_params 142 + |> List.map (fun (k, v) -> 143 + Printf.sprintf "%s=%s" (Uri.pct_encode k) (Uri.pct_encode v)) 144 + |> String.concat "&" in 145 + 146 + let base_string = String.concat "&" [ 147 + Uri.pct_encode method_str; 148 + Uri.pct_encode normalized_url; 149 + Uri.pct_encode param_string 114 150 ] in 115 - let oauth_header = String.concat ", " 116 - (List.map (fun (k, v) -> Printf.sprintf "%s=\"%s\"" k (Uri.pct_encode v)) oauth_params) in 117 - Cohttp.Header.add headers "Authorization" (Printf.sprintf "OAuth %s" oauth_header) 151 + 152 + (* Generate signature *) 153 + let signature = match signature_method with 154 + | `PLAINTEXT -> 155 + Printf.sprintf "%s&%s" 156 + (Uri.pct_encode consumer_secret) 157 + (Uri.pct_encode (Option.value ~default:"" token_secret)) 158 + | `HMAC_SHA1 -> 159 + let signing_key = Printf.sprintf "%s&%s" 160 + (Uri.pct_encode consumer_secret) 161 + (Uri.pct_encode (Option.value ~default:"" token_secret)) in 162 + let raw_sig = Digestif.SHA1.hmac_string ~key:signing_key base_string in 163 + Base64.encode_string (Digestif.SHA1.to_raw_string raw_sig) 164 + | `HMAC_SHA256 -> 165 + let signing_key = Printf.sprintf "%s&%s" 166 + (Uri.pct_encode consumer_secret) 167 + (Uri.pct_encode (Option.value ~default:"" token_secret)) in 168 + let raw_sig = Digestif.SHA256.hmac_string ~key:signing_key base_string in 169 + Base64.encode_string (Digestif.SHA256.to_raw_string raw_sig) 170 + in 171 + 172 + let full_oauth_params = ("oauth_signature", signature) :: oauth_params in 173 + let oauth_header = "OAuth " ^ String.concat ", " 174 + (List.map (fun (k, v) -> 175 + Printf.sprintf "%s=\"%s\"" k (Uri.pct_encode v)) 176 + (List.sort (fun (k1,_) (k2,_) -> String.compare k1 k2) full_oauth_params)) in 177 + Cohttp.Header.add headers "Authorization" oauth_header 118 178 | OAuth2 { client_id; client_secret; token_type; access_token } -> 179 + (* OAuth2 can use client credentials in headers for some flows *) 180 + let headers = 181 + match client_id, client_secret with 182 + | Some id, Some secret when id <> "" && secret <> "" -> 183 + (* Add client credentials as basic auth for token endpoint requests *) 184 + let encoded = Base64.encode_string (Printf.sprintf "%s:%s" id secret) in 185 + Cohttp.Header.add headers "X-Client-Authorization" (Printf.sprintf "Basic %s" encoded) 186 + | _ -> headers 187 + in 119 188 Cohttp.Header.add headers "Authorization" (Printf.sprintf "%s %s" token_type access_token) 120 189 | Custom f -> f meth uri headers 121 190 end ··· 152 221 let code = Cohttp.Code.code_of_status t.status in 153 222 code >= 500 && code < 600 154 223 155 - let pp ppf t = 224 + let _pp ppf t = 156 225 Format.fprintf ppf "@[<v>Response:@,Status: %s@,Headers: %d@,Body: %d bytes@]" 157 226 (Cohttp.Code.string_of_status t.status) 158 227 (Cohttp.Header.to_lines t.headers |> List.length) 159 228 (String.length t.body) 160 229 end 161 230 231 + (* Retry Implementation *) 232 + module Retry = struct 233 + type backoff = { 234 + factor : float; 235 + jitter : float; 236 + max : float; 237 + } 238 + 239 + type history = { 240 + method_ : meth; 241 + url : Uri.t; 242 + error : exn option; 243 + status : int option; 244 + redirect_location : string option; 245 + } 246 + 247 + type t = { 248 + total : int; 249 + connect : int option; 250 + read : int option; 251 + redirect : int option; 252 + status : int option; 253 + other : int option; 254 + allowed_methods : meth list; 255 + status_forcelist : int list; 256 + backoff : backoff; 257 + raise_on_redirect : bool; 258 + raise_on_status : bool; 259 + respect_retry_after : bool; 260 + remove_headers_on_redirect : string list; 261 + history : history list; 262 + mutable retry_count : int; 263 + } 264 + 265 + let default_backoff = { factor = 0.0; jitter = 0.0; max = 120.0 } 266 + let default_allowed_methods = [`HEAD; `GET; `PUT; `DELETE; `OPTIONS] 267 + let default_remove_headers = ["Cookie"; "Authorization"; "Proxy-Authorization"] 268 + 269 + let default = { 270 + total = 10; 271 + connect = None; 272 + read = None; 273 + redirect = None; 274 + status = None; 275 + other = None; 276 + allowed_methods = default_allowed_methods; 277 + status_forcelist = []; 278 + backoff = default_backoff; 279 + raise_on_redirect = true; 280 + raise_on_status = true; 281 + respect_retry_after = true; 282 + remove_headers_on_redirect = default_remove_headers; 283 + history = []; 284 + retry_count = 0; 285 + } 286 + 287 + let create ?total ?(connect=None) ?(read=None) ?(redirect=None) ?(status=None) ?(other=None) 288 + ?(allowed_methods=default_allowed_methods) 289 + ?(status_forcelist=[]) 290 + ?(backoff=default_backoff) 291 + ?(raise_on_redirect=true) 292 + ?(raise_on_status=true) 293 + ?(respect_retry_after=true) 294 + ?(remove_headers_on_redirect=default_remove_headers) () = 295 + let total = Option.value total ~default:10 in 296 + { total; connect; read; redirect; status; other; 297 + allowed_methods; status_forcelist; backoff; 298 + raise_on_redirect; raise_on_status; respect_retry_after; 299 + remove_headers_on_redirect; history = []; retry_count = 0 } 300 + 301 + let disabled = { default with total = 0 } 302 + 303 + let get_history t = t.history 304 + 305 + let increment t ~method_ ~url ?response ?error () = 306 + let status = Option.map (fun r -> 307 + Cohttp.Code.code_of_status (Response.status r)) response in 308 + let redirect_location = match response with 309 + | Some r -> Cohttp.Header.get (Response.headers r) "location" 310 + | None -> None 311 + in 312 + let history_entry = { method_; url; error; status; redirect_location } in 313 + { t with history = history_entry :: t.history; retry_count = t.retry_count + 1 } 314 + 315 + let is_retry t ~method_ ~status_code = 316 + if t.retry_count >= t.total then false 317 + else if not (List.mem method_ t.allowed_methods) then false 318 + else List.mem status_code t.status_forcelist 319 + 320 + let get_backoff_time t = 321 + if t.backoff.factor = 0.0 then 0.0 322 + else 323 + let base_time = t.backoff.factor *. (2.0 ** float_of_int t.retry_count) in 324 + let jittered = base_time +. Random.float t.backoff.jitter in 325 + min jittered t.backoff.max 326 + 327 + let sleep ~sw:_ t response = 328 + let backoff_time = 329 + match t.respect_retry_after, response with 330 + | true, Some resp -> 331 + (match Cohttp.Header.get (Response.headers resp) "retry-after" with 332 + | Some retry_after -> 333 + (try float_of_string retry_after with _ -> get_backoff_time t) 334 + | None -> get_backoff_time t) 335 + | _ -> get_backoff_time t 336 + in 337 + if backoff_time > 0.0 then 338 + Unix.sleepf backoff_time 339 + end 340 + 162 341 module Config = struct 163 342 type t = { 164 343 headers : Cohttp.Header.t; ··· 184 363 let with_follow_redirects t follow_redirects = { t with follow_redirects } 185 364 let with_max_redirects t max_redirects = { t with max_redirects } 186 365 let with_verify_tls t verify_tls = { t with verify_tls } 187 - let with_auth t auth = { t with auth } 366 + let _with_auth t auth = { t with auth } 188 367 189 - let pp ppf t = 368 + let _pp ppf t = 190 369 Format.fprintf ppf "@[<v>Config:@,Redirects: %b (max %d)@,Timeout: %a@,TLS verify: %b@]" 191 370 t.follow_redirects t.max_redirects 192 371 (fun ppf -> function None -> Format.fprintf ppf "none" | Some f -> Format.fprintf ppf "%.2fs" f) t.timeout ··· 208 387 209 388 let insecure () = Insecure 210 389 211 - let pp_config ppf = function 390 + let _pp_config ppf = function 212 391 | Default -> Format.fprintf ppf "Default TLS" 213 392 | WithCaCerts _ -> Format.fprintf ppf "Custom CA certs" 214 393 | Custom _ -> Format.fprintf ppf "Custom TLS config" ··· 290 469 let new_uri = Uri.resolve "" uri (Uri.of_string location) in 291 470 request_with_redirects ~sw client config new_uri (redirect_count + 1) meth body 292 471 | None -> 293 - let buf_reader = Buf_read.of_flow ~max_size:(16 * 1024 * 1024) response_body in 294 - let body = Buf_read.(parse_exn take_all) buf_reader in 472 + let body = Eio.Flow.read_all response_body in 295 473 { Response.status; headers; body; body_stream = None } 296 474 else 297 - let buf_reader = Buf_read.of_flow ~max_size:(16 * 1024 * 1024) response_body in 298 - let body = Buf_read.(parse_exn take_all) buf_reader in 475 + let body = Eio.Flow.read_all response_body in 299 476 { Response.status; headers; body; body_stream = None } 300 477 301 - let request ~sw t ?(config=Config.default) ?body ~meth uri = 478 + let rec request_with_retries ~sw t ?(config=Config.default) ?body ~meth uri retry_state = 302 479 let client = make_client t.net t.tls_config in 303 480 let merged_headers = merge_headers t.default_headers config.Config.headers in 304 481 (* Apply authentication *) ··· 308 485 try 309 486 let result = request_with_redirects ~sw client config uri 0 meth body in 310 487 if not (Response.is_success result) then 311 - raise (Request_error (Http_error { status = result.Response.status; body = result.Response.body; headers = result.Response.headers })); 312 - result 488 + let status = Cohttp.Code.code_of_status result.Response.status in 489 + if Retry.is_retry retry_state ~method_:meth ~status_code:status then begin 490 + Log.info (fun m -> m "Retrying request to %a (attempt %d/%d)" 491 + Uri.pp uri (retry_state.Retry.retry_count + 1) retry_state.Retry.total); 492 + let retry_state = Retry.increment retry_state ~method_:meth ~url:uri 493 + ~response:result () in 494 + Retry.sleep ~sw retry_state (Some result); 495 + request_with_retries ~sw t ~config ?body ~meth uri retry_state 496 + end else 497 + raise (Request_error (Http_error { 498 + status = result.Response.status; 499 + body = result.Response.body; 500 + headers = result.Response.headers 501 + })) 502 + else 503 + result 313 504 with 314 505 | Request_error _ as e -> raise e 315 - | e -> raise (Request_error (Connection_error (Printexc.to_string e))) 506 + | e -> 507 + (* Check if we should retry on connection errors *) 508 + if retry_state.Retry.retry_count < retry_state.Retry.total && 509 + List.mem meth retry_state.Retry.allowed_methods then begin 510 + Log.info (fun m -> m "Retrying request to %a after error: %s (attempt %d/%d)" 511 + Uri.pp uri (Printexc.to_string e) 512 + (retry_state.Retry.retry_count + 1) retry_state.Retry.total); 513 + let retry_state = Retry.increment retry_state ~method_:meth ~url:uri 514 + ~error:(Request_error (Connection_error (Printexc.to_string e))) () in 515 + Retry.sleep ~sw retry_state None; 516 + request_with_retries ~sw t ~config ?body ~meth uri retry_state 517 + end else 518 + raise (Request_error (Connection_error (Printexc.to_string e))) 519 + 520 + let request ~sw t ?(config=Config.default) ?body ~meth uri = 521 + let retry_state = Retry.default in 522 + request_with_retries ~sw t ~config ?body ~meth uri retry_state 316 523 317 524 let get ~sw t ?config uri = 318 525 request ~sw t ?config ~meth:`GET uri ··· 379 586 post ~sw t ?config ~body uri 380 587 381 588 module Session = struct 589 + type cookie = { 590 + name : string; 591 + value : string; 592 + domain : string option; 593 + path : string option; 594 + expires : float option; (* Unix timestamp *) 595 + secure : bool; 596 + http_only : bool; 597 + } 598 + 382 599 type 'a session = { 383 600 client : 'a t; 384 - cookies : (string * string) list ref; 601 + cookies : cookie list ref; 385 602 } constraint 'a = [> `Generic] Net.ty 386 603 387 604 type 'a t = 'a session constraint 'a = [> `Generic] Net.ty ··· 390 607 { client = create ?tls_config ?default_headers net; 391 608 cookies = ref [] } 392 609 610 + let parse_cookie_header cookie_str = 611 + let parts = String.split_on_char ';' cookie_str |> List.map String.trim in 612 + match parts with 613 + | [] -> None 614 + | kv :: attrs -> 615 + match String.split_on_char '=' kv with 616 + | [k; v] -> 617 + let name = String.trim k in 618 + let value = String.trim v in 619 + let rec parse_attrs attrs cookie = 620 + match attrs with 621 + | [] -> cookie 622 + | attr :: rest -> 623 + let cookie' = 624 + match String.lowercase_ascii attr with 625 + | "secure" -> { cookie with secure = true } 626 + | "httponly" -> { cookie with http_only = true } 627 + | s when String.starts_with ~prefix:"domain=" s -> 628 + let domain = String.sub s 7 (String.length s - 7) in 629 + { cookie with domain = Some domain } 630 + | s when String.starts_with ~prefix:"path=" s -> 631 + let path = String.sub s 5 (String.length s - 5) in 632 + { cookie with path = Some path } 633 + | s when String.starts_with ~prefix:"expires=" s -> 634 + (* Simple expiry parsing - could be improved *) 635 + { cookie with expires = Some (Unix.gettimeofday () +. 3600.0) } 636 + | _ -> cookie 637 + in 638 + parse_attrs rest cookie' 639 + in 640 + let base_cookie = { 641 + name; value; 642 + domain = None; path = None; expires = None; 643 + secure = false; http_only = false 644 + } in 645 + Some (parse_attrs attrs base_cookie) 646 + | _ -> None 647 + 393 648 let update_cookies t headers = 394 649 let new_cookies = Cohttp.Header.get_multi headers "set-cookie" 395 - |> List.filter_map (fun cookie -> 396 - match String.split_on_char ';' cookie with 397 - | kv :: _ -> 398 - (match String.split_on_char '=' kv with 399 - | [k; v] -> Some (String.trim k, String.trim v) 400 - | _ -> None) 401 - | _ -> None) 650 + |> List.filter_map parse_cookie_header 402 651 in 403 - t.cookies := new_cookies @ !(t.cookies) 652 + (* Replace existing cookies with same name *) 653 + let updated = List.fold_left (fun acc new_cookie -> 654 + let filtered = List.filter (fun c -> c.name <> new_cookie.name) acc in 655 + new_cookie :: filtered 656 + ) !(t.cookies) new_cookies in 657 + t.cookies := updated 404 658 405 659 let add_cookies config cookies = 406 660 if cookies = [] then config else 407 - let cookie_header = 408 - cookies 409 - |> List.map (fun (k, v) -> Printf.sprintf "%s=%s" k v) 410 - |> String.concat "; " 411 - in 412 - Config.add_header "Cookie" cookie_header config 661 + (* Filter out expired cookies *) 662 + let now = Unix.gettimeofday () in 663 + let valid_cookies = cookies |> List.filter (fun c -> 664 + match c.expires with 665 + | Some exp when exp < now -> false 666 + | _ -> true 667 + ) in 668 + if valid_cookies = [] then config else 669 + let cookie_header = 670 + valid_cookies 671 + |> List.map (fun c -> Printf.sprintf "%s=%s" c.name c.value) 672 + |> String.concat "; " 673 + in 674 + Config.add_header "Cookie" cookie_header config 413 675 414 676 let request_with_cookies ~sw t ?config ~meth ?body uri = 415 677 let config = ··· 427 689 let post ~sw t ?config ?body uri = 428 690 request_with_cookies ~sw t ?config ~meth:`POST ?body uri 429 691 430 - let cookies t = !(t.cookies) 692 + let cookies t = 693 + (* Return valid cookies as (name, value) pairs for compatibility *) 694 + let now = Unix.gettimeofday () in 695 + !(t.cookies) 696 + |> List.filter (fun c -> 697 + match c.expires with 698 + | Some exp when exp < now -> false 699 + | _ -> true) 700 + |> List.map (fun c -> (c.name, c.value)) 701 + 431 702 let clear_cookies t = t.cookies := [] 432 703 end 433 704 ··· 454 725 455 726 (* Connection Pool Implementation *) 456 727 module ConnectionPool = struct 728 + type connection_state = 729 + | Idle 730 + | Active 731 + | Closed 732 + 733 + type connection = { 734 + client : Cohttp_eio.Client.t; 735 + mutable state : connection_state; 736 + mutable last_used : float; 737 + mutable request_count : int; 738 + } 739 + 457 740 type t = Pool : { 458 741 sw : Switch.t; 459 742 scheme : string; ··· 462 745 net : 'a Net.t; 463 746 tls_config : Tls.config option; 464 747 config : config; 465 - pool : Cohttp_eio.Client.t Eio.Pool.t; 466 - mutable num_connections : int; 467 - mutable num_requests : int; 748 + mutable connections : connection Queue.t; 749 + mutable active_connections : int; 750 + mutable total_connections_created : int; 751 + mutable total_requests : int; 752 + mutex : Eio.Mutex.t; 753 + available : Eio.Condition.t; 468 754 } -> t 469 755 470 756 and config = { ··· 472 758 block : bool; 473 759 retries : int; 474 760 timeout : float option; 761 + max_requests_per_connection : int option; 762 + connection_timeout : float; 475 763 } 476 764 477 765 let default_config = { ··· 479 767 block = false; 480 768 retries = 3; 481 769 timeout = None; 770 + max_requests_per_connection = Some 100; 771 + connection_timeout = 60.0; 482 772 } 483 773 484 774 let create ~sw ?(config=default_config) ?tls_config ~scheme ~host ~port net = 485 775 Log.debug (fun m -> m "Creating connection pool for %s://%s:%d" scheme host port); 776 + Pool { 777 + sw; 778 + scheme; 779 + host; 780 + port; 781 + net; 782 + tls_config; 783 + config; 784 + connections = Queue.create (); 785 + active_connections = 0; 786 + total_connections_created = 0; 787 + total_requests = 0; 788 + mutex = Eio.Mutex.create (); 789 + available = Eio.Condition.create (); 790 + } 486 791 487 - let create_connection () = 488 - make_client net (Option.value ~default:(Tls.default ()) tls_config) 489 - in 792 + let create_new_connection net tls_config = 793 + let client = make_client net (Option.value ~default:(Tls.default ()) tls_config) in 794 + { 795 + client; 796 + state = Idle; 797 + last_used = Unix.gettimeofday (); 798 + request_count = 0; 799 + } 490 800 491 - let pool = Eio.Pool.create config.maxsize create_connection in 801 + let is_connection_valid conn config = 802 + match conn.state with 803 + | Closed -> false 804 + | _ -> 805 + let now = Unix.gettimeofday () in 806 + let age = now -. conn.last_used in 807 + age < config.connection_timeout && 808 + (match config.max_requests_per_connection with 809 + | None -> true 810 + | Some max -> conn.request_count < max) 492 811 493 - Pool { sw; scheme; host; port; net; tls_config; config; pool; 494 - num_connections = 0; num_requests = 0 } 812 + let rec get_connection ~sw (Pool t as pool) = 813 + Eio.Mutex.use_rw ~protect:true t.mutex (fun () -> 814 + (* Try to get an existing connection *) 815 + let rec find_valid_connection () = 816 + if Queue.is_empty t.connections then 817 + None 818 + else 819 + let conn = Queue.pop t.connections in 820 + if is_connection_valid conn t.config then 821 + Some conn 822 + else ( 823 + conn.state <- Closed; 824 + find_valid_connection () 825 + ) 826 + in 495 827 496 - let get_connection ~sw (Pool t) = 497 - t.num_requests <- t.num_requests + 1; 498 - Eio.Pool.use t.pool Fun.id 499 - 500 - let put_connection (Pool t) conn = () 501 - let num_connections (Pool t) = t.num_connections 502 - let num_requests (Pool t) = t.num_requests 503 - let clear (Pool t) = () 504 - end 505 - 506 - (* Retry Implementation *) 507 - module Retry = struct 508 - type backoff = { 509 - factor : float; 510 - jitter : float; 511 - max : float; 512 - } 828 + match find_valid_connection () with 829 + | Some conn -> 830 + conn.state <- Active; 831 + conn.last_used <- Unix.gettimeofday (); 832 + conn.request_count <- conn.request_count + 1; 833 + t.active_connections <- t.active_connections + 1; 834 + t.total_requests <- t.total_requests + 1; 835 + conn.client 836 + | None -> 837 + if t.active_connections >= t.config.maxsize then 838 + if t.config.block then ( 839 + (* Wait for a connection to become available *) 840 + Eio.Condition.await t.available t.mutex; 841 + get_connection ~sw pool 842 + ) else 843 + raise (Request_error Pool_exhausted) 844 + else ( 845 + (* Create a new connection *) 846 + t.active_connections <- t.active_connections + 1; 847 + t.total_connections_created <- t.total_connections_created + 1; 848 + t.total_requests <- t.total_requests + 1; 849 + let conn = create_new_connection t.net t.tls_config in 850 + conn.state <- Active; 851 + conn.request_count <- 1; 852 + conn.client 853 + ) 854 + ) 513 855 514 - type history = { 515 - method_ : meth; 516 - url : Uri.t; 517 - error : exn option; 518 - status : int option; 519 - redirect_location : string option; 520 - } 856 + let put_connection (Pool t) _client = 857 + (* Since we can't track connection metadata with just the client, 858 + we'll simply decrement the active count *) 859 + Eio.Mutex.use_rw ~protect:false t.mutex (fun () -> 860 + t.active_connections <- t.active_connections - 1; 861 + Eio.Condition.broadcast t.available 862 + ) 521 863 522 - type t = { 523 - total : int; 524 - connect : int option; 525 - read : int option; 526 - redirect : int option; 527 - status : int option; 528 - other : int option; 529 - allowed_methods : meth list; 530 - status_forcelist : int list; 531 - backoff : backoff; 532 - raise_on_redirect : bool; 533 - raise_on_status : bool; 534 - respect_retry_after : bool; 535 - remove_headers_on_redirect : string list; 536 - history : history list; 537 - mutable retry_count : int; 538 - } 864 + let num_connections (Pool t) = 865 + Eio.Mutex.use_ro t.mutex (fun () -> 866 + Queue.length t.connections + t.active_connections 867 + ) 539 868 540 - let default_backoff = { factor = 0.0; jitter = 0.0; max = 120.0 } 541 - let default_allowed_methods = [`HEAD; `GET; `PUT; `DELETE; `OPTIONS] 542 - let default_remove_headers = ["Cookie"; "Authorization"; "Proxy-Authorization"] 869 + let num_requests (Pool t) = t.total_requests 543 870 544 - let default = { 545 - total = 10; 546 - connect = None; 547 - read = None; 548 - redirect = None; 549 - status = None; 550 - other = None; 551 - allowed_methods = default_allowed_methods; 552 - status_forcelist = []; 553 - backoff = default_backoff; 554 - raise_on_redirect = true; 555 - raise_on_status = true; 556 - respect_retry_after = true; 557 - remove_headers_on_redirect = default_remove_headers; 558 - history = []; 559 - retry_count = 0; 560 - } 561 - 562 - let create ?total ?(connect=None) ?(read=None) ?(redirect=None) ?(status=None) ?(other=None) 563 - ?(allowed_methods=default_allowed_methods) 564 - ?(status_forcelist=[]) 565 - ?(backoff=default_backoff) 566 - ?(raise_on_redirect=true) 567 - ?(raise_on_status=true) 568 - ?(respect_retry_after=true) 569 - ?(remove_headers_on_redirect=default_remove_headers) () = 570 - let total = Option.value total ~default:10 in 571 - { total; connect; read; redirect; status; other; 572 - allowed_methods; status_forcelist; backoff; 573 - raise_on_redirect; raise_on_status; respect_retry_after; 574 - remove_headers_on_redirect; history = []; retry_count = 0 } 575 - 576 - let disabled = { default with total = 0 } 577 - 578 - let get_history t = t.history 579 - 580 - let increment t ~method_ ~url ?response ?error () = 581 - let status = Option.map (fun r -> 582 - Cohttp.Code.code_of_status (Response.status r)) response in 583 - let redirect_location = match response with 584 - | Some r -> Cohttp.Header.get (Response.headers r) "location" 585 - | None -> None 586 - in 587 - let history_entry = { method_; url; error; status; redirect_location } in 588 - { t with history = history_entry :: t.history; retry_count = t.retry_count + 1 } 589 - 590 - let is_retry t ~method_ ~status_code = 591 - if t.retry_count >= t.total then false 592 - else if not (List.mem method_ t.allowed_methods) then false 593 - else List.mem status_code t.status_forcelist 594 - 595 - let get_backoff_time t = 596 - if t.backoff.factor = 0.0 then 0.0 597 - else 598 - let base_time = t.backoff.factor *. (2.0 ** float_of_int t.retry_count) in 599 - let jittered = base_time +. Random.float t.backoff.jitter in 600 - min jittered t.backoff.max 601 - 602 - let sleep ~sw t response = 603 - let backoff_time = 604 - match t.respect_retry_after, response with 605 - | true, Some resp -> 606 - (match Cohttp.Header.get (Response.headers resp) "retry-after" with 607 - | Some retry_after -> 608 - (try float_of_string retry_after with _ -> get_backoff_time t) 609 - | None -> get_backoff_time t) 610 - | _ -> get_backoff_time t 611 - in 612 - if backoff_time > 0.0 then 613 - Unix.sleepf backoff_time 871 + let clear (Pool t) = 872 + Eio.Mutex.use_rw ~protect:false t.mutex (fun () -> 873 + Queue.iter (fun conn -> conn.state <- Closed) t.connections; 874 + Queue.clear t.connections; 875 + t.active_connections <- 0; 876 + Eio.Condition.broadcast t.available 877 + ) 614 878 end 615 879 616 880 (* Advanced Timeout *) ··· 712 976 | `OPTIONS -> "OPTIONS" | `PATCH -> "PATCH") 713 977 (Uri.to_string url) 714 978 715 - let get t ~method_ ~url ~headers = 979 + let get t ~method_ ~url ~headers:_ = 716 980 let key = make_cache_key ~method_ ~url in 717 981 match t.storage with 718 982 | `Memory storage -> ··· 795 1059 ?(timeout=t.timeout) ?(redirect=true) ?(assert_same_host=false) 796 1060 ?(preload_content=true) ?(decode_content=true) ?chunk_size () = 797 1061 798 - (* Check cache first *) 1062 + (* Validate same host if required *) 1063 + if assert_same_host && redirect then 1064 + Log.warn (fun m -> m "assert_same_host is set with redirects enabled"); 1065 + 1066 + (* Check cache first for GET requests *) 799 1067 let cached_response = 800 - match t.cache with 801 - | Some cache -> 1068 + match t.cache, method_ with 1069 + | Some cache, `GET -> 802 1070 Cache.get cache ~method_ ~url 803 1071 ~headers:(Option.value headers ~default:t.headers) 804 - | None -> None 1072 + | _ -> None 805 1073 in 806 1074 807 1075 match cached_response with ··· 822 1090 | None -> t.headers 823 1091 in 824 1092 1093 + (* Add chunked transfer encoding if chunk_size is specified *) 1094 + let headers = match chunk_size with 1095 + | Some _ -> Cohttp.Header.add headers "Transfer-Encoding" "chunked" 1096 + | None -> headers 1097 + in 1098 + 825 1099 let create_client = fun ?tls_config ?default_headers net -> 826 1100 { net; tls_config = Option.value ~default:(Tls.default ()) tls_config; 827 1101 default_headers = Option.value ~default:(Cohttp.Header.init ()) default_headers } 828 1102 in 829 1103 let req_client = create_client ?tls_config:t.tls_config ~default_headers:t.headers t.net in 830 1104 let config = Config.create ~headers ~follow_redirects:redirect () in 831 - let response = request ~sw req_client ~config ?body ~meth:method_ url in 832 1105 833 - (match t.cache with 834 - | Some cache -> Cache.put cache ~method_ ~url ~response 835 - | None -> ()); 1106 + (* Wrap request with timeout if specified *) 1107 + let make_request () = 1108 + match timeout.Timeout.total with 1109 + | Some _timeout_sec -> 1110 + (* We need access to a clock - assuming t has one or using Eio.Stdenv.clock *) 1111 + (* For now, just make the request without timeout wrapper since we don't have clock access *) 1112 + Result.Ok (request ~sw req_client ~config ?body ~meth:method_ url) 1113 + | None -> 1114 + Result.Ok (request ~sw req_client ~config ?body ~meth:method_ url) 1115 + in 836 1116 837 - ConnectionPool.put_connection pool conn; 838 - response 1117 + (* Execute with retries *) 1118 + let rec execute_with_retries attempt = 1119 + match make_request () with 1120 + | Result.Ok response -> 1121 + (* Process response based on flags *) 1122 + let response = 1123 + if decode_content then 1124 + (* Check for content encoding and decode if needed *) 1125 + match Cohttp.Header.get response.Response.headers "content-encoding" with 1126 + | Some "gzip" | Some "deflate" -> 1127 + Log.info (fun m -> m "Content encoding detected but not yet implemented"); 1128 + response 1129 + | _ -> response 1130 + else response 1131 + in 1132 + 1133 + let response = 1134 + if preload_content then 1135 + (* Content is already loaded in response.body *) 1136 + response 1137 + else 1138 + (* For streaming, we'd need to return a different type *) 1139 + response 1140 + in 1141 + 1142 + (match t.cache, method_ with 1143 + | Some cache, `GET -> Cache.put cache ~method_ ~url ~response 1144 + | _ -> ()); 1145 + 1146 + ConnectionPool.put_connection pool conn; 1147 + response 1148 + 1149 + | Result.Error `Timeout -> 1150 + if attempt < retries.Retry.total then ( 1151 + Log.info (fun m -> m "Request timeout, retry %d/%d" (attempt + 1) retries.Retry.total); 1152 + Unix.sleepf 1.0; 1153 + execute_with_retries (attempt + 1) 1154 + ) else ( 1155 + ConnectionPool.put_connection pool conn; 1156 + raise (Request_error Timeout_error) 1157 + ) 1158 + in 1159 + 1160 + execute_with_retries 0 839 1161 840 1162 let request ~sw t ~method_ ~url ?body ?headers () = 841 1163 urlopen ~sw t ~method_ ~url ?body ?headers () ··· 866 1188 let encode_multipart_formdata ~fields ~boundary = 867 1189 let boundary = Option.value boundary ~default:(choose_boundary ()) in 868 1190 let content_type = Printf.sprintf "multipart/form-data; boundary=%s" boundary in 869 - (content_type, (Flow.string_source "" :> Flow.source_ty Eio.Resource.t)) 1191 + 1192 + let buf = Buffer.create 1024 in 1193 + 1194 + List.iter (fun field -> 1195 + Buffer.add_string buf (Printf.sprintf "--%s\r\n" boundary); 1196 + match field with 1197 + | Text { name; data } -> 1198 + Buffer.add_string buf (Printf.sprintf "Content-Disposition: form-data; name=\"%s\"\r\n\r\n" name); 1199 + Buffer.add_string buf data; 1200 + Buffer.add_string buf "\r\n" 1201 + | File { name; filename; content_type; data } -> 1202 + let filename_str = Option.value ~default:"file" filename in 1203 + let content_type_str = Option.value ~default:"application/octet-stream" content_type in 1204 + Buffer.add_string buf (Printf.sprintf "Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"\r\n" name filename_str); 1205 + Buffer.add_string buf (Printf.sprintf "Content-Type: %s\r\n\r\n" content_type_str); 1206 + (* For now, just read the file data as a string *) 1207 + let file_content = Eio.Flow.read_all data in 1208 + Buffer.add_string buf file_content; 1209 + Buffer.add_string buf "\r\n" 1210 + ) fields; 1211 + 1212 + Buffer.add_string buf (Printf.sprintf "--%s--\r\n" boundary); 1213 + 1214 + let body_content = Buffer.contents buf in 1215 + (content_type, Flow.string_source body_content) 870 1216 end 871 1217 872 1218 (* Progress tracking *) 873 1219 module Progress = struct 874 1220 type t = { 875 - total : int64 option; 1221 + mutable total : int64 option; 876 1222 desc : string option; 877 1223 unit_ : string; 878 1224 width : int; ··· 889 1235 let finish t = 890 1236 Log.info (fun m -> m "Progress complete: %Ld %s" t.current t.unit_) 891 1237 892 - let track_source ~sw t source = source 893 - let track_response t response f = f ~chunk:(Response.body response) 1238 + let track_source ~sw:_ _t source = 1239 + (* Wrap source to track upload progress *) 1240 + Log.info (fun m -> m "Progress tracking for uploads enabled"); 1241 + (* Since we can't easily wrap a Flow source, we'll just pass it through *) 1242 + (* In a real implementation, we'd need to create a custom Flow wrapper *) 1243 + source 1244 + 1245 + let track_response t response f = 1246 + (* Track download progress while processing response *) 1247 + let body = Response.body response in 1248 + let total_size = String.length body |> Int64.of_int in 1249 + 1250 + (* Update progress to show total if not set *) 1251 + (match t.total with 1252 + | None -> t.total <- Some total_size 1253 + | Some _ -> ()); 1254 + 1255 + (* Process body in chunks and track progress *) 1256 + let chunk_size = 8192 in 1257 + let processed = ref 0L in 1258 + 1259 + let process_chunks () = 1260 + let rec iter pos acc = 1261 + if pos >= String.length body then ( 1262 + finish t; 1263 + String.concat "" (List.rev acc) 1264 + ) else ( 1265 + let len = min chunk_size (String.length body - pos) in 1266 + let chunk = String.sub body pos len in 1267 + processed := Int64.add !processed (Int64.of_int len); 1268 + update t (Int64.of_int len); 1269 + 1270 + (* Show percentage if total is known *) 1271 + (match t.total with 1272 + | Some total when total > 0L -> 1273 + let pct = Int64.to_float !processed *. 100.0 /. Int64.to_float total in 1274 + Log.info (fun m -> m "Progress: %.1f%% (%Ld/%Ld %s)" 1275 + pct !processed total t.unit_) 1276 + | _ -> ()); 1277 + 1278 + iter (pos + len) (chunk :: acc) 1279 + ) 1280 + in 1281 + iter 0 [] 1282 + in 1283 + 1284 + let tracked_body = process_chunks () in 1285 + f ~chunk:tracked_body 894 1286 end 895 1287 896 1288 (* Utilities *) ··· 911 1303 let encoded = Base64.encode_string (Printf.sprintf "%s:%s" user pass) in 912 1304 Cohttp.Header.add h "Authorization" (Printf.sprintf "Basic %s" encoded) 913 1305 | None -> h 1306 + in 1307 + let h = match proxy_basic_auth with 1308 + | Some (user, pass) -> 1309 + let encoded = Base64.encode_string (Printf.sprintf "%s:%s" user pass) in 1310 + Cohttp.Header.add h "Proxy-Authorization" (Printf.sprintf "Basic %s" encoded) 1311 + | None -> h 1312 + in 1313 + let h = match keep_alive with 1314 + | Some true -> Cohttp.Header.add h "Connection" "keep-alive" 1315 + | Some false -> Cohttp.Header.add h "Connection" "close" 1316 + | None -> h 1317 + in 1318 + let h = match disable_cache with 1319 + | Some true -> 1320 + h 1321 + |> (fun h -> Cohttp.Header.add h "Cache-Control" "no-cache, no-store, must-revalidate") 1322 + |> (fun h -> Cohttp.Header.add h "Pragma" "no-cache") 1323 + |> (fun h -> Cohttp.Header.add h "Expires" "0") 1324 + | _ -> h 914 1325 in 915 1326 h 916 1327 ··· 932 1343 | None -> false 933 1344 934 1345 let urlencode ?(safe="") params = 1346 + (* Custom encoder that respects the safe characters *) 1347 + let encode_with_safe str = 1348 + if safe = "" then 1349 + Uri.pct_encode str 1350 + else 1351 + (* Encode character by character, skipping safe ones *) 1352 + String.to_seq str 1353 + |> Seq.map (fun c -> 1354 + let s = String.make 1 c in 1355 + if String.contains safe c then s 1356 + else Uri.pct_encode s) 1357 + |> List.of_seq 1358 + |> String.concat "" 1359 + in 935 1360 params 936 1361 |> List.map (fun (k, v) -> 937 - Printf.sprintf "%s=%s" (Uri.pct_encode k) (Uri.pct_encode v)) 1362 + Printf.sprintf "%s=%s" (encode_with_safe k) (encode_with_safe v)) 938 1363 |> String.concat "&" 939 1364 940 1365 let current_time () = Unix.gettimeofday () ··· 944 1369 (* Streaming support *) 945 1370 module Stream = struct 946 1371 let upload ~sw t ?config ?(chunk_size=8192) ~meth uri ~body = 947 - Log.debug (fun m -> m "Streaming upload to %s" (Uri.to_string uri)); 1372 + Log.debug (fun m -> m "Streaming upload to %s with chunk size %d" (Uri.to_string uri) chunk_size); 948 1373 let config = Option.value config ~default:Config.default in 949 1374 let headers = Config.(config.headers) in 950 1375 let headers = Cohttp.Header.add headers "Transfer-Encoding" "chunked" in 951 1376 let config = { config with Config.headers } in 952 1377 953 - (* Use the regular request but with streaming body *) 954 - request ~sw t ~config ~meth uri 1378 + (* For now, just read the entire body and send it *) 1379 + (* A proper implementation would need to create a Flow wrapper *) 1380 + let body_content = Flow.read_all body in 1381 + 1382 + (* Use the regular request with the body *) 1383 + request ~sw t ~config ~body:body_content ~meth uri 955 1384 956 1385 let download ~sw t ?config ?(chunk_size=8192) uri ~sink = 957 - Log.debug (fun m -> m "Streaming download from %s" (Uri.to_string uri)); 1386 + Log.debug (fun m -> m "Streaming download from %s with chunk size %d" (Uri.to_string uri) chunk_size); 958 1387 let response = get ~sw t ?config uri in 959 - (* Write response body to sink *) 960 - Flow.copy_string (Response.body response) sink 1388 + (* Write response body to sink in chunks *) 1389 + let body = Response.body response in 1390 + let rec write_chunks pos = 1391 + if pos < String.length body then 1392 + let len = min chunk_size (String.length body - pos) in 1393 + let chunk = String.sub body pos len in 1394 + Flow.copy_string chunk sink; 1395 + write_chunks (pos + len) 1396 + in 1397 + write_chunks 0 961 1398 962 1399 let iter_response ?(chunk_size=8192) response ~f = 963 1400 let body = Response.body response in ··· 972 1409 973 1410 let lines ?(chunk_size=8192) ?(keep_ends=false) response = 974 1411 let body = Response.body response in 975 - let lines = String.split_on_char '\n' body in 976 - let lines = 977 - if keep_ends then 978 - List.map (fun l -> l ^ "\n") lines 979 - else lines 1412 + (* Process body in chunks to find lines efficiently *) 1413 + let acc = ref [] in 1414 + let buffer = Buffer.create 256 in 1415 + let rec extract_lines pos = 1416 + if pos >= String.length body then ( 1417 + (* Add any remaining buffer content *) 1418 + if Buffer.length buffer > 0 then 1419 + acc := Buffer.contents buffer :: !acc 1420 + ) else ( 1421 + let len = min chunk_size (String.length body - pos) in 1422 + let chunk = String.sub body pos len in 1423 + (* Process chunk character by character to find line breaks *) 1424 + String.iter (fun c -> 1425 + if c = '\n' then ( 1426 + let line = Buffer.contents buffer in 1427 + Buffer.clear buffer; 1428 + if keep_ends then 1429 + acc := (line ^ "\n") :: !acc 1430 + else 1431 + acc := line :: !acc 1432 + ) else 1433 + Buffer.add_char buffer c 1434 + ) chunk; 1435 + extract_lines (pos + len) 1436 + ) 980 1437 in 981 - List.to_seq lines 1438 + extract_lines 0; 1439 + List.rev !acc |> List.to_seq 982 1440 983 1441 let json_stream ?(chunk_size=8192) response = 984 1442 lines ~chunk_size response 985 1443 |> Seq.filter (fun line -> String.trim line <> "") 986 1444 |> Seq.map (fun line -> 987 1445 try Yojson.Safe.from_string line 988 - with e -> 1446 + with _e -> 989 1447 Log.warn (fun m -> m "Failed to parse JSON line: %s" line); 990 1448 `Null) 991 1449 end ··· 999 1457 end 1000 1458 1001 1459 (* Additional exceptions *) 1460 + (* Unused exceptions - kept for potential future use 1002 1461 exception MaxRetryError of { url : Uri.t; reason : string } 1003 1462 exception PoolError of string 1004 - exception Pool_exhausted 1463 + exception Pool_exhausted *)
+2
requests/lib/requests.mli
··· 295 295 block : bool; (* Block when pool exhausted (default: false) *) 296 296 retries : int; (* Number of connection retries (default: 3) *) 297 297 timeout : float option; (* Socket timeout (default: None) *) 298 + max_requests_per_connection : int option; (* Max requests per connection before recycling *) 299 + connection_timeout : float; (* Connection TTL in seconds (default: 60.0) *) 298 300 } 299 301 300 302 val default_config : config
+3
requests/requests.opam
··· 18 18 "mirage-crypto-rng-eio" 19 19 "uri" 20 20 "yojson" 21 + "digestif" 22 + "base64" 23 + "logs" 21 24 "odoc" {with-doc} 22 25 ] 23 26 build: [