···180180 let rowid, consumed = Varint.decode buf (off + 4) in
181181 ({ left_child; rowid }, 4 + consumed)
182182183183- let parse_index_leaf buf off ~usable_size =
183183+ (* Parse index leaf cell - returns local payload and overflow pointer *)
184184+ let parse_index_leaf_raw buf off ~usable_size =
184185 let payload_size, consumed = Varint.decode buf off in
185186 let payload_size = Int64.to_int payload_size in
186187 let max_local = max_local ~usable_size ~is_table:false in
···195196 let overflow = get_u32_be buf (off + consumed + local) in
196197 (local, Some overflow)
197198 in
198198- let payload = String.sub buf (off + consumed) local_size in
199199+ let local_payload = String.sub buf (off + consumed) local_size in
199200 let total = consumed + local_size + if overflow_page = None then 0 else 4 in
200200- ({ payload; overflow_page }, total)
201201+ (payload_size, local_payload, overflow_page, total)
201202202202- let parse_index_interior buf off ~usable_size =
203203+ let parse_index_leaf buf off ~usable_size =
204204+ let payload_size, local_payload, overflow_page, total =
205205+ parse_index_leaf_raw buf off ~usable_size
206206+ in
207207+ (* For now, return just the local payload - caller must handle overflow *)
208208+ ignore payload_size;
209209+ ({ payload = local_payload; overflow_page }, total)
210210+211211+ (* Parse index interior cell - returns local payload and overflow pointer *)
212212+ let parse_index_interior_raw buf off ~usable_size =
203213 let left_child = get_u32_be buf off in
204214 let payload_size, consumed = Varint.decode buf (off + 4) in
205215 let payload_size = Int64.to_int payload_size in
···215225 let overflow = get_u32_be buf (off + 4 + consumed + local) in
216226 (local, Some overflow)
217227 in
218218- let payload = String.sub buf (off + 4 + consumed) local_size in
228228+ let local_payload = String.sub buf (off + 4 + consumed) local_size in
219229 let total =
220230 4 + consumed + local_size + if overflow_page = None then 0 else 4
221231 in
222222- ({ left_child; payload; overflow_page }, total)
232232+ (left_child, payload_size, local_payload, overflow_page, total)
233233+234234+ let parse_index_interior buf off ~usable_size =
235235+ let left_child, _payload_size, local_payload, overflow_page, total =
236236+ parse_index_interior_raw buf off ~usable_size
237237+ in
238238+ ({ left_child; payload = local_payload; overflow_page }, total)
223239end
224240225241(* Record format *)
···946962module Index = struct
947963 type t = { pager : Pager.t; mutable root_page : int }
948964949949- let create pager =
950950- let root = Pager.allocate pager in
951951- let page_size = Pager.page_size pager in
965965+ (* Initialize a page as empty leaf or interior *)
966966+ let init_page ~page_size ~page_type =
952967 let buf = Bytes.create page_size in
953953- Bytes.set_uint8 buf 0 (byte_of_page_type Leaf_index);
968968+ Bytes.set_uint8 buf 0 (byte_of_page_type page_type);
954969 set_u16_be buf 1 0;
970970+ (* first freeblock *)
955971 set_u16_be buf 3 0;
972972+ (* cell count *)
956973 set_u16_be buf 5 page_size;
974974+ (* cell content start *)
957975 Bytes.set_uint8 buf 7 0;
976976+ (* fragmented bytes *)
977977+ if is_interior page_type then set_u32_be buf 8 0;
978978+ (* right child *)
979979+ buf
980980+981981+ let create pager =
982982+ let root = Pager.allocate pager in
983983+ let page_size = Pager.page_size pager in
984984+ let buf = init_page ~page_size ~page_type:Leaf_index in
958985 Pager.write pager root (Bytes.unsafe_to_string buf);
959986 { pager; root_page = root }
960987···962989 let root_page t = t.root_page
963990 let usable_size t = Pager.page_size t.pager
964991992992+ (* Overflow page handling *)
993993+ let read_overflow_chain pager first_page ~remaining_size =
994994+ let usable = Pager.page_size pager in
995995+ let overflow_content_size = usable - 4 in
996996+ let buf = Buffer.create remaining_size in
997997+ let rec read page_num remaining =
998998+ if remaining <= 0 || page_num = 0 then ()
999999+ else begin
10001000+ let page = Pager.read pager page_num in
10011001+ let next_page = get_u32_be page 0 in
10021002+ let to_read = min remaining overflow_content_size in
10031003+ Buffer.add_substring buf page 4 to_read;
10041004+ read next_page (remaining - to_read)
10051005+ end
10061006+ in
10071007+ read first_page remaining_size;
10081008+ Buffer.contents buf
10091009+10101010+ let write_overflow_chain pager payload ~offset =
10111011+ let usable = Pager.page_size pager in
10121012+ let overflow_content_size = usable - 4 in
10131013+ let remaining = String.length payload - offset in
10141014+ if remaining <= 0 then 0
10151015+ else begin
10161016+ let first_page = ref 0 in
10171017+ let prev_page_buf = ref None in
10181018+ let prev_page_num = ref 0 in
10191019+ let pos = ref offset in
10201020+ while !pos < String.length payload do
10211021+ let page_num = Pager.allocate pager in
10221022+ if !first_page = 0 then first_page := page_num;
10231023+ (* Link previous page to this one *)
10241024+ (match !prev_page_buf with
10251025+ | Some buf ->
10261026+ set_u32_be buf 0 page_num;
10271027+ Pager.write pager !prev_page_num (Bytes.unsafe_to_string buf)
10281028+ | None -> ());
10291029+ (* Write this page *)
10301030+ let page_buf = Bytes.create usable in
10311031+ set_u32_be page_buf 0 0;
10321032+ (* Next page = 0 for now *)
10331033+ let to_write =
10341034+ min (String.length payload - !pos) overflow_content_size
10351035+ in
10361036+ Bytes.blit_string payload !pos page_buf 4 to_write;
10371037+ prev_page_buf := Some page_buf;
10381038+ prev_page_num := page_num;
10391039+ pos := !pos + to_write
10401040+ done;
10411041+ (* Write final page *)
10421042+ (match !prev_page_buf with
10431043+ | Some buf ->
10441044+ Pager.write pager !prev_page_num (Bytes.unsafe_to_string buf)
10451045+ | None -> ());
10461046+ !first_page
10471047+ end
10481048+10491049+ (* Read full payload including overflow pages for leaf cells *)
10501050+ let read_full_payload t page off ~usable_size =
10511051+ let payload_size, local_payload, overflow_page, _consumed =
10521052+ Cell.parse_index_leaf_raw page off ~usable_size
10531053+ in
10541054+ match overflow_page with
10551055+ | None -> local_payload
10561056+ | Some first_overflow ->
10571057+ let remaining = payload_size - String.length local_payload in
10581058+ let overflow_data =
10591059+ read_overflow_chain t.pager first_overflow ~remaining_size:remaining
10601060+ in
10611061+ local_payload ^ overflow_data
10621062+10631063+ (* Read full payload including overflow pages for interior cells *)
10641064+ let read_full_interior_payload t page off ~usable_size =
10651065+ let _left_child, payload_size, local_payload, overflow_page, _consumed =
10661066+ Cell.parse_index_interior_raw page off ~usable_size
10671067+ in
10681068+ match overflow_page with
10691069+ | None -> local_payload
10701070+ | Some first_overflow ->
10711071+ let remaining = payload_size - String.length local_payload in
10721072+ let overflow_data =
10731073+ read_overflow_chain t.pager first_overflow ~remaining_size:remaining
10741074+ in
10751075+ local_payload ^ overflow_data
10761076+9651077 let cell_pointers page header =
9661078 let ptrs = Array.make header.cell_count 0 in
9671079 let ptr_start = page_header_size header.page_type in
···9701082 done;
9711083 ptrs
972108410851085+ (* Calculate free space in a page *)
10861086+ let free_space header ~page_type =
10871087+ let header_size = page_header_size page_type in
10881088+ let ptr_area_end = header_size + (header.cell_count * 2) in
10891089+ header.cell_content_start - ptr_area_end - header.fragmented_bytes
10901090+10911091+ (* Encode an index leaf cell - handles overflow for large payloads *)
10921092+ let encode_index_leaf_cell_with_overflow t ~payload =
10931093+ let payload_size = String.length payload in
10941094+ let usable_size = usable_size t in
10951095+ let max_local = Cell.max_local ~usable_size ~is_table:false in
10961096+ let min_local = Cell.min_local ~usable_size in
10971097+ let payload_size_varint = Varint.encode (Int64.of_int payload_size) in
10981098+ let varint_len = String.length payload_size_varint in
10991099+ if payload_size <= max_local then begin
11001100+ (* Fits entirely - no overflow needed *)
11011101+ let cell = Bytes.create (varint_len + payload_size) in
11021102+ Bytes.blit_string payload_size_varint 0 cell 0 varint_len;
11031103+ Bytes.blit_string payload 0 cell varint_len payload_size;
11041104+ Bytes.unsafe_to_string cell
11051105+ end
11061106+ else begin
11071107+ (* Need overflow pages *)
11081108+ let k = min_local + ((payload_size - min_local) mod (usable_size - 4)) in
11091109+ let local_size = if k <= max_local then k else min_local in
11101110+ (* Write overflow pages for data beyond local_size *)
11111111+ let overflow_page =
11121112+ write_overflow_chain t.pager payload ~offset:local_size
11131113+ in
11141114+ (* Create cell: varint(size) + local_payload + overflow_ptr *)
11151115+ let cell = Bytes.create (varint_len + local_size + 4) in
11161116+ Bytes.blit_string payload_size_varint 0 cell 0 varint_len;
11171117+ Bytes.blit_string payload 0 cell varint_len local_size;
11181118+ set_u32_be cell (varint_len + local_size) overflow_page;
11191119+ Bytes.unsafe_to_string cell
11201120+ end
11211121+11221122+ (* Encode an index interior cell *)
11231123+ let encode_index_interior_cell ~left_child ~payload =
11241124+ let payload_size_varint =
11251125+ Varint.encode (Int64.of_int (String.length payload))
11261126+ in
11271127+ let cell =
11281128+ Bytes.create
11291129+ (4 + String.length payload_size_varint + String.length payload)
11301130+ in
11311131+ set_u32_be cell 0 left_child;
11321132+ Bytes.blit_string payload_size_varint 0 cell 4
11331133+ (String.length payload_size_varint);
11341134+ Bytes.blit_string payload 0 cell
11351135+ (4 + String.length payload_size_varint)
11361136+ (String.length payload);
11371137+ Bytes.unsafe_to_string cell
11381138+11391139+ (* Write a cell into a page buffer, returns new cell_content_start *)
11401140+ let write_cell buf ~cell_content_start ~cell =
11411141+ let cell_len = String.length cell in
11421142+ let new_start = cell_content_start - cell_len in
11431143+ Bytes.blit_string cell 0 buf new_start cell_len;
11441144+ new_start
11451145+11461146+ (* Insert a cell pointer at index, shifting others *)
11471147+ let insert_cell_pointer buf ~header_offset ~page_type ~cell_count ~index ~ptr
11481148+ =
11491149+ let ptr_start = header_offset + page_header_size page_type in
11501150+ (* Shift existing pointers right *)
11511151+ for i = cell_count - 1 downto index do
11521152+ let old_ptr =
11531153+ get_u16_be (Bytes.unsafe_to_string buf) (ptr_start + (i * 2))
11541154+ in
11551155+ set_u16_be buf (ptr_start + ((i + 1) * 2)) old_ptr
11561156+ done;
11571157+ set_u16_be buf (ptr_start + (index * 2)) ptr
11581158+11591159+ (* Compare for btree navigation: key is strictly less than payload.
11601160+ Special case: if key is a prefix of payload, they are equal for navigation purposes
11611161+ (i.e., entries starting with key might be at or after payload's position). *)
11621162+ let key_less_than key payload =
11631163+ let key_len = String.length key in
11641164+ let payload_len = String.length payload in
11651165+ let cmp_len = min key_len payload_len in
11661166+ let cmp =
11671167+ String.compare (String.sub key 0 cmp_len) (String.sub payload 0 cmp_len)
11681168+ in
11691169+ if cmp <> 0 then cmp < 0
11701170+ else
11711171+ (* Prefixes match - if key is shorter or equal length, it's NOT strictly less *)
11721172+ false
11731173+9731174 let rec mem_in_page t page_num key =
9741175 let page = Pager.read t.pager page_num in
9751176 let header = parse_page_header page 0 in
···9771178 let usable = usable_size t in
9781179 match header.page_type with
9791180 | Leaf_index ->
980980- let rec search i =
981981- if i >= header.cell_count then false
11811181+ let rec search lo hi =
11821182+ if lo > hi then false
9821183 else
11841184+ let mid = (lo + hi) / 2 in
9831185 let cell, _ =
984984- Cell.parse_index_leaf page ptrs.(i) ~usable_size:usable
11861186+ Cell.parse_index_leaf page ptrs.(mid) ~usable_size:usable
9851187 in
986986- if cell.payload = key then true
987987- else if cell.payload > key then false
988988- else search (i + 1)
11881188+ let cmp = String.compare key cell.payload in
11891189+ if cmp = 0 then true
11901190+ else if cmp < 0 then search lo (mid - 1)
11911191+ else search (mid + 1) hi
9891192 in
990990- search 0
11931193+ search 0 (header.cell_count - 1)
9911194 | Interior_index ->
9921195 let rec find_child i =
9931196 if i >= header.cell_count then Option.get header.right_child
···9951198 let cell, _ =
9961199 Cell.parse_index_interior page ptrs.(i) ~usable_size:usable
9971200 in
998998- if key <= cell.payload then cell.left_child else find_child (i + 1)
12011201+ if key_less_than key cell.payload then cell.left_child
12021202+ else find_child (i + 1)
9991203 in
10001204 mem_in_page t (find_child 0) key
10011205 | _ -> failwith "Invalid page type in index B-tree"
1002120610031207 let mem t key = mem_in_page t t.root_page key
10041004- let insert _t _key = failwith "Index insert not yet implemented"
10051005- let delete _t _key = failwith "Index delete not yet implemented"
12081208+12091209+ (* Find exact key, returns payload *)
12101210+ let rec find_in_page t page_num key =
12111211+ let page = Pager.read t.pager page_num in
12121212+ let header = parse_page_header page 0 in
12131213+ let ptrs = cell_pointers page header in
12141214+ let usable = usable_size t in
12151215+ match header.page_type with
12161216+ | Leaf_index ->
12171217+ let rec search lo hi =
12181218+ if lo > hi then None
12191219+ else
12201220+ let mid = (lo + hi) / 2 in
12211221+ (* Read full payload including overflow *)
12221222+ let full_payload =
12231223+ read_full_payload t page ptrs.(mid) ~usable_size:usable
12241224+ in
12251225+ let cmp = String.compare key full_payload in
12261226+ if cmp = 0 then Some full_payload
12271227+ else if cmp < 0 then search lo (mid - 1)
12281228+ else search (mid + 1) hi
12291229+ in
12301230+ search 0 (header.cell_count - 1)
12311231+ | Interior_index ->
12321232+ let rec find_child_rec i =
12331233+ if i >= header.cell_count then Option.get header.right_child
12341234+ else
12351235+ (* Read full payload for interior comparison *)
12361236+ let full_payload =
12371237+ read_full_interior_payload t page ptrs.(i) ~usable_size:usable
12381238+ in
12391239+ if key_less_than key full_payload then
12401240+ let cell, _ =
12411241+ Cell.parse_index_interior page ptrs.(i) ~usable_size:usable
12421242+ in
12431243+ cell.left_child
12441244+ else find_child_rec (i + 1)
12451245+ in
12461246+ find_in_page t (find_child_rec 0) key
12471247+ | _ -> failwith "Invalid page type in index B-tree"
12481248+12491249+ let find t key = find_in_page t t.root_page key
12501250+12511251+ (* Find a key in leaf page, returns (payload, index) if found *)
12521252+ let find_in_leaf t page header key =
12531253+ let ptrs = cell_pointers page header in
12541254+ let usable = usable_size t in
12551255+ let rec search lo hi =
12561256+ if lo > hi then None
12571257+ else
12581258+ let mid = (lo + hi) / 2 in
12591259+ let full_payload =
12601260+ read_full_payload t page ptrs.(mid) ~usable_size:usable
12611261+ in
12621262+ let cmp = String.compare key full_payload in
12631263+ if cmp = 0 then Some (full_payload, mid)
12641264+ else if cmp < 0 then search lo (mid - 1)
12651265+ else search (mid + 1) hi
12661266+ in
12671267+ search 0 (header.cell_count - 1)
12681268+12691269+ (* Find insertion index for key in leaf page *)
12701270+ let find_insert_idx t page header key =
12711271+ let ptrs = cell_pointers page header in
12721272+ let usable = usable_size t in
12731273+ let rec find i =
12741274+ if i >= header.cell_count then i
12751275+ else
12761276+ let full_payload =
12771277+ read_full_payload t page ptrs.(i) ~usable_size:usable
12781278+ in
12791279+ if key < full_payload then i else find (i + 1)
12801280+ in
12811281+ find 0
12821282+12831283+ (* Find child page for key in interior page *)
12841284+ let find_child t page header key =
12851285+ let ptrs = cell_pointers page header in
12861286+ let usable = usable_size t in
12871287+ let rec loop i =
12881288+ if i >= header.cell_count then Option.get header.right_child
12891289+ else
12901290+ let full_payload =
12911291+ read_full_interior_payload t page ptrs.(i) ~usable_size:usable
12921292+ in
12931293+ if key_less_than key full_payload then
12941294+ let cell, _ =
12951295+ Cell.parse_index_interior page ptrs.(i) ~usable_size:usable
12961296+ in
12971297+ cell.left_child
12981298+ else loop (i + 1)
12991299+ in
13001300+ loop 0
13011301+13021302+ (* Find child index for key in interior page *)
13031303+ let find_child_idx t page header key =
13041304+ let ptrs = cell_pointers page header in
13051305+ let usable = usable_size t in
13061306+ let rec loop i =
13071307+ if i >= header.cell_count then i (* right child *)
13081308+ else
13091309+ let full_payload =
13101310+ read_full_interior_payload t page ptrs.(i) ~usable_size:usable
13111311+ in
13121312+ if key_less_than key full_payload then i else loop (i + 1)
13131313+ in
13141314+ loop 0
13151315+13161316+ (* Split result: new page number and separator key *)
13171317+ type split_result = { new_page : int; separator_key : string }
13181318+13191319+ (* Split a leaf page, returns info about the new page *)
13201320+ let split_leaf t page_num =
13211321+ let page = Pager.read t.pager page_num in
13221322+ let header = parse_page_header page 0 in
13231323+ let ptrs = cell_pointers page header in
13241324+ let usable = usable_size t in
13251325+ let page_size = Pager.page_size t.pager in
13261326+13271327+ (* Find split point (middle) *)
13281328+ let split_idx = header.cell_count / 2 in
13291329+13301330+ (* Get separator key (first key that goes to right page) - read full payload *)
13311331+ let separator_key =
13321332+ read_full_payload t page ptrs.(split_idx) ~usable_size:usable
13331333+ in
13341334+13351335+ (* Create new right page *)
13361336+ let new_page_num = Pager.allocate t.pager in
13371337+ let new_buf = init_page ~page_size ~page_type:Leaf_index in
13381338+13391339+ (* Copy cells [split_idx..cell_count-1] to new page *)
13401340+ let new_cell_content_start = ref page_size in
13411341+ for i = split_idx to header.cell_count - 1 do
13421342+ let cell_off = ptrs.(i) in
13431343+ (* Read full payload including from overflow pages *)
13441344+ let full_payload =
13451345+ read_full_payload t page cell_off ~usable_size:usable
13461346+ in
13471347+ (* Re-encode with overflow support *)
13481348+ let cell_data =
13491349+ encode_index_leaf_cell_with_overflow t ~payload:full_payload
13501350+ in
13511351+ new_cell_content_start :=
13521352+ write_cell new_buf ~cell_content_start:!new_cell_content_start
13531353+ ~cell:cell_data;
13541354+ let new_idx = i - split_idx in
13551355+ let ptr_off = page_header_size Leaf_index + (new_idx * 2) in
13561356+ set_u16_be new_buf ptr_off !new_cell_content_start
13571357+ done;
13581358+ let new_cell_count = header.cell_count - split_idx in
13591359+ set_u16_be new_buf 3 new_cell_count;
13601360+ set_u16_be new_buf 5 !new_cell_content_start;
13611361+ Pager.write t.pager new_page_num (Bytes.unsafe_to_string new_buf);
13621362+13631363+ (* Update original page to only have cells [0..split_idx-1] *)
13641364+ let old_buf = Bytes.of_string page in
13651365+ set_u16_be old_buf 3 split_idx;
13661366+ (* Recalculate cell content start for remaining cells *)
13671367+ if split_idx > 0 then begin
13681368+ let min_ptr = ref page_size in
13691369+ for i = 0 to split_idx - 1 do
13701370+ if ptrs.(i) < !min_ptr then min_ptr := ptrs.(i)
13711371+ done;
13721372+ set_u16_be old_buf 5 !min_ptr
13731373+ end
13741374+ else set_u16_be old_buf 5 page_size;
13751375+ Pager.write t.pager page_num (Bytes.unsafe_to_string old_buf);
13761376+13771377+ { new_page = new_page_num; separator_key }
13781378+13791379+ (* Split an interior page *)
13801380+ let split_interior t page_num =
13811381+ let page = Pager.read t.pager page_num in
13821382+ let header = parse_page_header page 0 in
13831383+ let ptrs = cell_pointers page header in
13841384+ let usable = usable_size t in
13851385+ let page_size = Pager.page_size t.pager in
13861386+13871387+ (* Split point - middle cell becomes separator, doesn't go to either page *)
13881388+ let split_idx = header.cell_count / 2 in
13891389+ let sep_cell, _ =
13901390+ Cell.parse_index_interior page ptrs.(split_idx) ~usable_size:usable
13911391+ in
13921392+ let separator_key = sep_cell.payload in
13931393+13941394+ (* Create new right page *)
13951395+ let new_page_num = Pager.allocate t.pager in
13961396+ let new_buf = init_page ~page_size ~page_type:Interior_index in
13971397+13981398+ (* Cells [split_idx+1..cell_count-1] go to new page *)
13991399+ let new_cell_content_start = ref page_size in
14001400+ for i = split_idx + 1 to header.cell_count - 1 do
14011401+ let cell_off = ptrs.(i) in
14021402+ let cell, _ =
14031403+ Cell.parse_index_interior page cell_off ~usable_size:usable
14041404+ in
14051405+ let cell_data =
14061406+ encode_index_interior_cell ~left_child:cell.left_child
14071407+ ~payload:cell.payload
14081408+ in
14091409+ new_cell_content_start :=
14101410+ write_cell new_buf ~cell_content_start:!new_cell_content_start
14111411+ ~cell:cell_data;
14121412+ let new_idx = i - split_idx - 1 in
14131413+ let ptr_off = page_header_size Interior_index + (new_idx * 2) in
14141414+ set_u16_be new_buf ptr_off !new_cell_content_start
14151415+ done;
14161416+ let new_cell_count = header.cell_count - split_idx - 1 in
14171417+ set_u16_be new_buf 3 new_cell_count;
14181418+ set_u16_be new_buf 5 !new_cell_content_start;
14191419+ (* Right child of new page is same as original *)
14201420+ set_u32_be new_buf 8 (Option.get header.right_child);
14211421+ Pager.write t.pager new_page_num (Bytes.unsafe_to_string new_buf);
14221422+14231423+ (* Update original page: keep cells [0..split_idx-1], right child = sep_cell.left_child *)
14241424+ let old_buf = Bytes.of_string page in
14251425+ set_u16_be old_buf 3 split_idx;
14261426+ set_u32_be old_buf 8 sep_cell.left_child;
14271427+ if split_idx > 0 then begin
14281428+ let min_ptr = ref page_size in
14291429+ for i = 0 to split_idx - 1 do
14301430+ if ptrs.(i) < !min_ptr then min_ptr := ptrs.(i)
14311431+ done;
14321432+ set_u16_be old_buf 5 !min_ptr
14331433+ end
14341434+ else set_u16_be old_buf 5 page_size;
14351435+ Pager.write t.pager page_num (Bytes.unsafe_to_string old_buf);
14361436+14371437+ { new_page = new_page_num; separator_key }
14381438+14391439+ (* Insert a separator into an interior page, potentially splitting *)
14401440+ let rec insert_into_interior t page_num ~left_child ~separator_key
14411441+ ~right_child =
14421442+ let page = Pager.read t.pager page_num in
14431443+ let header = parse_page_header page 0 in
14441444+ let cell = encode_index_interior_cell ~left_child ~payload:separator_key in
14451445+ let cell_len = String.length cell in
14461446+ let space_needed = cell_len + 2 in
14471447+ (* cell + pointer *)
14481448+14491449+ if free_space header ~page_type:Interior_index >= space_needed then begin
14501450+ (* Fits - insert directly *)
14511451+ let buf = Bytes.of_string page in
14521452+ let ptrs = cell_pointers page header in
14531453+ let usable = usable_size t in
14541454+14551455+ (* Find insert position *)
14561456+ let insert_idx =
14571457+ let rec find i =
14581458+ if i >= header.cell_count then i
14591459+ else
14601460+ let c, _ =
14611461+ Cell.parse_index_interior page ptrs.(i) ~usable_size:usable
14621462+ in
14631463+ if separator_key < c.payload then i else find (i + 1)
14641464+ in
14651465+ find 0
14661466+ in
14671467+14681468+ (* Write cell *)
14691469+ let cell_start =
14701470+ write_cell buf ~cell_content_start:header.cell_content_start ~cell
14711471+ in
14721472+ set_u16_be buf 5 cell_start;
14731473+14741474+ (* Insert pointer *)
14751475+ insert_cell_pointer buf ~header_offset:0 ~page_type:Interior_index
14761476+ ~cell_count:header.cell_count ~index:insert_idx ~ptr:cell_start;
14771477+14781478+ (* Update cell count *)
14791479+ set_u16_be buf 3 (header.cell_count + 1);
14801480+14811481+ (* Update child pointers *)
14821482+ if insert_idx < header.cell_count then begin
14831483+ let ptr_start = page_header_size Interior_index in
14841484+ let displaced_ptr =
14851485+ get_u16_be
14861486+ (Bytes.unsafe_to_string buf)
14871487+ (ptr_start + ((insert_idx + 1) * 2))
14881488+ in
14891489+ set_u32_be buf displaced_ptr right_child
14901490+ end
14911491+ else set_u32_be buf 8 right_child;
14921492+14931493+ Pager.write t.pager page_num (Bytes.unsafe_to_string buf);
14941494+ None
14951495+ end
14961496+ else begin
14971497+ (* Need to split interior page *)
14981498+ let split = split_interior t page_num in
14991499+15001500+ (* Determine which page gets the new separator *)
15011501+ if separator_key < split.separator_key then begin
15021502+ ignore
15031503+ (insert_into_interior t page_num ~left_child ~separator_key
15041504+ ~right_child)
15051505+ end
15061506+ else begin
15071507+ ignore
15081508+ (insert_into_interior t split.new_page ~left_child ~separator_key
15091509+ ~right_child)
15101510+ end;
15111511+15121512+ Some { split with separator_key = split.separator_key }
15131513+ end
15141514+15151515+ (* Insert into a leaf page, potentially splitting *)
15161516+ let rec insert_into_leaf t page_num ~key ~parent_stack =
15171517+ let page = Pager.read t.pager page_num in
15181518+ let header = parse_page_header page 0 in
15191519+ let cell = encode_index_leaf_cell_with_overflow t ~payload:key in
15201520+ let cell_len = String.length cell in
15211521+ let space_needed = cell_len + 2 in
15221522+ (* cell + pointer *)
15231523+15241524+ if free_space header ~page_type:Leaf_index >= space_needed then begin
15251525+ (* Fits - insert directly *)
15261526+ let buf = Bytes.of_string page in
15271527+ let insert_idx = find_insert_idx t page header key in
15281528+15291529+ (* Write cell *)
15301530+ let cell_start =
15311531+ write_cell buf ~cell_content_start:header.cell_content_start ~cell
15321532+ in
15331533+ set_u16_be buf 5 cell_start;
15341534+15351535+ (* Insert pointer *)
15361536+ insert_cell_pointer buf ~header_offset:0 ~page_type:Leaf_index
15371537+ ~cell_count:header.cell_count ~index:insert_idx ~ptr:cell_start;
15381538+15391539+ (* Update cell count *)
15401540+ set_u16_be buf 3 (header.cell_count + 1);
15411541+15421542+ Pager.write t.pager page_num (Bytes.unsafe_to_string buf)
15431543+ end
15441544+ else begin
15451545+ (* Need to split *)
15461546+ let split = split_leaf t page_num in
15471547+15481548+ (* Determine target page and insert *)
15491549+ let target_page =
15501550+ if key < split.separator_key then page_num else split.new_page
15511551+ in
15521552+ let target = Pager.read t.pager target_page in
15531553+ let target_header = parse_page_header target 0 in
15541554+ let target_buf = Bytes.of_string target in
15551555+ let insert_idx = find_insert_idx t target target_header key in
15561556+ let cell_start =
15571557+ write_cell target_buf
15581558+ ~cell_content_start:target_header.cell_content_start ~cell
15591559+ in
15601560+ set_u16_be target_buf 5 cell_start;
15611561+ insert_cell_pointer target_buf ~header_offset:0 ~page_type:Leaf_index
15621562+ ~cell_count:target_header.cell_count ~index:insert_idx ~ptr:cell_start;
15631563+ set_u16_be target_buf 3 (target_header.cell_count + 1);
15641564+ Pager.write t.pager target_page (Bytes.unsafe_to_string target_buf);
15651565+15661566+ (* Propagate split up *)
15671567+ propagate_split t ~parent_stack ~left_page:page_num
15681568+ ~separator_key:split.separator_key ~right_page:split.new_page
15691569+ end
15701570+15711571+ and propagate_split t ~parent_stack ~left_page ~separator_key ~right_page =
15721572+ match parent_stack with
15731573+ | [] ->
15741574+ (* Splitting root - create new root *)
15751575+ let page_size = Pager.page_size t.pager in
15761576+ let new_root = Pager.allocate t.pager in
15771577+ let buf = init_page ~page_size ~page_type:Interior_index in
15781578+15791579+ (* Single cell pointing to left page *)
15801580+ let cell =
15811581+ encode_index_interior_cell ~left_child:left_page
15821582+ ~payload:separator_key
15831583+ in
15841584+ let cell_start = write_cell buf ~cell_content_start:page_size ~cell in
15851585+ set_u16_be buf 5 cell_start;
15861586+ set_u16_be buf (page_header_size Interior_index) cell_start;
15871587+ set_u16_be buf 3 1;
15881588+ set_u32_be buf 8 right_page;
15891589+15901590+ Pager.write t.pager new_root (Bytes.unsafe_to_string buf);
15911591+ t.root_page <- new_root
15921592+ | parent_page :: rest -> (
15931593+ match
15941594+ insert_into_interior t parent_page ~left_child:left_page
15951595+ ~separator_key ~right_child:right_page
15961596+ with
15971597+ | None -> () (* Fit in parent *)
15981598+ | Some split ->
15991599+ (* Parent also split, propagate up *)
16001600+ propagate_split t ~parent_stack:rest ~left_page:parent_page
16011601+ ~separator_key:split.separator_key ~right_page:split.new_page)
16021602+16031603+ (* Main insert - traverses tree and handles splits *)
16041604+ let insert t key =
16051605+ let rec traverse page_num parent_stack =
16061606+ let page = Pager.read t.pager page_num in
16071607+ let header = parse_page_header page 0 in
16081608+ match header.page_type with
16091609+ | Leaf_index -> (
16101610+ (* Check if key already exists *)
16111611+ match find_in_leaf t page header key with
16121612+ | Some _ -> () (* Key exists, do nothing (set semantics) *)
16131613+ | None -> insert_into_leaf t page_num ~key ~parent_stack)
16141614+ | Interior_index ->
16151615+ let child_idx = find_child_idx t page header key in
16161616+ let child_page =
16171617+ if child_idx >= header.cell_count then Option.get header.right_child
16181618+ else
16191619+ let ptrs = cell_pointers page header in
16201620+ let usable = usable_size t in
16211621+ let cell, _ =
16221622+ Cell.parse_index_interior page ptrs.(child_idx)
16231623+ ~usable_size:usable
16241624+ in
16251625+ cell.left_child
16261626+ in
16271627+ traverse child_page (page_num :: parent_stack)
16281628+ | _ -> failwith "Invalid page type in index B-tree"
16291629+ in
16301630+ traverse t.root_page []
16311631+16321632+ (* Delete a cell from a leaf page at given index *)
16331633+ let delete_from_leaf t page_num ~index =
16341634+ let page = Pager.read t.pager page_num in
16351635+ let header = parse_page_header page 0 in
16361636+ if header.cell_count = 0 then ()
16371637+ else begin
16381638+ let buf = Bytes.of_string page in
16391639+ let ptr_start = page_header_size Leaf_index in
16401640+16411641+ (* Shift pointers left to remove the entry at index *)
16421642+ for i = index to header.cell_count - 2 do
16431643+ let next_ptr = get_u16_be page (ptr_start + ((i + 1) * 2)) in
16441644+ set_u16_be buf (ptr_start + (i * 2)) next_ptr
16451645+ done;
16461646+16471647+ (* Decrease cell count *)
16481648+ set_u16_be buf 3 (header.cell_count - 1);
16491649+16501650+ (* Note: We don't reclaim space - it becomes fragmented.
16511651+ A proper implementation would compact or track free space. *)
16521652+ Pager.write t.pager page_num (Bytes.unsafe_to_string buf)
16531653+ end
16541654+16551655+ (* Delete implementation - simplified, doesn't rebalance *)
16561656+ let delete t key =
16571657+ let rec traverse page_num =
16581658+ let page = Pager.read t.pager page_num in
16591659+ let header = parse_page_header page 0 in
16601660+ match header.page_type with
16611661+ | Leaf_index -> (
16621662+ match find_in_leaf t page header key with
16631663+ | Some (_, idx) -> delete_from_leaf t page_num ~index:idx
16641664+ | None -> () (* Key not found, nothing to do *))
16651665+ | Interior_index ->
16661666+ let child = find_child t page header key in
16671667+ traverse child
16681668+ | _ -> failwith "Invalid page type in index B-tree"
16691669+ in
16701670+ traverse t.root_page
16711671+16721672+ (* Find by prefix - returns first entry starting with prefix *)
16731673+ let rec find_by_prefix_in_page t page_num prefix =
16741674+ let page = Pager.read t.pager page_num in
16751675+ let header = parse_page_header page 0 in
16761676+ let ptrs = cell_pointers page header in
16771677+ let usable = usable_size t in
16781678+ let prefix_len = String.length prefix in
16791679+ let starts_with payload =
16801680+ String.length payload >= prefix_len
16811681+ && String.sub payload 0 prefix_len = prefix
16821682+ in
16831683+ match header.page_type with
16841684+ | Leaf_index ->
16851685+ (* Linear search for first entry with prefix *)
16861686+ let rec find_first i =
16871687+ if i >= header.cell_count then None
16881688+ else
16891689+ let full_payload =
16901690+ read_full_payload t page ptrs.(i) ~usable_size:usable
16911691+ in
16921692+ if starts_with full_payload then Some full_payload
16931693+ else if full_payload > prefix then None
16941694+ else find_first (i + 1)
16951695+ in
16961696+ find_first 0
16971697+ | Interior_index ->
16981698+ let child = find_child t page header prefix in
16991699+ find_by_prefix_in_page t child prefix
17001700+ | _ -> failwith "Invalid page type in index B-tree"
17011701+17021702+ let find_by_prefix t prefix = find_by_prefix_in_page t t.root_page prefix
17031703+17041704+ (* Delete by prefix - deletes first entry starting with prefix *)
17051705+ let rec delete_by_prefix_in_page t page_num prefix =
17061706+ let page = Pager.read t.pager page_num in
17071707+ let header = parse_page_header page 0 in
17081708+ let ptrs = cell_pointers page header in
17091709+ let usable = usable_size t in
17101710+ let prefix_len = String.length prefix in
17111711+ let starts_with payload =
17121712+ String.length payload >= prefix_len
17131713+ && String.sub payload 0 prefix_len = prefix
17141714+ in
17151715+ match header.page_type with
17161716+ | Leaf_index -> (
17171717+ (* Find first entry with prefix *)
17181718+ let rec find_idx i =
17191719+ if i >= header.cell_count then None
17201720+ else
17211721+ let full_payload =
17221722+ read_full_payload t page ptrs.(i) ~usable_size:usable
17231723+ in
17241724+ if starts_with full_payload then Some i
17251725+ else if full_payload > prefix then None
17261726+ else find_idx (i + 1)
17271727+ in
17281728+ match find_idx 0 with
17291729+ | Some idx -> delete_from_leaf t page_num ~index:idx
17301730+ | None -> ())
17311731+ | Interior_index ->
17321732+ let child = find_child t page header prefix in
17331733+ delete_by_prefix_in_page t child prefix
17341734+ | _ -> failwith "Invalid page type in index B-tree"
17351735+17361736+ let delete_by_prefix t prefix = delete_by_prefix_in_page t t.root_page prefix
1006173710071738 let iter t f =
10081739 let rec iter_page page_num =
···10131744 match header.page_type with
10141745 | Leaf_index ->
10151746 for i = 0 to header.cell_count - 1 do
10161016- let cell, _ =
10171017- Cell.parse_index_leaf page ptrs.(i) ~usable_size:usable
17471747+ let full_payload =
17481748+ read_full_payload t page ptrs.(i) ~usable_size:usable
10181749 in
10191019- f cell.payload
17501750+ f full_payload
10201751 done
10211752 | Interior_index ->
10221753 for i = 0 to header.cell_count - 1 do
+17-3
lib/btree.mli
···196196197197module Index : sig
198198 type t
199199+ (** An index B-tree for string keys. *)
199200200201 val create : Pager.t -> t
201202 (** [create pager] creates a new empty index B-tree. *)
···209210 val mem : t -> string -> bool
210211 (** [mem t key] returns true if [key] exists in the index. *)
211212213213+ val find : t -> string -> string option
214214+ (** [find t key] returns the payload for [key] if it exists. *)
215215+212216 val insert : t -> string -> unit
213213- (** [insert t key] inserts a key. *)
217217+ (** [insert t key] inserts a key. If the key already exists, this is a no-op
218218+ (set semantics). *)
214219215220 val delete : t -> string -> unit
216216- (** [delete t key] deletes a key. *)
221221+ (** [delete t key] removes a key. If the key doesn't exist, this is a no-op.
222222+ Note: This is a simplified implementation that doesn't rebalance the tree.
223223+ *)
224224+225225+ val find_by_prefix : t -> string -> string option
226226+ (** [find_by_prefix t prefix] finds the first entry starting with [prefix]. *)
227227+228228+ val delete_by_prefix : t -> string -> unit
229229+ (** [delete_by_prefix t prefix] deletes the first entry starting with
230230+ [prefix]. *)
217231218232 val iter : t -> (string -> unit) -> unit
219219- (** [iter t f] calls [f key] for each key in order. *)
233233+ (** [iter t f] calls [f key] for each key in sorted order. *)
220234end