objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

xrpc getRepo, getLatestCommit, getRecord, getBlocks

futurGH bb1827eb 5628b0b4

+109 -20
+7 -1
bin/main.ml
··· 40 40 ; (post, "/xrpc/com.atproto.repo.deleteRecord", Api.Repo.DeleteRecord.handler) 41 41 ; (post, "/xrpc/com.atproto.repo.uploadBlob", Api.Repo.UploadBlob.handler) 42 42 ; (* sync *) 43 - (get, "/xrpc/com.atproto.sync.getRepoStatus", Api.Sync.GetRepoStatus.handler) 43 + (get, "/xrpc/com.atproto.sync.getRepo", Api.Sync.GetRepo.handler) 44 + ; (get, "/xrpc/com.atproto.sync.getRepoStatus", Api.Sync.GetRepoStatus.handler) 45 + ; ( get 46 + , "/xrpc/com.atproto.sync.getLatestCommit" 47 + , Api.Sync.GetLatestCommit.handler ) 44 48 ; (get, "/xrpc/com.atproto.sync.listRepos", Api.Sync.ListRepos.handler) 49 + ; (get, "/xrpc/com.atproto.sync.getRecord", Api.Sync.GetRecord.handler) 50 + ; (get, "/xrpc/com.atproto.sync.getBlocks", Api.Sync.GetBlocks.handler) 45 51 ; (get, "/xrpc/com.atproto.sync.getBlob", Api.Sync.GetBlob.handler) 46 52 ; (get, "/xrpc/com.atproto.sync.listBlobs", Api.Sync.ListBlobs.handler) 47 53 ; (* preferences *)
+1 -1
pegasus/lib/api/sync/getBlob.ml
··· 4 4 Xrpc.handler (fun ctx -> 5 5 let {did; cid} = Xrpc.parse_query ctx.req query_of_yojson in 6 6 let cid = Cid.as_cid cid in 7 - let%lwt db = User_store.connect did in 7 + let%lwt {db; _} = Repository.load did ~write:false ~ds:ctx.db in 8 8 let%lwt blob = 9 9 match%lwt User_store.get_blob db cid with 10 10 | Some blob ->
+33
pegasus/lib/api/sync/getBlocks.ml
··· 1 + type query = {did: string; cids: string list} [@@deriving yojson] 2 + 3 + let handler = 4 + Xrpc.handler (fun ctx -> 5 + let {did; cids} : query = Xrpc.parse_query ctx.req query_of_yojson in 6 + let%lwt {db; commit; _} = Repository.load did ~write:false ~ds:ctx.db in 7 + let commit_cid, commit_signed = Option.get commit in 8 + let commit_block = 9 + commit_signed |> User_store.Types.signed_commit_to_yojson 10 + |> Dag_cbor.encode_yojson 11 + in 12 + let cids = List.map Cid.as_cid cids in 13 + match%lwt User_store.get_blocks db cids with 14 + | {blocks; missing= []} -> 15 + let blocks_stream = 16 + Repository.BlockMap.entries blocks |> Lwt_seq.of_list 17 + in 18 + let car_stream = 19 + Lwt_seq.cons (commit_cid, commit_block) blocks_stream 20 + |> Car.blocks_to_stream commit_cid 21 + in 22 + Dream.stream 23 + ~headers:[("Content-Type", "application/vnd.ipld.car")] 24 + (fun res_stream -> 25 + Lwt_seq.iter_s 26 + (fun chunk -> Dream.write res_stream (Bytes.to_string chunk)) 27 + car_stream ) 28 + | {missing; _} -> 29 + let missing_cids = 30 + List.map Cid.to_string missing |> String.concat ", " 31 + in 32 + Errors.invalid_request ~name:"BlockNotFound" 33 + ("missing the following blocks: " ^ missing_cids) )
+13
pegasus/lib/api/sync/getLatestCommit.ml
··· 1 + type query = {did: string} [@@deriving yojson] 2 + 3 + type response = {cid: string; rev: string} [@@deriving yojson] 4 + 5 + let handler = 6 + Xrpc.handler (fun ctx -> 7 + let {did} : query = Xrpc.parse_query ctx.req query_of_yojson in 8 + match%lwt Repository.load did ~write:false ~ds:ctx.db with 9 + | {commit= Some (cid, {rev; _}); _} -> 10 + let cid = Cid.to_string cid in 11 + Dream.json @@ Yojson.Safe.to_string @@ response_to_yojson {cid; rev} 12 + | _ -> 13 + failwith ("couldn't resolve commit for " ^ did) )
+16
pegasus/lib/api/sync/getRecord.ml
··· 1 + type query = {did: string; collection: string; rkey: string} [@@deriving yojson] 2 + 3 + type response = Mist.Lex.repo_record [@@deriving yojson] 4 + 5 + let handler = 6 + Xrpc.handler (fun ctx -> 7 + let {did; collection; rkey} : query = 8 + Xrpc.parse_query ctx.req query_of_yojson 9 + in 10 + let path = collection ^ "/" ^ rkey in 11 + let%lwt {db; _} = Repository.load did ~write:false ~ds:ctx.db in 12 + match%lwt User_store.get_record_by_path db path with 13 + | Some {value; _} -> 14 + Dream.json @@ Yojson.Safe.to_string @@ response_to_yojson value 15 + | None -> 16 + Errors.invalid_request ~name:"RecordNotFound" "record not found" )
+13
pegasus/lib/api/sync/getRepo.ml
··· 1 + type query = {did: string} [@@deriving yojson] 2 + 3 + let handler = 4 + Xrpc.handler (fun ctx -> 5 + let {did} : query = Xrpc.parse_query ctx.req query_of_yojson in 6 + let%lwt repo = Repository.load did ~write:false in 7 + let%lwt car_stream = Repository.export_car repo in 8 + Dream.stream 9 + ~headers:[("Content-Type", "application/vnd.ipld.car")] 10 + (fun res_stream -> 11 + Lwt_seq.iter_s 12 + (fun chunk -> Dream.write res_stream (Bytes.to_string chunk)) 13 + car_stream ) )
+1 -1
pegasus/lib/api/sync/getRepoStatus.ml
··· 15 15 Errors.invalid_request ~name:"RepoNotFound" 16 16 "couldn't find a repo with that did" 17 17 in 18 - let%lwt user_db = User_store.connect actor.did in 18 + let%lwt {db= user_db; _} = Repository.load did ~write:false ~ds:ctx.db in 19 19 let%lwt _, commit = 20 20 match%lwt User_store.get_commit user_db with 21 21 | Some c ->
+1 -1
pegasus/lib/api/sync/listBlobs.ml
··· 17 17 | _ -> 18 18 1000 19 19 in 20 - let%lwt db = User_store.connect did in 20 + let%lwt {db; _} = Repository.load did ~write:false ~ds:ctx.db in 21 21 let%lwt cids = User_store.list_blobs db ~limit ~cursor ?since in 22 22 let cids = List.map Cid.to_string cids in 23 23 let cursor =
+24 -16
pegasus/lib/repository.ml
··· 141 141 ; did: string 142 142 ; db: User_store.t 143 143 ; mutable block_map: Cid.t StringMap.t option 144 - ; mutable commit: Cid.t option } 144 + ; mutable commit: (Cid.t * signed_commit) option } 145 145 146 146 let get_map t : Cid.t StringMap.t Lwt.t = 147 147 let%lwt root, commit = ··· 151 151 | None -> 152 152 failwith ("failed to retrieve commit for " ^ t.did) 153 153 in 154 - if t.commit <> Some root then t.commit <- Some root ; 154 + t.commit <- Some (root, commit) ; 155 155 match t.block_map with 156 156 | Some map -> 157 157 Lwt.return map ··· 225 225 let%lwt commit_cid = 226 226 User_store.put_commit t.db signed |> Lwt_result.get_exn 227 227 in 228 - t.commit <- Some commit_cid ; 228 + t.commit <- Some (commit_cid, signed) ; 229 229 Lwt.return (commit_cid, signed) 230 230 231 231 let put_initial_commit t : (Cid.t * signed_commit) Lwt.t = ··· 243 243 | None -> 244 244 failwith ("failed to retrieve commit for " ^ t.did) 245 245 in 246 - if swap_commit <> None && swap_commit <> t.commit then 246 + if swap_commit <> None && swap_commit <> Option.map fst t.commit then 247 247 Errors.invalid_request ~name:"InvalidSwap" 248 248 (Format.sprintf "swapCommit cid %s did not match last commit cid %s" 249 249 (Cid.to_string (Option.get swap_commit)) 250 - (match t.commit with Some c -> Cid.to_string c | None -> "null") ) ; 250 + (match t.commit with Some (c, _) -> Cid.to_string c | None -> "null") ) ; 251 251 let%lwt block_map = Lwt.map ref (get_map t) in 252 252 (* need to cache this so in the end, we only emit new blocks *) 253 253 let prev_blocks = ··· 426 426 in 427 427 Lwt.return {commit= new_commit; results} 428 428 429 - let load did : t Lwt.t = 430 - let%lwt data_store_conn = Data_store.connect () in 431 - let%lwt user_db = User_store.connect did in 429 + let load ?write ?(ensure_active = false) ?ds did : t Lwt.t = 430 + let%lwt data_store_conn = 431 + match ds with 432 + | Some ds -> 433 + Lwt.return ds 434 + | None -> 435 + Data_store.connect ?write () 436 + in 437 + let%lwt user_db = 438 + try%lwt User_store.connect did 439 + with _ -> 440 + Errors.invalid_request ~name:"RepoNotFound" 441 + "your princess is in another castle" 442 + in 432 443 let%lwt () = User_store.init user_db in 433 444 let%lwt {signing_key; _} = 434 445 match%lwt Data_store.get_actor_by_identifier did data_store_conn with 435 - | Some actor -> 446 + | Some actor when ensure_active = false || actor.deactivated_at = None -> 436 447 Lwt.return actor 448 + | Some _ -> 449 + Errors.invalid_request ~name:"RepoDeactivated" 450 + ("repository " ^ did ^ " is deactivated") 437 451 | None -> 438 452 failwith ("failed to retrieve actor for " ^ did) 439 453 in 440 454 let key = Kleidos.parse_multikey_str signing_key in 441 - let%lwt commit = 442 - match%lwt User_store.get_commit user_db with 443 - | Some (cid, _) -> 444 - Lwt.return_some cid 445 - | None -> 446 - Lwt.return_none 447 - in 455 + let%lwt commit = User_store.get_commit user_db in 448 456 Lwt.return {key; did; db= user_db; block_map= None; commit} 449 457 450 458 let export_car t : Car.stream Lwt.t =