Pure OCaml B-tree implementation for persistent storage
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(btree,gpt): extract pack_cells helper, fix E005/E320

Extract shared pack_cells helper in index.ml and table.ml to reduce
split_leaf/split_interior below 50-line threshold. Shorten
test_first_usable_lba identifier to satisfy E320.

+90 -176
+45 -86
lib/index.ml
··· 392 392 (* Split result: new page number and separator key *) 393 393 type split_result = { new_page : int; separator_key : string } 394 394 395 - (* Split a leaf page, returns info about the new page *) 395 + (* Pack cells [start..stop-1] into a fresh page. [copy_cell i] returns bytes. *) 396 + let pack_cells ~page_size ~kind ~copy_cell ~start ~stop = 397 + let buf = Page.init ~page_size ~kind in 398 + let content_start = ref page_size in 399 + for i = start to stop - 1 do 400 + let cell = copy_cell i in 401 + content_start := write_cell buf ~cell_content_start:!content_start ~cell; 402 + let ptr_off = Page.header_size kind + ((i - start) * 2) in 403 + Page.set_u16_be buf ptr_off !content_start 404 + done; 405 + Page.set_u16_be buf 3 (stop - start); 406 + Page.set_u16_be buf 5 !content_start; 407 + buf 408 + 396 409 let split_leaf t page_num = 397 410 let page = Pager.read t.pager page_num in 398 411 let header = Page.parse_header page 0 in 399 412 let ptrs = cell_pointers page header in 400 413 let usable = usable_size t in 401 414 let page_size = Pager.page_size t.pager in 402 - 403 - (* Find split point (middle) *) 404 415 let split_idx = header.Page.cell_count / 2 in 405 - 406 - (* Get separator key (first key that goes to right page) - read full payload *) 407 416 let separator_key = 408 417 read_full_payload t page ptrs.(split_idx) ~usable_size:usable 409 418 in 410 - 411 - (* Create new right page *) 419 + (* Right page: re-encode cells with overflow support *) 412 420 let new_page_num = Pager.allocate t.pager in 413 - let new_buf = Page.init ~page_size ~kind:Page.Leaf_index in 414 - 415 - (* Copy cells [split_idx..cell_count-1] to new page *) 416 - let new_cell_content_start = ref page_size in 417 - for i = split_idx to header.Page.cell_count - 1 do 418 - let cell_off = ptrs.(i) in 419 - (* Read full payload including from overflow pages *) 420 - let full_payload = read_full_payload t page cell_off ~usable_size:usable in 421 - (* Re-encode with overflow support *) 422 - let cell_data = encode_leaf_cell_overflow t ~payload:full_payload in 423 - new_cell_content_start := 424 - write_cell new_buf ~cell_content_start:!new_cell_content_start 425 - ~cell:cell_data; 426 - let new_idx = i - split_idx in 427 - let ptr_off = Page.header_size Page.Leaf_index + (new_idx * 2) in 428 - Page.set_u16_be new_buf ptr_off !new_cell_content_start 429 - done; 430 - let new_cell_count = header.Page.cell_count - split_idx in 431 - Page.set_u16_be new_buf 3 new_cell_count; 432 - Page.set_u16_be new_buf 5 !new_cell_content_start; 421 + let new_buf = 422 + pack_cells ~page_size ~kind:Page.Leaf_index ~start:split_idx 423 + ~stop:header.Page.cell_count ~copy_cell:(fun i -> 424 + let payload = read_full_payload t page ptrs.(i) ~usable_size:usable in 425 + encode_leaf_cell_overflow t ~payload) 426 + in 433 427 Pager.write t.pager new_page_num (Bytes.unsafe_to_string new_buf); 434 - 435 - (* Compact original page: repack remaining cells [0..split_idx-1] *) 436 - let old_buf = Page.init ~page_size ~kind:Page.Leaf_index in 437 - let old_cell_start = ref page_size in 438 - for i = 0 to split_idx - 1 do 439 - let _payload_size, _local, _overflow, cell_size = 440 - Cell.parse_index_leaf_raw page ptrs.(i) ~usable_size:usable 441 - in 442 - let raw_cell = String.sub page ptrs.(i) cell_size in 443 - old_cell_start := 444 - write_cell old_buf ~cell_content_start:!old_cell_start ~cell:raw_cell; 445 - let ptr_off = Page.header_size Page.Leaf_index + (i * 2) in 446 - Page.set_u16_be old_buf ptr_off !old_cell_start 447 - done; 448 - Page.set_u16_be old_buf 3 split_idx; 449 - Page.set_u16_be old_buf 5 !old_cell_start; 428 + (* Left page: raw-copy remaining cells *) 429 + let old_buf = 430 + pack_cells ~page_size ~kind:Page.Leaf_index ~start:0 ~stop:split_idx 431 + ~copy_cell:(fun i -> 432 + let _payload_size, _local, _overflow, cell_size = 433 + Cell.parse_index_leaf_raw page ptrs.(i) ~usable_size:usable 434 + in 435 + String.sub page ptrs.(i) cell_size) 436 + in 450 437 Pager.write t.pager page_num (Bytes.unsafe_to_string old_buf); 451 - 452 438 { new_page = new_page_num; separator_key } 453 439 454 - (* Split an interior page *) 455 440 let split_interior t page_num = 456 441 let page = Pager.read t.pager page_num in 457 442 let header = Page.parse_header page 0 in 458 443 let ptrs = cell_pointers page header in 459 444 let usable = usable_size t in 460 445 let page_size = Pager.page_size t.pager in 461 - 462 - (* Split point - middle cell becomes separator, doesn't go to either page *) 463 446 let split_idx = header.Page.cell_count / 2 in 464 447 let sep_cell, _ = 465 448 Cell.parse_index_interior page ptrs.(split_idx) ~usable_size:usable ··· 467 450 let separator_key = 468 451 read_full_interior_payload t page ptrs.(split_idx) ~usable_size:usable 469 452 in 470 - 471 - (* Create new right page *) 472 - let new_page_num = Pager.allocate t.pager in 473 - let new_buf = Page.init ~page_size ~kind:Page.Interior_index in 474 - 475 - (* Cells [split_idx+1..cell_count-1] go to new page (raw copy preserves overflow) *) 476 - let new_cell_content_start = ref page_size in 477 - for i = split_idx + 1 to header.Page.cell_count - 1 do 478 - let cell_off = ptrs.(i) in 453 + let raw_interior_cell i = 479 454 let _left, _payload_size, _local, _overflow, cell_size = 480 - Cell.parse_index_interior_raw page cell_off ~usable_size:usable 455 + Cell.parse_index_interior_raw page ptrs.(i) ~usable_size:usable 481 456 in 482 - let raw_cell = String.sub page cell_off cell_size in 483 - new_cell_content_start := 484 - write_cell new_buf ~cell_content_start:!new_cell_content_start 485 - ~cell:raw_cell; 486 - let new_idx = i - split_idx - 1 in 487 - let ptr_off = Page.header_size Page.Interior_index + (new_idx * 2) in 488 - Page.set_u16_be new_buf ptr_off !new_cell_content_start 489 - done; 490 - let new_cell_count = header.Page.cell_count - split_idx - 1 in 491 - Page.set_u16_be new_buf 3 new_cell_count; 492 - Page.set_u16_be new_buf 5 !new_cell_content_start; 493 - (* Right child of new page is same as original *) 457 + String.sub page ptrs.(i) cell_size 458 + in 459 + (* Right page *) 460 + let new_page_num = Pager.allocate t.pager in 461 + let new_buf = 462 + pack_cells ~page_size ~kind:Page.Interior_index ~start:(split_idx + 1) 463 + ~stop:header.Page.cell_count ~copy_cell:raw_interior_cell 464 + in 494 465 Page.set_u32_be new_buf 8 (Option.get header.Page.right_child); 495 466 Pager.write t.pager new_page_num (Bytes.unsafe_to_string new_buf); 496 - 497 - (* Compact original page: repack remaining cells [0..split_idx-1] *) 498 - let old_buf = Page.init ~page_size ~kind:Page.Interior_index in 499 - let old_cell_start = ref page_size in 500 - for i = 0 to split_idx - 1 do 501 - let _left, _payload_size, _local, _overflow, cell_size = 502 - Cell.parse_index_interior_raw page ptrs.(i) ~usable_size:usable 503 - in 504 - let raw_cell = String.sub page ptrs.(i) cell_size in 505 - old_cell_start := 506 - write_cell old_buf ~cell_content_start:!old_cell_start ~cell:raw_cell; 507 - let ptr_off = Page.header_size Page.Interior_index + (i * 2) in 508 - Page.set_u16_be old_buf ptr_off !old_cell_start 509 - done; 510 - Page.set_u16_be old_buf 3 split_idx; 511 - Page.set_u16_be old_buf 5 !old_cell_start; 467 + (* Left page *) 468 + let old_buf = 469 + pack_cells ~page_size ~kind:Page.Interior_index ~start:0 ~stop:split_idx 470 + ~copy_cell:raw_interior_cell 471 + in 512 472 Page.set_u32_be old_buf 8 sep_cell.Cell.left_child; 513 473 Pager.write t.pager page_num (Bytes.unsafe_to_string old_buf); 514 - 515 474 { new_page = new_page_num; separator_key } 516 475 517 476 (* Insert a separator into an interior page, potentially splitting *)
+45 -90
lib/table.ml
··· 169 169 (* Split result: new page number and separator rowid *) 170 170 type split_result = { new_page : int; separator_rowid : int64 } 171 171 172 - (* Split a leaf page, returns info about the new page *) 172 + (* Pack cells [start..stop-1] into a fresh page. [copy_cell i] returns bytes. *) 173 + let pack_cells ~page_size ~kind ~copy_cell ~start ~stop = 174 + let buf = Page.init ~page_size ~kind in 175 + let content_start = ref page_size in 176 + for i = start to stop - 1 do 177 + let cell = copy_cell i in 178 + content_start := write_cell buf ~cell_content_start:!content_start ~cell; 179 + let ptr_off = Page.header_size kind + ((i - start) * 2) in 180 + Page.set_u16_be buf ptr_off !content_start 181 + done; 182 + Page.set_u16_be buf 3 (stop - start); 183 + Page.set_u16_be buf 5 !content_start; 184 + buf 185 + 173 186 let split_leaf t page_num = 174 187 let page = Pager.read t.pager page_num in 175 188 let header = Page.parse_header page 0 in 176 189 let ptrs = Page.cell_pointers page 0 header in 177 190 let usable = usable_size t in 178 191 let page_size = Pager.page_size t.pager in 179 - 180 - (* Find split point (middle) *) 181 192 let split_idx = header.Page.cell_count / 2 in 182 - 183 - (* Get separator rowid (first key that goes to right page) *) 184 193 let sep_cell, _ = 185 194 Cell.parse_table_leaf page ptrs.(split_idx) ~usable_size:usable 186 195 in 187 196 let separator_rowid = sep_cell.Cell.rowid in 188 - 189 - (* Create new right page *) 197 + (* Right page: re-encode cells *) 190 198 let new_page_num = Pager.allocate t.pager in 191 - let new_buf = Page.init ~page_size ~kind:Page.Leaf_table in 192 - 193 - (* Copy cells [split_idx..cell_count-1] to new page *) 194 - let new_cell_content_start = ref page_size in 195 - for i = split_idx to header.Page.cell_count - 1 do 196 - let cell_off = ptrs.(i) in 197 - let cell, cell_len = 198 - Cell.parse_table_leaf page cell_off ~usable_size:usable 199 - in 200 - let cell_data = 201 - encode_table_leaf_cell ~rowid:cell.Cell.rowid ~data:cell.Cell.payload 202 - in 203 - new_cell_content_start := 204 - write_cell new_buf ~cell_content_start:!new_cell_content_start 205 - ~cell:cell_data; 206 - let new_idx = i - split_idx in 207 - let ptr_off = Page.header_size Page.Leaf_table + (new_idx * 2) in 208 - Page.set_u16_be new_buf ptr_off !new_cell_content_start; 209 - ignore cell_len 210 - done; 211 - let new_cell_count = header.Page.cell_count - split_idx in 212 - Page.set_u16_be new_buf 3 new_cell_count; 213 - Page.set_u16_be new_buf 5 !new_cell_content_start; 199 + let new_buf = 200 + pack_cells ~page_size ~kind:Page.Leaf_table ~start:split_idx 201 + ~stop:header.Page.cell_count ~copy_cell:(fun i -> 202 + let cell, _ = Cell.parse_table_leaf page ptrs.(i) ~usable_size:usable in 203 + encode_table_leaf_cell ~rowid:cell.Cell.rowid ~data:cell.Cell.payload) 204 + in 214 205 Pager.write t.pager new_page_num (Bytes.unsafe_to_string new_buf); 215 - 216 - (* Compact original page: repack remaining cells [0..split_idx-1] *) 217 - let old_buf = Page.init ~page_size ~kind:Page.Leaf_table in 218 - let old_cell_start = ref page_size in 219 - for i = 0 to split_idx - 1 do 220 - let _cell, cell_size = 221 - Cell.parse_table_leaf page ptrs.(i) ~usable_size:usable 222 - in 223 - let raw_cell = String.sub page ptrs.(i) cell_size in 224 - old_cell_start := 225 - write_cell old_buf ~cell_content_start:!old_cell_start ~cell:raw_cell; 226 - let ptr_off = Page.header_size Page.Leaf_table + (i * 2) in 227 - Page.set_u16_be old_buf ptr_off !old_cell_start 228 - done; 229 - Page.set_u16_be old_buf 3 split_idx; 230 - Page.set_u16_be old_buf 5 !old_cell_start; 206 + (* Left page: raw-copy remaining cells *) 207 + let old_buf = 208 + pack_cells ~page_size ~kind:Page.Leaf_table ~start:0 ~stop:split_idx 209 + ~copy_cell:(fun i -> 210 + let _cell, cell_size = 211 + Cell.parse_table_leaf page ptrs.(i) ~usable_size:usable 212 + in 213 + String.sub page ptrs.(i) cell_size) 214 + in 231 215 Pager.write t.pager page_num (Bytes.unsafe_to_string old_buf); 232 - 233 216 { new_page = new_page_num; separator_rowid } 234 217 235 - (* Split an interior page *) 236 218 let split_interior t page_num = 237 219 let page = Pager.read t.pager page_num in 238 220 let header = Page.parse_header page 0 in 239 221 let ptrs = Page.cell_pointers page 0 header in 240 222 let page_size = Pager.page_size t.pager in 241 - 242 - (* Split point - middle cell becomes separator, doesn't go to either page *) 243 223 let split_idx = header.Page.cell_count / 2 in 244 224 let sep_cell, _ = Cell.parse_table_interior page ptrs.(split_idx) in 245 225 let separator_rowid = sep_cell.Cell.rowid in 246 - 247 - (* Create new right page *) 226 + (* Right page *) 248 227 let new_page_num = Pager.allocate t.pager in 249 - let new_buf = Page.init ~page_size ~kind:Page.Interior_table in 250 - 251 - (* The right child of split cell becomes the left-most child of new page *) 252 - (* Cells [split_idx+1..cell_count-1] go to new page *) 253 - let new_cell_content_start = ref page_size in 254 - for i = split_idx + 1 to header.Page.cell_count - 1 do 255 - let cell_off = ptrs.(i) in 256 - let cell, _ = Cell.parse_table_interior page cell_off in 257 - let cell_data = 258 - encode_table_interior_cell ~left_child:cell.Cell.left_child 259 - ~rowid:cell.Cell.rowid 260 - in 261 - new_cell_content_start := 262 - write_cell new_buf ~cell_content_start:!new_cell_content_start 263 - ~cell:cell_data; 264 - let new_idx = i - split_idx - 1 in 265 - let ptr_off = Page.header_size Page.Interior_table + (new_idx * 2) in 266 - Page.set_u16_be new_buf ptr_off !new_cell_content_start 267 - done; 268 - let new_cell_count = header.Page.cell_count - split_idx - 1 in 269 - Page.set_u16_be new_buf 3 new_cell_count; 270 - Page.set_u16_be new_buf 5 !new_cell_content_start; 271 - (* Right child of new page is same as original *) 228 + let new_buf = 229 + pack_cells ~page_size ~kind:Page.Interior_table ~start:(split_idx + 1) 230 + ~stop:header.Page.cell_count ~copy_cell:(fun i -> 231 + let cell, _ = Cell.parse_table_interior page ptrs.(i) in 232 + encode_table_interior_cell ~left_child:cell.Cell.left_child 233 + ~rowid:cell.Cell.rowid) 234 + in 272 235 Page.set_u32_be new_buf 8 (Option.get header.Page.right_child); 273 236 Pager.write t.pager new_page_num (Bytes.unsafe_to_string new_buf); 274 - 275 - (* Compact original page: repack remaining cells [0..split_idx-1] *) 276 - let old_buf = Page.init ~page_size ~kind:Page.Interior_table in 277 - let old_cell_start = ref page_size in 278 - for i = 0 to split_idx - 1 do 279 - let _cell, cell_size = Cell.parse_table_interior page ptrs.(i) in 280 - let raw_cell = String.sub page ptrs.(i) cell_size in 281 - old_cell_start := 282 - write_cell old_buf ~cell_content_start:!old_cell_start ~cell:raw_cell; 283 - let ptr_off = Page.header_size Page.Interior_table + (i * 2) in 284 - Page.set_u16_be old_buf ptr_off !old_cell_start 285 - done; 286 - Page.set_u16_be old_buf 3 split_idx; 287 - Page.set_u16_be old_buf 5 !old_cell_start; 237 + (* Left page *) 238 + let old_buf = 239 + pack_cells ~page_size ~kind:Page.Interior_table ~start:0 ~stop:split_idx 240 + ~copy_cell:(fun i -> 241 + let _cell, cell_size = Cell.parse_table_interior page ptrs.(i) in 242 + String.sub page ptrs.(i) cell_size) 243 + in 288 244 Page.set_u32_be old_buf 8 sep_cell.Cell.left_child; 289 245 Pager.write t.pager page_num (Bytes.unsafe_to_string old_buf); 290 - 291 246 { new_page = new_page_num; separator_rowid } 292 247 293 248 (* Insert a separator into an interior page, potentially splitting *)