An Elixir implementation of AT Protocol-flavoured Merkle Search Trees (MST)
1defmodule MST.CAR do
2 @moduledoc """
3 Bridges `MST.Tree` with the DASL CAR file format.
4
5 Provides functions to load an MST from a CAR binary or stream, and to export
6 an MST back to CAR format. The CAR header's first root CID is treated as the
7 MST root; any additional roots are ignored.
8
9 MST node blocks (DAG-CBOR codec, `:drisl`) are decoded into `MST.Node`
10 structs and stored in an `MST.Store.Memory`. Non-MST blocks (e.g. record
11 data with the `:raw` codec) are ignored during import — the store only holds
12 MST structural nodes.
13
14 ## Example
15
16 {:ok, tree} = MST.CAR.from_binary(File.read!("repo.car"))
17 {:ok, pairs} = MST.Tree.to_list(tree)
18
19 """
20
21 alias DASL.{CAR, CID}
22 alias MST.{Node, Store, Tree}
23
24 @type car_error() ::
25 {:error, :header, atom()}
26 | {:error, :block, atom()}
27 | {:error, atom()}
28
29 # ---------------------------------------------------------------------------
30 # Import
31 # ---------------------------------------------------------------------------
32
33 @doc """
34 Loads an MST from an already-decoded `DASL.CAR` struct.
35
36 Populates an `MST.Store.Memory` from the struct's blocks map and returns an
37 `MST.Tree` rooted at the CAR's first root CID. Use this when you already hold
38 a `%DASL.CAR{}` in memory and want to avoid a redundant encode/decode cycle.
39
40 ## Examples
41
42 iex> store = MST.Store.Memory.new()
43 iex> tree = MST.Tree.new(store)
44 iex> val = DASL.CID.compute("data")
45 iex> {:ok, tree} = MST.Tree.put(tree, "col/key", val)
46 iex> {:ok, binary} = MST.CAR.to_binary(tree)
47 iex> {:ok, car} = DASL.CAR.decode(binary)
48 iex> {:ok, tree2} = MST.CAR.from_car(car)
49 iex> MST.Tree.get(tree2, "col/key")
50 {:ok, val}
51
52 """
53 @spec from_car(CAR.t()) :: {:ok, Tree.t()} | car_error()
54 def from_car(%CAR{roots: roots, blocks: blocks}), do: build_tree(roots, blocks)
55
56 @doc """
57 Loads an MST from a CAR-encoded binary.
58
59 Decodes all blocks, populates an `MST.Store.Memory` with MST nodes (DAG-CBOR
60 codec), and returns an `MST.Tree` rooted at the CAR's first root CID.
61
62 Accepts the same options as `DASL.CAR.decode/2` (`verify: boolean`).
63
64 ## Examples
65
66 iex> store = MST.Store.Memory.new()
67 iex> tree = MST.Tree.new(store)
68 iex> val = DASL.CID.compute("data")
69 iex> {:ok, tree} = MST.Tree.put(tree, "col/key", val)
70 iex> {:ok, binary} = MST.CAR.to_binary(tree)
71 iex> {:ok, tree2} = MST.CAR.from_binary(binary)
72 iex> MST.Tree.get(tree2, "col/key")
73 {:ok, val}
74
75 """
76 @spec from_binary(binary(), keyword()) :: {:ok, Tree.t()} | car_error()
77 def from_binary(binary, opts \\ []) when is_binary(binary) do
78 try do
79 with {:ok, car} <- CAR.decode(binary, opts) do
80 build_tree(car.roots, car.blocks)
81 end
82 rescue
83 e in ArgumentError -> {:error, :header, {:invalid_binary, e.message}}
84 end
85 end
86
87 @doc """
88 Loads an MST from a CAR stream (an `Enumerable` of binary chunks).
89
90 Streams blocks through `DASL.CAR.stream_decode/2`, populating an
91 `MST.Store.Memory` incrementally. Useful for large files where you want to
92 avoid loading the full binary into memory at once. Converts stream raises
93 to error tuples.
94
95 ## Options
96
97 - `:verify` — verify CID digests of incoming blocks (default: `true`)
98
99 ## Examples
100
101 iex> store = MST.Store.Memory.new()
102 iex> tree = MST.Tree.new(store)
103 iex> val = DASL.CID.compute("data")
104 iex> {:ok, tree} = MST.Tree.put(tree, "col/key", val)
105 iex> {:ok, binary} = MST.CAR.to_binary(tree)
106 iex> chunk_stream = [binary]
107 iex> {:ok, tree2} = MST.CAR.from_stream(chunk_stream)
108 iex> MST.Tree.get(tree2, "col/key")
109 {:ok, val}
110
111 """
112 @spec from_stream(Enumerable.t(), keyword()) :: {:ok, Tree.t()} | car_error()
113 def from_stream(stream, opts \\ []) do
114 try do
115 {roots, blocks} =
116 stream
117 |> CAR.stream_decode(opts)
118 |> Enum.reduce({nil, %{}}, fn
119 {:header, _version, roots}, {_roots, blocks} ->
120 {roots, blocks}
121
122 {:block, cid, data}, {roots, blocks} ->
123 {roots, Map.put(blocks, cid, data)}
124 end)
125
126 build_tree(roots || [], blocks)
127 rescue
128 e in RuntimeError -> {:error, {:stream_decode, e.message}}
129 end
130 end
131
132 # ---------------------------------------------------------------------------
133 # Export
134 # ---------------------------------------------------------------------------
135
136 @doc """
137 Serialises an `MST.Tree` to a CAR-encoded binary.
138
139 Collects all reachable MST node blocks and wraps them in a CARv1 file with
140 the tree root as the sole header root.
141
142 ## Examples
143
144 iex> store = MST.Store.Memory.new()
145 iex> tree = MST.Tree.new(store)
146 iex> val = DASL.CID.compute("data")
147 iex> {:ok, tree} = MST.Tree.put(tree, "col/key", val)
148 iex> {:ok, binary} = MST.CAR.to_binary(tree)
149 iex> is_binary(binary)
150 true
151
152 """
153 @spec to_binary(Tree.t()) :: {:ok, binary()} | car_error()
154 def to_binary(tree), do: to_binary(tree, [])
155
156 @doc false
157 @spec to_binary(Tree.t(), keyword()) :: {:ok, binary()} | car_error()
158 def to_binary(%Tree{root: nil}, _opts) do
159 # Empty tree — emit a CAR with an empty node as root
160 empty_node = Node.empty()
161
162 with {:ok, bytes} <- Node.encode(empty_node) do
163 cid = CID.compute(bytes, :drisl)
164
165 car = %CAR{
166 version: 1,
167 roots: [cid],
168 blocks: %{cid => bytes}
169 }
170
171 CAR.encode(car)
172 else
173 {:error, :encode, reason} -> {:error, reason}
174 end
175 end
176
177 def to_binary(%Tree{root: root} = tree, opts) do
178 with {:ok, blocks} <- Tree.collect_blocks(tree) do
179 car = %CAR{
180 version: 1,
181 roots: [root],
182 blocks: blocks
183 }
184
185 CAR.encode(car, opts)
186 end
187 end
188
189 @doc """
190 Returns a stream of `DASL.CAR` stream items for the tree in pre-order
191 (root first, then depth-first left-to-right).
192
193 Emits `{:header, 1, [root_cid]}` followed by `{:block, cid, bytes}` for
194 each reachable MST node.
195
196 This stream can be piped into a custom CAR writer. It does **not** produce
197 a fully-encoded CAR binary — use `to_binary/2` for that.
198
199 """
200 @spec to_stream(Tree.t()) :: Enumerable.t()
201 def to_stream(%Tree{root: nil}) do
202 empty_node = Node.empty()
203 {:ok, bytes} = Node.encode(empty_node)
204 cid = CID.compute(bytes, :drisl)
205
206 [
207 {:header, 1, [cid]},
208 {:block, cid, bytes}
209 ]
210 end
211
212 def to_stream(%Tree{root: root, store: store}) do
213 header = [{:header, 1, [root]}]
214 blocks = preorder_stream(store, root)
215 Stream.concat(header, blocks)
216 end
217
218 # ---------------------------------------------------------------------------
219 # Private — tree construction from decoded blocks
220 # ---------------------------------------------------------------------------
221
222 @spec build_tree([CID.t()], %{CID.t() => binary()}) :: {:ok, Tree.t()} | car_error()
223 defp build_tree([], _blocks), do: {:ok, Tree.new(Store.Memory.new())}
224
225 defp build_tree([root | _], blocks) do
226 # Decode all DAG-CBOR blocks into MST nodes; ignore raw-codec blocks.
227 result =
228 Enum.reduce_while(blocks, {:ok, Store.Memory.new()}, fn {cid, data}, {:ok, store} ->
229 case decode_block(cid, data) do
230 {:ok, node} ->
231 {:cont, {:ok, Store.put(store, cid, node)}}
232
233 :skip ->
234 {:cont, {:ok, store}}
235
236 {:error, _} = err ->
237 {:halt, err}
238 end
239 end)
240
241 case result do
242 {:ok, store} -> {:ok, Tree.from_root(root, store)}
243 err -> err
244 end
245 end
246
247 @spec decode_block(CID.t(), binary()) :: {:ok, Node.t()} | :skip | {:error, atom()}
248 defp decode_block(%CID{codec: :raw}, _data), do: :skip
249
250 defp decode_block(%CID{codec: :drisl}, data) do
251 case Node.decode(data) do
252 {:ok, node} -> {:ok, node}
253 {:error, :decode, reason} -> {:error, reason}
254 end
255 end
256
257 # ---------------------------------------------------------------------------
258 # Private — pre-order DFS stream
259 # ---------------------------------------------------------------------------
260
261 @spec preorder_stream(Store.t(), CID.t()) :: Enumerable.t()
262 defp preorder_stream(store, root) do
263 Stream.resource(
264 fn -> [root] end,
265 fn
266 [] ->
267 {:halt, []}
268
269 [cid | rest] ->
270 case Store.get(store, cid) do
271 {:error, :not_found} ->
272 raise "MST.CAR.to_stream/1: node not found: #{CID.encode(cid)}"
273
274 {:ok, node} ->
275 {:ok, bytes} = Node.encode(node)
276 children = subtree_cids(node)
277 {[{:block, cid, bytes}], children ++ rest}
278 end
279 end,
280 fn _ -> :ok end
281 )
282 end
283
284 @spec subtree_cids(Node.t()) :: [CID.t()]
285 defp subtree_cids(node) do
286 left = if node.left, do: [node.left], else: []
287 rights = Enum.flat_map(node.entries, fn e -> if e.right, do: [e.right], else: [] end)
288 left ++ rights
289 end
290end