Persistent store with Git semantics: lazy reads, delayed writes, content-addressing
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

irmin: add Bloom-slice sync protocol (Journault & Gazagnaire 2014)

Efficient sync of persistent DAGs: cost proportional to the difference,
not the total history. The client partitions its history into fixed-size
slices, each encoded as a Bloom filter. The server walks backward from
its heads, stopping at Bloom matches. Stateless server, self-correcting
(false positives cause redundant transfer, never data loss).

Three layers in Sync:
- Anti-entropy gossip: exchange branch head vectors (Demers et al. 1987)
- Bloom-slice: transfer missing blocks via Bloom-compressed rounds
- Bloom filter: probabilistic set membership (~1% FP, FNV-1a probes)

Also keeps merkle_diff for direct-access scenarios (local heap sync).

11 sync tests: gossip (3), merkle descent (2), bloom-slice (4),
bloom (2). 48 total tests in the irmin suite.

+456
+124
lib/sync.ml
··· 149 149 let bits = Bytes.of_string (String.sub s 4 (String.length s - 4)) in 150 150 Ok { num_entries = n; bits } 151 151 end 152 + 153 + (** {1 Bloom-slice sync (Journault & Gazagnaire 2014)} 154 + 155 + Efficient sync of persistent DAGs. The cost is proportional to the size of 156 + the difference, not the total history. The client partitions its history 157 + into fixed-size slices, each encoded as a Bloom filter. The server walks 158 + backward from its heads, stopping at Bloom matches. Stateless server. 159 + 160 + Protocol: 161 + {v 162 + Client Server 163 + | | 164 + |-- heads, slice₁ (Bloom) ----------->| 165 + | | walk back from heads, 166 + |<-- blocks₁, frontier₁ --------------| stop at Bloom matches 167 + | | 168 + |-- slice₂ (Bloom) ------------------>| 169 + |<-- blocks₂, frontier₂ --------------| 170 + ... 171 + |<-- blocks_n, frontier=∅ ------------| done 172 + v} 173 + 174 + False positives cause redundant transfer, never data loss. *) 175 + 176 + module Slice_sync = struct 177 + let slice_size = 256 178 + 179 + (** Client state between rounds. *) 180 + type 'hash state = { 181 + get_ancestors : 'hash list -> int -> 'hash list; 182 + to_string : 'hash -> string; 183 + } 184 + 185 + type 'hash client_msg = 186 + | Init of { heads : 'hash list; bloom : Bloom.t } 187 + | Next of { frontier : 'hash list; bloom : Bloom.t } 188 + | Done 189 + 190 + type 'hash server_msg = { 191 + blocks : ('hash * string) list; 192 + frontier : 'hash list; 193 + } 194 + 195 + let make_bloom ~to_string hashes = 196 + Bloom.of_strings (List.map to_string hashes) 197 + 198 + (** Start a sync session. *) 199 + let start ~heads ~get_ancestors ~to_string = 200 + let ancestors = get_ancestors heads slice_size in 201 + let bloom = make_bloom ~to_string ancestors in 202 + let state = { get_ancestors; to_string } in 203 + (state, Init { heads; bloom }) 204 + 205 + (** Process a server response. Returns [None] when sync is complete. *) 206 + let receive state ~apply_block response = 207 + List.iter (fun (h, data) -> apply_block h data) response.blocks; 208 + if response.frontier = [] then None 209 + else 210 + let ancestors = 211 + state.get_ancestors response.frontier slice_size 212 + in 213 + let bloom = make_bloom ~to_string:state.to_string ancestors in 214 + Some (Next { frontier = response.frontier; bloom }) 215 + 216 + (** Handle a client message on the server side. Stateless. *) 217 + let handle ~heads ~get_block ~get_children ~to_string msg = 218 + match msg with 219 + | Done -> { blocks = []; frontier = [] } 220 + | Init { heads = _; bloom } | Next { frontier = _; bloom } -> 221 + let start_from = 222 + match msg with 223 + | Init _ -> heads 224 + | Next { frontier; _ } -> frontier 225 + | Done -> [] 226 + in 227 + let visited = Hashtbl.create 256 in 228 + let blocks = ref [] in 229 + let queue = Queue.create () in 230 + List.iter (fun h -> Queue.push h queue) start_from; 231 + while not (Queue.is_empty queue) do 232 + let h = Queue.pop queue in 233 + if not (Hashtbl.mem visited h) then ( 234 + Hashtbl.replace visited h (); 235 + if Bloom.mem bloom (to_string h) then 236 + (* Client probably has this — stop exploring this branch *) 237 + () 238 + else 239 + match get_block h with 240 + | None -> () 241 + | Some data -> 242 + blocks := (h, data) :: !blocks; 243 + List.iter (fun c -> Queue.push c queue) (get_children h)) 244 + done; 245 + (* Frontier: nodes we sent whose children include unvisited hashes. 246 + These are the boundary — the client needs to send the next Bloom 247 + slice covering the ancestors of the frontier. *) 248 + let frontier = 249 + if !blocks = [] then [] 250 + else 251 + List.filter_map 252 + (fun (h, _) -> 253 + let children = get_children h in 254 + let unexplored = 255 + List.exists (fun c -> not (Hashtbl.mem visited c)) children 256 + in 257 + if unexplored then Some h else None) 258 + !blocks 259 + in 260 + { blocks = List.rev !blocks; frontier } 261 + 262 + (** Run the full sync loop. *) 263 + let pull ~heads ~get_ancestors ~to_string ~apply_block ~send ~receive:recv = 264 + let state, first = start ~heads ~get_ancestors ~to_string in 265 + send first; 266 + let rec loop state = 267 + let response = recv () in 268 + match receive state ~apply_block response with 269 + | None -> () 270 + | Some msg -> 271 + send msg; 272 + loop state 273 + in 274 + loop state 275 + end
+56
lib/sync.mli
··· 89 89 val encode : t -> string 90 90 val decode : string -> (t, [ `Msg of string ]) result 91 91 end 92 + 93 + (** {1 Bloom-slice sync (Journault & Gazagnaire 2014)} 94 + 95 + Efficient sync of persistent DAGs. Cost is proportional to the difference, 96 + not the total history. The client partitions its history into fixed-size 97 + slices, each encoded as a Bloom filter. The server walks backward from its 98 + heads, stopping at Bloom matches. The server is stateless between rounds. 99 + False positives cause redundant transfer, never data loss. *) 100 + 101 + module Slice_sync : sig 102 + type 'hash client_msg = 103 + | Init of { heads : 'hash list; bloom : Bloom.t } 104 + | Next of { frontier : 'hash list; bloom : Bloom.t } 105 + | Done 106 + 107 + type 'hash server_msg = { 108 + blocks : ('hash * string) list; 109 + frontier : 'hash list; 110 + } 111 + 112 + type 'hash state 113 + (** Client state between rounds. *) 114 + 115 + val start : 116 + heads:'hash list -> 117 + get_ancestors:('hash list -> int -> 'hash list) -> 118 + to_string:('hash -> string) -> 119 + 'hash state * 'hash client_msg 120 + (** Begin a sync session. Returns the first message to send. *) 121 + 122 + val receive : 123 + 'hash state -> 124 + apply_block:('hash -> string -> unit) -> 125 + 'hash server_msg -> 126 + 'hash client_msg option 127 + (** Process a server response. [None] when sync is complete. *) 128 + 129 + val handle : 130 + heads:'hash list -> 131 + get_block:('hash -> string option) -> 132 + get_children:('hash -> 'hash list) -> 133 + to_string:('hash -> string) -> 134 + 'hash client_msg -> 135 + 'hash server_msg 136 + (** Handle a client message on the server side. Stateless. *) 137 + 138 + val pull : 139 + heads:'hash list -> 140 + get_ancestors:('hash list -> int -> 'hash list) -> 141 + to_string:('hash -> string) -> 142 + apply_block:('hash -> string -> unit) -> 143 + send:('hash client_msg -> unit) -> 144 + receive:(unit -> 'hash server_msg) -> 145 + unit 146 + (** Run the full client-side sync loop until complete. *) 147 + end
+1
test/dune
··· 7 7 test_irmin 8 8 test_schema 9 9 test_irmin_tar 10 + test_sync 10 11 test_worktree) 11 12 (libraries 12 13 irmin
+1
test/test.ml
··· 6 6 Test_irmin.suite; 7 7 Test_schema.suite; 8 8 Test_irmin_tar.suite; 9 + Test_sync.suite; 9 10 Test_worktree.suite; 10 11 ]
+271
test/test_sync.ml
··· 1 + (** Sync tests: gossip exchange, merkle descent, Bloom-slice protocol. *) 2 + 3 + (* ===== Test DAG ===== *) 4 + 5 + type dag = { 6 + blocks : (string, string) Hashtbl.t; 7 + children : (string, string list) Hashtbl.t; 8 + mutable heads : (string * string) list; 9 + } 10 + 11 + let dag () = 12 + { 13 + blocks = Hashtbl.create 32; 14 + children = Hashtbl.create 32; 15 + heads = []; 16 + } 17 + 18 + let add d ~hash ~data ~parents = 19 + Hashtbl.replace d.blocks hash data; 20 + Hashtbl.replace d.children hash parents; 21 + d 22 + 23 + let set_head d branch hash = d.heads <- (branch, hash) :: d.heads 24 + let has d h = Hashtbl.mem d.blocks h 25 + let get d h = Hashtbl.find_opt d.blocks h 26 + let kids d h = match Hashtbl.find_opt d.children h with Some l -> l | None -> [] 27 + 28 + let get_ancestors d roots n = 29 + let visited = Hashtbl.create 16 in 30 + let result = ref [] in 31 + let queue = Queue.create () in 32 + List.iter (fun r -> Queue.push r queue) roots; 33 + while not (Queue.is_empty queue) && List.length !result < n do 34 + let h = Queue.pop queue in 35 + if not (Hashtbl.mem visited h) then ( 36 + Hashtbl.replace visited h (); 37 + if has d h then ( 38 + result := h :: !result; 39 + List.iter (fun p -> Queue.push p queue) (kids d h))) 40 + done; 41 + !result 42 + 43 + let apply d h data = 44 + Hashtbl.replace d.blocks h data; 45 + if not (Hashtbl.mem d.children h) then Hashtbl.replace d.children h [] 46 + 47 + (* ===== Gossip tests ===== *) 48 + 49 + let gossip_same () = 50 + let actions = 51 + Irmin.Sync.exchange ~equal:String.equal 52 + ~local:[ ("main", "abc") ] 53 + ~remote:[ ("main", "abc") ] 54 + ~is_ancestor:(fun _ _ -> false) 55 + in 56 + Alcotest.(check int) "no actions" 0 (List.length actions) 57 + 58 + let gossip_pull () = 59 + let actions = 60 + Irmin.Sync.exchange ~equal:String.equal 61 + ~local:[ ("main", "a") ] 62 + ~remote:[ ("main", "b") ] 63 + ~is_ancestor:(fun a b -> a = "a" && b = "b") 64 + in 65 + match actions with 66 + | [ Irmin.Sync.Pull ("main", "b") ] -> () 67 + | _ -> Alcotest.fail "expected Pull" 68 + 69 + let gossip_diverged () = 70 + let actions = 71 + Irmin.Sync.exchange ~equal:String.equal 72 + ~local:[ ("main", "x") ] 73 + ~remote:[ ("main", "y") ] 74 + ~is_ancestor:(fun _ _ -> false) 75 + in 76 + match actions with 77 + | [ Irmin.Sync.Diverged ("main", "x", "y") ] -> () 78 + | _ -> Alcotest.fail "expected Diverged" 79 + 80 + (* ===== Merkle descent tests ===== *) 81 + 82 + let descent_chain () = 83 + let server = dag () in 84 + let server = 85 + add server ~hash:"c" ~data:"data-c" ~parents:[ "b" ] 86 + |> add ~hash:"b" ~data:"data-b" ~parents:[ "a" ] 87 + |> add ~hash:"a" ~data:"data-a" ~parents:[] 88 + in 89 + let diff = 90 + Irmin.Sync.merkle_diff 91 + ~local_has:(fun _ -> false) 92 + ~remote_get:(fun h -> 93 + match get server h with 94 + | Some data -> Some (kids server h, data) 95 + | None -> None) 96 + "c" 97 + in 98 + Alcotest.(check int) "3 blocks" 3 (List.length diff); 99 + let hashes = List.map fst diff in 100 + Alcotest.(check bool) "has a" true (List.mem "a" hashes); 101 + Alcotest.(check bool) "has b" true (List.mem "b" hashes); 102 + Alcotest.(check bool) "has c" true (List.mem "c" hashes) 103 + 104 + let descent_skips_existing () = 105 + let server = dag () in 106 + let server = 107 + add server ~hash:"c" ~data:"data-c" ~parents:[ "b" ] 108 + |> add ~hash:"b" ~data:"data-b" ~parents:[ "a" ] 109 + |> add ~hash:"a" ~data:"data-a" ~parents:[] 110 + in 111 + let diff = 112 + Irmin.Sync.merkle_diff 113 + ~local_has:(fun h -> h = "a" || h = "b") 114 + ~remote_get:(fun h -> 115 + match get server h with 116 + | Some data -> Some (kids server h, data) 117 + | None -> None) 118 + "c" 119 + in 120 + Alcotest.(check int) "only c" 1 (List.length diff); 121 + Alcotest.(check string) "hash" "c" (fst (List.hd diff)) 122 + 123 + (* ===== Bloom-slice protocol tests ===== *) 124 + 125 + let slice_empty_client () = 126 + let server = dag () in 127 + let server = 128 + add server ~hash:"c" ~data:"data-c" ~parents:[ "b" ] 129 + |> add ~hash:"b" ~data:"data-b" ~parents:[ "a" ] 130 + |> add ~hash:"a" ~data:"data-a" ~parents:[] 131 + in 132 + set_head server "main" "c"; 133 + let client = dag () in 134 + let round = ref 0 in 135 + let responses = Queue.create () in 136 + let send msg = 137 + incr round; 138 + let resp = 139 + Irmin.Sync.Slice_sync.handle ~heads:[ "c" ] ~get_block:(get server) 140 + ~get_children:(kids server) ~to_string:Fun.id msg 141 + in 142 + Queue.push resp responses 143 + in 144 + Irmin.Sync.Slice_sync.pull ~heads:[] ~to_string:Fun.id 145 + ~get_ancestors:(get_ancestors client) ~apply_block:(apply client) ~send 146 + ~receive:(fun () -> Queue.pop responses); 147 + Alcotest.(check bool) "has a" true (has client "a"); 148 + Alcotest.(check bool) "has b" true (has client "b"); 149 + Alcotest.(check bool) "has c" true (has client "c"); 150 + Alcotest.(check string) "data-a" "data-a" (Option.get (get client "a")); 151 + Alcotest.(check bool) "converges" true (!round <= 3) 152 + 153 + let slice_partial_overlap () = 154 + let server = dag () in 155 + let server = 156 + add server ~hash:"d" ~data:"data-d" ~parents:[ "c" ] 157 + |> add ~hash:"c" ~data:"data-c" ~parents:[ "b" ] 158 + |> add ~hash:"b" ~data:"data-b" ~parents:[ "a" ] 159 + |> add ~hash:"a" ~data:"data-a" ~parents:[] 160 + in 161 + set_head server "main" "d"; 162 + let client = dag () in 163 + let client = 164 + add client ~hash:"b" ~data:"data-b" ~parents:[ "a" ] 165 + |> add ~hash:"a" ~data:"data-a" ~parents:[] 166 + in 167 + set_head client "main" "b"; 168 + let responses = Queue.create () in 169 + let send msg = 170 + let resp = 171 + Irmin.Sync.Slice_sync.handle ~heads:[ "d" ] ~get_block:(get server) 172 + ~get_children:(kids server) ~to_string:Fun.id msg 173 + in 174 + Queue.push resp responses 175 + in 176 + Irmin.Sync.Slice_sync.pull ~heads:[ "b" ] ~to_string:Fun.id 177 + ~get_ancestors:(get_ancestors client) ~apply_block:(apply client) ~send 178 + ~receive:(fun () -> Queue.pop responses); 179 + Alcotest.(check bool) "has c" true (has client "c"); 180 + Alcotest.(check bool) "has d" true (has client "d"); 181 + Alcotest.(check string) "data-d" "data-d" (Option.get (get client "d")) 182 + 183 + let slice_already_synced () = 184 + let d = dag () in 185 + let d = 186 + add d ~hash:"a" ~data:"data-a" ~parents:[] 187 + |> add ~hash:"b" ~data:"data-b" ~parents:[ "a" ] 188 + in 189 + set_head d "main" "b"; 190 + let rounds = ref 0 in 191 + let responses = Queue.create () in 192 + let send msg = 193 + incr rounds; 194 + let resp = 195 + Irmin.Sync.Slice_sync.handle ~heads:[ "b" ] ~get_block:(get d) 196 + ~get_children:(kids d) ~to_string:Fun.id msg 197 + in 198 + Queue.push resp responses 199 + in 200 + Irmin.Sync.Slice_sync.pull ~heads:[ "b" ] ~to_string:Fun.id 201 + ~get_ancestors:(get_ancestors d) ~apply_block:(apply d) ~send 202 + ~receive:(fun () -> Queue.pop responses); 203 + (* Should complete in 1 round — the Bloom catches everything *) 204 + Alcotest.(check bool) "at most 2 rounds" true (!rounds <= 2) 205 + 206 + let slice_diamond () = 207 + let server = dag () in 208 + let server = 209 + add server ~hash:"root" ~data:"root" ~parents:[] 210 + |> add ~hash:"left" ~data:"left" ~parents:[ "root" ] 211 + |> add ~hash:"right" ~data:"right" ~parents:[ "root" ] 212 + |> add ~hash:"merge" ~data:"merge" ~parents:[ "left"; "right" ] 213 + in 214 + set_head server "main" "merge"; 215 + let client = dag () in 216 + let client = add client ~hash:"root" ~data:"root" ~parents:[] in 217 + set_head client "main" "root"; 218 + let responses = Queue.create () in 219 + let send msg = 220 + let resp = 221 + Irmin.Sync.Slice_sync.handle ~heads:[ "merge" ] 222 + ~get_block:(get server) ~get_children:(kids server) ~to_string:Fun.id 223 + msg 224 + in 225 + Queue.push resp responses 226 + in 227 + Irmin.Sync.Slice_sync.pull ~heads:[ "root" ] ~to_string:Fun.id 228 + ~get_ancestors:(get_ancestors client) ~apply_block:(apply client) ~send 229 + ~receive:(fun () -> Queue.pop responses); 230 + Alcotest.(check bool) "has merge" true (has client "merge"); 231 + Alcotest.(check bool) "has left" true (has client "left"); 232 + Alcotest.(check bool) "has right" true (has client "right") 233 + 234 + (* ===== Bloom tests ===== *) 235 + 236 + let bloom_no_false_negatives () = 237 + let hashes = List.init 100 (fun i -> Fmt.str "hash-%03d-pad" i) in 238 + let b = Irmin.Sync.Bloom.of_strings hashes in 239 + List.iter 240 + (fun h -> Alcotest.(check bool) h true (Irmin.Sync.Bloom.mem b h)) 241 + hashes 242 + 243 + let bloom_roundtrip () = 244 + let hashes = List.init 50 (fun i -> Fmt.str "rt-%04d-pad" i) in 245 + let b = Irmin.Sync.Bloom.of_strings hashes in 246 + match Irmin.Sync.Bloom.decode (Irmin.Sync.Bloom.encode b) with 247 + | Ok decoded -> 248 + List.iter 249 + (fun h -> 250 + Alcotest.(check bool) h true (Irmin.Sync.Bloom.mem decoded h)) 251 + hashes 252 + | Error (`Msg m) -> Alcotest.fail m 253 + 254 + let suite = 255 + ( "sync", 256 + [ 257 + Alcotest.test_case "gossip: same heads" `Quick gossip_same; 258 + Alcotest.test_case "gossip: pull" `Quick gossip_pull; 259 + Alcotest.test_case "gossip: diverged" `Quick gossip_diverged; 260 + Alcotest.test_case "descent: chain" `Quick descent_chain; 261 + Alcotest.test_case "descent: skips existing" `Quick descent_skips_existing; 262 + Alcotest.test_case "bloom-slice: empty client" `Quick slice_empty_client; 263 + Alcotest.test_case "bloom-slice: partial overlap" `Quick 264 + slice_partial_overlap; 265 + Alcotest.test_case "bloom-slice: already synced" `Quick 266 + slice_already_synced; 267 + Alcotest.test_case "bloom-slice: diamond DAG" `Quick slice_diamond; 268 + Alcotest.test_case "bloom: no false negatives" `Quick 269 + bloom_no_false_negatives; 270 + Alcotest.test_case "bloom: roundtrip" `Quick bloom_roundtrip; 271 + ] )
+3
test/test_sync.mli
··· 1 + (** Sync tests. *) 2 + 3 + val suite : string * unit Alcotest.test_case list