objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

xrpc importRepo, update getRepo for sync 1.1

futurGH ec36b761 b0056cb4

+326 -154
+110 -126
mist/lib/mst.ml
··· 31 31 [ ("l", match node.l with Some l -> `Link l | None -> `Null) 32 32 ; ("e", `Array (Array.of_list (List.map encode_entry_raw node.e))) ] ) 33 33 34 + (* decodes a node from cbor bytes *) 35 + let decode_block_raw b : node_raw = 36 + match Dag_cbor.decode b with 37 + | `Map node -> 38 + if not (String_map.mem "e" node) then 39 + raise (Invalid_argument "mst node missing 'e'") ; 40 + let l = 41 + if String_map.mem "l" node then 42 + match String_map.find "l" node with `Link l -> Some l | _ -> None 43 + else None 44 + in 45 + let e_array = 46 + match String_map.find "e" node with `Array e -> e | _ -> [||] 47 + in 48 + let e = 49 + Array.to_list 50 + @@ Array.map 51 + (fun (entry : Dag_cbor.value) -> 52 + match entry with 53 + | `Map entry -> 54 + { p= 55 + ( entry |> String_map.find "p" 56 + |> function 57 + | `Integer p -> 58 + Int64.to_int p 59 + | _ -> 60 + raise (Invalid_argument "mst entry missing 'p'") ) 61 + ; k= 62 + ( entry |> String_map.find "k" 63 + |> function 64 + | `Bytes k -> 65 + k 66 + | _ -> 67 + raise (Invalid_argument "mst entry missing 'k'") ) 68 + ; v= 69 + ( entry |> String_map.find "v" 70 + |> function 71 + | `Link v -> 72 + v 73 + | _ -> 74 + raise (Invalid_argument "mst entry missing 'v'") ) 75 + ; t= 76 + ( entry |> String_map.find "t" 77 + |> function `Link t -> Some t | _ -> None ) } 78 + | _ -> 79 + raise (Invalid_argument "non-map mst entry") ) 80 + e_array 81 + in 82 + {l; e} 83 + | _ -> 84 + raise (Invalid_argument "invalid block") 85 + 86 + (* items yielded by ordered stream; either an mst node block or a record cid *) 87 + type ordered_item = Node of Cid.t * bytes | Leaf of Cid.t 88 + 34 89 type node = 35 90 { layer: int 36 91 ; mutable left: node option Lwt.t Lazy.t ··· 84 139 85 140 val to_blocks_stream : t -> (Cid.t * bytes) Lwt_seq.t 86 141 142 + val to_ordered_stream : t -> ordered_item Lwt_seq.t 143 + 87 144 val serialize : t -> node -> (Cid.t * bytes, exn) Lwt_result.t 88 145 89 146 val proof_for_key : t -> Cid.t -> string -> Block_map.t Lwt.t ··· 126 183 127 184 let create blockstore root = {blockstore; root} 128 185 129 - (* decodes a node retrieved from the blockstore *) 130 - let decode_block_raw b : node_raw = 131 - match Dag_cbor.decode b with 132 - | `Map node -> 133 - if not (String_map.mem "e" node) then 134 - raise (Invalid_argument "mst node missing 'e'") ; 135 - let l = 136 - if String_map.mem "l" node then 137 - match String_map.find "l" node with `Link l -> Some l | _ -> None 138 - else None 139 - in 140 - let e_array = 141 - match String_map.find "e" node with `Array e -> e | _ -> [||] 142 - in 143 - let e = 144 - Array.to_list 145 - @@ Array.map 146 - (fun (entry : Dag_cbor.value) -> 147 - match entry with 148 - | `Map entry -> 149 - { p= 150 - ( entry |> String_map.find "p" 151 - |> function 152 - | `Integer p -> 153 - Int64.to_int p 154 - | _ -> 155 - raise (Invalid_argument "mst entry missing 'p'") ) 156 - ; k= 157 - ( entry |> String_map.find "k" 158 - |> function 159 - | `Bytes k -> 160 - k 161 - | _ -> 162 - raise (Invalid_argument "mst entry missing 'k'") ) 163 - ; v= 164 - ( entry |> String_map.find "v" 165 - |> function 166 - | `Link v -> 167 - v 168 - | _ -> 169 - raise (Invalid_argument "mst entry missing 'v'") ) 170 - ; t= 171 - ( entry |> String_map.find "t" 172 - |> function `Link t -> Some t | _ -> None ) } 173 - | _ -> 174 - raise (Invalid_argument "non-map mst entry") ) 175 - e_array 176 - in 177 - {l; e} 178 - | _ -> 179 - raise (Invalid_argument "invalid block") 180 - 181 186 (* retrieves a raw node by cid *) 182 187 let retrieve_node_raw t cid : node_raw option Lwt.t = 183 188 match%lwt Store.get_bytes t.blockstore cid with ··· 281 286 in 282 287 Lwt.return !map 283 288 284 - (* returns all mst entries in order for a car stream *) 289 + (* returns all non-leaf mst node blocks in order for a car stream 290 + leaf cids can be obtained via collect_nodes_and_leaves or leaves_of_root *) 285 291 let to_blocks_stream t : (Cid.t * bytes) Lwt_seq.t = 286 - let module M = struct 287 - type stage = 288 - (* currently walking nodes *) 289 - | Nodes of 290 - { next: Cid.t list (* next cids to fetch *) 291 - ; fetched: (Cid.t * bytes) list (* fetched cids and their bytes *) 292 - ; leaves_seen: Cid.Set.t (* seen leaf cids for dedupe *) 293 - ; leaves_rev: Cid.t list (* reversed encounter order of leaves *) } 294 - (* done walking nodes, streaming accumulated leaves *) 295 - | Leaves of (Cid.t * bytes) list 296 - | Done 297 - end in 298 - let open M in 299 - let init_state = 300 - Nodes 301 - {next= [t.root]; fetched= []; leaves_seen= Cid.Set.empty; leaves_rev= []} 302 - in 303 - let rec step = function 304 - | Done -> 305 - Lwt.return_none 292 + (* (next cids to fetch list, fetched (cid * bytes) list) *) 293 + let init_state = ([t.root], []) in 294 + let rec step (next, fetched) = 295 + match fetched with 306 296 (* node has been fetched, can now be yielded *) 307 - | Nodes ({fetched= (cid, bytes) :: rest; _} as s) -> 308 - Lwt.return_some ((cid, bytes), Nodes {s with fetched= rest}) 297 + | (cid, bytes) :: rest -> 298 + Lwt.return_some ((cid, bytes), (next, rest)) 309 299 (* need to fetch next nodes *) 310 - | Nodes {next; fetched= []; leaves_seen; leaves_rev} -> 311 - if List.is_empty next then ( 312 - (* finished traversing nodes, time to switch to leaves *) 313 - let leaves_list = List.rev leaves_rev in 314 - let%lwt leaves_bm = Store.get_blocks t.blockstore leaves_list in 315 - if leaves_bm.missing <> [] then failwith "missing mst leaf blocks" ; 316 - let leaves_nodes = 317 - List.map 318 - (fun cid -> 319 - let bytes = 320 - Block_map.get cid leaves_bm.blocks |> Option.get 321 - in 322 - (cid, bytes) ) 323 - leaves_list 324 - in 325 - match leaves_nodes with 326 - | [] -> 327 - (* with Done, we don't care about the first pair element *) 328 - Lwt.return_some (Obj.magic (), Done) 329 - | _ -> 330 - (* it's leafin time *) 331 - step (Leaves leaves_nodes) ) 300 + | [] -> 301 + if List.is_empty next then Lwt.return_none 332 302 else 333 303 (* go ahead and fetch the next nodes *) 334 304 let%lwt bm = Store.get_blocks t.blockstore next in 335 305 if bm.missing <> [] then failwith "missing mst nodes" ; 336 - let fetched, next', leaves_seen', leaves_rev' = 306 + let fetched', next' = 337 307 List.fold_left 338 - (fun (acc, nxt, seen, rev) cid -> 308 + (fun (acc, nxt) cid -> 339 309 let bytes = 340 310 (* we should be safe to do this since we just got the cids from the blockmap *) 341 311 Block_map.get cid bm.blocks |> Option.get ··· 354 324 nxt ) 355 325 node.e 356 326 in 357 - let seen', rev' = 358 - (* add each entry in this node to the seen set and record encounter order *) 359 - List.fold_left 360 - (fun (s, r) e -> 361 - if Cid.Set.mem e.v s then (s, r) 362 - else (Cid.Set.add e.v s, e.v :: r) ) 363 - (seen, rev) node.e 364 - in 365 - (* prepending is O(1) per prepend + one O(n) to reverse, vs. O(n) per append = O(n^2) total *) 366 - ((cid, bytes) :: acc, nxt', seen', rev') ) 367 - ([], [], leaves_seen, leaves_rev) 368 - next 327 + (* prepending then reversing is O(2n), appending each time is O(n^2) *) 328 + ((cid, bytes) :: acc, nxt') ) 329 + ([], []) next 369 330 in 370 - step 371 - (Nodes 372 - { next= List.rev next' 373 - ; fetched= List.rev fetched 374 - ; leaves_seen= leaves_seen' 375 - ; leaves_rev= leaves_rev' } ) 376 - (* if we're onto yielding leaves, do that *) 377 - | Leaves ((cid, bytes) :: rest) -> 378 - let next = if rest = [] then Done else Leaves rest in 379 - Lwt.return_some ((cid, bytes), next) 380 - (* once we're out of leaves, we're done *) 381 - | Leaves [] -> 382 - Lwt.return_some (Obj.magic (), Done) 331 + step (List.rev next', List.rev fetched') 383 332 in 384 333 Lwt_seq.unfold_lwt step init_state 334 + 335 + (* depth-first pre-order as per sync 1.1, yields cid references in place of leaf nodes 336 + for each node: node block, left subtree, then for each entry: record, right subtree *) 337 + let to_ordered_stream t : ordered_item Lwt_seq.t = 338 + (* queue items: `Node cid to visit, `Leaf cid to yield *) 339 + let rec step queue = 340 + match queue with 341 + | [] -> 342 + Lwt.return_none 343 + | `Node cid :: rest -> ( 344 + let%lwt bytes_opt = Store.get_bytes t.blockstore cid in 345 + match bytes_opt with 346 + | None -> 347 + step rest 348 + | Some bytes -> 349 + let node = decode_block_raw bytes in 350 + (* queue items: left subtree, then for each entry: record then right subtree *) 351 + let left_queue = 352 + match node.l with Some l -> [`Node l] | None -> [] 353 + in 354 + let entries_queue = 355 + List.concat_map 356 + (fun (e : entry_raw) -> 357 + let right_queue = 358 + match e.t with Some r -> [`Node r] | None -> [] 359 + in 360 + `Leaf e.v :: right_queue ) 361 + node.e 362 + in 363 + let new_queue = left_queue @ entries_queue @ rest in 364 + Lwt.return_some ((Node (cid, bytes) : ordered_item), new_queue) ) 365 + | `Leaf cid :: rest -> 366 + Lwt.return_some ((Leaf cid : ordered_item), rest) 367 + in 368 + Lwt_seq.unfold_lwt step [`Node t.root] 385 369 386 370 (* produces a cid and cbor-encoded bytes for a given tree *) 387 371 let serialize t node : (Cid.t * bytes, exn) Lwt_result.t = ··· 620 604 in 621 605 bfs [t.root] Cid.Set.empty [] Cid.Set.empty 622 606 623 - (* list of all leaves belonging to a node, ordered by key *) 607 + (* list of all leaves belonging to a node and its children, ordered by key *) 624 608 let rec leaves_of_node n : (string * Cid.t) list Lwt.t = 625 609 let%lwt left_leaves = 626 610 n.left >>? function Some l -> leaves_of_node l | None -> Lwt.return [] ··· 642 626 in 643 627 Lwt.return leaves 644 628 645 - (* little helper *) 629 + (* list of all leaves in the mst *) 646 630 let leaves_of_root t : (string * Cid.t) list Lwt.t = 647 631 match%lwt retrieve_node t t.root with 648 632 | None ->
+24
pegasus/lib/api/repo/importRepo.ml
··· 1 + type query = {did: string} [@@deriving yojson {strict= false}] 2 + 3 + let rec stream_to_seq stream () = 4 + let%lwt chunk = Dream.read stream in 5 + match chunk with 6 + | None -> 7 + Lwt.return Lwt_seq.Nil 8 + | Some data -> 9 + Lwt.return (Lwt_seq.Cons (Bytes.of_string data, stream_to_seq stream)) 10 + 11 + let handler = 12 + Xrpc.handler ~auth:Authorization (fun ctx -> 13 + let did = Auth.get_authed_did_exn ctx.auth in 14 + let bytes_stream = Dream.body_stream ctx.req in 15 + let car_stream = stream_to_seq bytes_stream in 16 + let%lwt repo = 17 + Repository.load did ~ds:ctx.db ~ensure_active:true ~write:true 18 + in 19 + let%lwt result = Repository.import_car repo car_stream in 20 + match result with 21 + | Ok _ -> 22 + Dream.empty `OK 23 + | Error e -> 24 + Errors.internal_error ~msg:(Printexc.to_string e) () )
+6 -3
pegasus/lib/api/sync/getRepo.ml
··· 8 8 Dream.stream 9 9 ~headers:[("Content-Type", "application/vnd.ipld.car")] 10 10 (fun res_stream -> 11 - Lwt_seq.iter_s 12 - (fun chunk -> Dream.write res_stream (Bytes.to_string chunk)) 13 - car_stream ) ) 11 + let%lwt () = 12 + Lwt_seq.iter_s 13 + (fun chunk -> Dream.write res_stream (Bytes.to_string chunk)) 14 + car_stream 15 + in 16 + Dream.close res_stream ) )
+139 -25
pegasus/lib/repository.ml
··· 2 2 module Block_map = User_store.Block_map 3 3 module Lex = Mist.Lex 4 4 module Mst = Mist.Mst.Make (User_store) 5 + module Mem_mst = Mist.Mst.Make (Mist.Storage.Memory_blockstore) 5 6 module String_map = Lex.String_map 6 7 module Tid = Mist.Tid 7 8 ··· 471 472 failwith ("failed to retrieve commit for " ^ t.did) 472 473 in 473 474 let mst : Mst.t = {blockstore= t.db; root= commit.data} in 474 - let mst_blocks = Mst.to_blocks_stream mst in 475 475 let commit_block = 476 476 commit |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 477 477 in 478 478 if Cid.create Dcbor commit_block <> root then 479 479 failwith "commit does not match stored cid" ; 480 - Lwt.return 481 - @@ Car.blocks_to_stream root (Lwt_seq.cons (root, commit_block) mst_blocks) 480 + (* the idea is to read ahead to collect record cids to reduce db calls to fetch records *) 481 + let batch_size = 100 in 482 + let ordered_stream = Mst.to_ordered_stream mst in 483 + (* read up to n items from stream, returning (items, remaining_stream) *) 484 + let rec read_ahead n stream acc = 485 + if n = 0 then Lwt.return (List.rev acc, stream) 486 + else 487 + match%lwt stream () with 488 + | Lwt_seq.Nil -> 489 + Lwt.return (List.rev acc, Lwt_seq.empty) 490 + | Lwt_seq.Cons (item, rest) -> 491 + read_ahead (n - 1) rest (item :: acc) 492 + in 493 + let leaf_cids_of items = 494 + List.filter_map (function Mist.Mst.Leaf cid -> Some cid | _ -> None) items 495 + in 496 + (* state: (buffer of items to yield, cache of fetched records, remaining stream) *) 497 + let blocks_stream : (Cid.t * bytes) Lwt_seq.t = 498 + let rec step (buffer, cache, stream) = 499 + match (buffer : Mist.Mst.ordered_item list) with 500 + | [] -> ( 501 + (* buffer empty, try to read more *) 502 + let%lwt items, stream' = read_ahead batch_size stream [] in 503 + match items with 504 + | [] -> 505 + Lwt.return_none 506 + | _ -> 507 + (* batch fetch all leaf records in this chunk *) 508 + let leaf_cids = leaf_cids_of items in 509 + let%lwt records = User_store.get_records_by_cids t.db leaf_cids in 510 + let cache' = 511 + List.fold_left 512 + (fun acc (cid, data) -> Block_map.set cid data acc) 513 + cache records 514 + in 515 + step (items, cache', stream') ) 516 + | Node (cid, data) :: rest -> 517 + Lwt.return_some ((cid, data), (rest, cache, stream)) 518 + | Leaf cid :: rest -> ( 519 + match Block_map.get cid cache with 520 + | Some data -> 521 + Lwt.return_some ((cid, data), (rest, cache, stream)) 522 + | None -> ( 523 + (* not in cache -> fetch individually *) 524 + match%lwt 525 + User_store.get_records_by_cids t.db [cid] 526 + with 527 + | (cid, data) :: _ -> 528 + Lwt.return_some ((cid, data), (rest, cache, stream)) 529 + | [] -> 530 + (* record not found, skip *) 531 + step (rest, cache, stream) ) ) 532 + in 533 + Lwt_seq.unfold_lwt step ([], Block_map.empty, ordered_stream) 534 + in 535 + let all_blocks = Lwt_seq.cons (root, commit_block) blocks_stream in 536 + Lwt.return @@ Car.blocks_to_stream root all_blocks 482 537 483 - let import_car (did : string) (stream : Car.stream) : t Lwt.t = 484 - let%lwt t = load did in 485 - let%lwt roots, blocks = Car.read_car_stream stream in 538 + let import_car t (stream : Car.stream) : (t, exn) Lwt_result.t = 539 + let%lwt roots, blocks_seq = Car.read_car_stream stream in 486 540 let root = 487 541 match roots with [root] -> root | _ -> failwith "invalid number of roots" 488 542 in 489 - let%lwt () = 490 - Lwt_seq.iter_s 491 - (fun (cid, block) -> 492 - if cid = root then ( 493 - let commit = 494 - block |> Dag_cbor.decode_to_yojson |> signed_commit_of_yojson 495 - |> Result.get_ok 496 - in 497 - if commit.did <> did then failwith "did does not match commit did" ; 498 - let%lwt _ = Lwt_result.get_exn @@ User_store.put_commit t.db commit in 499 - Lwt.return_unit ) 500 - else 501 - let%lwt _ = 502 - Lwt_result.get_exn @@ User_store.put_block t.db cid block 503 - in 504 - Lwt.return_unit ) 505 - blocks 506 - in 507 - Lwt.return t 543 + try%lwt 544 + (* collect all blocks into a map *) 545 + let%lwt all_blocks = 546 + Lwt_seq.fold_left_s 547 + (fun acc (cid, block) -> Lwt.return (Block_map.set cid block acc)) 548 + Block_map.empty blocks_seq 549 + in 550 + (* parse commit block *) 551 + let commit_bytes = 552 + match Block_map.get root all_blocks with 553 + | Some b -> 554 + b 555 + | None -> 556 + failwith "commit block not found in CAR" 557 + in 558 + let commit = 559 + match 560 + commit_bytes |> Dag_cbor.decode_to_yojson |> signed_commit_of_yojson 561 + with 562 + | Ok c -> 563 + c 564 + | Error e -> 565 + failwith ("invalid commit: " ^ e) 566 + in 567 + if commit.did <> t.did then failwith "did does not match commit did" ; 568 + (* create in-memory mst to walk *) 569 + let mem_bs = Mist.Storage.Memory_blockstore.create ~blocks:all_blocks () in 570 + let mem_mst : Mem_mst.t = {blockstore= mem_bs; root= commit.data} in 571 + let%lwt leaves = Mem_mst.leaves_of_root mem_mst in 572 + let leaf_cids = 573 + List.fold_left 574 + (fun acc (_, cid) -> Cid.Set.add cid acc) 575 + Cid.Set.empty leaves 576 + in 577 + (* get mst nodes by filtering out leaves and commit from all blocks *) 578 + let mst_node_cids = 579 + Block_map.keys all_blocks 580 + |> List.filter (fun cid -> 581 + (not (Cid.equal cid root)) && not (Cid.Set.mem cid leaf_cids) ) 582 + in 583 + let%lwt _ = 584 + Util.use_pool t.db.db (fun conn -> 585 + Util.transact conn (fun () -> 586 + (* store commit *) 587 + let%lwt _ = User_store.put_commit t.db commit in 588 + (* store mst nodes *) 589 + let%lwt () = 590 + Lwt_list.iter_s 591 + (fun cid -> 592 + match Block_map.get cid all_blocks with 593 + | Some block -> 594 + let%lwt _ = User_store.put_block t.db cid block in 595 + Lwt.return_unit 596 + | None -> 597 + Lwt.return_unit ) 598 + mst_node_cids 599 + in 600 + (* store records *) 601 + let%lwt () = 602 + Lwt_list.iter_s 603 + (fun (path, cid) -> 604 + match Block_map.get cid all_blocks with 605 + | Some data -> 606 + User_store.put_record_raw t.db ~path ~cid ~data 607 + ~since:(Tid.now ()) 608 + | None -> 609 + failwith ("missing record block: " ^ Cid.to_string cid) ) 610 + leaves 611 + in 612 + Lwt.return_ok () ) 613 + (* use_pool expects Caqti_error.t as exn type, so handle errors via try/with *) 614 + |> Lwt_result.get_exn 615 + |> Lwt_result.ok ) 616 + in 617 + (* clear cached block_map so it's rebuilt on next access *) 618 + t.block_map <- None ; 619 + t.commit <- Some (root, commit) ; 620 + Lwt.return_ok t 621 + with exn -> Lwt.return_error exn
+23
pegasus/lib/user_store.ml
··· 269 269 |sql}] 270 270 ~cid ~mimetype 271 271 272 + let get_records_by_cids cids = 273 + [%rapper 274 + get_many 275 + {sql| SELECT @CID{cid}, @Blob{data} FROM records WHERE cid IN (%list{%CID{cids}}) |sql} 276 + record_out] 277 + ~cids 278 + 279 + let delete_record path = 280 + [%rapper execute {sql| DELETE FROM records WHERE path = %string{path} |sql}] 281 + ~path 282 + 272 283 let list_blob_refs path = 273 284 [%rapper 274 285 get_many ··· 391 402 >|= Option.map (fun (cid, data, since) -> 392 403 {path; cid; value= Lex.of_cbor data; since} ) 393 404 405 + let get_records_by_cids t cids : (Cid.t * Blob.t) list Lwt.t = 406 + if List.is_empty cids then Lwt.return [] 407 + else 408 + Util.use_pool t.db @@ Queries.get_records_by_cids cids 409 + >|= List.map (fun ({cid; data} : block) -> (cid, data)) 410 + 394 411 let list_records t ?(limit = 100) ?(cursor = "") ?(reverse = false) collection : 395 412 record list Lwt.t = 396 413 let fn = ··· 407 424 Util.use_pool t.db @@ Queries.put_record ~path ~cid ~data ~since 408 425 in 409 426 Lwt.return (cid, data) 427 + 428 + let put_record_raw t ~path ~cid ~data ~since : unit Lwt.t = 429 + Util.use_pool t.db @@ Queries.put_record ~path ~cid ~data ~since 430 + 431 + let delete_record t path : unit Lwt.t = 432 + Util.use_pool t.db @@ Queries.delete_record path 410 433 411 434 (* blobs *) 412 435
+24
pegasus/lib/util.ml
··· 236 236 | Error e -> 237 237 raise (Caqti_error.Exn e) 238 238 239 + let transact conn fn : (unit, exn) Lwt_result.t = 240 + let module C = (val conn : Caqti_lwt.CONNECTION) in 241 + match%lwt C.start () with 242 + | Ok () -> ( 243 + match%lwt fn () with 244 + | Ok () -> ( 245 + match%lwt C.commit () with 246 + | Ok () -> 247 + Lwt.return_ok () 248 + | Error e -> ( 249 + match%lwt C.rollback () with 250 + | Ok () -> 251 + Lwt.return_error (Caqti_error.Exn e) 252 + | Error e -> 253 + Lwt.return_error (Caqti_error.Exn e) ) ) 254 + | Error e -> ( 255 + match%lwt C.rollback () with 256 + | Ok () -> 257 + Lwt.return_error e 258 + | Error e -> 259 + Lwt.return_error (Caqti_error.Exn e) ) ) 260 + | Error e -> 261 + Lwt.return_error (Caqti_error.Exn e) 262 + 239 263 (* runs a bunch of queries and catches duplicate insertion, returning how many succeeded *) 240 264 let multi_query pool 241 265 (queries : (Caqti_lwt.connection -> ('a, Caqti_error.t) Lwt_result.t) list)