···145145 Bytes.unsafe_to_string cell
146146 end
147147148148-(* Encode an index interior cell *)
149149-let encode_index_interior_cell ~left_child ~payload =
150150- let payload_size_varint =
151151- Varint.encode (Int64.of_int (String.length payload))
152152- in
153153- let cell =
154154- Bytes.create (4 + String.length payload_size_varint + String.length payload)
155155- in
156156- Page.set_u32_be cell 0 left_child;
157157- Bytes.blit_string payload_size_varint 0 cell 4
158158- (String.length payload_size_varint);
159159- Bytes.blit_string payload 0 cell
160160- (4 + String.length payload_size_varint)
161161- (String.length payload);
162162- Bytes.unsafe_to_string cell
148148+(* Encode an index interior cell - handles overflow for large payloads *)
149149+let encode_interior_cell_overflow t ~left_child ~payload =
150150+ let payload_size = String.length payload in
151151+ let usable_size = usable_size t in
152152+ let max_local = Cell.max_local ~usable_size ~is_table:false in
153153+ let min_local = Cell.min_local ~usable_size in
154154+ let payload_size_varint = Varint.encode (Int64.of_int payload_size) in
155155+ let varint_len = String.length payload_size_varint in
156156+ if payload_size <= max_local then begin
157157+ let cell = Bytes.create (4 + varint_len + payload_size) in
158158+ Page.set_u32_be cell 0 left_child;
159159+ Bytes.blit_string payload_size_varint 0 cell 4 varint_len;
160160+ Bytes.blit_string payload 0 cell (4 + varint_len) payload_size;
161161+ Bytes.unsafe_to_string cell
162162+ end
163163+ else begin
164164+ let k = min_local + ((payload_size - min_local) mod (usable_size - 4)) in
165165+ let local_size = if k <= max_local then k else min_local in
166166+ let overflow_page =
167167+ write_overflow_chain t.pager payload ~offset:local_size
168168+ in
169169+ let cell = Bytes.create (4 + varint_len + local_size + 4) in
170170+ Page.set_u32_be cell 0 left_child;
171171+ Bytes.blit_string payload_size_varint 0 cell 4 varint_len;
172172+ Bytes.blit_string payload 0 cell (4 + varint_len) local_size;
173173+ Page.set_u32_be cell (4 + varint_len + local_size) overflow_page;
174174+ Bytes.unsafe_to_string cell
175175+ end
163176164177(* Write a cell into a page buffer, returns new cell_content_start *)
165178let write_cell buf ~cell_content_start ~cell =
···206219 if lo > hi then false
207220 else
208221 let mid = (lo + hi) / 2 in
209209- let cell, _ =
210210- Cell.parse_index_leaf page ptrs.(mid) ~usable_size:usable
222222+ let full_payload =
223223+ read_full_payload t page ptrs.(mid) ~usable_size:usable
211224 in
212212- let cmp = String.compare key cell.Cell.payload in
225225+ let cmp = String.compare key full_payload in
213226 if cmp = 0 then true
214227 else if cmp < 0 then search lo (mid - 1)
215228 else search (mid + 1) hi
···219232 let rec find_child i =
220233 if i >= header.Page.cell_count then Option.get header.Page.right_child
221234 else
235235+ let full_payload =
236236+ read_full_interior_payload t page ptrs.(i) ~usable_size:usable
237237+ in
222238 let cell, _ =
223239 Cell.parse_index_interior page ptrs.(i) ~usable_size:usable
224240 in
225225- if key_less_than key cell.Cell.payload then cell.Cell.left_child
241241+ if key_less_than key full_payload then cell.Cell.left_child
226242 else find_child (i + 1)
227243 in
228244 mem_in_page t (find_child 0) key
···337353 in
338354 loop 0
339355340340-(* Recompute cell_content_start after truncating a page to [count] cells *)
341341-let recompute_cell_content_start buf ~ptrs ~count ~page_size =
342342- if count > 0 then begin
343343- let min_ptr = ref page_size in
344344- for i = 0 to count - 1 do
345345- if ptrs.(i) < !min_ptr then min_ptr := ptrs.(i)
346346- done;
347347- Page.set_u16_be buf 5 !min_ptr
348348- end
349349- else Page.set_u16_be buf 5 page_size
350350-351356(* Find insertion position for a key among interior cells *)
352352-let insert_pos page ptrs ~cell_count ~usable_size ~key =
357357+let insert_pos t page ptrs ~cell_count ~usable_size ~key =
353358 let rec find i =
354359 if i >= cell_count then i
355360 else
356356- let c, _ = Cell.parse_index_interior page ptrs.(i) ~usable_size in
357357- if key < c.Cell.payload then i else find (i + 1)
361361+ let full_payload =
362362+ read_full_interior_payload t page ptrs.(i) ~usable_size
363363+ in
364364+ if key < full_payload then i else find (i + 1)
358365 in
359366 find 0
360367···425432 Page.set_u16_be new_buf 5 !new_cell_content_start;
426433 Pager.write t.pager new_page_num (Bytes.unsafe_to_string new_buf);
427434428428- (* Update original page to only have cells [0..split_idx-1] *)
429429- let old_buf = Bytes.of_string page in
435435+ (* Compact original page: repack remaining cells [0..split_idx-1] *)
436436+ let old_buf = Page.init ~page_size ~kind:Page.Leaf_index in
437437+ let old_cell_start = ref page_size in
438438+ for i = 0 to split_idx - 1 do
439439+ let _payload_size, _local, _overflow, cell_size =
440440+ Cell.parse_index_leaf_raw page ptrs.(i) ~usable_size:usable
441441+ in
442442+ let raw_cell = String.sub page ptrs.(i) cell_size in
443443+ old_cell_start :=
444444+ write_cell old_buf ~cell_content_start:!old_cell_start ~cell:raw_cell;
445445+ let ptr_off = Page.header_size Page.Leaf_index + (i * 2) in
446446+ Page.set_u16_be old_buf ptr_off !old_cell_start
447447+ done;
430448 Page.set_u16_be old_buf 3 split_idx;
431431- recompute_cell_content_start old_buf ~ptrs ~count:split_idx ~page_size;
449449+ Page.set_u16_be old_buf 5 !old_cell_start;
432450 Pager.write t.pager page_num (Bytes.unsafe_to_string old_buf);
433451434452 { new_page = new_page_num; separator_key }
···446464 let sep_cell, _ =
447465 Cell.parse_index_interior page ptrs.(split_idx) ~usable_size:usable
448466 in
449449- let separator_key = sep_cell.Cell.payload in
467467+ let separator_key =
468468+ read_full_interior_payload t page ptrs.(split_idx) ~usable_size:usable
469469+ in
450470451471 (* Create new right page *)
452472 let new_page_num = Pager.allocate t.pager in
453473 let new_buf = Page.init ~page_size ~kind:Page.Interior_index in
454474455455- (* Cells [split_idx+1..cell_count-1] go to new page *)
475475+ (* Cells [split_idx+1..cell_count-1] go to new page (raw copy preserves overflow) *)
456476 let new_cell_content_start = ref page_size in
457477 for i = split_idx + 1 to header.Page.cell_count - 1 do
458478 let cell_off = ptrs.(i) in
459459- let cell, _ = Cell.parse_index_interior page cell_off ~usable_size:usable in
460460- let cell_data =
461461- encode_index_interior_cell ~left_child:cell.Cell.left_child
462462- ~payload:cell.Cell.payload
479479+ let _left, _payload_size, _local, _overflow, cell_size =
480480+ Cell.parse_index_interior_raw page cell_off ~usable_size:usable
463481 in
482482+ let raw_cell = String.sub page cell_off cell_size in
464483 new_cell_content_start :=
465484 write_cell new_buf ~cell_content_start:!new_cell_content_start
466466- ~cell:cell_data;
485485+ ~cell:raw_cell;
467486 let new_idx = i - split_idx - 1 in
468487 let ptr_off = Page.header_size Page.Interior_index + (new_idx * 2) in
469488 Page.set_u16_be new_buf ptr_off !new_cell_content_start
···475494 Page.set_u32_be new_buf 8 (Option.get header.Page.right_child);
476495 Pager.write t.pager new_page_num (Bytes.unsafe_to_string new_buf);
477496478478- (* Update original page: keep cells [0..split_idx-1], right child = sep_cell.left_child *)
479479- let old_buf = Bytes.of_string page in
497497+ (* Compact original page: repack remaining cells [0..split_idx-1] *)
498498+ let old_buf = Page.init ~page_size ~kind:Page.Interior_index in
499499+ let old_cell_start = ref page_size in
500500+ for i = 0 to split_idx - 1 do
501501+ let _left, _payload_size, _local, _overflow, cell_size =
502502+ Cell.parse_index_interior_raw page ptrs.(i) ~usable_size:usable
503503+ in
504504+ let raw_cell = String.sub page ptrs.(i) cell_size in
505505+ old_cell_start :=
506506+ write_cell old_buf ~cell_content_start:!old_cell_start ~cell:raw_cell;
507507+ let ptr_off = Page.header_size Page.Interior_index + (i * 2) in
508508+ Page.set_u16_be old_buf ptr_off !old_cell_start
509509+ done;
480510 Page.set_u16_be old_buf 3 split_idx;
511511+ Page.set_u16_be old_buf 5 !old_cell_start;
481512 Page.set_u32_be old_buf 8 sep_cell.Cell.left_child;
482482- recompute_cell_content_start old_buf ~ptrs ~count:split_idx ~page_size;
483513 Pager.write t.pager page_num (Bytes.unsafe_to_string old_buf);
484514485515 { new_page = new_page_num; separator_key }
···489519 =
490520 let page = Pager.read t.pager page_num in
491521 let header = Page.parse_header page 0 in
492492- let cell = encode_index_interior_cell ~left_child ~payload:separator_key in
522522+ let cell =
523523+ encode_interior_cell_overflow t ~left_child ~payload:separator_key
524524+ in
493525 let cell_len = String.length cell in
494526 let space_needed = cell_len + 2 in
495527 (* cell + pointer *)
···500532 let ptrs = cell_pointers page header in
501533 let usable = usable_size t in
502534 let insert_idx =
503503- insert_pos page ptrs ~cell_count:header.Page.cell_count
535535+ insert_pos t page ptrs ~cell_count:header.Page.cell_count
504536 ~usable_size:usable ~key:separator_key
505537 in
506538 ignore
···579611580612 (* Single cell pointing to left page *)
581613 let cell =
582582- encode_index_interior_cell ~left_child:left_page ~payload:separator_key
614614+ encode_interior_cell_overflow t ~left_child:left_page
615615+ ~payload:separator_key
583616 in
584617 let cell_start = write_cell buf ~cell_content_start:page_size ~cell in
585618 Page.set_u16_be buf 5 cell_start;
···646679 done;
647680648681 (* Decrease cell count *)
649649- Page.set_u16_be buf 3 (header.Page.cell_count - 1);
682682+ let new_count = header.Page.cell_count - 1 in
683683+ Page.set_u16_be buf 3 new_count;
650684651651- (* Note: We don't reclaim space - it becomes fragmented.
652652- A proper implementation would compact or track free space. *)
685685+ (* Reset cell_content_start when page becomes empty *)
686686+ if new_count = 0 then Page.set_u16_be buf 5 (Pager.page_size t.pager);
687687+653688 Pager.write t.pager page_num (Bytes.unsafe_to_string buf)
654689 end
655690
+26-17
lib/table.ml
···132132133133let find t rowid = search_page t t.root_page rowid
134134135135-(* Recompute cell_content_start after truncating a page to [count] cells *)
136136-let recompute_cell_content_start buf ~ptrs ~count ~page_size =
137137- if count > 0 then begin
138138- let min_ptr = ref page_size in
139139- for i = 0 to count - 1 do
140140- if ptrs.(i) < !min_ptr then min_ptr := ptrs.(i)
141141- done;
142142- Page.set_u16_be buf 5 !min_ptr
143143- end
144144- else Page.set_u16_be buf 5 page_size
145145-146135(* Find insertion position for a rowid among interior cells *)
147136let insert_pos page ptrs ~cell_count ~rowid =
148137 let rec find i =
···224213 Page.set_u16_be new_buf 5 !new_cell_content_start;
225214 Pager.write t.pager new_page_num (Bytes.unsafe_to_string new_buf);
226215227227- (* Update original page to only have cells [0..split_idx-1] *)
228228- let old_buf = Bytes.of_string page in
216216+ (* Compact original page: repack remaining cells [0..split_idx-1] *)
217217+ let old_buf = Page.init ~page_size ~kind:Page.Leaf_table in
218218+ let old_cell_start = ref page_size in
219219+ for i = 0 to split_idx - 1 do
220220+ let _cell, cell_size =
221221+ Cell.parse_table_leaf page ptrs.(i) ~usable_size:usable
222222+ in
223223+ let raw_cell = String.sub page ptrs.(i) cell_size in
224224+ old_cell_start :=
225225+ write_cell old_buf ~cell_content_start:!old_cell_start ~cell:raw_cell;
226226+ let ptr_off = Page.header_size Page.Leaf_table + (i * 2) in
227227+ Page.set_u16_be old_buf ptr_off !old_cell_start
228228+ done;
229229 Page.set_u16_be old_buf 3 split_idx;
230230- recompute_cell_content_start old_buf ~ptrs ~count:split_idx ~page_size;
230230+ Page.set_u16_be old_buf 5 !old_cell_start;
231231 Pager.write t.pager page_num (Bytes.unsafe_to_string old_buf);
232232233233 { new_page = new_page_num; separator_rowid }
···272272 Page.set_u32_be new_buf 8 (Option.get header.Page.right_child);
273273 Pager.write t.pager new_page_num (Bytes.unsafe_to_string new_buf);
274274275275- (* Update original page: keep cells [0..split_idx-1], right child = sep_cell.left_child *)
276276- let old_buf = Bytes.of_string page in
275275+ (* Compact original page: repack remaining cells [0..split_idx-1] *)
276276+ let old_buf = Page.init ~page_size ~kind:Page.Interior_table in
277277+ let old_cell_start = ref page_size in
278278+ for i = 0 to split_idx - 1 do
279279+ let _cell, cell_size = Cell.parse_table_interior page ptrs.(i) in
280280+ let raw_cell = String.sub page ptrs.(i) cell_size in
281281+ old_cell_start :=
282282+ write_cell old_buf ~cell_content_start:!old_cell_start ~cell:raw_cell;
283283+ let ptr_off = Page.header_size Page.Interior_table + (i * 2) in
284284+ Page.set_u16_be old_buf ptr_off !old_cell_start
285285+ done;
277286 Page.set_u16_be old_buf 3 split_idx;
287287+ Page.set_u16_be old_buf 5 !old_cell_start;
278288 Page.set_u32_be old_buf 8 sep_cell.Cell.left_child;
279279- recompute_cell_content_start old_buf ~ptrs ~count:split_idx ~page_size;
280289 Pager.write t.pager page_num (Bytes.unsafe_to_string old_buf);
281290282291 { new_page = new_page_num; separator_rowid }
+140-1
test/test_index.ml
···11-let suite = ("index", [])
11+let with_mem_pager ?(page_size = 4096) f =
22+ let pager = Btree.Pager.mem ~page_size () in
33+ f pager
44+55+let test_insert_find () =
66+ with_mem_pager @@ fun pager ->
77+ let idx = Btree.Index.v pager in
88+ Btree.Index.insert idx "hello";
99+ Btree.Index.insert idx "world";
1010+ Alcotest.(check bool) "mem hello" true (Btree.Index.mem idx "hello");
1111+ Alcotest.(check bool) "mem world" true (Btree.Index.mem idx "world");
1212+ Alcotest.(check bool) "no missing" false (Btree.Index.mem idx "missing")
1313+1414+let test_many_short_keys () =
1515+ with_mem_pager ~page_size:512 @@ fun pager ->
1616+ let idx = Btree.Index.v pager in
1717+ let n = 200 in
1818+ for i = 0 to n - 1 do
1919+ Btree.Index.insert idx (Fmt.str "key_%04d" i)
2020+ done;
2121+ for i = 0 to n - 1 do
2222+ let key = Fmt.str "key_%04d" i in
2323+ Alcotest.(check bool) (Fmt.str "mem %s" key) true (Btree.Index.mem idx key)
2424+ done;
2525+ let count = ref 0 in
2626+ Btree.Index.iter idx (fun _ -> incr count);
2727+ Alcotest.(check int) "count" n !count
2828+2929+(* This test reproduces the page overflow bug: long keys on a 4096-byte page
3030+ cause splits where dead cells in the original page waste space, leaving
3131+ insufficient room for the new cell. *)
3232+let test_long_keys_page_overflow () =
3333+ with_mem_pager @@ fun pager ->
3434+ let idx = Btree.Index.v pager in
3535+ let n = 100 in
3636+ for i = 0 to n - 1 do
3737+ (* Keys similar to ATProto MST entries: ~80 bytes each *)
3838+ let key = Fmt.str "blocks\x00bafyreig%064d\x00value_data_%d" i i in
3939+ Btree.Index.insert idx key
4040+ done;
4141+ for i = 0 to n - 1 do
4242+ let key = Fmt.str "blocks\x00bafyreig%064d\x00value_data_%d" i i in
4343+ Alcotest.(check bool) (Fmt.str "mem %d" i) true (Btree.Index.mem idx key)
4444+ done;
4545+ let count = ref 0 in
4646+ Btree.Index.iter idx (fun _ -> incr count);
4747+ Alcotest.(check int) "count" n !count
4848+4949+let test_reverse_order () =
5050+ with_mem_pager ~page_size:512 @@ fun pager ->
5151+ let idx = Btree.Index.v pager in
5252+ let n = 150 in
5353+ for i = n - 1 downto 0 do
5454+ Btree.Index.insert idx (Fmt.str "r%04d" i)
5555+ done;
5656+ for i = 0 to n - 1 do
5757+ let key = Fmt.str "r%04d" i in
5858+ Alcotest.(check bool) key true (Btree.Index.mem idx key)
5959+ done
6060+6161+let test_random_order () =
6262+ with_mem_pager ~page_size:512 @@ fun pager ->
6363+ let idx = Btree.Index.v pager in
6464+ let n = 200 in
6565+ let keys = Array.init n (fun i -> Fmt.str "k%04d" i) in
6666+ (* LCG shuffle *)
6767+ let state = ref 42 in
6868+ for i = n - 1 downto 1 do
6969+ state := ((!state * 1103515245) + 12345) land 0x7fffffff;
7070+ let j = !state mod (i + 1) in
7171+ let tmp = keys.(i) in
7272+ keys.(i) <- keys.(j);
7373+ keys.(j) <- tmp
7474+ done;
7575+ Array.iter (fun k -> Btree.Index.insert idx k) keys;
7676+ for i = 0 to n - 1 do
7777+ let key = Fmt.str "k%04d" i in
7878+ Alcotest.(check bool) key true (Btree.Index.mem idx key)
7979+ done
8080+8181+let test_iter_sorted () =
8282+ with_mem_pager ~page_size:512 @@ fun pager ->
8383+ let idx = Btree.Index.v pager in
8484+ let n = 100 in
8585+ for i = n - 1 downto 0 do
8686+ Btree.Index.insert idx (Fmt.str "%04d" i)
8787+ done;
8888+ let prev = ref "" in
8989+ Btree.Index.iter idx (fun key ->
9090+ if key <= !prev then Alcotest.failf "out of order: %S after %S" key !prev;
9191+ prev := key)
9292+9393+let test_delete () =
9494+ with_mem_pager @@ fun pager ->
9595+ let idx = Btree.Index.v pager in
9696+ for i = 0 to 19 do
9797+ Btree.Index.insert idx (Fmt.str "key_%02d" i)
9898+ done;
9999+ Btree.Index.delete idx "key_10";
100100+ Alcotest.(check bool) "deleted" false (Btree.Index.mem idx "key_10");
101101+ Alcotest.(check bool) "kept" true (Btree.Index.mem idx "key_09")
102102+103103+let test_prefix_search () =
104104+ with_mem_pager @@ fun pager ->
105105+ let idx = Btree.Index.v pager in
106106+ Btree.Index.insert idx "app.bsky.feed.post/abc";
107107+ Btree.Index.insert idx "app.bsky.feed.post/def";
108108+ Btree.Index.insert idx "app.bsky.actor.profile/self";
109109+ let result = Btree.Index.by_prefix idx "app.bsky.feed.post/" in
110110+ Alcotest.(check bool) "found prefix" true (Option.is_some result)
111111+112112+let test_overflow_cells () =
113113+ (* Keys large enough to require overflow pages *)
114114+ with_mem_pager @@ fun pager ->
115115+ let idx = Btree.Index.v pager in
116116+ let n = 20 in
117117+ for i = 0 to n - 1 do
118118+ let key = Fmt.str "%04d%s" i (String.make 2000 'x') in
119119+ Btree.Index.insert idx key
120120+ done;
121121+ for i = 0 to n - 1 do
122122+ let key = Fmt.str "%04d%s" i (String.make 2000 'x') in
123123+ Alcotest.(check bool)
124124+ (Fmt.str "overflow %d" i) true (Btree.Index.mem idx key)
125125+ done
126126+127127+let suite =
128128+ ( "index",
129129+ [
130130+ Alcotest.test_case "insert/find" `Quick test_insert_find;
131131+ Alcotest.test_case "many short keys" `Quick test_many_short_keys;
132132+ Alcotest.test_case "long keys (page overflow)" `Quick
133133+ test_long_keys_page_overflow;
134134+ Alcotest.test_case "reverse order" `Quick test_reverse_order;
135135+ Alcotest.test_case "random order" `Quick test_random_order;
136136+ Alcotest.test_case "iter sorted" `Quick test_iter_sorted;
137137+ Alcotest.test_case "delete" `Quick test_delete;
138138+ Alcotest.test_case "prefix search" `Quick test_prefix_search;
139139+ Alcotest.test_case "overflow cells" `Quick test_overflow_cells;
140140+ ] )