···335335336336 # Write mutations to storage
337337 # TODO: do we need to write empty batches?
338338- append_batch(state, batch)
338338+ state = append_batch(state, batch)
339339340340 # Update versions
341341 state = %{state |
···441441 XKS.commit(xks)
442442 end
443443444444- defp append_batch(%State{} = _state, %LogBatch{} = _batch) do
445445- # TODO
444444+ defp append_batch(%State{} = state, %LogBatch{} = batch) do
445445+ # TODO: mutations should come from CommitBuffer grouped this way
446446+ groups = group_mutations(batch.tagged_mutations)
447447+ state = do_append_groups(groups, state)
448448+ state
449449+ end
450450+451451+ defp do_append_groups([], %State{} = state), do: state
452452+453453+ defp do_append_groups([group | groups_rest], %State{} = state) do
454454+ %{xks: xks, current_extent: current_extent} = state
455455+ %{extent: extent, size: size} = current_extent
456456+ extent_size = xks.opts.extent_block_count * xks.opts.block_size
457457+458458+ group_data = :erlang.term_to_binary(group)
459459+ group_size = byte_size(group_data)
460460+461461+ case size + group_size > extent_size do
462462+ true ->
463463+ # Rotate extent and start this iteration over
464464+ # TODO: this is not tested
465465+ state = rotate_extent(state)
466466+ do_append_groups([group | groups_rest], state)
467467+468468+ false ->
469469+ position = size
470470+ write_group(xks, extent, position, group_data)
471471+472472+ state = put_in(state.current_extent.size, size + group_size)
473473+ do_append_groups(groups_rest, state)
474474+ end
475475+ end
476476+477477+ defp write_group(%XKS{} = xks, extent, position, group_data) do
478478+ # TODO: this is a hack to write to blocks, need a raw read/write API
479479+ block_index = extent + div(position, xks.opts.block_size)
480480+ block_pos = rem(position, xks.opts.block_size)
481481+482482+ existing_block_data = XKS.Blocks.read_raw(xks.block_store, block_index)
483483+484484+ <<
485485+ prefix::binary-size(block_pos),
486486+ _::binary-size(byte_size(group_data)),
487487+ suffix::binary,
488488+ >> = existing_block_data
489489+490490+ new_block_data = <<prefix::binary, group_data::binary, suffix::binary>>
491491+ XKS.Blocks.write(xks.block_store, block_index, new_block_data)
492492+ end
493493+494494+ defp group_mutations(tagged_mutations) do
495495+ Enum.reduce(tagged_mutations, %{}, fn {tags, mut}, acc ->
496496+ Enum.reduce(tags, acc, fn t, acc ->
497497+ Map.update(acc, t, [mut], &[mut | &1])
498498+ end)
499499+ end)
500500+ |> Enum.sort_by(&elem(&1, 0))
501501+ |> Enum.map(fn {k, v} -> {k, Enum.reverse(v)} end)
446502 end
447503448504 defp rotate_extent(%State{} = state) do
···454510455511 new_extent = %{
456512 i: current_extent.i + 1,
457457- extent: FreeList.reserve_extent(xks),
513513+ extent: FreeList.reserve_extent(xks.free_list),
458514 size: 0,
459515 }
460516
+8
lib/xks/blocks.ex
···7272 end
7373 end
74747575+ def read_raw({:memory, memory_store}, index) do
7676+ [{:block_size, block_size}] = :ets.lookup(memory_store, :block_size)
7777+ case MemoryStore.fetch(memory_store, index) do
7878+ {:ok, block_data} -> block_data
7979+ :error -> <<0::integer-unit(8)-size(block_size)>>
8080+ end
8181+ end
8282+7583 defp do_read({:memory, memory_store}, index) do
7684 # TODO: return zeros for missing blocks in memory store?
7785 case MemoryStore.fetch(memory_store, index) do