this repo has no description
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge writes into leaf pages

garrison 2ba19171 ee677908

+204 -1
+204 -1
lib/btree/writer.ex
··· 156 156 new_pages = 157 157 case type_byte do 158 158 0x00 -> flush_inner(btree, page_data, page_sk, page_ek) 159 - 0x01 -> do_flush_leaf(btree, page_data, page_sk, page_ek) 159 + 0x01 -> flush_leaf(btree, page_data, page_sk, page_ek) 160 160 end 161 161 162 162 # Free old page ··· 389 389 offset::integer-16, 390 390 >> 391 391 encode_slots(offsets_rest, slots_acc) 392 + end 393 + 394 + defp flush_leaf(%BTree{} = btree, page_data, _page_sk, page_ek) do 395 + %{ 396 + write_buffer: write_buffer, 397 + opts: %{ 398 + page_size: opt_page_size, 399 + }, 400 + } = btree 401 + 402 + # See constant: c_page_trailer_bytes() 403 + << 404 + _::binary-size(opt_page_size - c_page_trailer_bytes()), 405 + pair_count::integer-16, 406 + type_byte::integer-8, 407 + >> = page_data 408 + assert type_byte == 0x01 409 + 410 + leaf_it = new_leaf_iterator(page_data, pair_count) 411 + buffer_it = new_buffer_iterator(write_buffer, page_ek) 412 + 413 + {pairs_data, offsets, pair_count} = do_merge_pairs(leaf_it, buffer_it, "", [], 0) 414 + maybe_split_leaf(opt_page_size, page_ek, pairs_data, offsets, pair_count) 415 + end 416 + 417 + defp do_merge_pairs(leaf_it, buffer_it, page_acc, offsets_acc, count_acc) do 418 + {_, _, _, leaf_key, leaf_value} = leaf_it 419 + {_, _, buffer_key, buffer_value} = buffer_it 420 + 421 + cond do 422 + leaf_key == :empty and buffer_key == :empty -> 423 + {page_acc, Enum.reverse(offsets_acc), count_acc} 424 + 425 + buffer_key == :empty -> 426 + offset = byte_size(page_acc) 427 + page_acc = append_pair(page_acc, leaf_key, leaf_value) 428 + do_merge_pairs(leaf_next(leaf_it), buffer_it, page_acc, [offset | offsets_acc], count_acc + 1) 429 + 430 + leaf_key == :empty -> 431 + offset = byte_size(page_acc) 432 + page_acc = append_pair(page_acc, buffer_key, buffer_value) 433 + do_merge_pairs(leaf_it, buffer_next(buffer_it), page_acc, [offset | offsets_acc], count_acc + 1) 434 + 435 + leaf_key < buffer_key -> 436 + offset = byte_size(page_acc) 437 + page_acc = append_pair(page_acc, leaf_key, leaf_value) 438 + do_merge_pairs(leaf_next(leaf_it), buffer_it, page_acc, [offset | offsets_acc], count_acc + 1) 439 + 440 + buffer_key < leaf_key -> 441 + offset = byte_size(page_acc) 442 + page_acc = append_pair(page_acc, buffer_key, buffer_value) 443 + do_merge_pairs(leaf_it, buffer_next(buffer_it), page_acc, [offset | offsets_acc], count_acc + 1) 444 + 445 + true -> 446 + assert buffer_key != :empty 447 + assert leaf_key != :empty 448 + assert buffer_key == leaf_key 449 + 450 + # Keep the buffer pair but advance both iterators 451 + offset = byte_size(page_acc) 452 + page_acc = append_pair(page_acc, buffer_key, buffer_value) 453 + do_merge_pairs(leaf_next(leaf_it), buffer_next(buffer_it), page_acc, [offset | offsets_acc], count_acc + 1) 454 + end 455 + end 456 + 457 + defp append_pair(page_acc, key, value) do 458 + key_size = byte_size(key) 459 + value_size = byte_size(value) 460 + # See constant: c_page_pair_overhead_bytes() 461 + << 462 + page_acc::binary, 463 + key_size::integer-16, 464 + value_size::integer-16, 465 + key::binary, 466 + value::binary, 467 + >> 468 + end 469 + 470 + defp maybe_split_leaf(opt_page_size, end_key, pairs_data, offsets, pair_count) do 471 + variable_size = byte_size(pairs_data) + (pair_count * c_page_slot_entry_bytes()) 472 + total_size = variable_size + c_page_trailer_bytes() 473 + 474 + case total_size <= opt_page_size do 475 + true -> 476 + pad_bytes = opt_page_size - total_size 477 + assert pad_bytes >= 0 478 + 479 + slots_data = encode_offset_slots(offsets, "") 480 + type_byte = 0x01 481 + 482 + # See constant: c_page_trailer_bytes() 483 + page_data = << 484 + pairs_data::binary, 485 + 0::integer-unit(8)-size(pad_bytes), 486 + slots_data::binary, 487 + pair_count::integer-16, 488 + type_byte::integer-8, 489 + >> 490 + 491 + [{end_key, page_data}] 492 + 493 + false -> 494 + target_midpoint_bytes = div(variable_size, 2) 495 + midpoint_i = find_midpoint(offsets, target_midpoint_bytes, 0) 496 + 497 + {offsets1, offsets2} = Enum.split(offsets, midpoint_i) 498 + # TODO: are these possible? 499 + assert offsets1 != [] 500 + assert offsets2 != [] 501 + body2_offset = hd(offsets2) 502 + offsets2 = Enum.map(offsets2, fn o -> o - body2_offset end) 503 + 504 + << 505 + pairs_data1::binary-size(body2_offset), 506 + pairs_data2::binary, 507 + >> = pairs_data 508 + 509 + ek_offset = List.last(offsets1) 510 + << 511 + _::binary-size(ek_offset), 512 + ek_size1::integer-16, 513 + _value_size::integer-16, 514 + end_key1::binary-size(ek_size1), 515 + _::binary, 516 + >> = pairs_data 517 + end_key2 = end_key 518 + 519 + pair_count1 = midpoint_i 520 + pair_count2 = pair_count - pair_count1 521 + 522 + pages1 = maybe_split_leaf(opt_page_size, end_key1, pairs_data1, offsets1, pair_count1) 523 + pages2 = maybe_split_leaf(opt_page_size, end_key2, pairs_data2, offsets2, pair_count2) 524 + pages1 ++ pages2 525 + end 526 + end 527 + 528 + defp find_midpoint([offset | offsets_rest], target, i) do 529 + size = offset + (i * (c_page_slot_entry_bytes() + c_page_pair_overhead_bytes())) 530 + case size > target do 531 + true -> 532 + assert i > 0 533 + i 534 + false -> 535 + find_midpoint(offsets_rest, target, i + 1) 536 + end 537 + end 538 + 539 + defp new_leaf_iterator(page_data, pair_count) do 540 + i = 0 541 + leaf_next({page_data, pair_count, i, :empty, :empty}) 542 + end 543 + 544 + defp leaf_next({page_data, pair_count, i, _key, _value}) do 545 + i = i + 1 546 + case i < pair_count do 547 + true -> 548 + page_size = byte_size(page_data) 549 + slots_start = page_size - (pair_count * c_page_slot_entry_bytes()) - c_page_trailer_bytes() 550 + 551 + << 552 + _prefix::binary-size(slots_start + (i * c_page_slot_entry_bytes())), 553 + pair_offset::integer-16, 554 + _rest::binary, 555 + >> = page_data 556 + 557 + << 558 + _prefix::binary-size(pair_offset), 559 + key_size::integer-16, 560 + value_size::integer-16, 561 + key::binary-size(key_size), 562 + value::binary-size(value_size), 563 + _rest::binary, 564 + >> = page_data 565 + 566 + {page_data, pair_count, i, key, value} 567 + 568 + false -> {page_data, pair_count, i, :empty, :empty} 569 + end 570 + end 571 + 572 + defp new_buffer_iterator(write_buffer, end_key) do 573 + buffer_next({write_buffer, end_key, nil, nil}) 574 + end 575 + 576 + defp buffer_next({write_buffer, end_key, _key, _value}) do 577 + case :ets.first_lookup(write_buffer) do 578 + {_key, [{key, value}]} when key < end_key -> 579 + :ets.delete(write_buffer, key) 580 + {write_buffer, end_key, key, value} 581 + _ -> 582 + {write_buffer, end_key, :empty, :empty} 583 + end 584 + end 585 + 586 + defp encode_offset_slots([], slots_acc), do: slots_acc 587 + defp encode_offset_slots([offset | offsets_rest], slots_acc) do 588 + assert offset < (2 ** 16) 589 + # See constant: c_page_slot_entry_bytes() 590 + slots_acc = << 591 + slots_acc::binary, 592 + offset::integer-16, 593 + >> 594 + encode_offset_slots(offsets_rest, slots_acc) 392 595 end 393 596 end