An Elixir implementation of AT Protocol-flavoured Merkle Search Trees (MST)
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 290 lines 8.9 kB view raw
1defmodule MST.CAR do 2 @moduledoc """ 3 Bridges `MST.Tree` with the DASL CAR file format. 4 5 Provides functions to load an MST from a CAR binary or stream, and to export 6 an MST back to CAR format. The CAR header's first root CID is treated as the 7 MST root; any additional roots are ignored. 8 9 MST node blocks (DAG-CBOR codec, `:drisl`) are decoded into `MST.Node` 10 structs and stored in an `MST.Store.Memory`. Non-MST blocks (e.g. record 11 data with the `:raw` codec) are ignored during import — the store only holds 12 MST structural nodes. 13 14 ## Example 15 16 {:ok, tree} = MST.CAR.from_binary(File.read!("repo.car")) 17 {:ok, pairs} = MST.Tree.to_list(tree) 18 19 """ 20 21 alias DASL.{CAR, CID} 22 alias MST.{Node, Store, Tree} 23 24 @type car_error() :: 25 {:error, :header, atom()} 26 | {:error, :block, atom()} 27 | {:error, atom()} 28 29 # --------------------------------------------------------------------------- 30 # Import 31 # --------------------------------------------------------------------------- 32 33 @doc """ 34 Loads an MST from an already-decoded `DASL.CAR` struct. 35 36 Populates an `MST.Store.Memory` from the struct's blocks map and returns an 37 `MST.Tree` rooted at the CAR's first root CID. Use this when you already hold 38 a `%DASL.CAR{}` in memory and want to avoid a redundant encode/decode cycle. 39 40 ## Examples 41 42 iex> store = MST.Store.Memory.new() 43 iex> tree = MST.Tree.new(store) 44 iex> val = DASL.CID.compute("data") 45 iex> {:ok, tree} = MST.Tree.put(tree, "col/key", val) 46 iex> {:ok, binary} = MST.CAR.to_binary(tree) 47 iex> {:ok, car} = DASL.CAR.decode(binary) 48 iex> {:ok, tree2} = MST.CAR.from_car(car) 49 iex> MST.Tree.get(tree2, "col/key") 50 {:ok, val} 51 52 """ 53 @spec from_car(CAR.t()) :: {:ok, Tree.t()} | car_error() 54 def from_car(%CAR{roots: roots, blocks: blocks}), do: build_tree(roots, blocks) 55 56 @doc """ 57 Loads an MST from a CAR-encoded binary. 58 59 Decodes all blocks, populates an `MST.Store.Memory` with MST nodes (DAG-CBOR 60 codec), and returns an `MST.Tree` rooted at the CAR's first root CID. 61 62 Accepts the same options as `DASL.CAR.decode/2` (`verify: boolean`). 63 64 ## Examples 65 66 iex> store = MST.Store.Memory.new() 67 iex> tree = MST.Tree.new(store) 68 iex> val = DASL.CID.compute("data") 69 iex> {:ok, tree} = MST.Tree.put(tree, "col/key", val) 70 iex> {:ok, binary} = MST.CAR.to_binary(tree) 71 iex> {:ok, tree2} = MST.CAR.from_binary(binary) 72 iex> MST.Tree.get(tree2, "col/key") 73 {:ok, val} 74 75 """ 76 @spec from_binary(binary(), keyword()) :: {:ok, Tree.t()} | car_error() 77 def from_binary(binary, opts \\ []) when is_binary(binary) do 78 try do 79 with {:ok, car} <- CAR.decode(binary, opts) do 80 build_tree(car.roots, car.blocks) 81 end 82 rescue 83 e in ArgumentError -> {:error, :header, {:invalid_binary, e.message}} 84 end 85 end 86 87 @doc """ 88 Loads an MST from a CAR stream (an `Enumerable` of binary chunks). 89 90 Streams blocks through `DASL.CAR.stream_decode/2`, populating an 91 `MST.Store.Memory` incrementally. Useful for large files where you want to 92 avoid loading the full binary into memory at once. Converts stream raises 93 to error tuples. 94 95 ## Options 96 97 - `:verify` — verify CID digests of incoming blocks (default: `true`) 98 99 ## Examples 100 101 iex> store = MST.Store.Memory.new() 102 iex> tree = MST.Tree.new(store) 103 iex> val = DASL.CID.compute("data") 104 iex> {:ok, tree} = MST.Tree.put(tree, "col/key", val) 105 iex> {:ok, binary} = MST.CAR.to_binary(tree) 106 iex> chunk_stream = [binary] 107 iex> {:ok, tree2} = MST.CAR.from_stream(chunk_stream) 108 iex> MST.Tree.get(tree2, "col/key") 109 {:ok, val} 110 111 """ 112 @spec from_stream(Enumerable.t(), keyword()) :: {:ok, Tree.t()} | car_error() 113 def from_stream(stream, opts \\ []) do 114 try do 115 {roots, blocks} = 116 stream 117 |> CAR.stream_decode(opts) 118 |> Enum.reduce({nil, %{}}, fn 119 {:header, _version, roots}, {_roots, blocks} -> 120 {roots, blocks} 121 122 {:block, cid, data}, {roots, blocks} -> 123 {roots, Map.put(blocks, cid, data)} 124 end) 125 126 build_tree(roots || [], blocks) 127 rescue 128 e in RuntimeError -> {:error, {:stream_decode, e.message}} 129 end 130 end 131 132 # --------------------------------------------------------------------------- 133 # Export 134 # --------------------------------------------------------------------------- 135 136 @doc """ 137 Serialises an `MST.Tree` to a CAR-encoded binary. 138 139 Collects all reachable MST node blocks and wraps them in a CARv1 file with 140 the tree root as the sole header root. 141 142 ## Examples 143 144 iex> store = MST.Store.Memory.new() 145 iex> tree = MST.Tree.new(store) 146 iex> val = DASL.CID.compute("data") 147 iex> {:ok, tree} = MST.Tree.put(tree, "col/key", val) 148 iex> {:ok, binary} = MST.CAR.to_binary(tree) 149 iex> is_binary(binary) 150 true 151 152 """ 153 @spec to_binary(Tree.t()) :: {:ok, binary()} | car_error() 154 def to_binary(tree), do: to_binary(tree, []) 155 156 @doc false 157 @spec to_binary(Tree.t(), keyword()) :: {:ok, binary()} | car_error() 158 def to_binary(%Tree{root: nil}, _opts) do 159 # Empty tree — emit a CAR with an empty node as root 160 empty_node = Node.empty() 161 162 with {:ok, bytes} <- Node.encode(empty_node) do 163 cid = CID.compute(bytes, :drisl) 164 165 car = %CAR{ 166 version: 1, 167 roots: [cid], 168 blocks: %{cid => bytes} 169 } 170 171 CAR.encode(car) 172 else 173 {:error, :encode, reason} -> {:error, reason} 174 end 175 end 176 177 def to_binary(%Tree{root: root} = tree, opts) do 178 with {:ok, blocks} <- Tree.collect_blocks(tree) do 179 car = %CAR{ 180 version: 1, 181 roots: [root], 182 blocks: blocks 183 } 184 185 CAR.encode(car, opts) 186 end 187 end 188 189 @doc """ 190 Returns a stream of `DASL.CAR` stream items for the tree in pre-order 191 (root first, then depth-first left-to-right). 192 193 Emits `{:header, 1, [root_cid]}` followed by `{:block, cid, bytes}` for 194 each reachable MST node. 195 196 This stream can be piped into a custom CAR writer. It does **not** produce 197 a fully-encoded CAR binary — use `to_binary/2` for that. 198 199 """ 200 @spec to_stream(Tree.t()) :: Enumerable.t() 201 def to_stream(%Tree{root: nil}) do 202 empty_node = Node.empty() 203 {:ok, bytes} = Node.encode(empty_node) 204 cid = CID.compute(bytes, :drisl) 205 206 [ 207 {:header, 1, [cid]}, 208 {:block, cid, bytes} 209 ] 210 end 211 212 def to_stream(%Tree{root: root, store: store}) do 213 header = [{:header, 1, [root]}] 214 blocks = preorder_stream(store, root) 215 Stream.concat(header, blocks) 216 end 217 218 # --------------------------------------------------------------------------- 219 # Private — tree construction from decoded blocks 220 # --------------------------------------------------------------------------- 221 222 @spec build_tree([CID.t()], %{CID.t() => binary()}) :: {:ok, Tree.t()} | car_error() 223 defp build_tree([], _blocks), do: {:ok, Tree.new(Store.Memory.new())} 224 225 defp build_tree([root | _], blocks) do 226 # Decode all DAG-CBOR blocks into MST nodes; ignore raw-codec blocks. 227 result = 228 Enum.reduce_while(blocks, {:ok, Store.Memory.new()}, fn {cid, data}, {:ok, store} -> 229 case decode_block(cid, data) do 230 {:ok, node} -> 231 {:cont, {:ok, Store.put(store, cid, node)}} 232 233 :skip -> 234 {:cont, {:ok, store}} 235 236 {:error, _} = err -> 237 {:halt, err} 238 end 239 end) 240 241 case result do 242 {:ok, store} -> {:ok, Tree.from_root(root, store)} 243 err -> err 244 end 245 end 246 247 @spec decode_block(CID.t(), binary()) :: {:ok, Node.t()} | :skip | {:error, atom()} 248 defp decode_block(%CID{codec: :raw}, _data), do: :skip 249 250 defp decode_block(%CID{codec: :drisl}, data) do 251 case Node.decode(data) do 252 {:ok, node} -> {:ok, node} 253 {:error, :decode, reason} -> {:error, reason} 254 end 255 end 256 257 # --------------------------------------------------------------------------- 258 # Private — pre-order DFS stream 259 # --------------------------------------------------------------------------- 260 261 @spec preorder_stream(Store.t(), CID.t()) :: Enumerable.t() 262 defp preorder_stream(store, root) do 263 Stream.resource( 264 fn -> [root] end, 265 fn 266 [] -> 267 {:halt, []} 268 269 [cid | rest] -> 270 case Store.get(store, cid) do 271 {:error, :not_found} -> 272 raise "MST.CAR.to_stream/1: node not found: #{CID.encode(cid)}" 273 274 {:ok, node} -> 275 {:ok, bytes} = Node.encode(node) 276 children = subtree_cids(node) 277 {[{:block, cid, bytes}], children ++ rest} 278 end 279 end, 280 fn _ -> :ok end 281 ) 282 end 283 284 @spec subtree_cids(Node.t()) :: [CID.t()] 285 defp subtree_cids(node) do 286 left = if node.left, do: [node.left], else: [] 287 rights = Enum.flat_map(node.entries, fn e -> if e.right, do: [e.right], else: [] end) 288 left ++ rights 289 end 290end