objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Emit #commit firehose event in apply_writes

futurGH b9cb6df1 d489b123

+213 -110
+147 -98
mist/lib/mst.ml
··· 41 41 42 42 type node_or_entry = Node of node | Entry of entry 43 43 44 + type diff_add = {key: string; cid: Cid.t} 45 + 46 + type diff_update = {key: string; prev: Cid.t; cid: Cid.t} 47 + 48 + type diff_delete = {key: string; cid: Cid.t} 49 + 50 + type data_diff = 51 + { adds: diff_add list 52 + ; updates: diff_update list 53 + ; deletes: diff_delete list 54 + ; new_mst_blocks: (Cid.t * bytes) list 55 + ; new_leaf_cids: Cid.Set.t 56 + ; removed_cids: Cid.Set.t } 57 + 44 58 let ( let*? ) lazy_opt_lwt f = 45 59 let%lwt result = Lazy.force lazy_opt_lwt in 46 60 f result ··· 49 63 let%lwt result = Lazy.force lazy_opt_lwt in 50 64 f result 51 65 52 - module Make (Store : Writable_blockstore) = struct 66 + module type Intf = sig 67 + type bs 68 + 69 + type t = {blockstore: bs; root: Cid.t} 70 + 71 + val create : bs -> Cid.t -> t 72 + 73 + val retrieve_node_raw : t -> Cid.t -> node_raw option Lwt.t 74 + 75 + val retrieve_node : t -> Cid.t -> node option Lwt.t 76 + 77 + val retrieve_node_lazy : t -> Cid.t -> node option Lwt.t lazy_t 78 + 79 + val get_node_height : t -> node_raw -> int Lwt.t 80 + 81 + val traverse : t -> (string -> Cid.t -> unit) -> unit Lwt.t 82 + 83 + val build_map : t -> Cid.t StringMap.t Lwt.t 84 + 85 + val path_to_entry : t -> Cid.t -> string -> (Cid.t * bytes) list Lwt.t 86 + 87 + val to_blocks_stream : t -> (Cid.t * bytes) Lwt_seq.t 88 + 89 + val serialize : t -> node -> (Cid.t * bytes, exn) Lwt_result.t 90 + 91 + val get_covering_proof : t -> string -> Storage.Block_map.t Lwt.t 92 + 93 + val leaf_count : t -> int Lwt.t 94 + 95 + val layer : t -> int Lwt.t 96 + 97 + val all_nodes : t -> (Cid.t * bytes) list Lwt.t 98 + 99 + val create_empty : bs -> (t, exn) Lwt_result.t 100 + 101 + val get_cid : t -> string -> Cid.t option Lwt.t 102 + 103 + val of_assoc : bs -> (string * Cid.t) list -> t Lwt.t 104 + 105 + val add : t -> string -> Cid.t -> t Lwt.t 106 + 107 + val delete : t -> string -> t Lwt.t 108 + 109 + val collect_nodes_and_leaves : 110 + t -> ((Cid.t * bytes) list * Cid.Set.t * Cid.Set.t) Lwt.t 111 + 112 + val leaves_of_node : node -> (string * Cid.t) list Lwt.t 113 + 114 + val leaves_of_root : t -> (string * Cid.t) list Lwt.t 115 + 116 + val null_diff : t -> data_diff Lwt.t 117 + 118 + val equal : t -> t -> bool Lwt.t 119 + end 120 + 121 + module Make (Store : Writable_blockstore) : Intf with type bs = Store.t = struct 53 122 type bs = Store.t 54 123 55 124 type t = {blockstore: bs; root: Cid.t} ··· 139 208 let%lwt layer = get_node_height t node_raw in 140 209 let entries = 141 210 List.fold_left 142 - (fun entries entry -> 211 + (fun (entries : entry list) entry -> 143 212 let prefix = 144 213 match entries with 145 214 | [] -> ··· 156 225 | None -> 157 226 lazy Lwt.return_none 158 227 in 159 - {layer; key= path; value= entry.v; right} :: entries ) 228 + ({layer; key= path; value= entry.v; right} : entry) :: entries ) 160 229 [] node_raw.e 161 230 in 162 231 Lwt.return {layer; left; entries} ··· 187 256 let*? left = node.left in 188 257 match left with Some l -> traverse l | None -> Lwt.return_unit 189 258 in 190 - List.iter (fun entry -> fn entry.key entry.value) node.entries ; 259 + List.iter (fun (entry : entry) -> fn entry.key entry.value) node.entries ; 191 260 Lwt.return_unit 192 261 in 193 262 match%lwt retrieve_node t t.root with ··· 248 317 let entries = (Option.get root).entries in 249 318 let entries_len = List.length entries in 250 319 let entry_index = 251 - match List.find_index (fun e -> e.key >= key) entries with 320 + match List.find_index (fun (e : entry) -> e.key >= key) entries with 252 321 | Some index -> 253 322 index 254 323 | None -> ··· 381 450 (* produces a cid and cbor-encoded bytes for a given tree *) 382 451 let serialize t node : (Cid.t * bytes, exn) Lwt_result.t = 383 452 let sorted_entries = 384 - List.sort (fun a b -> String.compare a.key b.key) node.entries 453 + List.sort (fun (a : entry) b -> String.compare a.key b.key) node.entries 385 454 in 386 455 let rec aux node : (Cid.t * bytes) Lwt.t = 387 456 let%lwt left = ··· 579 648 (fun acc proof -> Block_map.merge acc proof) 580 649 Block_map.empty proofs ) 581 650 582 - (*** diffs ***) 583 - type diff_add = {key: string; cid: Cid.t} 584 - 585 - type diff_update = {key: string; prev: Cid.t; cid: Cid.t} 586 - 587 - type diff_delete = {key: string; cid: Cid.t} 588 - 589 - type data_diff = 590 - { adds: diff_add list 591 - ; updates: diff_update list 592 - ; deletes: diff_delete list 593 - ; new_mst_blocks: (Cid.t * bytes) list 594 - ; new_leaf_cids: Cid.Set.t 595 - ; removed_cids: Cid.Set.t } 596 - 597 651 (* collects all node blocks (cid, bytes) and all leaf cids reachable from root 598 652 only traverses nodes; doesn't fetch leaf blocks 599 653 returns (nodes, visited, leaves) *) ··· 927 981 ; new_leaf_cids= curr_leaf_set 928 982 ; removed_cids= Cid.Set.empty } 929 983 930 - (* produces a diff between two msts *) 931 - let mst_diff t_curr t_prev_opt : data_diff Lwt.t = 932 - match t_prev_opt with 933 - | None -> 934 - null_diff t_curr 935 - | Some t_prev -> 936 - let%lwt curr_nodes, curr_node_set, curr_leaf_set = 937 - collect_nodes_and_leaves t_curr 938 - in 939 - let%lwt _, prev_node_set, prev_leaf_set = 940 - collect_nodes_and_leaves t_prev 941 - in 942 - (* just convenient to have these functions *) 943 - let in_prev_nodes cid = Cid.Set.mem cid prev_node_set in 944 - let in_curr_nodes cid = Cid.Set.mem cid curr_node_set in 945 - let in_prev_leaves cid = Cid.Set.mem cid prev_leaf_set in 946 - let in_curr_leaves cid = Cid.Set.mem cid curr_leaf_set in 947 - (* new mst blocks are curr nodes that are not in prev *) 948 - let new_mst_blocks = 949 - List.filter (fun (cid, _) -> not (in_prev_nodes cid)) curr_nodes 950 - in 951 - (* removed cids are prev nodes not in curr plus prev leaves not in curr *) 952 - let removed_node_cids = 953 - Cid.Set.fold 954 - (fun cid acc -> 955 - if not (in_curr_nodes cid) then Cid.Set.add cid acc else acc ) 956 - prev_node_set Cid.Set.empty 957 - in 958 - let removed_leaf_cids = 959 - Cid.Set.fold 960 - (fun cid acc -> 961 - if not (in_curr_leaves cid) then Cid.Set.add cid acc else acc ) 962 - prev_leaf_set Cid.Set.empty 963 - in 964 - let removed_cids = Cid.Set.union removed_node_cids removed_leaf_cids in 965 - (* new leaf cids are curr leaves not in prev *) 966 - let new_leaf_cids = 967 - Cid.Set.fold 968 - (fun cid acc -> 969 - if not (in_prev_leaves cid) then Cid.Set.add cid acc else acc ) 970 - curr_leaf_set Cid.Set.empty 971 - in 972 - (* compute adds/updates/deletes by merging sorted leaves *) 973 - let%lwt curr_leaves = leaves_of_root t_curr in 974 - let%lwt prev_leaves = leaves_of_root t_prev in 975 - let rec merge (pl : (string * Cid.t) list) (cl : (string * Cid.t) list) 976 - (adds : diff_add list) (updates : diff_update list) 977 - (deletes : diff_delete list) = 978 - match (pl, cl) with 979 - | [], [] -> 980 - (* we prepend for speed, then reverse at the end *) 981 - (List.rev adds, List.rev updates, List.rev deletes) 982 - | [], (k, c) :: cr -> 983 - (* more curr than prev, goes in adds *) 984 - merge [] cr ({key= k; cid= c} :: adds) updates deletes 985 - | (k, c) :: pr, [] -> 986 - (* more prev than curr, goes in deletes *) 987 - merge pr [] adds updates ({key= k; cid= c} :: deletes) 988 - | (k1, c1) :: pr, (k2, c2) :: cr -> 989 - if k1 = k2 then (* if key & value are the same, keep going *) 990 - if Cid.equal c1 c2 then merge pr cr adds updates deletes 991 - else (* same key, different value; update *) 992 - merge pr cr adds 993 - ({key= k1; prev= c1; cid= c2} :: updates) 994 - deletes 995 - else if k1 < k2 then 996 - merge pr ((k2, c2) :: cr) adds updates 997 - ({key= k1; cid= c1} :: deletes) 998 - else 999 - merge ((k1, c1) :: pr) cr 1000 - ({key= k2; cid= c2} :: adds) 1001 - updates deletes 1002 - in 1003 - let adds, updates, deletes = merge prev_leaves curr_leaves [] [] [] in 1004 - Lwt.return 1005 - {adds; updates; deletes; new_mst_blocks; new_leaf_cids; removed_cids} 1006 - 1007 984 (* checks that two msts are identical by recursively comparing their entries *) 1008 985 let equal (t1 : t) (t2 : t) : bool Lwt.t = 1009 986 let rec nodes_equal (n1 : node) (n2 : node) : bool Lwt.t = ··· 1074 1051 | _ -> 1075 1052 Lwt.return false 1076 1053 end 1054 + 1055 + module Differ (Prev : Intf) (Curr : Intf) = struct 1056 + let diff ~(t_curr : Curr.t) ~(t_prev : Prev.t) : data_diff Lwt.t = 1057 + let%lwt curr_nodes, curr_node_set, curr_leaf_set = 1058 + Curr.collect_nodes_and_leaves t_curr 1059 + in 1060 + let%lwt _, prev_node_set, prev_leaf_set = 1061 + Prev.collect_nodes_and_leaves t_prev 1062 + in 1063 + (* just convenient to have these functions *) 1064 + let in_prev_nodes cid = Cid.Set.mem cid prev_node_set in 1065 + let in_curr_nodes cid = Cid.Set.mem cid curr_node_set in 1066 + let in_prev_leaves cid = Cid.Set.mem cid prev_leaf_set in 1067 + let in_curr_leaves cid = Cid.Set.mem cid curr_leaf_set in 1068 + (* new mst blocks are curr nodes that are not in prev *) 1069 + let new_mst_blocks = 1070 + List.filter (fun (cid, _) -> not (in_prev_nodes cid)) curr_nodes 1071 + in 1072 + (* removed cids are prev nodes not in curr plus prev leaves not in curr *) 1073 + let removed_node_cids = 1074 + Cid.Set.fold 1075 + (fun cid acc -> 1076 + if not (in_curr_nodes cid) then Cid.Set.add cid acc else acc ) 1077 + prev_node_set Cid.Set.empty 1078 + in 1079 + let removed_leaf_cids = 1080 + Cid.Set.fold 1081 + (fun cid acc -> 1082 + if not (in_curr_leaves cid) then Cid.Set.add cid acc else acc ) 1083 + prev_leaf_set Cid.Set.empty 1084 + in 1085 + let removed_cids = Cid.Set.union removed_node_cids removed_leaf_cids in 1086 + (* new leaf cids are curr leaves not in prev *) 1087 + let new_leaf_cids = 1088 + Cid.Set.fold 1089 + (fun cid acc -> 1090 + if not (in_prev_leaves cid) then Cid.Set.add cid acc else acc ) 1091 + curr_leaf_set Cid.Set.empty 1092 + in 1093 + (* compute adds/updates/deletes by merging sorted leaves *) 1094 + let%lwt curr_leaves = Curr.leaves_of_root t_curr in 1095 + let%lwt prev_leaves = Prev.leaves_of_root t_prev in 1096 + let rec merge (pl : (string * Cid.t) list) (cl : (string * Cid.t) list) 1097 + (adds : diff_add list) (updates : diff_update list) 1098 + (deletes : diff_delete list) = 1099 + match (pl, cl) with 1100 + | [], [] -> 1101 + (* we prepend for speed, then reverse at the end *) 1102 + (List.rev adds, List.rev updates, List.rev deletes) 1103 + | [], (k, c) :: cr -> 1104 + (* more curr than prev, goes in adds *) 1105 + merge [] cr ({key= k; cid= c} :: adds) updates deletes 1106 + | (k, c) :: pr, [] -> 1107 + (* more prev than curr, goes in deletes *) 1108 + merge pr [] adds updates ({key= k; cid= c} :: deletes) 1109 + | (k1, c1) :: pr, (k2, c2) :: cr -> 1110 + if k1 = k2 then (* if key & value are the same, keep going *) 1111 + if Cid.equal c1 c2 then merge pr cr adds updates deletes 1112 + else (* same key, different value; update *) 1113 + merge pr cr adds ({key= k1; prev= c1; cid= c2} :: updates) deletes 1114 + else if k1 < k2 then 1115 + merge pr ((k2, c2) :: cr) adds updates 1116 + ({key= k1; cid= c1} :: deletes) 1117 + else 1118 + merge ((k1, c1) :: pr) cr 1119 + ({key= k2; cid= c2} :: adds) 1120 + updates deletes 1121 + in 1122 + let adds, updates, deletes = merge prev_leaves curr_leaves [] [] [] in 1123 + Lwt.return 1124 + {adds; updates; deletes; new_mst_blocks; new_leaf_cids; removed_cids} 1125 + end
+6 -5
mist/test/test_mst.ml
··· 2 2 open Lwt.Infix 3 3 open Lwt_result.Syntax 4 4 module MemMst = Mst.Make (Storage.Memory_blockstore) 5 + module MemDiff = Mst.Differ (MemMst) (MemMst) 5 6 module StringMap = Dag_cbor.StringMap 6 7 7 8 let cid_of_string_exn s = ··· 595 596 let%lwt to_diff = 596 597 Lwt_list.fold_left_s (fun t (k, v) -> MemMst.add t k v) to_diff to_add 597 598 in 598 - let%lwt diff = MemMst.mst_diff to_diff (Some mst) in 599 + let%lwt diff = MemDiff.diff ~t_curr:to_diff ~t_prev:mst in 599 600 (* lengths *) 600 601 Alcotest.(check int) "adds length" 100 (List.length diff.adds) ; 601 602 Alcotest.(check int) "updates length" 100 (List.length diff.updates) ; ··· 603 604 (* contents: convert to maps to compare *) 604 605 let adds_map = 605 606 List.fold_left 606 - (fun m (a : MemMst.diff_add) -> StringMap.add a.key a.cid m) 607 + (fun m (a : Mst.diff_add) -> StringMap.add a.key a.cid m) 607 608 StringMap.empty diff.adds 608 609 in 609 610 let updates_map = 610 611 List.fold_left 611 - (fun m (u : MemMst.diff_update) -> StringMap.add u.key (u.prev, u.cid) m) 612 + (fun m (u : Mst.diff_update) -> StringMap.add u.key (u.prev, u.cid) m) 612 613 StringMap.empty diff.updates 613 614 in 614 615 let deletes_map = 615 616 List.fold_left 616 - (fun m (d : MemMst.diff_delete) -> StringMap.add d.key d.cid m) 617 + (fun m (d : Mst.diff_delete) -> StringMap.add d.key d.cid m) 617 618 StringMap.empty diff.deletes 618 619 in 619 620 (* compare adds *) ··· 750 751 Lwt.return_ok () 751 752 752 753 let test_trivial_root () = 753 - let store = Storage.Memory_blockstore.create () in 754 + let store : MemMst.bs = Storage.Memory_blockstore.create () in 754 755 let cid1 = 755 756 cid_of_string_exn 756 757 "bafyreie5cvv4h45feadgeuwhbcutmh6t2ceseocckahdoe6uat64zmz454"
+58 -5
pegasus/lib/repository.ml
··· 1 1 open User_store.Types 2 + module BlockMap = User_store.Block_map 2 3 module Lex = Mist.Lex 3 4 module Mst = Mist.Mst.Make (User_store) 5 + module MemMst = Mist.Mst.Make (Mist.Storage.Memory_blockstore) 4 6 module StringMap = Lex.StringMap 5 7 module Tid = Mist.Tid 6 8 ··· 256 258 (Cid.to_string (Option.get swap_commit)) 257 259 (match t.commit with Some c -> Cid.to_string c | None -> "null") ) ; 258 260 let%lwt block_map = Lwt.map ref (get_map t) in 261 + (* need to cache this so in the end, we only emit new blocks *) 262 + let prev_blocks = 263 + StringMap.bindings !block_map 264 + |> List.fold_left (fun acc (k, v) -> Cid.Map.add v k acc) Cid.Map.empty 265 + in 266 + (* record cbor keyed by cid, needed to calculate covering proofs at the end 267 + we can't do it along the way because covering proofs require the full new mst *) 268 + let added_records = ref Cid.Map.empty in 269 + (* ops to emit, built in loop because prev_data (previous cid) is otherwise inaccessible *) 270 + let commit_ops : Sequencer.Types.commit_evt_op list ref = ref [] in 259 271 let%lwt results = 260 272 List.map 261 273 (fun (w : repo_write) -> ··· 279 291 if StringMap.mem "$type" value then value 280 292 else StringMap.add "$type" (`String collection) value 281 293 in 282 - let%lwt cid = 294 + let%lwt cid, cbor = 283 295 User_store.put_record t.db (`LexMap record_with_type) path 284 296 in 285 297 block_map := StringMap.add path cid !block_map ; 298 + added_records := Cid.Map.add cid (path, cbor) !added_records ; 299 + commit_ops := 300 + !commit_ops @ [{action= `Create; path; cid= Some cid; prev= None}] ; 286 301 let refs = 287 302 Util.find_blob_refs value 288 303 |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) ··· 336 351 if StringMap.mem "$type" value then value 337 352 else StringMap.add "$type" (`String collection) value 338 353 in 339 - let%lwt new_cid = 354 + let%lwt new_cid, cbor = 340 355 User_store.put_record t.db (`LexMap record_with_type) path 341 356 in 342 357 block_map := StringMap.add path new_cid !block_map ; 358 + added_records := Cid.Map.add new_cid (path, cbor) !added_records ; 359 + commit_ops := 360 + !commit_ops 361 + @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ; 343 362 Lwt.return 344 363 (Update 345 364 { type'= "com.atproto.repo.applyWrites#updateResult" ··· 372 391 Lwt.return_unit 373 392 in 374 393 block_map := StringMap.remove path !block_map ; 394 + commit_ops := 395 + !commit_ops @ [{action= `Delete; path; cid= None; prev= cid}] ; 375 396 Lwt.return 376 397 (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) ) 377 398 writes 378 399 |> Lwt.all 379 400 in 380 401 let%lwt () = User_store.clear_mst t.db in 381 - let%lwt {root; _} = Mst.of_assoc t.db (StringMap.bindings !block_map) in 382 - let%lwt commit = put_commit t root ~previous:(Some commit) in 383 - Lwt.return {commit; results} 402 + let%lwt new_mst = Mst.of_assoc t.db (StringMap.bindings !block_map) in 403 + let%lwt new_commit = put_commit t new_mst.root ~previous:(Some commit) in 404 + let commit_cid, commit_signed = new_commit in 405 + let commit_block = 406 + commit_signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 407 + in 408 + let relevant_blocks = ref BlockMap.empty in 409 + let%lwt _ = 410 + (* for each block that wasn't in the previous mst, 411 + commit event `blocks` contains the block itself & its covering proofs *) 412 + List.map 413 + (fun (cid, (path, cbor)) -> 414 + if Cid.Map.mem cid prev_blocks then Lwt.return_unit 415 + else 416 + let%lwt proofs = Mst.get_covering_proof new_mst path in 417 + relevant_blocks := 418 + BlockMap.merge (BlockMap.set cid cbor !relevant_blocks) proofs ; 419 + Lwt.return_unit ) 420 + (Cid.Map.bindings !added_records) 421 + |> Lwt.all 422 + in 423 + let block_stream = 424 + BlockMap.entries !relevant_blocks 425 + |> Lwt_seq.of_list 426 + |> Lwt_seq.cons (commit_cid, commit_block) 427 + in 428 + let%lwt blocks = 429 + Car.blocks_to_stream commit_cid block_stream |> Car.collect_stream 430 + in 431 + let%lwt _ = 432 + Sequencer.sequence_commit t.db ~did:t.did ~commit:commit_cid 433 + ~rev:commit_signed.rev ~blocks ~ops:!commit_ops ~since:commit.rev 434 + ~prev_data:commit.data () 435 + in 436 + Lwt.return {commit= new_commit; results} 384 437 385 438 let load did : t Lwt.t = 386 439 let%lwt data_store_conn =
+2 -2
pegasus/lib/user_store.ml
··· 355 355 {path; cid; value= Lex.of_cbor data; since} ) 356 356 >>= Lwt.return 357 357 358 - let put_record conn record path : Cid.t Lwt.t = 358 + let put_record conn record path : (Cid.t * bytes) Lwt.t = 359 359 let cid, data = Lex.to_cbor_block record in 360 360 let since = Tid.now () in 361 361 let$! () = Queries.put_record ~path ~cid ~data ~since conn in 362 - Lwt.return cid 362 + Lwt.return (cid, data) 363 363 364 364 (* blobs *) 365 365