Declarative CSV codecs
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Stream csvt decoder: field-by-field from byte stream

Added streaming CSV tokenizer (module P) that reads field-by-field
from Bytesrw.Bytes.Reader.t with RFC 4180 quoting support.

decode/decode_string now parse directly from the byte stream —
no intermediate string arrays or split_csv_line. Encoder writes
field-by-field to Bytes.Writer.t with proper quoting.

50 tests + 9 fuzz pass.

+240 -50
+6 -3
fuzz/fuzz_csvt.ml
··· 103 103 (* {1 String roundtrip (no commas/newlines)} *) 104 104 105 105 let test_string_roundtrip s = 106 + (* Filter characters that have special meaning in CSV: comma (field separator), 107 + newline/CR (row separator), and double-quote (quoting delimiter per RFC 4180). 108 + These characters require quoting to roundtrip, tested separately. *) 106 109 let s = 107 110 String.to_seq s 108 - |> Seq.filter (fun c -> c <> ',' && c <> '\n' && c <> '\r') 111 + |> Seq.filter (fun c -> c <> ',' && c <> '\n' && c <> '\r' && c <> '"') 109 112 |> String.of_seq 110 113 in 111 114 if String.length s = 0 then () (* empty lines are skipped *) ··· 123 126 let test_pair_roundtrip i s = 124 127 let s = 125 128 String.to_seq s 126 - |> Seq.filter (fun c -> c <> ',' && c <> '\n' && c <> '\r') 129 + |> Seq.filter (fun c -> c <> ',' && c <> '\n' && c <> '\r' && c <> '"') 127 130 |> String.of_seq 128 131 in 129 132 if String.length s = 0 then () ··· 141 144 let test_reorder_invariance i s = 142 145 let s = 143 146 String.to_seq s 144 - |> Seq.filter (fun c -> c <> ',' && c <> '\n' && c <> '\r') 147 + |> Seq.filter (fun c -> c <> ',' && c <> '\n' && c <> '\r' && c <> '"') 145 148 |> String.of_seq 146 149 in 147 150 if String.length s = 0 then ()
+234 -47
lib/csvt.ml
··· 382 382 Array.of_list 383 383 (List.map (fun (Mem_enc mm) -> encode_field mm.type' (mm.enc v)) m.mem_encs) 384 384 385 - (* {1 CSV splitting} *) 385 + (* {1 Streaming CSV tokenizer} 386 + 387 + Reads CSV fields directly from a [Bytesrw.Bytes.Reader.t] without building 388 + intermediate string arrays. Handles RFC 4180 quoting, escaped double-quotes, 389 + and both CRLF and LF line endings. *) 390 + 391 + module P = struct 392 + let buf_size = 8192 393 + 394 + type stream = { 395 + reader : Bytesrw.Bytes.Reader.t; 396 + mutable buf : bytes; 397 + mutable buf_pos : int; 398 + mutable buf_len : int; 399 + mutable eof : bool; 400 + } 401 + 402 + type field_result = Field of string | End_of_row | Eof 403 + 404 + let make_stream reader = 405 + { 406 + reader; 407 + buf = Bytes.create buf_size; 408 + buf_pos = 0; 409 + buf_len = 0; 410 + eof = false; 411 + } 412 + 413 + let refill s = 414 + if not s.eof then begin 415 + let remaining = s.buf_len - s.buf_pos in 416 + if remaining > 0 && s.buf_pos > 0 then 417 + Bytes.blit s.buf s.buf_pos s.buf 0 remaining; 418 + s.buf_pos <- 0; 419 + s.buf_len <- remaining; 420 + let space = Bytes.length s.buf - s.buf_len in 421 + if space > 0 then begin 422 + let slice = Bytesrw.Bytes.Reader.read s.reader in 423 + if Bytesrw.Bytes.Slice.is_eod slice then s.eof <- true 424 + else begin 425 + let src = Bytesrw.Bytes.Slice.bytes slice in 426 + let off = Bytesrw.Bytes.Slice.first slice in 427 + let len = Bytesrw.Bytes.Slice.length slice in 428 + let to_copy = min len space in 429 + Bytes.blit src off s.buf s.buf_len to_copy; 430 + s.buf_len <- s.buf_len + to_copy; 431 + if to_copy < len then 432 + Bytesrw.Bytes.Reader.push_back s.reader 433 + (Bytesrw.Bytes.Slice.make src ~first:(off + to_copy) 434 + ~length:(len - to_copy)) 435 + end 436 + end 437 + end 438 + 439 + let peek s = 440 + if s.buf_pos < s.buf_len then Some (Bytes.get s.buf s.buf_pos) 441 + else if s.eof then None 442 + else begin 443 + refill s; 444 + if s.buf_pos < s.buf_len then Some (Bytes.get s.buf s.buf_pos) else None 445 + end 446 + 447 + let advance s = s.buf_pos <- s.buf_pos + 1 448 + 449 + (* Skip a LF or CRLF line ending. Assumes current char is '\n' or '\r'. *) 450 + let skip_eol s = 451 + match peek s with 452 + | Some '\r' -> ( 453 + advance s; 454 + match peek s with Some '\n' -> advance s | _ -> ()) 455 + | Some '\n' -> advance s 456 + | _ -> () 457 + 458 + (* Read a quoted field: opening quote already consumed. 459 + Handles "" escapes. Returns field content without quotes. *) 460 + let read_quoted s = 461 + let acc = Buffer.create 64 in 462 + let rec loop () = 463 + match peek s with 464 + | None -> 465 + (* EOF inside quoted field — return what we have *) 466 + Buffer.contents acc 467 + | Some '"' -> ( 468 + advance s; 469 + (* Check for escaped quote "" *) 470 + match peek s with 471 + | Some '"' -> 472 + advance s; 473 + Buffer.add_char acc '"'; 474 + loop () 475 + | _ -> 476 + (* End of quoted field — skip to next delimiter *) 477 + Buffer.contents acc) 478 + | Some c -> 479 + advance s; 480 + Buffer.add_char acc c; 481 + loop () 482 + in 483 + loop () 484 + 485 + (* Read an unquoted field. Returns field content. *) 486 + let read_unquoted s = 487 + let acc = Buffer.create 64 in 488 + let rec loop () = 489 + match peek s with 490 + | None | Some ',' | Some '\n' | Some '\r' -> Buffer.contents acc 491 + | Some c -> 492 + advance s; 493 + Buffer.add_char acc c; 494 + loop () 495 + in 496 + loop () 497 + 498 + (* Track whether we're at the start of a row (after End_of_row or start). *) 499 + type row_state = Row_start | In_row 500 + 501 + (* Read the next field from the stream. Returns Field, End_of_row, or Eof. 502 + After End_of_row, the next call starts on the following row. *) 503 + let next_field_with_state s state = 504 + match peek s with 505 + | None -> (Eof, Row_start) 506 + | (Some '\n' | Some '\r') when state = Row_start -> 507 + (* Empty line — skip it *) 508 + skip_eol s; 509 + (End_of_row, Row_start) 510 + | Some ',' when state = In_row -> ( 511 + (* Field separator — consume it, read next field *) 512 + advance s; 513 + match peek s with 514 + | Some '"' -> 515 + advance s; 516 + (Field (read_quoted s), In_row) 517 + | Some '\n' | Some '\r' | None -> 518 + (* Trailing comma — empty field before EOL *) 519 + (Field "", In_row) 520 + | _ -> (Field (read_unquoted s), In_row)) 521 + | (Some '\n' | Some '\r') when state = In_row -> 522 + skip_eol s; 523 + (End_of_row, Row_start) 524 + | Some '"' -> 525 + advance s; 526 + (Field (read_quoted s), In_row) 527 + | _ -> (Field (read_unquoted s), In_row) 386 528 387 - let split_csv_line line = 388 - let fields = String.split_on_char ',' line in 389 - Array.of_list fields 529 + (* Read all fields of a single row into a list. *) 530 + let read_row s = 531 + let rec loop acc state = 532 + match next_field_with_state s state with 533 + | Eof, _ when acc = [] -> None 534 + | Eof, _ -> Some (List.rev acc) 535 + | End_of_row, _ when acc = [] -> 536 + (* Empty line — skip and try next *) 537 + loop acc Row_start 538 + | End_of_row, _ -> Some (List.rev acc) 539 + | Field f, state' -> loop (f :: acc) state' 540 + in 541 + loop [] Row_start 542 + 543 + let read_row_array s = 544 + match read_row s with 545 + | None -> None 546 + | Some fields -> Some (Array.of_list fields) 547 + end 390 548 391 - let parse_header line = 392 - let h = split_csv_line line in 393 - if Array.length h = 0 then Error Missing_header else Ok h 549 + (* {1 Streaming decode} *) 550 + 551 + let fold_reader codec reader f acc = 552 + let s = P.make_stream reader in 553 + match P.read_row_array s with 554 + | None -> Error Missing_header 555 + | Some header -> 556 + if Array.length header = 0 then Error Missing_header 557 + else 558 + let* resolved = resolve codec header in 559 + let rec go acc row = 560 + match P.read_row_array s with 561 + | None -> Ok acc 562 + | Some fields -> 563 + let* v = decode_row resolved row fields in 564 + go (f acc v) (row + 1) 565 + in 566 + go acc 2 567 + 568 + let decode codec reader = 569 + let* rev_list = fold_reader codec reader (fun acc v -> v :: acc) [] in 570 + Ok (List.rev rev_list) 571 + 572 + let decode_string codec s = 573 + let reader = Bytesrw.Bytes.Reader.of_string s in 574 + decode codec reader 394 575 395 576 (* {1 File operations} *) 396 577 397 578 let fold_channel codec ic f acc = 398 - let line = try Some (input_line ic) with End_of_file -> None in 399 - match line with 400 - | None -> Error Missing_header 401 - | Some header_line -> 402 - let* header = parse_header header_line in 403 - let* resolved = resolve codec header in 404 - let rec go acc row = 405 - match input_line ic with 406 - | exception End_of_file -> Ok acc 407 - | "" -> go acc row 408 - | line -> 409 - let fields = split_csv_line line in 410 - let* v = decode_row resolved row fields in 411 - go (f acc v) (row + 1) 412 - in 413 - go acc 2 579 + let reader = Bytesrw.Bytes.Reader.of_in_channel ic in 580 + fold_reader codec reader f acc 414 581 415 582 let decode_channel codec ic = 416 583 let* rev_list = fold_channel codec ic (fun acc v -> v :: acc) [] in ··· 428 595 ~finally:(fun () -> close_in ic) 429 596 (fun () -> decode_channel codec ic) 430 597 431 - let decode_string codec s = 432 - let lines = String.split_on_char '\n' s in 433 - match lines with 434 - | [] | [ "" ] -> Error Missing_header 435 - | header_line :: rest -> 436 - let* header = parse_header header_line in 437 - let* resolved = resolve codec header in 438 - let rec go acc row = function 439 - | [] -> Ok (List.rev acc) 440 - | "" :: tl -> go acc row tl 441 - | line :: tl -> 442 - let fields = split_csv_line line in 443 - let* v = decode_row resolved row fields in 444 - go (v :: acc) (row + 1) tl 445 - in 446 - go [] 2 rest 598 + (* {1 Streaming encode} *) 447 599 448 - (* {1 Streaming I/O} *) 600 + let needs_quoting s = 601 + let n = String.length s in 602 + let rec loop i = 603 + if i >= n then false 604 + else match s.[i] with ',' | '"' | '\n' | '\r' -> true | _ -> loop (i + 1) 605 + in 606 + loop 0 449 607 450 - let decode codec reader = 451 - let s = Bytesrw.Bytes.Reader.to_string reader in 452 - decode_string codec s 608 + let write_quoted_field w s = 609 + w "\""; 610 + let n = String.length s in 611 + let rec loop i start = 612 + if i >= n then (if start < n then w (String.sub s start (n - start))) 613 + else if s.[i] = '"' then ( 614 + if start < i then w (String.sub s start (i - start)); 615 + w "\"\""; 616 + loop (i + 1) (i + 1)) 617 + else loop (i + 1) start 618 + in 619 + loop 0 0; 620 + w "\"" 621 + 622 + let write_field w s = if needs_quoting s then write_quoted_field w s else w s 453 623 454 624 let encode codec rows writer = 455 625 let w s = Bytesrw.Bytes.Writer.write_string writer s in 456 - let header = encode_header codec in 457 - w (String.concat "," (Array.to_list header)); 626 + let m = as_obj codec in 627 + (* Write header *) 628 + let mem_encs = m.mem_encs in 629 + let rec write_header_fields first = function 630 + | [] -> () 631 + | Mem_enc mm :: rest -> 632 + if not first then w ","; 633 + write_field w mm.name; 634 + write_header_fields false rest 635 + in 636 + write_header_fields true mem_encs; 458 637 w "\n"; 638 + (* Write data rows *) 459 639 List.iter 460 640 (fun row -> 461 - let fields = encode_row codec row in 462 - w (String.concat "," (Array.to_list fields)); 641 + let rec write_row_fields first = function 642 + | [] -> () 643 + | Mem_enc mm :: rest -> 644 + if not first then w ","; 645 + let v = encode_field mm.type' (mm.enc row) in 646 + write_field w v; 647 + write_row_fields false rest 648 + in 649 + write_row_fields true mem_encs; 463 650 w "\n") 464 651 rows 465 652