this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Lightly refactor wal (names and comments)

garrison 36cbb32e 36ed0995

+36 -23
+35 -23
lib/xks/wal.ex
··· 53 53 case :ets.next_lookup(wal, prev_key) do 54 54 {{:entry_buffer, i} = key, [{_key, {partition, version, bin}}]} when version <= max_persist_version -> 55 55 :ets.delete(wal, key) 56 - write_wal_entry(xks, i, partition, version, bin) 56 + :ok = write_wal_entry(xks, i, partition, version, bin) 57 + 57 58 do_flush_entry_buffer(xks, wal, max_persist_version, key) 59 + 58 60 _ -> 59 61 :ok 60 62 end ··· 82 84 # otherwise it would not appear in the slots and would never be reclaimed 83 85 assert pos > 0 84 86 85 - # The current extent may already be blow wal_gc_version but had not been released 87 + # The current extent may already be below wal_gc_version but had not been released 86 88 # because it was still current 87 89 # In that case it should be released immediately 88 90 # TODO: this is a rare case that requires the extent to be full before a GC, probe and test ··· 118 120 checksum::binary-16, 119 121 >> 120 122 121 - :ets.insert(wal, {{:buffer, i}, {version, slot}}) 123 + :ets.insert(wal, {{:slot_buffer, i}, {version, slot}}) 122 124 :ok 123 125 end 124 126 ··· 205 207 # Load WAL slot blocks into a list so we have them in the proper order 206 208 # (the linked list starts from the tail, but the WAL must be loaded the other way) 207 209 {tail_index, tail_checksum} = wal_tail_address 208 - wal_blocks = do_load_wal_blocks(block_store, wal_block_count, tail_index, tail_checksum, [], 0) 210 + wal_slot_blocks = do_load_wal_slot_blocks(block_store, wal_block_count, tail_index, tail_checksum, [], 0) 209 211 210 212 # Replay the WAL 211 - wal_lv = do_load_wal(wal_blocks, xks, partition_mlbs, 0) 213 + wal_lv = do_replay_wal_entries(wal_slot_blocks, xks, partition_mlbs, 0) 212 214 assert wal_lv >= 0 213 215 214 216 # Compute largest persisted version of any table ··· 231 233 recovered_version 232 234 end 233 235 234 - defp do_load_wal_blocks(_block_store, wal_block_count, _block_index, _block_checksum, blocks_acc, count_acc) when count_acc >= wal_block_count do 236 + defp do_load_wal_slot_blocks(_block_store, wal_block_count, _block_index, _block_checksum, blocks_acc, count_acc) when count_acc == wal_block_count do 235 237 blocks_acc 236 238 end 237 239 238 - defp do_load_wal_blocks(block_store, wal_block_count, block_index, block_checksum, blocks_acc, count_acc) do 240 + defp do_load_wal_slot_blocks(block_store, wal_block_count, block_index, block_checksum, blocks_acc, count_acc) do 241 + # The head of a new log will point to the null address but we should never reach it here due to `wal_block_count` 239 242 assert block_index != 0 243 + assert count_acc < wal_block_count 244 + 240 245 block_data = Blocks.read(block_store, block_index, block_checksum) 241 246 242 247 << ··· 247 252 248 253 blocks_acc = [{block_index, block_data} | blocks_acc] 249 254 count_acc = count_acc + 1 250 - do_load_wal_blocks(block_store, wal_block_count, prev_index, prev_checksum, blocks_acc, count_acc) 255 + do_load_wal_slot_blocks(block_store, wal_block_count, prev_index, prev_checksum, blocks_acc, count_acc) 251 256 end 252 257 253 - defp do_load_wal([], _xks, _partition_mlbs, lv_acc) do 258 + defp do_replay_wal_entries([], _xks, _partition_mlbs, lv_acc) do 254 259 lv_acc 255 260 end 256 261 257 - defp do_load_wal([{block_index, block_data} | blocks_rest], xks, partition_mlbs, lv_acc) do 262 + defp do_replay_wal_entries([{block_index, block_data} | blocks_rest], xks, partition_mlbs, lv_acc) do 258 263 %{ 259 264 block_store: block_store, 260 265 wal: wal, ··· 276 281 _padding_and_entries_size::binary, 277 282 >> = block_data 278 283 279 - block_lv = do_load_entries(entries_data, xks, block_store, wal, wal_gc_version, partition_mlbs, 0) 284 + block_lv = do_replay_entries_from_block(entries_data, xks, block_store, wal, wal_gc_version, partition_mlbs, 0) 280 285 :ets.insert(wal, {{:wal_block, block_index}, block_lv}) 281 286 282 287 # Version in the log should always be monotonically increasing 283 288 assert block_lv >= lv_acc 284 289 lv_acc = block_lv 285 290 286 - do_load_wal(blocks_rest, xks, partition_mlbs, lv_acc) 291 + do_replay_wal_entries(blocks_rest, xks, partition_mlbs, lv_acc) 287 292 end 288 293 289 - defp do_load_entries("", _xks, _block_store, _wal, _wal_gc_version, _partition_mlbs, lv_acc) do 294 + defp do_replay_entries_from_block("", _xks, _block_store, _wal, _wal_gc_version, _partition_mlbs, lv_acc) do 290 295 lv_acc 291 296 end 292 297 293 - defp do_load_entries(block_data, xks, block_store, wal, wal_gc_version, partition_mlbs, lv_acc) do 298 + defp do_replay_entries_from_block(block_data, xks, block_store, wal, wal_gc_version, partition_mlbs, lv_acc) do 294 299 # See constant: c_wal_slot_bytes() 295 300 << 296 301 partition::signed-integer-64, ··· 300 305 bin_size::integer-64, 301 306 checksum::binary-16, 302 307 303 - rest::binary, 308 + block_rest::binary, 304 309 >> = block_data 305 310 306 311 # Slots with (version < wal_gc_version) have already been garbage-collected ··· 335 340 assert version >= lv_acc 336 341 lv_acc = version 337 342 338 - do_load_entries(rest, xks, block_store, wal, wal_gc_version, partition_mlbs, lv_acc) 343 + do_replay_entries_from_block(block_rest, xks, block_store, wal, wal_gc_version, partition_mlbs, lv_acc) 339 344 end 340 345 341 346 @spec flush(XKS.t) :: XKS.t 342 347 def flush(%XKS{} = xks) do 348 + # Ensure all entries <= `max_persist_version` are written 349 + # Note: flushing an `:entry_buffer` entry will write a `:slot_buffer` entry, 350 + # so this must be done before the skip optimization below 343 351 :ok = flush_entry_buffer(xks) 344 352 345 - case :ets.next(xks.wal, {:buffer, -1}) do 346 - {:buffer, _i} -> do_flush(xks) 353 + # Skip the flush if there have been no writes since last time 354 + # (this avoids copying the tail for no reason) 355 + case :ets.next(xks.wal, {:slot_buffer, -1}) do 356 + {:slot_buffer, _i} -> do_flush(xks) 347 357 _ -> xks 348 358 end 349 359 end ··· 359 369 }, 360 370 } = xks 361 371 372 + # Copy-on-write the tail block if it has space, otherwise create a new 373 + # block pointing to the previous tail 362 374 block_acc = 363 375 case xks.wal_tail_address do 364 376 c_null_address() -> ··· 396 408 end 397 409 end 398 410 399 - prev_key = {:buffer, -1} 411 + prev_key = {:slot_buffer, -1} 400 412 prev_version = 0 401 413 wal_tail_address = do_flush_slots(block_store, free_list, wal, opt_block_size, prev_key, prev_version, block_acc) 402 414 ··· 411 423 412 424 defp do_flush_slots(block_store, free_list, wal, opt_block_size, prev_key, prev_version, block_acc) do 413 425 case :ets.next_lookup(wal, prev_key) do 414 - {{:buffer, _i} = key, [{_key, entry}]} -> 426 + {{:slot_buffer, _i} = key, [{_key, entry}]} -> 415 427 :ets.delete(wal, key) 416 428 {version, slot} = entry 417 429 ··· 419 431 case (byte_size(block_acc) + byte_size(slot)) > opt_block_size do 420 432 true -> 421 433 # Pad and rotate block 422 - {block_index, block_checksum} = write_wal_block(block_store, wal, free_list, opt_block_size, block_acc, prev_version) 434 + {block_index, block_checksum} = write_slots_block(block_store, wal, free_list, opt_block_size, block_acc, prev_version) 423 435 424 436 # Create a new block acc pointing to the previous (written) block 425 437 # See constant: c_address_bytes() ··· 438 450 do_flush_slots(block_store, free_list, wal, opt_block_size, prev_key, version, block_acc) 439 451 440 452 _ -> 441 - _address = write_wal_block(block_store, wal, free_list, opt_block_size, block_acc, prev_version) 453 + _address = write_slots_block(block_store, wal, free_list, opt_block_size, block_acc, prev_version) 442 454 end 443 455 end 444 456 445 - defp write_wal_block(block_store, wal, free_list, opt_block_size, block_acc, largest_version) do 457 + defp write_slots_block(block_store, wal, free_list, opt_block_size, block_acc, largest_version) do 446 458 entries_size = byte_size(block_acc) - c_address_bytes() 447 459 pad_bytes = opt_block_size - c_address_bytes() - entries_size - c_wal_entries_size_bytes() 448 460 assert pad_bytes >= 0
+1
test/xks_test.exs
··· 222 222 223 223 defmodule ModelDiskFuzzTest do 224 224 use ExUnit.Case, async: true 225 + @moduletag :xks_disk 225 226 @moduletag :model_disk_fuzz 226 227 @moduletag :disable 227 228 @moduletag :tmp_dir