My working unpac space for OCaml projects in development
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add streaming API and framing format support

Implements the Snappy framing format for streaming compression/decompression:
- CRC32-C checksums for data integrity (with masking per spec)
- Stream identifier chunk (0xff with "sNaPpY" magic)
- Compressed (0x00) and uncompressed (0x01) data chunks
- 64KB block size for memory-efficient processing
- Streaming API with feed/finish pattern for incremental processing

The streaming API allows processing gigabyte-scale files without buffering
the entire input in memory. Each 64KB block is compressed independently
and wrapped with a CRC32-C checksum for integrity verification.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+728
+404
src/snappy.ml
··· 518 518 match get_uncompressed_length src ~pos:0 ~len:(String.length s) with 519 519 | None -> false 520 520 | Some len -> len >= 0 && len <= 0xFFFFFFFF 521 + 522 + (* ============================================================ 523 + CRC32-C Implementation 524 + ============================================================ *) 525 + 526 + (* CRC32-C polynomial: 0x82F63B78 (bit-reversed 0x1EDC6F41) *) 527 + (* Pre-computed table for CRC32-C *) 528 + let crc32c_table = 529 + let table = Array.make 256 0l in 530 + for i = 0 to 255 do 531 + let crc = ref (Int32.of_int i) in 532 + for _ = 0 to 7 do 533 + if Int32.logand !crc 1l <> 0l then 534 + crc := Int32.logxor (Int32.shift_right_logical !crc 1) 0x82F63B78l 535 + else 536 + crc := Int32.shift_right_logical !crc 1 537 + done; 538 + table.(i) <- !crc 539 + done; 540 + table 541 + 542 + let crc32c_update crc buf ~pos ~len = 543 + let crc = ref crc in 544 + for i = pos to pos + len - 1 do 545 + let b = Char.code (Bytes.unsafe_get buf i) in 546 + let idx = Int32.to_int (Int32.logand (Int32.logxor !crc (Int32.of_int b)) 0xFFl) in 547 + crc := Int32.logxor (Int32.shift_right_logical !crc 8) (Array.unsafe_get crc32c_table idx) 548 + done; 549 + !crc 550 + 551 + let crc32c buf ~pos ~len = 552 + let crc = crc32c_update 0xFFFFFFFFl buf ~pos ~len in 553 + Int32.logxor crc 0xFFFFFFFFl 554 + 555 + (* Mask checksum as per Snappy framing format *) 556 + let mask_checksum crc = 557 + let rotated = Int32.logor 558 + (Int32.shift_right_logical crc 15) 559 + (Int32.shift_left crc 17) in 560 + Int32.add rotated 0xa282ead8l 561 + 562 + let unmask_checksum masked = 563 + let unrotated = Int32.sub masked 0xa282ead8l in 564 + Int32.logor 565 + (Int32.shift_left unrotated 15) 566 + (Int32.shift_right_logical unrotated 17) 567 + 568 + (* ============================================================ 569 + Streaming API 570 + ============================================================ *) 571 + 572 + (* Block size for streaming - 64KB as per Snappy framing format *) 573 + let stream_block_size = 65536 574 + 575 + (* Streaming compression state *) 576 + type compress_stream = { 577 + cs_buffer : bytes; 578 + mutable cs_buffer_pos : int; 579 + cs_output : bytes -> int -> int -> unit; (* callback: buf, pos, len *) 580 + mutable cs_started : bool; (* whether stream identifier has been written *) 581 + } 582 + 583 + (* Streaming decompression state *) 584 + type decompress_stream = { 585 + ds_buffer : bytes; 586 + mutable ds_buffer_pos : int; 587 + ds_output : bytes -> int -> int -> unit; (* callback: buf, pos, len *) 588 + mutable ds_state : decompress_state; 589 + mutable ds_chunk_type : int; 590 + mutable ds_chunk_len : int; 591 + mutable ds_chunk_pos : int; 592 + mutable ds_chunk_buf : bytes; 593 + } 594 + 595 + and decompress_state = 596 + | DS_Header (* Waiting for stream identifier *) 597 + | DS_ChunkHeader (* Reading chunk type + length *) 598 + | DS_ChunkData (* Reading chunk data *) 599 + 600 + (* Stream identifier for framing format *) 601 + let stream_identifier = "\xff\x06\x00\x00sNaPpY" 602 + 603 + (* Chunk types *) 604 + let chunk_compressed = 0x00 605 + let chunk_uncompressed = 0x01 606 + let chunk_padding = 0xfe 607 + let chunk_stream_id = 0xff 608 + 609 + (* Create a new compression stream *) 610 + let create_compress_stream ~output = 611 + { 612 + cs_buffer = Bytes.create stream_block_size; 613 + cs_buffer_pos = 0; 614 + cs_output = output; 615 + cs_started = false; 616 + } 617 + 618 + (* Write the stream identifier *) 619 + let write_stream_identifier cs = 620 + if not cs.cs_started then begin 621 + cs.cs_output (Bytes.unsafe_of_string stream_identifier) 0 10; 622 + cs.cs_started <- true 623 + end 624 + 625 + (* Flush a block of data *) 626 + let flush_block cs = 627 + if cs.cs_buffer_pos > 0 then begin 628 + write_stream_identifier cs; 629 + 630 + let src = cs.cs_buffer in 631 + let src_len = cs.cs_buffer_pos in 632 + 633 + (* Calculate CRC of uncompressed data *) 634 + let crc = crc32c src ~pos:0 ~len:src_len in 635 + let masked_crc = mask_checksum crc in 636 + 637 + (* Compress the block *) 638 + let max_len = max_compressed_length src_len in 639 + let compressed = Bytes.create max_len in 640 + let compressed_len = compress_into ~src ~src_pos:0 ~src_len ~dst:compressed ~dst_pos:0 in 641 + 642 + (* Choose compressed or uncompressed based on size *) 643 + if compressed_len < src_len then begin 644 + (* Write compressed chunk: type (1) + length (3) + crc (4) + data *) 645 + let chunk_len = 4 + compressed_len in 646 + let header = Bytes.create 8 in 647 + set_u8 header 0 chunk_compressed; 648 + set_u8 header 1 (chunk_len land 0xFF); 649 + set_u8 header 2 ((chunk_len lsr 8) land 0xFF); 650 + set_u8 header 3 ((chunk_len lsr 16) land 0xFF); 651 + set_u32_le header 4 (Int32.to_int masked_crc); 652 + cs.cs_output header 0 8; 653 + cs.cs_output compressed 0 compressed_len 654 + end else begin 655 + (* Write uncompressed chunk *) 656 + let chunk_len = 4 + src_len in 657 + let header = Bytes.create 8 in 658 + set_u8 header 0 chunk_uncompressed; 659 + set_u8 header 1 (chunk_len land 0xFF); 660 + set_u8 header 2 ((chunk_len lsr 8) land 0xFF); 661 + set_u8 header 3 ((chunk_len lsr 16) land 0xFF); 662 + set_u32_le header 4 (Int32.to_int masked_crc); 663 + cs.cs_output header 0 8; 664 + cs.cs_output src 0 src_len 665 + end; 666 + 667 + cs.cs_buffer_pos <- 0 668 + end 669 + 670 + (* Feed data to compression stream *) 671 + let compress_stream_feed cs data ~pos ~len = 672 + let remaining = ref len in 673 + let src_pos = ref pos in 674 + 675 + while !remaining > 0 do 676 + let space = stream_block_size - cs.cs_buffer_pos in 677 + let to_copy = min space !remaining in 678 + Bytes.blit data !src_pos cs.cs_buffer cs.cs_buffer_pos to_copy; 679 + cs.cs_buffer_pos <- cs.cs_buffer_pos + to_copy; 680 + src_pos := !src_pos + to_copy; 681 + remaining := !remaining - to_copy; 682 + 683 + if cs.cs_buffer_pos >= stream_block_size then 684 + flush_block cs 685 + done 686 + 687 + (* Finish compression stream *) 688 + let compress_stream_finish cs = 689 + (* Always write stream identifier even for empty input *) 690 + write_stream_identifier cs; 691 + flush_block cs 692 + 693 + (* Compress stream from string *) 694 + let compress_stream_string ~output s = 695 + let cs = create_compress_stream ~output in 696 + compress_stream_feed cs (Bytes.unsafe_of_string s) ~pos:0 ~len:(String.length s); 697 + compress_stream_finish cs 698 + 699 + (* ============================================================ 700 + Streaming Decompression 701 + ============================================================ *) 702 + 703 + (* Framing format errors *) 704 + type framing_error = 705 + | Missing_stream_identifier 706 + | Invalid_stream_identifier 707 + | Invalid_chunk_type of int 708 + | Checksum_mismatch 709 + | Chunk_too_large 710 + | Decompression_failed of error 711 + 712 + exception Framing_error of framing_error 713 + 714 + let framing_error_to_string = function 715 + | Missing_stream_identifier -> "Missing stream identifier" 716 + | Invalid_stream_identifier -> "Invalid stream identifier" 717 + | Invalid_chunk_type n -> Printf.sprintf "Invalid chunk type: 0x%02x" n 718 + | Checksum_mismatch -> "Checksum mismatch" 719 + | Chunk_too_large -> "Chunk too large" 720 + | Decompression_failed e -> "Decompression failed: " ^ error_to_string e 721 + 722 + (* Create a new decompression stream *) 723 + let create_decompress_stream ~output = 724 + { 725 + ds_buffer = Bytes.create 16; (* For reading headers *) 726 + ds_buffer_pos = 0; 727 + ds_output = output; 728 + ds_state = DS_Header; 729 + ds_chunk_type = 0; 730 + ds_chunk_len = 0; 731 + ds_chunk_pos = 0; 732 + ds_chunk_buf = Bytes.create 0; 733 + } 734 + 735 + (* Process buffered chunk data *) 736 + let process_chunk ds = 737 + match ds.ds_chunk_type with 738 + | t when t = chunk_stream_id -> 739 + (* Verify stream identifier *) 740 + if ds.ds_chunk_len <> 6 then 741 + raise (Framing_error Invalid_stream_identifier); 742 + let expected = "sNaPpY" in 743 + for i = 0 to 5 do 744 + if Bytes.get ds.ds_chunk_buf i <> String.get expected i then 745 + raise (Framing_error Invalid_stream_identifier) 746 + done; 747 + ds.ds_state <- DS_ChunkHeader 748 + 749 + | t when t = chunk_compressed -> 750 + if ds.ds_chunk_len < 4 then 751 + raise (Framing_error (Decompression_failed Truncated_input)); 752 + 753 + (* Read masked CRC *) 754 + let masked_crc = Int32.of_int (get_u32_le ds.ds_chunk_buf 0) in 755 + let expected_crc = unmask_checksum masked_crc in 756 + 757 + (* Decompress *) 758 + let compressed_len = ds.ds_chunk_len - 4 in 759 + begin 760 + try 761 + let decompressed = decompress_to_bytes ds.ds_chunk_buf ~pos:4 ~len:compressed_len in 762 + 763 + (* Verify CRC *) 764 + let actual_crc = crc32c decompressed ~pos:0 ~len:(Bytes.length decompressed) in 765 + if actual_crc <> expected_crc then 766 + raise (Framing_error Checksum_mismatch); 767 + 768 + (* Output decompressed data *) 769 + ds.ds_output decompressed 0 (Bytes.length decompressed) 770 + with Snappy_error e -> 771 + raise (Framing_error (Decompression_failed e)) 772 + end; 773 + ds.ds_state <- DS_ChunkHeader 774 + 775 + | t when t = chunk_uncompressed -> 776 + if ds.ds_chunk_len < 4 then 777 + raise (Framing_error (Decompression_failed Truncated_input)); 778 + 779 + (* Read masked CRC *) 780 + let masked_crc = Int32.of_int (get_u32_le ds.ds_chunk_buf 0) in 781 + let expected_crc = unmask_checksum masked_crc in 782 + 783 + let data_len = ds.ds_chunk_len - 4 in 784 + 785 + (* Verify CRC *) 786 + let actual_crc = crc32c ds.ds_chunk_buf ~pos:4 ~len:data_len in 787 + if actual_crc <> expected_crc then 788 + raise (Framing_error Checksum_mismatch); 789 + 790 + (* Output uncompressed data *) 791 + ds.ds_output ds.ds_chunk_buf 4 data_len; 792 + ds.ds_state <- DS_ChunkHeader 793 + 794 + | t when t = chunk_padding -> 795 + (* Ignore padding *) 796 + ds.ds_state <- DS_ChunkHeader 797 + 798 + | t when t >= 0x02 && t <= 0x7f -> 799 + (* Reserved unskippable chunk *) 800 + raise (Framing_error (Invalid_chunk_type t)) 801 + 802 + | t when t >= 0x80 && t <= 0xfd -> 803 + (* Reserved skippable chunk - ignore *) 804 + ds.ds_state <- DS_ChunkHeader 805 + 806 + | t -> 807 + raise (Framing_error (Invalid_chunk_type t)) 808 + 809 + (* Feed data to decompression stream *) 810 + let decompress_stream_feed ds data ~pos ~len = 811 + let remaining = ref len in 812 + let src_pos = ref pos in 813 + 814 + while !remaining > 0 do 815 + match ds.ds_state with 816 + | DS_Header -> 817 + (* Need 10 bytes for stream identifier *) 818 + let need = 10 - ds.ds_buffer_pos in 819 + let to_copy = min need !remaining in 820 + Bytes.blit data !src_pos ds.ds_buffer ds.ds_buffer_pos to_copy; 821 + ds.ds_buffer_pos <- ds.ds_buffer_pos + to_copy; 822 + src_pos := !src_pos + to_copy; 823 + remaining := !remaining - to_copy; 824 + 825 + if ds.ds_buffer_pos >= 10 then begin 826 + (* Verify stream identifier *) 827 + let expected = stream_identifier in 828 + for i = 0 to 9 do 829 + if Bytes.get ds.ds_buffer i <> String.get expected i then 830 + raise (Framing_error Invalid_stream_identifier) 831 + done; 832 + ds.ds_buffer_pos <- 0; 833 + ds.ds_state <- DS_ChunkHeader 834 + end 835 + 836 + | DS_ChunkHeader -> 837 + (* Need 4 bytes for chunk header *) 838 + let need = 4 - ds.ds_buffer_pos in 839 + let to_copy = min need !remaining in 840 + Bytes.blit data !src_pos ds.ds_buffer ds.ds_buffer_pos to_copy; 841 + ds.ds_buffer_pos <- ds.ds_buffer_pos + to_copy; 842 + src_pos := !src_pos + to_copy; 843 + remaining := !remaining - to_copy; 844 + 845 + if ds.ds_buffer_pos >= 4 then begin 846 + ds.ds_chunk_type <- get_u8 ds.ds_buffer 0; 847 + ds.ds_chunk_len <- get_u8 ds.ds_buffer 1 848 + lor (get_u8 ds.ds_buffer 2 lsl 8) 849 + lor (get_u8 ds.ds_buffer 3 lsl 16); 850 + 851 + (* Check chunk size limits *) 852 + if ds.ds_chunk_len > 16777215 then 853 + raise (Framing_error Chunk_too_large); 854 + 855 + (* For stream identifier, chunk_len should be 6 *) 856 + if ds.ds_chunk_type = chunk_stream_id && ds.ds_chunk_len <> 6 then 857 + raise (Framing_error Invalid_stream_identifier); 858 + 859 + if ds.ds_chunk_len = 0 then begin 860 + (* Empty chunk *) 861 + ds.ds_buffer_pos <- 0; 862 + process_chunk ds 863 + end else begin 864 + (* Allocate buffer for chunk data *) 865 + if Bytes.length ds.ds_chunk_buf < ds.ds_chunk_len then 866 + ds.ds_chunk_buf <- Bytes.create (max ds.ds_chunk_len (stream_block_size + 1024)); 867 + ds.ds_chunk_pos <- 0; 868 + ds.ds_buffer_pos <- 0; 869 + ds.ds_state <- DS_ChunkData 870 + end 871 + end 872 + 873 + | DS_ChunkData -> 874 + let need = ds.ds_chunk_len - ds.ds_chunk_pos in 875 + let to_copy = min need !remaining in 876 + Bytes.blit data !src_pos ds.ds_chunk_buf ds.ds_chunk_pos to_copy; 877 + ds.ds_chunk_pos <- ds.ds_chunk_pos + to_copy; 878 + src_pos := !src_pos + to_copy; 879 + remaining := !remaining - to_copy; 880 + 881 + if ds.ds_chunk_pos >= ds.ds_chunk_len then 882 + process_chunk ds 883 + done 884 + 885 + (* Check if decompression stream is in a valid final state *) 886 + let decompress_stream_is_complete ds = 887 + match ds.ds_state with 888 + | DS_ChunkHeader -> true 889 + | _ -> false 890 + 891 + (* ============================================================ 892 + Framing Format: Complete String API 893 + ============================================================ *) 894 + 895 + (* Compress with framing format *) 896 + let compress_framed s = 897 + let buf = Buffer.create (String.length s + 100) in 898 + let output bytes pos len = 899 + Buffer.add_subbytes buf bytes pos len 900 + in 901 + compress_stream_string ~output s; 902 + Buffer.contents buf 903 + 904 + (* Decompress framed data *) 905 + let decompress_framed s = 906 + let buf = Buffer.create (String.length s) in 907 + let output bytes pos len = 908 + Buffer.add_subbytes buf bytes pos len 909 + in 910 + let ds = create_decompress_stream ~output in 911 + try 912 + decompress_stream_feed ds (Bytes.unsafe_of_string s) ~pos:0 ~len:(String.length s); 913 + if decompress_stream_is_complete ds then 914 + Ok (Buffer.contents buf) 915 + else 916 + Error "Incomplete framed data" 917 + with 918 + | Framing_error e -> Error (framing_error_to_string e) 919 + | Snappy_error e -> Error (error_to_string e) 920 + 921 + (* Check if data looks like framed snappy data *) 922 + let is_framed_format s = 923 + String.length s >= 10 && 924 + String.sub s 0 10 = stream_identifier
+142
src/snappy.mli
··· 136 136 at [pos] with [len] bytes available. 137 137 Returns [(value, bytes_consumed)]. 138 138 @raise Snappy_error if the varint is truncated or invalid. *) 139 + 140 + (** {1 CRC32-C Checksum} 141 + 142 + CRC32-C is used by the Snappy framing format for data integrity. *) 143 + 144 + val crc32c : bytes -> pos:int -> len:int -> int32 145 + (** [crc32c buf ~pos ~len] computes the CRC32-C checksum of [len] bytes 146 + from [buf] starting at [pos]. *) 147 + 148 + val mask_checksum : int32 -> int32 149 + (** [mask_checksum crc] applies the Snappy framing format mask to a CRC32-C 150 + checksum. The masking prevents issues when checksumming data that 151 + contains its own checksum. *) 152 + 153 + val unmask_checksum : int32 -> int32 154 + (** [unmask_checksum masked] reverses the masking applied by {!mask_checksum}. *) 155 + 156 + (** {1 Streaming API} 157 + 158 + The streaming API allows processing large data without holding the 159 + entire input or output in memory. Data is processed in 64KB blocks 160 + using the Snappy framing format. 161 + 162 + {2 Streaming Compression} 163 + 164 + {[ 165 + (* Create a stream that writes to a channel *) 166 + let cs = Snappy.create_compress_stream ~output:(fun buf pos len -> 167 + output oc buf pos len 168 + ) in 169 + 170 + (* Feed data in chunks *) 171 + Snappy.compress_stream_feed cs data ~pos:0 ~len:(Bytes.length data); 172 + 173 + (* Finish the stream *) 174 + Snappy.compress_stream_finish cs 175 + ]} 176 + 177 + {2 Streaming Decompression} 178 + 179 + {[ 180 + let ds = Snappy.create_decompress_stream ~output:(fun buf pos len -> 181 + Buffer.add_subbytes result buf pos len 182 + ) in 183 + Snappy.decompress_stream_feed ds compressed ~pos:0 ~len; 184 + if not (Snappy.decompress_stream_is_complete ds) then 185 + failwith "incomplete stream" 186 + ]} *) 187 + 188 + (** Streaming compression state. *) 189 + type compress_stream 190 + 191 + (** Streaming decompression state. *) 192 + type decompress_stream 193 + 194 + val stream_block_size : int 195 + (** Block size for streaming (65536 bytes). Each block is compressed 196 + independently and wrapped in a framing format chunk. *) 197 + 198 + val create_compress_stream : output:(bytes -> int -> int -> unit) -> compress_stream 199 + (** [create_compress_stream ~output] creates a new compression stream. 200 + The [output] callback is called with compressed chunks as they are 201 + produced. The callback receives the buffer, position, and length. *) 202 + 203 + val compress_stream_feed : compress_stream -> bytes -> pos:int -> len:int -> unit 204 + (** [compress_stream_feed cs data ~pos ~len] feeds [len] bytes of data 205 + from [data] starting at [pos] to the compression stream. The stream's 206 + output callback may be called zero or more times. *) 207 + 208 + val compress_stream_finish : compress_stream -> unit 209 + (** [compress_stream_finish cs] flushes any remaining data and finishes 210 + the compression stream. The output callback will be called for any 211 + remaining data. *) 212 + 213 + val create_decompress_stream : output:(bytes -> int -> int -> unit) -> decompress_stream 214 + (** [create_decompress_stream ~output] creates a new decompression stream. 215 + The [output] callback is called with decompressed data as it becomes 216 + available. *) 217 + 218 + val decompress_stream_feed : decompress_stream -> bytes -> pos:int -> len:int -> unit 219 + (** [decompress_stream_feed ds data ~pos ~len] feeds [len] bytes of 220 + compressed framed data to the decompression stream. 221 + @raise Framing_error on invalid framed data. *) 222 + 223 + val decompress_stream_is_complete : decompress_stream -> bool 224 + (** [decompress_stream_is_complete ds] returns [true] if the stream is 225 + in a valid final state (at a chunk boundary). *) 226 + 227 + (** {1 Snappy Framing Format} 228 + 229 + The framing format wraps Snappy-compressed blocks with headers and 230 + CRC32-C checksums for streaming and integrity checking. 231 + 232 + {2 Format Structure} 233 + 234 + A framed stream consists of: 235 + - Stream identifier chunk (0xff): "sNaPpY" magic bytes 236 + - Data chunks (0x00 for compressed, 0x01 for uncompressed) 237 + - Optional padding chunks (0xfe) 238 + 239 + Each data chunk contains: 240 + - 1-byte chunk type 241 + - 3-byte little-endian chunk length 242 + - 4-byte masked CRC32-C of uncompressed data 243 + - Compressed or uncompressed data (max 65536 bytes uncompressed) 244 + 245 + {2 File Extension} 246 + 247 + The recommended file extension for framed Snappy data is [.sz]. *) 248 + 249 + (** Framing format error types *) 250 + type framing_error = 251 + | Missing_stream_identifier (** No stream identifier at start *) 252 + | Invalid_stream_identifier (** Stream identifier is malformed *) 253 + | Invalid_chunk_type of int (** Unknown or reserved chunk type *) 254 + | Checksum_mismatch (** CRC32-C verification failed *) 255 + | Chunk_too_large (** Chunk exceeds maximum size *) 256 + | Decompression_failed of error (** Inner Snappy decompression error *) 257 + 258 + exception Framing_error of framing_error 259 + (** Exception raised when framing format parsing fails. *) 260 + 261 + val framing_error_to_string : framing_error -> string 262 + (** Convert a framing error to a human-readable string. *) 263 + 264 + val stream_identifier : string 265 + (** The 10-byte stream identifier that starts every framed stream: 266 + [\xff\x06\x00\x00sNaPpY]. *) 267 + 268 + val compress_framed : string -> string 269 + (** [compress_framed s] compresses string [s] using the Snappy framing 270 + format. The result includes the stream identifier and CRC32-C 271 + checksums for each block. *) 272 + 273 + val decompress_framed : string -> (string, string) result 274 + (** [decompress_framed s] decompresses framed Snappy data. 275 + Returns [Ok original] on success or [Error msg] on failure. 276 + Verifies CRC32-C checksums for each block. *) 277 + 278 + val is_framed_format : string -> bool 279 + (** [is_framed_format s] returns [true] if [s] starts with the Snappy 280 + framing format stream identifier. *)
+182
test/test_snappy.ml
··· 439 439 ) [59; 60; 61; 255; 256; 257; 65535; 65536; 65537] 440 440 441 441 (* ============================================================ 442 + CRC32-C tests 443 + ============================================================ *) 444 + 445 + let test_crc32c_empty () = 446 + let crc = Snappy.crc32c (Bytes.create 0) ~pos:0 ~len:0 in 447 + check int32 "crc32c empty" 0l crc 448 + 449 + let test_crc32c_known_values () = 450 + (* Test vector: "123456789" should give 0xE3069283 *) 451 + let data = Bytes.of_string "123456789" in 452 + let crc = Snappy.crc32c data ~pos:0 ~len:9 in 453 + check int32 "crc32c 123456789" 0xE3069283l crc 454 + 455 + let test_crc32c_mask_unmask () = 456 + let original = 0x12345678l in 457 + let masked = Snappy.mask_checksum original in 458 + let unmasked = Snappy.unmask_checksum masked in 459 + check int32 "mask/unmask roundtrip" original unmasked 460 + 461 + (* ============================================================ 462 + Framing format tests 463 + ============================================================ *) 464 + 465 + let test_framed_empty () = 466 + let compressed = Snappy.compress_framed "" in 467 + check bool "starts with stream id" true (Snappy.is_framed_format compressed); 468 + match Snappy.decompress_framed compressed with 469 + | Ok result -> check string_testable "framed empty roundtrip" "" result 470 + | Error msg -> fail msg 471 + 472 + let test_framed_small () = 473 + let s = "Hello, World!" in 474 + let compressed = Snappy.compress_framed s in 475 + check bool "is framed format" true (Snappy.is_framed_format compressed); 476 + match Snappy.decompress_framed compressed with 477 + | Ok result -> check string_testable "framed small roundtrip" s result 478 + | Error msg -> fail msg 479 + 480 + let test_framed_large () = 481 + (* Test with data larger than one block (64KB) *) 482 + let s = make_repeated 200000 "ABCDEFGHIJ" in 483 + let compressed = Snappy.compress_framed s in 484 + check bool "is framed format" true (Snappy.is_framed_format compressed); 485 + match Snappy.decompress_framed compressed with 486 + | Ok result -> check string_testable "framed large roundtrip" s result 487 + | Error msg -> fail msg 488 + 489 + let test_framed_random () = 490 + let s = make_random 100000 12345 in 491 + let compressed = Snappy.compress_framed s in 492 + match Snappy.decompress_framed compressed with 493 + | Ok result -> check string_testable "framed random roundtrip" s result 494 + | Error msg -> fail msg 495 + 496 + let test_framed_invalid_stream_id () = 497 + (* Invalid stream identifier *) 498 + let bad = "\xff\x06\x00\x00NOTSNAPPY" in 499 + match Snappy.decompress_framed bad with 500 + | Ok _ -> fail "Should have failed on invalid stream id" 501 + | Error _ -> () 502 + 503 + let test_framed_truncated () = 504 + (* Truncated stream identifier *) 505 + let bad = "\xff\x06\x00" in 506 + match Snappy.decompress_framed bad with 507 + | Ok _ -> fail "Should have failed on truncated data" 508 + | Error _ -> () 509 + 510 + (* ============================================================ 511 + Streaming API tests 512 + ============================================================ *) 513 + 514 + let test_streaming_compress_small () = 515 + let s = "Hello, streaming world!" in 516 + let buf = Buffer.create 100 in 517 + let output bytes pos len = 518 + Buffer.add_subbytes buf bytes pos len 519 + in 520 + let cs = Snappy.create_compress_stream ~output in 521 + Snappy.compress_stream_feed cs (Bytes.unsafe_of_string s) ~pos:0 ~len:(String.length s); 522 + Snappy.compress_stream_finish cs; 523 + let compressed = Buffer.contents buf in 524 + 525 + check bool "streaming produces framed format" true (Snappy.is_framed_format compressed); 526 + match Snappy.decompress_framed compressed with 527 + | Ok result -> check string_testable "streaming roundtrip" s result 528 + | Error msg -> fail msg 529 + 530 + let test_streaming_compress_multi_block () = 531 + (* Data spanning multiple 64KB blocks *) 532 + let s = make_repeated 200000 "STREAMING" in 533 + let buf = Buffer.create (String.length s) in 534 + let output bytes pos len = 535 + Buffer.add_subbytes buf bytes pos len 536 + in 537 + let cs = Snappy.create_compress_stream ~output in 538 + 539 + (* Feed in chunks of various sizes *) 540 + let pos = ref 0 in 541 + let chunk_sizes = [1000; 5000; 20000; 50000; 10000; 100000; 14000] in 542 + List.iter (fun chunk_size -> 543 + let remaining = String.length s - !pos in 544 + let to_feed = min chunk_size remaining in 545 + if to_feed > 0 then begin 546 + Snappy.compress_stream_feed cs (Bytes.unsafe_of_string s) ~pos:!pos ~len:to_feed; 547 + pos := !pos + to_feed 548 + end 549 + ) chunk_sizes; 550 + 551 + Snappy.compress_stream_finish cs; 552 + let compressed = Buffer.contents buf in 553 + 554 + match Snappy.decompress_framed compressed with 555 + | Ok result -> check string_testable "multi-block streaming roundtrip" s result 556 + | Error msg -> fail msg 557 + 558 + let test_streaming_decompress () = 559 + (* First compress with framing format *) 560 + let s = make_repeated 150000 "DECOMPRESS" in 561 + let compressed = Snappy.compress_framed s in 562 + 563 + (* Now decompress using streaming API *) 564 + let buf = Buffer.create (String.length s) in 565 + let output bytes pos len = 566 + Buffer.add_subbytes buf bytes pos len 567 + in 568 + let ds = Snappy.create_decompress_stream ~output in 569 + 570 + (* Feed compressed data in small chunks *) 571 + let chunk_size = 1024 in 572 + let pos = ref 0 in 573 + while !pos < String.length compressed do 574 + let remaining = String.length compressed - !pos in 575 + let to_feed = min chunk_size remaining in 576 + Snappy.decompress_stream_feed ds (Bytes.unsafe_of_string compressed) ~pos:!pos ~len:to_feed; 577 + pos := !pos + to_feed 578 + done; 579 + 580 + check bool "decompress stream complete" true (Snappy.decompress_stream_is_complete ds); 581 + check string_testable "streaming decompress roundtrip" s (Buffer.contents buf) 582 + 583 + let test_streaming_byte_by_byte () = 584 + let s = "Byte by byte test" in 585 + let compressed = Snappy.compress_framed s in 586 + 587 + let buf = Buffer.create (String.length s) in 588 + let output bytes pos len = 589 + Buffer.add_subbytes buf bytes pos len 590 + in 591 + let ds = Snappy.create_decompress_stream ~output in 592 + 593 + (* Feed one byte at a time *) 594 + for i = 0 to String.length compressed - 1 do 595 + Snappy.decompress_stream_feed ds (Bytes.unsafe_of_string compressed) ~pos:i ~len:1 596 + done; 597 + 598 + check bool "byte-by-byte complete" true (Snappy.decompress_stream_is_complete ds); 599 + check string_testable "byte-by-byte roundtrip" s (Buffer.contents buf) 600 + 601 + (* ============================================================ 442 602 Test runner 443 603 ============================================================ *) 444 604 ··· 536 696 "all bytes", `Quick, test_all_bytes; 537 697 "binary data", `Quick, test_binary_data; 538 698 "max literal lengths", `Quick, test_max_literal_lengths; 699 + ]; 700 + 701 + "crc32c", [ 702 + "empty", `Quick, test_crc32c_empty; 703 + "known values", `Quick, test_crc32c_known_values; 704 + "mask/unmask roundtrip", `Quick, test_crc32c_mask_unmask; 705 + ]; 706 + 707 + "framing", [ 708 + "framed empty", `Quick, test_framed_empty; 709 + "framed small", `Quick, test_framed_small; 710 + "framed large", `Quick, test_framed_large; 711 + "framed random", `Quick, test_framed_random; 712 + "invalid stream id", `Quick, test_framed_invalid_stream_id; 713 + "truncated", `Quick, test_framed_truncated; 714 + ]; 715 + 716 + "streaming", [ 717 + "compress small", `Quick, test_streaming_compress_small; 718 + "compress multi-block", `Quick, test_streaming_compress_multi_block; 719 + "decompress", `Quick, test_streaming_decompress; 720 + "byte by byte", `Quick, test_streaming_byte_by_byte; 539 721 ]; 540 722 ]