objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Service proxying

futurGH 06fa04e8 4448e2b9

+464 -21
+3 -1
bin/main.ml
··· 40 40 let%lwt db = Util.connect_sqlite Util.Constants.pegasus_db_location in 41 41 let%lwt () = Data_store.init db in 42 42 Dream.serve ~interface:"0.0.0.0" ~port:8008 43 - @@ Dream.logger @@ Dream.router 43 + @@ Dream.logger 44 + @@ Xrpc.service_proxy_middleware db 45 + @@ Dream.router 44 46 @@ List.map 45 47 (fun (fn, path, handler) -> 46 48 fn path (fun req -> handler ({req; db} : Xrpc.init)) )
+15
pegasus/lib/auth.ml
··· 56 56 in 57 57 (access, refresh) 58 58 59 + let generate_service_jwt ~did ~service_did ~lxm ~signing_key = 60 + let now_s = int_of_float (Unix.gettimeofday ()) in 61 + let exp = now_s + (60 * 5) in 62 + match 63 + Jwto.encode Jwto.HS256 signing_key 64 + [ ("iss", did) 65 + ; ("aud", service_did) 66 + ; ("lxm", lxm) 67 + ; ("exp", Int.to_string exp) ] 68 + with 69 + | Ok token -> 70 + token 71 + | Error err -> 72 + failwith err 73 + 59 74 let verify_bearer_jwt t token expected_scope = 60 75 match Jwto.decode_and_verify Env.jwt_secret token with 61 76 | Error err ->
+238 -20
pegasus/lib/id_resolver.ml
··· 12 12 let uri = 13 13 Uri.of_string ("https://" ^ handle ^ "/.well-known/atproto-did") 14 14 in 15 - let%lwt _, body = Client.get uri in 16 - let%lwt did = Body.to_string body in 17 - Lwt.return_ok did 15 + let%lwt {status; _}, body = Client.get uri in 16 + match status with 17 + | `OK -> 18 + let%lwt did = Body.to_string body in 19 + Lwt.return_ok did 20 + | _ -> 21 + Lwt.return_error "failed to resolve" 18 22 with exn -> Lwt.return_error (Printexc.to_string exn) 19 23 20 24 let resolve_dns handle = ··· 40 44 Lwt.return_error e 41 45 with exn -> Lwt.return_error (Printexc.to_string exn) 42 46 43 - let resolve handle = 44 - (* run well-known and dns in parallel, error if they return different values, if only one returns just return that value *) 45 - match%lwt Lwt.all [resolve_well_known handle; resolve_dns handle] with 46 - | [Ok did1; Ok did2] when did1 = did2 -> 47 - Lwt.return_ok did1 48 - | [Ok _; Ok _] -> 49 - Lwt.return_error "conflicting dids" 50 - | [Ok did1; _] -> 51 - Lwt.return_ok did1 52 - | [_; Ok did2] -> 53 - Lwt.return_ok did2 54 - | [Error e1; Error e2] -> 55 - Lwt.return_error 56 - (Printf.sprintf 57 - "well-known resolution error: %s\ndns resolution error: %s" e1 e2 ) 58 - | _ -> 59 - Lwt.return_error "unexpected error" 47 + let cache = Ttl_cache.String_cache.create (3 * 60 * 60 * 1000) () 48 + 49 + let resolve ?(skip_cache = false) handle = 50 + match Ttl_cache.String_cache.get cache handle with 51 + | Some from_cache when skip_cache = false -> 52 + Lwt.return_ok from_cache 53 + | _ -> ( 54 + (* run well-known and dns in parallel, error if they return different values, if only one returns just return that value *) 55 + let%lwt result = 56 + match%lwt Lwt.all [resolve_well_known handle; resolve_dns handle] with 57 + | [Ok did1; Ok did2] when did1 = did2 -> 58 + Lwt.return_ok did1 59 + | [Ok _; Ok _] -> 60 + Lwt.return_error "conflicting dids" 61 + | [Ok did1; _] -> 62 + Lwt.return_ok did1 63 + | [_; Ok did2] -> 64 + Lwt.return_ok did2 65 + | [Error e1; Error e2] -> 66 + Lwt.return_error 67 + (Printf.sprintf 68 + "well-known resolution error: %s\ndns resolution error: %s" 69 + e1 e2 ) 70 + | _ -> 71 + Lwt.return_error "unexpected error" 72 + in 73 + match result with 74 + | Ok did -> 75 + Ttl_cache.String_cache.set cache handle did ; 76 + Lwt.return_ok did 77 + | Error e -> 78 + Lwt.return_error e ) 79 + end 80 + 81 + module Did = struct 82 + open struct 83 + type string_or_strings = [`String of string | `Strings of string list] 84 + 85 + let string_or_strings_to_yojson = function 86 + | `String c -> 87 + `String c 88 + | `Strings cs -> 89 + `List (List.map (fun c -> `String c) cs) 90 + 91 + let string_or_strings_of_yojson = function 92 + | `String c -> 93 + Ok (`Strings [c]) 94 + | `List cs -> 95 + Ok (`Strings (Yojson.Safe.Util.filter_string cs)) 96 + | _ -> 97 + Error "invalid field value" 98 + 99 + type string_or_string_map = 100 + [`String of string | `StringMap of (string * string) list] 101 + 102 + let string_or_string_map_to_yojson = function 103 + | `String c -> 104 + `String c 105 + | `StringMap m -> 106 + `Assoc (List.map (fun (k, v) -> (k, `String v)) m) 107 + 108 + let string_or_string_map_of_yojson = function 109 + | `String c -> 110 + Ok (`StringMap [(c, "")]) 111 + | `Assoc m -> 112 + Ok 113 + (`StringMap 114 + (List.map (fun (k, v) -> (k, Yojson.Safe.Util.to_string v)) m) ) 115 + | _ -> 116 + Error "invalid field value" 117 + 118 + type string_or_string_map_or_either_list = 119 + [ `String of string 120 + | `StringMap of (string * string) list 121 + | `List of string_or_string_map list ] 122 + 123 + let string_or_string_map_or_either_list_to_yojson = function 124 + | `String c -> 125 + `String c 126 + | `StringMap m -> 127 + `Assoc (List.map (fun (k, v) -> (k, `String v)) m) 128 + | `List l -> 129 + `List (List.map string_or_string_map_to_yojson l) 130 + 131 + let string_or_string_map_or_either_list_of_yojson = function 132 + | `String c -> 133 + Ok (`StringMap [(c, "")]) 134 + | `Assoc m -> 135 + Ok 136 + (`StringMap 137 + (List.map (fun (k, v) -> (k, Yojson.Safe.Util.to_string v)) m) ) 138 + | `List l -> 139 + Ok 140 + (`List 141 + ( List.map string_or_string_map_of_yojson l 142 + |> List.filter_map (function Ok x -> Some x | Error _ -> None) ) 143 + ) 144 + | _ -> 145 + Error "invalid field value" 146 + end 147 + 148 + module Document = struct 149 + type service = 150 + { id: string 151 + ; type': string_or_strings [@key "type"] 152 + ; service_endpoint: string_or_string_map_or_either_list 153 + [@key "serviceEndpoint"] } 154 + [@@deriving yojson {strict= false}] 155 + 156 + type verification_method = 157 + { id: string 158 + ; type': string [@key "type"] 159 + ; controller: string 160 + ; public_key_multibase: string option [@key "publicKeyMultibase"] } 161 + [@@deriving yojson {strict= false}] 162 + 163 + type string_or_verification_method = 164 + [`String of string | `VerificationMethod of verification_method] 165 + 166 + let string_or_verification_method_to_yojson = function 167 + | `String s -> 168 + `String s 169 + | `VerificationMethod vm -> 170 + verification_method_to_yojson vm 171 + 172 + let string_or_verification_method_of_yojson = function 173 + | `String s -> 174 + Ok (`String s) 175 + | `Assoc m -> 176 + verification_method_of_yojson (`Assoc m) 177 + |> Result.map (fun x -> `VerificationMethod x) 178 + | _ -> 179 + Error "invalid field value" 180 + 181 + type t = 182 + { context: string list [@key "@context"] 183 + ; id: string 184 + ; controller: string_or_strings option 185 + ; also_known_as: string list option [@key "alsoKnownAs"] 186 + ; verification_method: verification_method list option 187 + [@key "verificationMethod"] 188 + ; authentication: string_or_verification_method list option 189 + ; service: service list option } 190 + [@@deriving yojson {strict= false}] 191 + 192 + let get_service_endpoint s = 193 + match s.service_endpoint with 194 + | `String e -> 195 + e 196 + | `List l -> ( 197 + match List.hd l with `String e -> e | `StringMap m -> List.hd m |> snd ) 198 + | `StringMap m -> 199 + List.hd m |> snd 200 + 201 + let get_service t fragment = 202 + match t.service with 203 + | None -> 204 + None 205 + | Some services -> 206 + List.find_map 207 + (fun (s : service) -> 208 + if s.id = fragment then Some (get_service_endpoint s) else None ) 209 + services 210 + end 211 + 212 + type document = Document.t 213 + 214 + let cache = Ttl_cache.String_cache.create (12 * 60 * 60 * 1000) () 215 + 216 + let resolve_plc did = 217 + if not (String.starts_with ~prefix:"did:plc:" did) then 218 + Lwt.return_error "invalid did method" 219 + else 220 + try%lwt 221 + let uri = 222 + Uri.make ~scheme:"https" ~host:"plc.directory" 223 + ~path:(Uri.pct_encode did) () 224 + in 225 + let%lwt {status; _}, body = 226 + Client.get uri 227 + ~headers:(Cohttp.Header.of_list [("Accept", "application/json")]) 228 + in 229 + match status with 230 + | `OK -> 231 + let%lwt body = Body.to_string body in 232 + body |> Yojson.Safe.from_string |> Document.of_yojson |> Lwt.return 233 + | _ -> 234 + Lwt.return_error "failed to resolve" 235 + with e -> Lwt.return_error (Printexc.to_string e) 236 + 237 + let resolve_web did = 238 + if not (String.starts_with ~prefix:"did:web:" did) then 239 + Lwt.return_error "invalid did method" 240 + else 241 + try%lwt 242 + let uri = 243 + Uri.make ~scheme:"https" ~host:(Str.string_after did 8) 244 + ~path:"/.well-known/did.json" () 245 + in 246 + let%lwt {status; _}, body = 247 + Client.get uri 248 + ~headers:(Cohttp.Header.of_list [("Accept", "application/json")]) 249 + in 250 + match status with 251 + | `OK -> 252 + let%lwt body = Body.to_string body in 253 + body |> Yojson.Safe.from_string |> Document.of_yojson |> Lwt.return 254 + | _ -> 255 + Lwt.return_error "failed to resolve" 256 + with e -> Lwt.return_error (Printexc.to_string e) 257 + 258 + let resolve ?(skip_cache = false) did = 259 + match Ttl_cache.String_cache.get cache did with 260 + | Some from_cache when skip_cache = false -> 261 + Lwt.return_ok from_cache 262 + | _ -> ( 263 + let%lwt result = 264 + match did with 265 + | did when String.starts_with ~prefix:"did:plc:" did -> 266 + resolve_plc did 267 + | did when String.starts_with ~prefix:"did:web:" did -> 268 + resolve_web did 269 + | _ -> 270 + Lwt.return_error "invalid did method" 271 + in 272 + match result with 273 + | Ok doc -> 274 + Ttl_cache.String_cache.set cache did doc ; 275 + Lwt.return_ok doc 276 + | Error err -> 277 + Lwt.return_error err ) 60 278 end
+121
pegasus/lib/ttl_cache.ml
··· 1 + module Make (K : Hashtbl.HashedType) = struct 2 + module H = Hashtbl.Make (K) 3 + 4 + type time_ms = int 5 + 6 + type 'a entry = {value: 'a; mutable expires_at: time_ms} 7 + 8 + type 'a t = 9 + {table: 'a entry H.t; mutable capacity: int option; default_ttl: time_ms} 10 + 11 + let default_initial_capacity = 16 12 + 13 + let[@inline] _now_ms () : time_ms = Util.now_ms () 14 + 15 + let create ?capacity ?(initial_capacity = default_initial_capacity) 16 + default_ttl () : 'a t = 17 + {table= H.create initial_capacity; capacity; default_ttl} 18 + 19 + let clear (t : 'a t) : unit = H.clear t.table 20 + 21 + let remove (t : 'a t) (k : K.t) : unit = H.remove t.table k 22 + 23 + let[@inline] _is_expired ~now (e : _ entry) = e.expires_at <= now 24 + 25 + let cleanup (t : 'a t) : unit = 26 + let now = _now_ms () in 27 + (* collect first to avoid mutating while iterating *) 28 + let to_remove = ref [] in 29 + H.iter 30 + (fun k e -> if _is_expired ~now e then to_remove := k :: !to_remove) 31 + t.table ; 32 + List.iter (H.remove t.table) !to_remove 33 + 34 + let _find_entry_opt (t : 'a t) (k : K.t) : 'a entry option = 35 + try Some (H.find t.table k) with Not_found -> None 36 + 37 + let get (t : 'a t) (k : K.t) : 'a option = 38 + let now = _now_ms () in 39 + match _find_entry_opt t k with 40 + | None -> 41 + None 42 + | Some e -> 43 + if _is_expired ~now e then ( 44 + (* lazy eviction *) 45 + H.remove t.table k ; 46 + None ) 47 + else Some e.value 48 + 49 + let mem (t : 'a t) (k : K.t) : bool = 50 + match get t k with None -> false | Some _ -> true 51 + 52 + let length (t : 'a t) : int = cleanup t ; H.length t.table 53 + 54 + let _evict_earliest (t : 'a t) : unit = 55 + let earliest_key = ref None in 56 + let earliest_exp = ref max_int in 57 + H.iter 58 + (fun k e -> 59 + if e.expires_at < !earliest_exp then ( 60 + earliest_exp := e.expires_at ; 61 + earliest_key := Some k ) ) 62 + t.table ; 63 + match !earliest_key with None -> () | Some k -> H.remove t.table k 64 + 65 + let _enforce_capacity_after_insert (t : 'a t) : unit = 66 + match t.capacity with 67 + | None -> 68 + () 69 + | Some cap -> 70 + cleanup t ; 71 + while H.length t.table > cap do 72 + _evict_earliest t 73 + done 74 + 75 + let set ?ttl_ms (t : 'a t) (k : K.t) (v : 'a) : unit = 76 + let now = _now_ms () in 77 + let ttl_ms = Option.value ttl_ms ~default:t.default_ttl in 78 + if ttl_ms <= 0 then H.remove t.table k 79 + else 80 + let expires_at = now + ttl_ms in 81 + let entry = {value= v; expires_at} in 82 + H.replace t.table k entry ; 83 + _enforce_capacity_after_insert t 84 + 85 + let replace = set 86 + 87 + let ttl_remaining_ms (t : 'a t) (k : K.t) : int option = 88 + let now = _now_ms () in 89 + match _find_entry_opt t k with 90 + | None -> 91 + None 92 + | Some e -> 93 + if _is_expired ~now e then (H.remove t.table k ; None) 94 + else Some (e.expires_at - now) 95 + 96 + let to_list (t : 'a t) : (K.t * 'a) list = 97 + cleanup t ; 98 + let acc = ref [] in 99 + H.iter (fun k e -> acc := (k, e.value) :: !acc) t.table ; 100 + !acc 101 + 102 + let iter (t : 'a t) ~(f : K.t -> 'a -> unit) : unit = 103 + cleanup t ; 104 + H.iter (fun k e -> f k e.value) t.table 105 + 106 + let fold (t : 'a t) ~(init : 'acc) ~(f : 'acc -> K.t -> 'a -> 'acc) : 'acc = 107 + cleanup t ; 108 + H.fold (fun k e acc -> f acc k e.value) t.table init 109 + 110 + let set_capacity (t : 'a t) (cap : int option) : unit = 111 + t.capacity <- cap ; 112 + _enforce_capacity_after_insert t 113 + end 114 + 115 + module String_cache = Make (struct 116 + type t = string 117 + 118 + let equal = String.equal 119 + 120 + let hash = Hashtbl.hash 121 + end)
+87
pegasus/lib/xrpc.ml
··· 1 + open Cohttp_lwt 2 + open Cohttp_lwt_unix 3 + 1 4 type init = Auth.Verifiers.ctx 2 5 3 6 type context = {req: Dream.request; db: Data_store.t; auth: Auth.credentials} ··· 20 23 let%lwt body = Dream.body req in 21 24 body |> Yojson.Safe.from_string |> of_yojson |> Result.get_ok |> Lwt.return 22 25 with _ -> Errors.invalid_request "Invalid request body" 26 + 27 + let service_proxy (ctx : context) (proxy_header : string) = 28 + let did = Auth.get_authed_did_exn ctx.auth in 29 + let nsid_regex = 30 + Str.regexp 31 + {|^[a-zA-Z](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)+\.[a-zA-Z][a-zA-Z0-9]{0,62}?$|} 32 + in 33 + let nsid = 34 + (Dream.path [@warning "-3"]) ctx.req 35 + |> List.rev |> List.hd |> String.lowercase_ascii 36 + in 37 + if not (Str.string_match nsid_regex nsid 0) then 38 + Errors.invalid_request "invalid nsid" ; 39 + let service_did, service_type = 40 + match String.split_on_char '#' proxy_header with 41 + | [did; typ] -> 42 + (did, typ) 43 + | _ -> 44 + Errors.invalid_request "invalid proxy header" 45 + in 46 + let fragment = "#" ^ service_type in 47 + match%lwt Id_resolver.Did.resolve service_did with 48 + | Ok did_doc -> ( 49 + let host = 50 + match Id_resolver.Did.Document.get_service did_doc fragment with 51 + | Some service -> 52 + service 53 + | None -> 54 + Errors.invalid_request "failed to resolve destination service" 55 + in 56 + let%lwt signing_key = 57 + match%lwt Data_store.get_actor_by_identifier did ctx.db with 58 + | Some {signing_key; _} -> 59 + Lwt.return signing_key 60 + | None -> 61 + Errors.internal_error ~msg:"user not found" () 62 + in 63 + let jwt = 64 + Auth.generate_service_jwt ~did ~service_did ~lxm:nsid ~signing_key 65 + in 66 + let uri = 67 + host ^ "/" ^ String.concat "/" @@ (Dream.path [@warning "-3"]) ctx.req 68 + |> Uri.of_string 69 + in 70 + let headers = Http.Header.of_list [("Authorization", "Bearer " ^ jwt)] in 71 + match Dream.method_ ctx.req with 72 + | `GET -> ( 73 + let%lwt res, body = Client.get uri ~headers in 74 + match res.status with 75 + | `OK -> 76 + let%lwt body = Body.to_string body in 77 + Lwt.return @@ Dream.response ~status:`OK body 78 + | e -> 79 + Dream.error (fun log -> 80 + log "error when proxying to %s: %s" (Uri.to_string uri) 81 + (Http.Status.to_string e) ) ; 82 + Errors.internal_error ~msg:"failed to proxy request" () ) 83 + | `POST -> ( 84 + let%lwt req_body = Dream.body ctx.req in 85 + let%lwt res, body = 86 + Client.post uri ~headers ~body:(Body.of_string req_body) 87 + in 88 + match res.status with 89 + | `OK -> 90 + let%lwt body = Body.to_string body in 91 + Lwt.return @@ Dream.response ~status:`OK body 92 + | e -> 93 + Dream.error (fun log -> 94 + log "error when proxying to %s: %s" (Uri.to_string uri) 95 + (Http.Status.to_string e) ) ; 96 + Errors.internal_error ~msg:"failed to proxy request" () ) 97 + | _ -> 98 + Errors.invalid_request "unsupported method" ) 99 + | Error _ -> 100 + Errors.internal_error ~msg:"failed to resolve destination service" () 101 + 102 + let service_proxy_middleware db inner_handler req = 103 + match Dream.header req "atproto-proxy" with 104 + | Some header -> 105 + handler ~auth:Auth.Verifiers.access 106 + (fun ctx -> service_proxy ctx header) 107 + {req; db} 108 + | None -> 109 + inner_handler req