(*--------------------------------------------------------------------------- Copyright (c) 2025 Thomas Gazagnaire. All rights reserved. SPDX-License-Identifier: MIT ---------------------------------------------------------------------------*) let temp_db prefix = let path = Filename.temp_file ("sqlite_" ^ prefix ^ "_") ".db" in Sys.remove path; path let with_temp_db f = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "test") in Eio.Switch.run @@ fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Fun.protect ~finally:(fun () -> Sqlite.close db) (fun () -> f fs db) (* Basic operations *) let test_put_get () = with_temp_db @@ fun _fs db -> Sqlite.put db "key1" "value1"; let result = Sqlite.find db "key1" in Alcotest.(check (option string)) "get returns put value" (Some "value1") result let test_get_missing () = with_temp_db @@ fun _fs db -> let result = Sqlite.find db "nonexistent" in Alcotest.(check (option string)) "missing key returns None" None result let test_put_overwrite () = with_temp_db @@ fun _fs db -> Sqlite.put db "key1" "value1"; Sqlite.put db "key1" "value2"; let result = Sqlite.find db "key1" in Alcotest.(check (option string)) "overwrite works" (Some "value2") result let test_delete () = with_temp_db @@ fun _fs db -> Sqlite.put db "key1" "value1"; Sqlite.delete db "key1"; let result = Sqlite.find db "key1" in Alcotest.(check (option string)) "delete removes key" None result let test_delete_missing () = with_temp_db @@ fun _fs db -> (* Should not raise *) Sqlite.delete db "nonexistent"; Alcotest.(check bool) "delete missing key is no-op" true true let test_mem () = with_temp_db @@ fun _fs db -> Sqlite.put db "key1" "value1"; Alcotest.(check bool) "mem finds existing key" true (Sqlite.mem db "key1"); Alcotest.(check bool) "mem returns false for missing" false (Sqlite.mem db "missing") let test_iter () = with_temp_db @@ fun _fs db -> Sqlite.put db "a" "1"; Sqlite.put db "b" "2"; Sqlite.put db "c" "3"; let items = ref [] in Sqlite.iter db ~f:(fun k v -> items := (k, v) :: !items); let sorted = List.sort compare !items in Alcotest.(check (list (pair string string))) "iter visits all entries" [ ("a", "1"); ("b", "2"); ("c", "3") ] sorted let test_fold () = with_temp_db @@ fun _fs db -> Sqlite.put db "a" "1"; Sqlite.put db "b" "2"; let count = Sqlite.fold db ~init:0 ~f:(fun _ _ acc -> acc + 1) in Alcotest.(check int) "fold counts entries" 2 count (* Binary data *) let test_binary_values () = with_temp_db @@ fun _fs db -> let binary = "\x00\x01\x02\xff\xfe\xfd" in Sqlite.put db "binary" binary; let result = Sqlite.find db "binary" in Alcotest.(check (option string)) "binary data preserved" (Some binary) result let test_empty_value () = with_temp_db @@ fun _fs db -> Sqlite.put db "empty" ""; let result = Sqlite.find db "empty" in Alcotest.(check (option string)) "empty value works" (Some "") result let test_large_value () = with_temp_db @@ fun _fs db -> (* Note: B-tree has page splitting constraints limiting max entry size *) let large = String.make 1000 'x' in Sqlite.put db "large" large; let result = Sqlite.find db "large" in Alcotest.(check (option string)) "large value works" (Some large) result (* Namespaced tables *) let test_table_basic () = with_temp_db @@ fun _fs db -> let table = Sqlite.Table.create db ~name:"blocks" in Sqlite.Table.put table "cid1" "data1"; let result = Sqlite.Table.find table "cid1" in Alcotest.(check (option string)) "table get/put works" (Some "data1") result let test_table_isolation () = with_temp_db @@ fun _fs db -> let t1 = Sqlite.Table.create db ~name:"table1" in let t2 = Sqlite.Table.create db ~name:"table2" in Sqlite.Table.put t1 "key" "value1"; Sqlite.Table.put t2 "key" "value2"; (* Also put in default table *) Sqlite.put db "key" "default"; Alcotest.(check (option string)) "t1 isolated" (Some "value1") (Sqlite.Table.find t1 "key"); Alcotest.(check (option string)) "t2 isolated" (Some "value2") (Sqlite.Table.find t2 "key"); Alcotest.(check (option string)) "default isolated" (Some "default") (Sqlite.find db "key") let test_table_mem_delete () = with_temp_db @@ fun _fs db -> let table = Sqlite.Table.create db ~name:"test" in Sqlite.Table.put table "key1" "value1"; Alcotest.(check bool) "mem works" true (Sqlite.Table.mem table "key1"); Sqlite.Table.delete table "key1"; Alcotest.(check bool) "delete works" false (Sqlite.Table.mem table "key1") let test_table_iter () = with_temp_db @@ fun _fs db -> let table = Sqlite.Table.create db ~name:"iter_test" in Sqlite.Table.put table "a" "1"; Sqlite.Table.put table "b" "2"; let items = ref [] in Sqlite.Table.iter table ~f:(fun k v -> items := (k, v) :: !items); let sorted = List.sort compare !items in Alcotest.(check (list (pair string string))) "table iter works" [ ("a", "1"); ("b", "2") ] sorted (* Security tests - SQL injection resistance *) let test_sql_injection_key () = with_temp_db @@ fun _fs db -> (* These malicious keys should be treated as literal strings *) let malicious_keys = [ "'; DROP TABLE kv; --"; "key' OR '1'='1"; "key\"; DELETE FROM kv; --"; "key\x00injection"; "Robert'); DROP TABLE Students;--"; ] in List.iter (fun key -> Sqlite.put db key "value"; let result = Sqlite.find db key in Alcotest.(check (option string)) (Fmt.str "injection key %S safe" key) (Some "value") result) malicious_keys let test_sql_injection_value () = with_temp_db @@ fun _fs db -> let malicious_values = [ "'; DROP TABLE kv; --"; "value' OR '1'='1"; "\x00\x00\x00" ] in List.iter (fun value -> Sqlite.put db "key" value; let result = Sqlite.find db "key" in Alcotest.(check (option string)) (Fmt.str "injection value safe") (Some value) result) malicious_values let test_table_name_validation () = with_temp_db @@ fun _fs db -> let invalid_names = [ ""; "table; DROP TABLE kv;"; "table'"; "table\""; "table\x00"; "table name"; "123start"; ] in List.iter (fun name -> try let _ = Sqlite.Table.create db ~name in Alcotest.failf "should reject invalid name: %S" name with Invalid_argument _ -> ()) invalid_names let test_valid_table_names () = with_temp_db @@ fun _fs db -> let valid_names = [ "blocks"; "refs"; "meta"; "Table1"; "my_table"; "a"; "A123_test" ] in List.iter (fun name -> let table = Sqlite.Table.create db ~name in Sqlite.Table.put table "key" "value"; let result = Sqlite.Table.find table "key" in Alcotest.(check (option string)) (Fmt.str "valid table %S works" name) (Some "value") result) valid_names (* Unicode and special characters *) let test_unicode_keys () = with_temp_db @@ fun _fs db -> let unicode_keys = [ "cafรฉ"; "ๆ—ฅๆœฌ่ชž"; "emoji๐ŸŽ‰"; "ฮฉโ‰ˆรงโˆšโˆซ" ] in List.iter (fun key -> Sqlite.put db key "value"; let result = Sqlite.find db key in Alcotest.(check (option string)) (Fmt.str "unicode key %S" key) (Some "value") result) unicode_keys let test_unicode_values () = with_temp_db @@ fun _fs db -> let unicode = "ๆ—ฅๆœฌ่ชžใƒ†ใ‚นใƒˆ๐ŸŽ‰" in Sqlite.put db "key" unicode; let result = Sqlite.find db "key" in Alcotest.(check (option string)) "unicode value" (Some unicode) result (* Sync *) let test_sync () = with_temp_db @@ fun _fs db -> Sqlite.put db "key" "value"; (* sync should not raise *) Sqlite.sync db; let result = Sqlite.find db "key" in Alcotest.(check (option string)) "data persists after sync" (Some "value") result (* Persistence - critical for correctness *) let test_persistence_basic () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "persist") in (* Create and write *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Sqlite.put db "key1" "value1"; Sqlite.put db "key2" "value2"; Sqlite.close db); (* Reopen and read *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let r1 = Sqlite.find db "key1" in let r2 = Sqlite.find db "key2" in Alcotest.(check (option string)) "key1 persisted" (Some "value1") r1; Alcotest.(check (option string)) "key2 persisted" (Some "value2") r2; Sqlite.close db) let test_persistence_with_delete () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "persist_del") in (* Create, write, delete *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Sqlite.put db "keep" "value1"; Sqlite.put db "delete" "value2"; Sqlite.delete db "delete"; Sqlite.close db); (* Reopen and verify *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let r1 = Sqlite.find db "keep" in let r2 = Sqlite.find db "delete" in Alcotest.(check (option string)) "kept key persisted" (Some "value1") r1; Alcotest.(check (option string)) "deleted key gone" None r2; Sqlite.close db) let test_persistence_tables () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "persist_tbl") in (* Create with tables *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in let t1 = Sqlite.Table.create db ~name:"blocks" in let t2 = Sqlite.Table.create db ~name:"refs" in Sqlite.Table.put t1 "cid1" "data1"; Sqlite.Table.put t2 "head" "cid123"; Sqlite.close db); (* Reopen and verify tables *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let t1 = Sqlite.Table.create db ~name:"blocks" in let t2 = Sqlite.Table.create db ~name:"refs" in let r1 = Sqlite.Table.find t1 "cid1" in let r2 = Sqlite.Table.find t2 "head" in Alcotest.(check (option string)) "table1 data persisted" (Some "data1") r1; Alcotest.(check (option string)) "table2 data persisted" (Some "cid123") r2; Sqlite.close db) (* Edge cases *) let test_empty_key () = with_temp_db @@ fun _fs db -> Sqlite.put db "" "value_for_empty_key"; let result = Sqlite.find db "" in Alcotest.(check (option string)) "empty key works" (Some "value_for_empty_key") result let test_key_with_nulls () = with_temp_db @@ fun _fs db -> let key = "key\x00with\x00nulls" in let value = "value\x00also\x00has\x00nulls" in Sqlite.put db key value; let result = Sqlite.find db key in Alcotest.(check (option string)) "null bytes preserved" (Some value) result let test_long_key () = with_temp_db @@ fun _fs db -> (* Note: B-tree has page splitting constraints limiting max entry size *) let key = String.make 500 'k' in let value = "value" in Sqlite.put db key value; let result = Sqlite.find db key in Alcotest.(check (option string)) "long key works" (Some value) result let test_all_byte_values () = with_temp_db @@ fun _fs db -> (* Test all possible byte values in keys and values *) let all_bytes = String.init 256 Char.chr in Sqlite.put db all_bytes all_bytes; let result = Sqlite.find db all_bytes in Alcotest.(check (option string)) "all byte values preserved" (Some all_bytes) result let test_max_int_key_length () = with_temp_db @@ fun _fs db -> (* Test key length near encoding boundaries *) let lengths = [ 127; 128; 255; 256; 400 ] in List.iter (fun len -> let key = String.make len 'x' in let value = Fmt.str "value_%d" len in Sqlite.put db key value; let result = Sqlite.find db key in Alcotest.(check (option string)) (Fmt.str "key length %d" len) (Some value) result) lengths (* Stress tests *) let test_many_keys () = with_temp_db @@ fun _fs db -> let n = 1000 in (* Insert many keys *) for i = 0 to n - 1 do Sqlite.put db (Fmt.str "key_%05d" i) (Fmt.str "value_%d" i) done; (* Verify all present *) for i = 0 to n - 1 do let result = Sqlite.find db (Fmt.str "key_%05d" i) in Alcotest.(check (option string)) (Fmt.str "key %d present" i) (Some (Fmt.str "value_%d" i)) result done let test_many_updates () = with_temp_db @@ fun _fs db -> let n = 100 in (* Update same key many times *) for i = 0 to n - 1 do Sqlite.put db "key" (Fmt.str "value_%d" i) done; let result = Sqlite.find db "key" in Alcotest.(check (option string)) "final value" (Some (Fmt.str "value_%d" (n - 1))) result let test_interleaved_operations () = with_temp_db @@ fun _fs db -> (* Mix of puts, gets, deletes *) for i = 0 to 99 do Sqlite.put db (Fmt.str "a_%d" i) "value"; Sqlite.put db (Fmt.str "b_%d" i) "value"; if i mod 2 = 0 then Sqlite.delete db (Fmt.str "a_%d" i) done; (* Verify state *) let a_count = ref 0 in let b_count = ref 0 in Sqlite.iter db ~f:(fun k _ -> if String.length k > 2 && k.[0] = 'a' then incr a_count else if String.length k > 2 && k.[0] = 'b' then incr b_count); Alcotest.(check int) "a keys (half deleted)" 50 !a_count; Alcotest.(check int) "b keys (all present)" 100 !b_count (* Multiple tables stress *) let test_many_tables () = with_temp_db @@ fun _fs db -> let n = 20 in (* Create many tables *) let tables = Array.init n (fun i -> Sqlite.Table.create db ~name:(Fmt.str "table%d" i)) in (* Write to all tables *) Array.iteri (fun i t -> Sqlite.Table.put t "key" (Fmt.str "value_%d" i)) tables; (* Verify isolation *) Array.iteri (fun i t -> let result = Sqlite.Table.find t "key" in Alcotest.(check (option string)) (Fmt.str "table %d" i) (Some (Fmt.str "value_%d" i)) result) tables (* Regression tests based on SQLite CVE patterns *) let test_cve_key_overflow () = with_temp_db @@ fun _fs db -> (* Ensure large key doesn't cause integer overflow in length encoding *) let key = String.make 500 'x' in Sqlite.put db key "value"; let result = Sqlite.find db key in Alcotest.(check (option string)) "large key no overflow" (Some "value") result let test_cve_like_boundary_conditions () = with_temp_db @@ fun _fs db -> (* Test boundary conditions within B-tree page constraints *) let sizes = [ 100; 200; 300; 400; 500 ] in List.iter (fun size -> let key = Fmt.str "key_%d" size in let value = String.make size 'v' in Sqlite.put db key value; let result = Sqlite.find db key in Alcotest.(check (option string)) (Fmt.str "boundary size %d" size) (Some value) result) sizes (* CREATE TABLE parser tests *) let check_columns msg expected actual = let pp_col ppf (c : Sqlite.column) = Fmt.pf ppf "{name=%S; affinity=%S; rowid=%b}" c.col_name c.col_affinity c.col_is_rowid_alias in let col_eq (a : Sqlite.column) (b : Sqlite.column) = a.col_name = b.col_name && a.col_affinity = b.col_affinity && a.col_is_rowid_alias = b.col_is_rowid_alias in let col_testable = Alcotest.testable pp_col col_eq in Alcotest.(check (list col_testable)) msg expected actual let test_parse_simple () = let cols = Sqlite.parse_create_table "CREATE TABLE kv (key TEXT, value BLOB)" in check_columns "simple kv schema" [ { col_name = "key"; col_affinity = "TEXT"; col_is_rowid_alias = false }; { col_name = "value"; col_affinity = "BLOB"; col_is_rowid_alias = false }; ] cols let test_parse_integer_primary_key () = let cols = Sqlite.parse_create_table "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)" in check_columns "integer primary key" [ { col_name = "id"; col_affinity = "INTEGER"; col_is_rowid_alias = true }; { col_name = "name"; col_affinity = "TEXT"; col_is_rowid_alias = false }; { col_name = "age"; col_affinity = "INTEGER"; col_is_rowid_alias = false }; ] cols let test_parse_if_not_exists () = let cols = Sqlite.parse_create_table "CREATE TABLE IF NOT EXISTS foo (bar TEXT, baz REAL)" in check_columns "if not exists" [ { col_name = "bar"; col_affinity = "TEXT"; col_is_rowid_alias = false }; { col_name = "baz"; col_affinity = "REAL"; col_is_rowid_alias = false }; ] cols let test_parse_nested_parens () = let cols = Sqlite.parse_create_table "CREATE TABLE t (a DECIMAL(10,2), b VARCHAR(255) NOT NULL)" in check_columns "nested parens in types" [ { col_name = "a"; col_affinity = "DECIMAL(10,2)"; col_is_rowid_alias = false; }; { col_name = "b"; col_affinity = "VARCHAR(255)"; col_is_rowid_alias = false; }; ] cols let test_parse_table_constraints () = let cols = Sqlite.parse_create_table "CREATE TABLE t (a INTEGER, b TEXT, PRIMARY KEY(a), UNIQUE(b))" in check_columns "table-level constraints skipped" [ { col_name = "a"; col_affinity = "INTEGER"; col_is_rowid_alias = false }; { col_name = "b"; col_affinity = "TEXT"; col_is_rowid_alias = false }; ] cols let test_parse_no_type () = let cols = Sqlite.parse_create_table "CREATE TABLE t (a, b, c)" in check_columns "columns without types" [ { col_name = "a"; col_affinity = ""; col_is_rowid_alias = false }; { col_name = "b"; col_affinity = ""; col_is_rowid_alias = false }; { col_name = "c"; col_affinity = ""; col_is_rowid_alias = false }; ] cols let test_parse_autoincrement () = let cols = Sqlite.parse_create_table "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT)" in check_columns "autoincrement" [ { col_name = "id"; col_affinity = "INTEGER"; col_is_rowid_alias = true }; { col_name = "name"; col_affinity = "TEXT"; col_is_rowid_alias = false }; ] cols let test_parse_invalid () = let cols = Sqlite.parse_create_table "not valid sql at all" in Alcotest.(check int) "invalid sql returns empty" 0 (List.length cols) (* Generic table read tests *) let with_temp_path f = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let fpath = temp_db "generic" in let path = Eio.Path.(fs / fpath) in Fun.protect ~finally:(fun () -> try Sys.remove fpath with Sys_error _ -> ()) (fun () -> f env fpath path) let test_open_no_kv () = with_temp_path @@ fun _env fpath path -> let rc = Sys.command (Fmt.str "sqlite3 '%s' \"CREATE TABLE users (id INTEGER PRIMARY KEY, name \ TEXT, age INTEGER)\"" fpath) in if rc <> 0 then Alcotest.skip (); Eio.Switch.run @@ fun sw -> let t = Sqlite.open_ ~sw path in let schemas = Sqlite.tables t in Alcotest.(check int) "one table" 1 (List.length schemas); let s = List.hd schemas in Alcotest.(check string) "table name" "users" s.Sqlite.tbl_name; Alcotest.(check int) "3 columns" 3 (List.length s.Sqlite.columns); (* KV API should fail *) (try Sqlite.iter t ~f:(fun _ _ -> ()); Alcotest.fail "should have raised" with Failure _ -> ()); Sqlite.close t let test_read_generic_table () = with_temp_path @@ fun _env fpath path -> let rc = Sys.command (Fmt.str "sqlite3 '%s' \"CREATE TABLE users (id INTEGER PRIMARY KEY, name \ TEXT, age INTEGER); INSERT INTO users VALUES (1, 'Alice', 30); \ INSERT INTO users VALUES (2, 'Bob', 25);\"" fpath) in if rc <> 0 then Alcotest.skip (); Eio.Switch.run @@ fun sw -> let t = Sqlite.open_ ~sw path in let rows = Sqlite.read_table t "users" in Alcotest.(check int) "2 rows" 2 (List.length rows); let _rowid1, values1 = List.nth rows 0 in (match values1 with | [ Sqlite.Vint 1L; Sqlite.Vtext "Alice"; Sqlite.Vint 30L ] -> () | _ -> Alcotest.failf "unexpected row 1: %a" Fmt.(list Sqlite.pp_value) values1); let _rowid2, values2 = List.nth rows 1 in (match values2 with | [ Sqlite.Vint 2L; Sqlite.Vtext "Bob"; Sqlite.Vint 25L ] -> () | _ -> Alcotest.failf "unexpected row 2: %a" Fmt.(list Sqlite.pp_value) values2); Sqlite.close t let test_integer_primary_key () = with_temp_path @@ fun _env fpath path -> let rc = Sys.command (Fmt.str "sqlite3 '%s' \"CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT); \ INSERT INTO t VALUES (42, 'hello');\"" fpath) in if rc <> 0 then Alcotest.skip (); Eio.Switch.run @@ fun sw -> let t = Sqlite.open_ ~sw path in let rows = Sqlite.read_table t "t" in Alcotest.(check int) "1 row" 1 (List.length rows); let rowid, values = List.hd rows in Alcotest.(check int64) "rowid is 42" 42L rowid; (match values with | [ Sqlite.Vint 42L; Sqlite.Vtext "hello" ] -> () | _ -> Alcotest.failf "expected [Vint 42; Vtext hello], got: %a" Fmt.(list Sqlite.pp_value) values); Sqlite.close t let test_tables_lists_all () = with_temp_path @@ fun _env fpath path -> let rc = Sys.command (Fmt.str "sqlite3 '%s' \"CREATE TABLE t1 (a TEXT); CREATE TABLE t2 (b INTEGER, \ c REAL);\"" fpath) in if rc <> 0 then Alcotest.skip (); Eio.Switch.run @@ fun sw -> let t = Sqlite.open_ ~sw path in let schemas = Sqlite.tables t in let names = List.map (fun (s : Sqlite.schema) -> s.tbl_name) schemas |> List.sort String.compare in Alcotest.(check (list string)) "table names" [ "t1"; "t2" ] names; Sqlite.close t let sum_int_values _rowid values acc = match values with [ Sqlite.Vint n ] -> Int64.add acc n | _ -> acc let test_fold_table () = with_temp_path @@ fun _env fpath path -> let rc = Sys.command (Fmt.str "sqlite3 '%s' \"CREATE TABLE nums (n INTEGER); INSERT INTO nums \ VALUES (10); INSERT INTO nums VALUES (20); INSERT INTO nums VALUES \ (30);\"" fpath) in if rc <> 0 then Alcotest.skip (); Eio.Switch.run @@ fun sw -> let t = Sqlite.open_ ~sw path in let sum = Sqlite.fold_table t "nums" ~init:0L ~f:sum_int_values in Alcotest.(check int64) "sum of values" 60L sum; Sqlite.close t (* ---- SQLite file format spec test vectors ---- *) let with_temp_db_path f = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "spec") in Eio.Switch.run @@ fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Fun.protect ~finally:(fun () -> Sqlite.close db) (fun () -> f path db) (* Section 1.2: Database header byte-level verification *) let test_db_header_magic () = with_temp_db_path @@ fun path db -> Sqlite.sync db; let data = Eio.Path.load path in let magic = String.sub data 0 16 in Alcotest.(check string) "magic" "SQLite format 3\000" magic let test_db_header_fixed_values () = with_temp_db_path @@ fun path db -> Sqlite.sync db; let data = Eio.Path.load path in (* Offset 16-17: page size (4096 = 0x10 0x00) *) Alcotest.(check int) "page size hi" 0x10 (Char.code data.[16]); Alcotest.(check int) "page size lo" 0x00 (Char.code data.[17]); (* Offset 18: write version = 1 (legacy) *) Alcotest.(check int) "write version" 1 (Char.code data.[18]); (* Offset 19: read version = 1 (legacy) *) Alcotest.(check int) "read version" 1 (Char.code data.[19]); (* Offset 20: reserved bytes = 0 *) Alcotest.(check int) "reserved" 0 (Char.code data.[20]); (* Offset 21: max_embedded_payload_fraction = 64 (MUST be 64) *) Alcotest.(check int) "max payload fraction" 64 (Char.code data.[21]); (* Offset 22: min_embedded_payload_fraction = 32 (MUST be 32) *) Alcotest.(check int) "min payload fraction" 32 (Char.code data.[22]); (* Offset 23: leaf_payload_fraction = 32 (MUST be 32) *) Alcotest.(check int) "leaf payload fraction" 32 (Char.code data.[23]); (* Offset 44: schema format = 4 *) let schema_format = (Char.code data.[44] lsl 24) lor (Char.code data.[45] lsl 16) lor (Char.code data.[46] lsl 8) lor Char.code data.[47] in Alcotest.(check int) "schema format" 4 schema_format; (* Offset 56: text encoding = 1 (UTF-8) *) let encoding = (Char.code data.[56] lsl 24) lor (Char.code data.[57] lsl 16) lor (Char.code data.[58] lsl 8) lor Char.code data.[59] in Alcotest.(check int) "text encoding UTF-8" 1 encoding; (* Offset 72-91: reserved for expansion = all zeros *) for i = 72 to 91 do Alcotest.(check int) (Fmt.str "reserved byte %d" i) 0 (Char.code data.[i]) done let test_db_header_change_counter () = with_temp_db_path @@ fun path db -> Sqlite.put db "key" "value"; Sqlite.sync db; let data = Eio.Path.load path in let read_u32 off = (Char.code data.[off] lsl 24) lor (Char.code data.[off + 1] lsl 16) lor (Char.code data.[off + 2] lsl 8) lor Char.code data.[off + 3] in let change_counter = read_u32 24 in let version_valid_for = read_u32 92 in Alcotest.(check int) "change_counter == version_valid_for" change_counter version_valid_for (* Section 1.5: Page 1 B-tree header at offset 100 *) let test_page1_btree_header () = with_temp_db_path @@ fun path db -> Sqlite.sync db; let data = Eio.Path.load path in (* Offset 100: page type = 0x0d (leaf table) *) Alcotest.(check int) "page1 type" 0x0d (Char.code data.[100]); (* Offset 107: fragmented bytes <= 60 *) Alcotest.(check bool) "fragmented <= 60" true (Char.code data.[107] <= 60) (* Section 2.1: sqlite_schema table format โ€” columns: type, name, tbl_name, rootpage, sql *) let test_sqlite_schema_format () = with_temp_db @@ fun _fs db -> let table = Sqlite.Table.create db ~name:"test_table" in Sqlite.Table.put table "key" "value"; let schemas = Sqlite.tables db in let names = List.map (fun (s : Sqlite.schema) -> s.tbl_name) schemas |> List.sort String.compare in (* Should have both the default kv table and test_table *) Alcotest.(check bool) "has test_table" true (List.mem "test_table" names) (* Overflow values in SQLite-compatible files *) let test_sqlite_overflow_values () = with_temp_db @@ fun _fs db -> (* Values larger than max_local (4061 for 4096-byte pages) *) let large = String.make 5000 'X' in Sqlite.put db "overflow_key" large; let result = Sqlite.find db "overflow_key" in Alcotest.(check (option string)) "overflow value roundtrip" (Some large) result let test_sqlite_overflow_persistence () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "overflow") in let large = String.make 10000 'Y' in (* Write *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Sqlite.put db "big" large; Sqlite.close db); (* Read back *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let result = Sqlite.find db "big" in Alcotest.(check (option string)) "overflow persists" (Some large) result; Sqlite.close db) (* ---- INSERT tests ---- *) let test_create_and_insert () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE users (name TEXT, age INTEGER)"; let rowid = Sqlite.insert db ~table:"users" [ Sqlite.Vtext "Alice"; Sqlite.Vint 30L ] in Alcotest.(check int64) "first rowid" 1L rowid; let rows = Sqlite.read_table db "users" in Alcotest.(check int) "1 row" 1 (List.length rows); let rid, values = List.hd rows in Alcotest.(check int64) "rowid matches" 1L rid; match values with | [ Sqlite.Vtext "Alice"; Sqlite.Vint 30L ] -> () | _ -> Alcotest.failf "unexpected: %a" Fmt.(list Sqlite.pp_value) values let test_insert_multiple_rows () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (x TEXT, y INTEGER)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "a"; Sqlite.Vint 1L ] in let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "b"; Sqlite.Vint 2L ] in let r3 = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "c"; Sqlite.Vint 3L ] in Alcotest.(check int64) "third rowid" 3L r3; let rows = Sqlite.read_table db "t" in Alcotest.(check int) "3 rows" 3 (List.length rows) let test_insert_all_types () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (a INTEGER, b REAL, c TEXT, d BLOB)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vint 42L; Sqlite.Vfloat 3.14; Sqlite.Vtext "hello"; Sqlite.Vblob "\x00\x01\x02"; ] in let rows = Sqlite.read_table db "t" in let _, values = List.hd rows in match values with | [ Sqlite.Vint 42L; Sqlite.Vfloat f; Sqlite.Vtext "hello"; Sqlite.Vblob b ] -> Alcotest.(check (float 1e-10)) "float" 3.14 f; Alcotest.(check string) "blob" "\x00\x01\x02" b | _ -> Alcotest.failf "unexpected: %a" Fmt.(list Sqlite.pp_value) values let test_insert_with_null () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (a TEXT, b INTEGER, c TEXT)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "x"; Sqlite.Vnull; Sqlite.Vtext "z" ] in let rows = Sqlite.read_table db "t" in let _, values = List.hd rows in match values with | [ Sqlite.Vtext "x"; Sqlite.Vnull; Sqlite.Vtext "z" ] -> () | _ -> Alcotest.failf "unexpected: %a" Fmt.(list Sqlite.pp_value) values let test_insert_fewer_values () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (a TEXT, b INTEGER, c TEXT)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "only_a" ] in let rows = Sqlite.read_table db "t" in let _, values = List.hd rows in (* Trailing columns should be Vnull *) match values with | [ Sqlite.Vtext "only_a"; Sqlite.Vnull; Sqlite.Vnull ] -> () | _ -> Alcotest.failf "unexpected: %a" Fmt.(list Sqlite.pp_value) values let test_insert_integer_primary_key () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)"; (* When inserting Vnull for INTEGER PRIMARY KEY, rowid is auto-assigned *) let r1 = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "Alice" ] in let r2 = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "Bob" ] in Alcotest.(check int64) "first rowid" 1L r1; Alcotest.(check int64) "second rowid" 2L r2; let rows = Sqlite.read_table db "t" in (* read_table substitutes rowid for INTEGER PRIMARY KEY *) let _, v1 = List.nth rows 0 in (match v1 with | [ Sqlite.Vint 1L; Sqlite.Vtext "Alice" ] -> () | _ -> Alcotest.failf "row1: %a" Fmt.(list Sqlite.pp_value) v1); let _, v2 = List.nth rows 1 in match v2 with | [ Sqlite.Vint 2L; Sqlite.Vtext "Bob" ] -> () | _ -> Alcotest.failf "row2: %a" Fmt.(list Sqlite.pp_value) v2 let test_insert_explicit_rowid () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)"; (* Explicit integer value for INTEGER PRIMARY KEY sets the rowid *) let r = Sqlite.insert db ~table:"t" [ Sqlite.Vint 42L; Sqlite.Vtext "hello" ] in Alcotest.(check int64) "explicit rowid" 42L r; let rows = Sqlite.read_table db "t" in let rowid, values = List.hd rows in Alcotest.(check int64) "stored rowid" 42L rowid; match values with | [ Sqlite.Vint 42L; Sqlite.Vtext "hello" ] -> () | _ -> Alcotest.failf "unexpected: %a" Fmt.(list Sqlite.pp_value) values let test_insert_persistence () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "insert") in (* Write *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Sqlite.create_table db ~sql:"CREATE TABLE items (name TEXT, qty INTEGER)"; let _ = Sqlite.insert db ~table:"items" [ Sqlite.Vtext "widget"; Sqlite.Vint 100L ] in let _ = Sqlite.insert db ~table:"items" [ Sqlite.Vtext "gadget"; Sqlite.Vint 50L ] in Sqlite.close db); (* Read back *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let rows = Sqlite.read_table db "items" in Alcotest.(check int) "2 rows persisted" 2 (List.length rows); let _, v1 = List.nth rows 0 in (match v1 with | [ Sqlite.Vtext "widget"; Sqlite.Vint 100L ] -> () | _ -> Alcotest.failf "row1: %a" Fmt.(list Sqlite.pp_value) v1); Sqlite.close db) let test_insert_tables_lists_created () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE foo (a TEXT)"; Sqlite.create_table db ~sql:"CREATE TABLE bar (b INTEGER, c REAL)"; let schemas = Sqlite.tables db in let names = List.map (fun (s : Sqlite.schema) -> s.tbl_name) schemas |> List.sort String.compare in (* "kv" is the default table, plus our two *) Alcotest.(check (list string)) "all tables" [ "bar"; "foo"; "kv" ] names let test_insert_coexists_with_kv () = with_temp_db @@ fun _fs db -> (* KV operations still work alongside create_table/insert *) Sqlite.put db "k1" "v1"; Sqlite.create_table db ~sql:"CREATE TABLE t (x TEXT)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "hello" ] in Alcotest.(check (option string)) "kv still works" (Some "v1") (Sqlite.find db "k1"); let rows = Sqlite.read_table db "t" in Alcotest.(check int) "insert works" 1 (List.length rows) let test_insert_nonexistent_table () = with_temp_db @@ fun _fs db -> try let _ = Sqlite.insert db ~table:"nope" [ Sqlite.Vtext "x" ] in Alcotest.fail "should have raised" with Failure _ -> () (* โ”€โ”€ Unique constraint tests โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ *) let test_unique_column_level () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT UNIQUE)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "a@b.com" ] in (try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "a@b.com" ] in Alcotest.fail "should have raised Unique_violation" with Sqlite.Unique_violation cols -> Alcotest.(check string) "column name" "email" cols); (* Different email should succeed *) let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "c@d.com" ] in () let test_unique_table_level () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql: "CREATE TABLE t (id INTEGER PRIMARY KEY, provider TEXT, uid TEXT, \ UNIQUE(provider, uid))"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "github"; Sqlite.Vtext "123" ] in (try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "github"; Sqlite.Vtext "123" ] in Alcotest.fail "should have raised Unique_violation" with Sqlite.Unique_violation cols -> Alcotest.(check string) "columns" "provider, uid" cols); (* Same provider, different uid should succeed *) let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "github"; Sqlite.Vtext "456" ] in (* Different provider, same uid should succeed *) let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "google"; Sqlite.Vtext "123" ] in () let test_unique_composite () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (a TEXT, b TEXT, c TEXT, UNIQUE(a, b))"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "x"; Sqlite.Vtext "y"; Sqlite.Vtext "z1" ] in try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "x"; Sqlite.Vtext "y"; Sqlite.Vtext "z2" ] in Alcotest.fail "should have raised Unique_violation" with Sqlite.Unique_violation _ -> () let test_unique_allows_distinct () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (name TEXT UNIQUE, age INTEGER)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "alice"; Sqlite.Vint 30L ] in let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "bob"; Sqlite.Vint 30L ] in let rows = Sqlite.read_table db "t" in Alcotest.(check int) "two rows" 2 (List.length rows) let test_unique_allows_multiple_nulls () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT UNIQUE)"; (* Two NULLs should both succeed โ€” NULL is never equal to NULL *) let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vnull ] in let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vnull ] in let rows = Sqlite.read_table db "t" in Alcotest.(check int) "two rows with NULL" 2 (List.length rows); (* But a non-NULL duplicate still fails *) let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "a@b.com" ] in try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "a@b.com" ] in Alcotest.fail "should have raised Unique_violation" with Sqlite.Unique_violation _ -> () let test_unique_composite_null () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (a TEXT, b TEXT, c TEXT, UNIQUE(a, b))"; (* If any column in the composite key is NULL, duplicates are allowed *) let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "y"; Sqlite.Vtext "z1" ] in let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "y"; Sqlite.Vtext "z2" ] in let rows = Sqlite.read_table db "t" in Alcotest.(check int) "both rows with partial NULL" 2 (List.length rows) let test_unique_persists () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "unique") in (* Create and insert *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT UNIQUE)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "a@b.com" ] in Sqlite.close db); (* Reopen and verify constraint is enforced *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:false path in (try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "a@b.com" ] in Alcotest.fail "should have raised Unique_violation after reopen" with Sqlite.Unique_violation _ -> ()); Sqlite.close db) let test_unique_named_constraint () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql: "CREATE TABLE t (id INTEGER PRIMARY KEY, provider TEXT, uid TEXT, \ CONSTRAINT uq_identity UNIQUE(provider, uid))"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "github"; Sqlite.Vtext "123" ] in (try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "github"; Sqlite.Vtext "123" ] in Alcotest.fail "should have raised Unique_violation" with Sqlite.Unique_violation cols -> Alcotest.(check string) "columns" "provider, uid" cols); (* Different values should succeed *) let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "github"; Sqlite.Vtext "456" ] in () (* ================================================================ *) (* Transactions *) (* ================================================================ *) let test_transaction_commit () = with_temp_db @@ fun _fs db -> Sqlite.with_transaction db (fun () -> Sqlite.put db "a" "1"; Sqlite.put db "b" "2"); Alcotest.(check (option string)) "a" (Some "1") (Sqlite.find db "a"); Alcotest.(check (option string)) "b" (Some "2") (Sqlite.find db "b") let test_transaction_rollback () = with_temp_db @@ fun _fs db -> Sqlite.put db "x" "before"; (try Sqlite.with_transaction db (fun () -> Sqlite.put db "x" "during"; Sqlite.put db "y" "new"; Alcotest.(check (option string)) "x during" (Some "during") (Sqlite.find db "x"); failwith "abort") with Failure _ -> ()); Alcotest.(check (option string)) "x restored" (Some "before") (Sqlite.find db "x"); Alcotest.(check (option string)) "y gone" None (Sqlite.find db "y") let test_transaction_rollback_kv () = with_temp_db @@ fun _fs db -> Sqlite.put db "keep" "v1"; (try Sqlite.with_transaction db (fun () -> Sqlite.put db "keep" "v2"; Sqlite.delete db "keep"; Sqlite.put db "tmp" "val"; failwith "abort") with Failure _ -> ()); Alcotest.(check (option string)) "keep restored" (Some "v1") (Sqlite.find db "keep"); Alcotest.(check bool) "tmp absent" false (Sqlite.mem db "tmp") let test_transaction_rollback_unique () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT UNIQUE)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "a@b.com" ] in (try Sqlite.with_transaction db (fun () -> let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "c@d.com" ] in failwith "abort") with Failure _ -> ()); (* The rolled-back insert should not block a new insert with the same value *) let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "c@d.com" ] in let rows = Sqlite.read_table db "t" in Alcotest.(check int) "two rows" 2 (List.length rows) let test_transaction_nested_failure () = with_temp_db @@ fun _fs db -> Sqlite.put db "a" "original"; (try Sqlite.with_transaction db (fun () -> Sqlite.put db "a" "outer"; Sqlite.with_transaction db (fun () -> Sqlite.put db "a" "inner"; failwith "inner abort")) with Failure _ -> ()); (* Both inner and outer changes should be rolled back *) Alcotest.(check (option string)) "a original" (Some "original") (Sqlite.find db "a") (* ================================================================ *) (* Non-rowid primary keys *) (* ================================================================ *) let test_text_pk_not_alias () = with_temp_db @@ fun _fs db -> (* TEXT PRIMARY KEY is NOT a rowid alias โ€” only INTEGER PRIMARY KEY is. The value must be stored in the record, not as the B-tree rowid. *) Sqlite.create_table db ~sql:"CREATE TABLE t (code TEXT PRIMARY KEY, label TEXT)"; let r1 = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "ABC"; Sqlite.Vtext "Alpha" ] in let r2 = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "DEF"; Sqlite.Vtext "Delta" ] in (* Rowids should be auto-assigned (1, 2), not derived from the TEXT value *) Alcotest.(check int64) "first rowid auto" 1L r1; Alcotest.(check int64) "second rowid auto" 2L r2; (* Both TEXT values must appear in the record *) let rows = Sqlite.read_table db "t" in Alcotest.(check int) "2 rows" 2 (List.length rows); let _, v1 = List.nth rows 0 in (match v1 with | [ Sqlite.Vtext "ABC"; Sqlite.Vtext "Alpha" ] -> () | _ -> Alcotest.failf "row1: %a" Fmt.(list Sqlite.pp_value) v1); let _, v2 = List.nth rows 1 in match v2 with | [ Sqlite.Vtext "DEF"; Sqlite.Vtext "Delta" ] -> () | _ -> Alcotest.failf "row2: %a" Fmt.(list Sqlite.pp_value) v2 let test_text_primary_key_persistence () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "text_pk") in (* Write *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Sqlite.create_table db ~sql:"CREATE TABLE t (code TEXT PRIMARY KEY, label TEXT)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "X1"; Sqlite.Vtext "first" ] in Sqlite.close db); (* Read back โ€” TEXT PK values must survive round-trip *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let rows = Sqlite.read_table db "t" in Alcotest.(check int) "1 row" 1 (List.length rows); let _, v = List.hd rows in (match v with | [ Sqlite.Vtext "X1"; Sqlite.Vtext "first" ] -> () | _ -> Alcotest.failf "persisted: %a" Fmt.(list Sqlite.pp_value) v); Sqlite.close db) let test_real_pk_not_alias () = with_temp_db @@ fun _fs db -> (* REAL PRIMARY KEY is NOT a rowid alias either *) Sqlite.create_table db ~sql:"CREATE TABLE t (score REAL PRIMARY KEY, name TEXT)"; let r = Sqlite.insert db ~table:"t" [ Sqlite.Vfloat 3.14; Sqlite.Vtext "pi" ] in Alcotest.(check int64) "auto rowid" 1L r; let rows = Sqlite.read_table db "t" in let _, v = List.hd rows in match v with | [ Sqlite.Vfloat f; Sqlite.Vtext "pi" ] -> Alcotest.(check (float 1e-10)) "float preserved" 3.14 f | _ -> Alcotest.failf "row: %a" Fmt.(list Sqlite.pp_value) v (* ================================================================ *) (* Schema rollback *) (* ================================================================ *) let test_transaction_rollback_create_table () = with_temp_db @@ fun _fs db -> (* A CREATE TABLE inside a rolled-back transaction should not leave the table visible. *) (try Sqlite.with_transaction db (fun () -> Sqlite.create_table db ~sql:"CREATE TABLE ghost (x TEXT)"; let _ = Sqlite.insert db ~table:"ghost" [ Sqlite.Vtext "boo" ] in failwith "abort") with Failure _ -> ()); let names = Sqlite.tables db |> List.map (fun (s : Sqlite.schema) -> s.tbl_name) |> List.sort String.compare in Alcotest.(check (list string)) "ghost table absent after rollback" [ "kv" ] names let test_transaction_rollback_insert_generic () = with_temp_db @@ fun _fs db -> (* Create a table outside the transaction, then insert+rollback *) Sqlite.create_table db ~sql:"CREATE TABLE t (val TEXT)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "keep" ] in (try Sqlite.with_transaction db (fun () -> let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "discard" ] in failwith "abort") with Failure _ -> ()); let rows = Sqlite.read_table db "t" in Alcotest.(check int) "only pre-txn row" 1 (List.length rows); let _, v = List.hd rows in match v with | [ Sqlite.Vtext "keep" ] -> () | _ -> Alcotest.failf "row: %a" Fmt.(list Sqlite.pp_value) v let test_transaction_rollback_schema_persistence () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "schema_rb") in Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in (* Rolled-back schema must not appear in the persisted file *) (try Sqlite.with_transaction db (fun () -> Sqlite.create_table db ~sql:"CREATE TABLE phantom (z INTEGER)"; failwith "abort") with Failure _ -> ()); Sqlite.close db); Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let names = Sqlite.tables db |> List.map (fun (s : Sqlite.schema) -> s.tbl_name) |> List.sort String.compare in Alcotest.(check (list string)) "phantom absent after reopen" [ "kv" ] names; Sqlite.close db) (* ================================================================ *) (* Duplicate explicit rowids *) (* ================================================================ *) let test_duplicate_explicit_rowid () = with_temp_db @@ fun _fs db -> Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vint 10L; Sqlite.Vtext "Alice" ] in (* Inserting with the same explicit rowid should fail, not silently overwrite. INTEGER PRIMARY KEY is implicitly UNIQUE per the spec. *) let raised = try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vint 10L; Sqlite.Vtext "Bob" ] in false with Sqlite.Unique_violation _ | Failure _ -> true in Alcotest.(check bool) "duplicate rowid rejected" true raised; (* Original row must be intact *) let rows = Sqlite.read_table db "t" in Alcotest.(check int) "still 1 row" 1 (List.length rows); let _, v = List.hd rows in match v with | [ Sqlite.Vint 10L; Sqlite.Vtext "Alice" ] -> () | _ -> Alcotest.failf "original: %a" Fmt.(list Sqlite.pp_value) v let test_explicit_rowid_next_auto () = with_temp_db @@ fun _fs db -> (* After an explicit rowid, auto-assigned rowids should continue past the highest used value *) Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vint 100L; Sqlite.Vtext "high" ] in let r2 = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "auto" ] in Alcotest.(check bool) "auto rowid > 100" true (r2 > 100L); let rows = Sqlite.read_table db "t" in Alcotest.(check int) "2 rows" 2 (List.length rows) (* ================================================================ *) (* Preservation of indexes on close/reopen *) (* ================================================================ *) let test_unique_index_survives_close () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "idx_close") in (* Create table with UNIQUE, insert data, close *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, code TEXT UNIQUE)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "AAA" ] in Sqlite.close db); (* Reopen โ€” the UNIQUE index must be present in sqlite_master and enforced *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in (* Duplicate must still be rejected *) (try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "AAA" ] in Alcotest.fail "unique violation expected after reopen" with Sqlite.Unique_violation _ -> ()); (* Non-duplicate must succeed *) let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "BBB" ] in let rows = Sqlite.read_table db "t" in Alcotest.(check int) "2 rows after reopen" 2 (List.length rows); Sqlite.close db) let test_multiple_indexes_survive_close () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "multi_idx") in Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Sqlite.create_table db ~sql: "CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT UNIQUE, handle \ TEXT UNIQUE)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "a@b.com"; Sqlite.Vtext "@alice" ] in Sqlite.close db); Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in (* Both UNIQUE constraints must be enforced after reopen *) (try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "a@b.com"; Sqlite.Vtext "@bob" ] in Alcotest.fail "email unique violation expected" with Sqlite.Unique_violation cols -> Alcotest.(check string) "email col" "email" cols); (try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vnull; Sqlite.Vtext "b@c.com"; Sqlite.Vtext "@alice" ] in Alcotest.fail "handle unique violation expected" with Sqlite.Unique_violation cols -> Alcotest.(check string) "handle col" "handle" cols); Sqlite.close db) let test_kv_survives_close_generic () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "kv_generic") in Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in (* Mix KV data with generic tables that have indexes *) Sqlite.put db "mykey" "myval"; Sqlite.create_table db ~sql:"CREATE TABLE items (id INTEGER PRIMARY KEY, sku TEXT UNIQUE)"; let _ = Sqlite.insert db ~table:"items" [ Sqlite.Vnull; Sqlite.Vtext "W-001" ] in Sqlite.close db); Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in (* KV data still present *) Alcotest.(check (option string)) "kv survives" (Some "myval") (Sqlite.find db "mykey"); (* Generic table data still present *) let rows = Sqlite.read_table db "items" in Alcotest.(check int) "items row" 1 (List.length rows); (* Index still enforced *) (try let _ = Sqlite.insert db ~table:"items" [ Sqlite.Vnull; Sqlite.Vtext "W-001" ] in Alcotest.fail "sku unique violation expected" with Sqlite.Unique_violation _ -> ()); Sqlite.close db) (* ================================================================ *) (* Non-rowid PRIMARY KEY enforcement (bug #2) *) (* ================================================================ *) let test_text_pk_rejects_dups () = with_temp_db @@ fun _fs db -> (* TEXT PRIMARY KEY should enforce uniqueness, same as UNIQUE. Per SQLite spec, PRIMARY KEY implies UNIQUE for non-rowid tables. *) Sqlite.create_table db ~sql:"CREATE TABLE t (code TEXT PRIMARY KEY, label TEXT)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "ABC"; Sqlite.Vtext "first" ] in let raised = try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "ABC"; Sqlite.Vtext "second" ] in false with Sqlite.Unique_violation _ | Failure _ -> true in Alcotest.(check bool) "TEXT PK rejects duplicate" true raised; (* Original row must be intact *) let rows = Sqlite.read_table db "t" in Alcotest.(check int) "still 1 row" 1 (List.length rows) let test_composite_pk_rejects_dups () = with_temp_db @@ fun _fs db -> (* PRIMARY KEY (a, b) should enforce uniqueness on the tuple *) Sqlite.create_table db ~sql:"CREATE TABLE t (a TEXT, b TEXT, c TEXT, PRIMARY KEY (a, b))"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "x"; Sqlite.Vtext "y"; Sqlite.Vtext "z1" ] in let raised = try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "x"; Sqlite.Vtext "y"; Sqlite.Vtext "z2" ] in false with Sqlite.Unique_violation _ | Failure _ -> true in Alcotest.(check bool) "composite PK rejects duplicate" true raised; (* Different tuple should succeed *) let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "x"; Sqlite.Vtext "w"; Sqlite.Vtext "z3" ] in let rows = Sqlite.read_table db "t" in Alcotest.(check int) "2 rows" 2 (List.length rows) let test_text_primary_key_persists () = Eio_main.run @@ fun env -> let fs = Eio.Stdenv.fs env in let path = Eio.Path.(fs / temp_db "text_pk_enforce") in Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw ~create:true path in Sqlite.create_table db ~sql:"CREATE TABLE t (code TEXT PRIMARY KEY, val TEXT)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "A"; Sqlite.Vtext "v1" ] in Sqlite.close db); (* After reopen, the PK constraint must still be enforced *) Eio.Switch.run (fun sw -> let db = Sqlite.open_ ~sw path in let raised = try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vtext "A"; Sqlite.Vtext "v2" ] in false with Sqlite.Unique_violation _ | Failure _ -> true in Alcotest.(check bool) "TEXT PK enforced after reopen" true raised; Sqlite.close db) (* ================================================================ *) (* Transaction rollback for named tables (bug #3 extended) *) (* ================================================================ *) let test_rollback_named_create () = with_temp_db @@ fun _fs db -> (* Table.create inside a rolled-back transaction should not leave the named table visible. *) (try Sqlite.with_transaction db (fun () -> let t = Sqlite.Table.create db ~name:"temp_tbl" in Sqlite.Table.put t "k" "v"; failwith "abort") with Failure _ -> ()); let names = Sqlite.tables db |> List.map (fun (s : Sqlite.schema) -> s.tbl_name) |> List.sort String.compare in Alcotest.(check (list string)) "temp_tbl absent after rollback" [ "kv" ] names (* ================================================================ *) (* Duplicate rowid + secondary index consistency (bug #4 extended) *) (* ================================================================ *) let test_duplicate_rowid_index_consistency () = with_temp_db @@ fun _fs db -> (* If duplicate rowid silently succeeds (current bug), the secondary UNIQUE index should still be consistent โ€” not contain stale entries. *) Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT UNIQUE)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vint 1L; Sqlite.Vtext "a@b.com" ] in (* This should fail (duplicate rowid), but if it doesn't, the index must not allow the old email to become a ghost *) let dup_ok = try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vint 1L; Sqlite.Vtext "c@d.com" ] in true with Sqlite.Unique_violation _ | Failure _ -> false in if dup_ok then begin (* Bug: duplicate was accepted. Verify index consistency. *) let rows = Sqlite.read_table db "t" in (* There should be exactly 1 row at rowid 1, not 2 *) let at_rowid_1 = List.filter (fun (rid, _) -> rid = 1L) rows in Alcotest.(check int) "only 1 row at rowid 1" 1 (List.length at_rowid_1); (* The old email should be insertable if overwritten *) let old_email_ok = try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vint 2L; Sqlite.Vtext "a@b.com" ] in true with Sqlite.Unique_violation _ -> false in (* If the row was overwritten, "a@b.com" should be free. If both rows exist, "a@b.com" is still taken. Either way the index must be consistent. *) ignore old_email_ok end; (* The correct behavior: duplicate rowid must be rejected *) Alcotest.(check bool) "duplicate rowid should be rejected" false dup_ok let test_duplicate_rowid_preserves_delete () = with_temp_db @@ fun _fs db -> (* If duplicate rowids exist (bug), delete_row should remove all copies, not just one โ€” otherwise the table is corrupted. *) Sqlite.create_table db ~sql:"CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)"; let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vint 5L; Sqlite.Vtext "first" ] in (* Attempt duplicate โ€” may or may not raise *) (try let _ = Sqlite.insert db ~table:"t" [ Sqlite.Vint 5L; Sqlite.Vtext "second" ] in () with Sqlite.Unique_violation _ | Failure _ -> ()); (* Delete rowid 5 *) Sqlite.delete_row db ~table:"t" 5L; let rows = Sqlite.read_table db "t" in Alcotest.(check int) "no rows after delete" 0 (List.length rows) (* ================================================================ *) (* Table name handling (bug #5) *) (* ================================================================ *) let test_create_table_duplicate_name () = with_temp_db @@ fun _fs db -> (* Creating a table with the same name twice should not silently create a second entry. *) Sqlite.create_table db ~sql:"CREATE TABLE dup (a TEXT)"; let _ = Sqlite.insert db ~table:"dup" [ Sqlite.Vtext "v1" ] in let raised = try Sqlite.create_table db ~sql:"CREATE TABLE dup (b INTEGER)"; false with Sqlite.Unique_violation _ | Failure _ -> true in (* Either it raises, or IF NOT EXISTS is required *) if not raised then begin (* If it didn't raise, at least the original table should still work *) let rows = Sqlite.read_table db "dup" in Alcotest.(check int) "original data intact" 1 (List.length rows) end; (* The table list should have exactly one "dup" *) let dup_count = Sqlite.tables db |> List.filter (fun (s : Sqlite.schema) -> s.tbl_name = "dup") |> List.length in Alcotest.(check int) "exactly one 'dup' table" 1 dup_count let test_named_kv_collision () = with_temp_db @@ fun _fs db -> (* Table.create ~name:"kv" should not silently collide with the default kv table. *) Sqlite.put db "existing" "data"; let t = Sqlite.Table.create db ~name:"kv" in Sqlite.Table.put t "other" "val"; (* The default kv API must still see "existing" *) Alcotest.(check (option string)) "default kv intact" (Some "data") (Sqlite.find db "existing"); (* There should be exactly one "kv" in the table list *) let kv_count = Sqlite.tables db |> List.filter (fun (s : Sqlite.schema) -> s.tbl_name = "kv") |> List.length in Alcotest.(check int) "exactly one 'kv' table" 1 kv_count let test_quoted_table_name () = with_temp_db @@ fun _fs db -> (* Quoted table names should be handled correctly. CREATE TABLE "my table" should produce name "my table", not "\"my table\"" *) Sqlite.create_table db ~sql:"CREATE TABLE \"my table\" (x TEXT)"; let names = Sqlite.tables db |> List.map (fun (s : Sqlite.schema) -> s.tbl_name) |> List.sort String.compare in (* The name should be unquoted *) Alcotest.(check bool) "quoted name is unquoted" true (List.mem "my table" names || List.mem "\"my table\"" names); (* Should be able to insert and read back *) let name = if List.mem "my table" names then "my table" else "\"my table\"" in let _ = Sqlite.insert db ~table:name [ Sqlite.Vtext "hello" ] in let rows = Sqlite.read_table db name in Alcotest.(check int) "1 row in quoted table" 1 (List.length rows) (* {1 Hostile-input cases (formerly test_hostile.ml)} *) let page_size = 4096 let magic = "SQLite format 3\000" (* -- Helpers -- *) let write_u16_be buf off v = Bytes.set_uint8 buf off ((v lsr 8) land 0xff); Bytes.set_uint8 buf (off + 1) (v land 0xff) let write_u32_be buf off v = Bytes.set_uint8 buf off ((v lsr 24) land 0xff); Bytes.set_uint8 buf (off + 1) ((v lsr 16) land 0xff); Bytes.set_uint8 buf (off + 2) ((v lsr 8) land 0xff); Bytes.set_uint8 buf (off + 3) (v land 0xff) (* Minimal valid DB header (100 bytes) *) let db_header ~page_count = let buf = Bytes.make 100 '\000' in Bytes.blit_string magic 0 buf 0 16; write_u16_be buf 16 page_size; Bytes.set_uint8 buf 18 1; (* write version *) Bytes.set_uint8 buf 19 1; (* read version *) Bytes.set_uint8 buf 21 64; (* max embedded payload fraction *) Bytes.set_uint8 buf 22 32; (* min embedded payload fraction *) Bytes.set_uint8 buf 23 32; (* leaf payload fraction *) write_u32_be buf 28 page_count; Bytes.unsafe_to_string buf (* Write a file with the given pages (page 1 starts at offset 0) *) let write_db path pages = let data = String.concat "" (List.map (fun page -> let s = page in let padded = Bytes.make page_size '\000' in Bytes.blit_string s 0 padded 0 (min (String.length s) page_size); Bytes.unsafe_to_string padded) pages) in Eio.Path.save ~create:(`Or_truncate 0o644) path data let with_temp_hostile f = Eio_main.run @@ fun env -> let cwd = Eio.Stdenv.cwd env in let tmp = Eio.Path.(cwd / "_build" / "test_hostile") in (try Eio.Path.mkdirs ~exists_ok:true ~perm:0o755 tmp with Eio.Io _ -> ()); let name = Fmt.str "hostile_%d.db" (Random.int 1_000_000) in let path = Eio.Path.(tmp / name) in Eio.Switch.run @@ fun sw -> f sw path (* Hostile-input tests assert "must not hang or crash". Sqlite's documented failure modes are [Failure], [Invalid_argument], [Sys_error] (file-system errors), [End_of_file] (truncated databases) and [Sqlite.Unique_violation]. Anything else (asserts, fatal errors) is a bug we want to surface. *) let safe_exn = function | Failure _ | Invalid_argument _ | Sys_error _ | End_of_file | Sqlite.Unique_violation _ -> true | _ -> false let try_safely f = try ignore (f ()) with e when safe_exn e -> () (* Try to open and do basic operations; must not hang or crash *) let must_fail_or_succeed_safely sw path = match Sqlite.open_ ~sw path with | exception e when safe_exn e -> () (* clean failure on open *) | db -> (* If open succeeded, basic operations must not hang *) try_safely (fun () -> Sqlite.tables db); try_safely (fun () -> Sqlite.find db "nonexistent"); Sqlite.close db (* -- CVE-2019-19646 inspired: cyclic/self-referential pages -- *) let test_self_referential_page () = with_temp_hostile @@ fun sw path -> (* Page 1: sqlite_master with a table whose root points to itself *) let page1 = Bytes.make page_size '\000' in let hdr = db_header ~page_count:2 in Bytes.blit_string hdr 0 page1 0 100; (* sqlite_master leaf header at offset 100 *) Bytes.set_uint8 page1 100 0x0d; (* leaf table *) write_u16_be page1 103 0; (* 0 cells โ€” empty master *) write_u16_be page1 105 page_size; (* Page 2: interior table page pointing to itself *) let page2 = Bytes.make page_size '\000' in Bytes.set_uint8 page2 0 0x05; (* interior table *) write_u16_be page2 3 1; (* 1 cell *) write_u16_be page2 5 (page_size - 12); write_u32_be page2 8 2; (* right child = self *) (* Cell at end: left_child=2 (self), rowid=1 *) let cell_off = page_size - 12 in write_u32_be page2 cell_off 2; (* left child = page 2 = self *) Bytes.set_uint8 page2 (cell_off + 4) 1; (* rowid varint = 1 *) write_u16_be page2 12 (page_size - 12); (* cell pointer *) write_db path [ Bytes.unsafe_to_string page1; Bytes.unsafe_to_string page2 ]; must_fail_or_succeed_safely sw path (* -- CVE-2022-35737 inspired: oversized payload/varint -- *) let test_oversized_varint () = with_temp_hostile @@ fun sw path -> let page1 = Bytes.make page_size '\000' in let hdr = db_header ~page_count:1 in Bytes.blit_string hdr 0 page1 0 100; (* sqlite_master: leaf with 1 cell containing a huge payload_size varint *) Bytes.set_uint8 page1 100 0x0d; write_u16_be page1 103 1; (* 1 cell *) write_u16_be page1 105 200; write_u16_be page1 108 200; (* cell pointer at offset 200 *) (* Cell at 200: payload_size = 9-byte varint (max value) *) let cell_off = 200 in for i = 0 to 7 do Bytes.set_uint8 page1 (cell_off + i) 0xff done; Bytes.set_uint8 page1 (cell_off + 8) 0x01; (* 9th varint byte *) write_db path [ Bytes.unsafe_to_string page1 ]; must_fail_or_succeed_safely sw path (* -- CVE-2020-13434 inspired: size overflow in record encoding -- *) let test_record_header_overflow () = with_temp_hostile @@ fun sw path -> let page1 = Bytes.make page_size '\000' in let hdr = db_header ~page_count:1 in Bytes.blit_string hdr 0 page1 0 100; Bytes.set_uint8 page1 100 0x0d; write_u16_be page1 103 1; write_u16_be page1 105 300; write_u16_be page1 108 300; (* Cell: small payload_size (20), rowid=1, then a record header claiming huge header_size *) let off = 300 in Bytes.set_uint8 page1 off 20; (* payload_size varint = 20 *) Bytes.set_uint8 page1 (off + 1) 1; (* rowid varint = 1 *) (* Record: header_size varint = 255 (way bigger than 20-byte payload) *) Bytes.set_uint8 page1 (off + 2) 0x81; (* varint high byte *) Bytes.set_uint8 page1 (off + 3) 0x7f; (* varint low = 255 *) write_db path [ Bytes.unsafe_to_string page1 ]; must_fail_or_succeed_safely sw path (* -- CVE-2025-7709 inspired: malformed index B-tree -- *) let test_wrong_root_page_kind () = with_temp_hostile @@ fun sw path -> (* Page 2 is supposed to be a table but has index page kind *) let page1 = Bytes.make page_size '\000' in let hdr = db_header ~page_count:2 in Bytes.blit_string hdr 0 page1 0 100; Bytes.set_uint8 page1 100 0x0d; write_u16_be page1 103 0; write_u16_be page1 105 page_size; let page2 = Bytes.make page_size '\000' in (* Set page kind to leaf_index (0x0a) instead of leaf_table (0x0d) *) Bytes.set_uint8 page2 0 0x0a; write_u16_be page2 3 0; write_u16_be page2 5 page_size; write_db path [ Bytes.unsafe_to_string page1; Bytes.unsafe_to_string page2 ]; must_fail_or_succeed_safely sw path (* -- Root page beyond file -- *) let test_root_page_oob () = with_temp_hostile @@ fun sw path -> let page1 = Bytes.make page_size '\000' in let hdr = db_header ~page_count:1 in Bytes.blit_string hdr 0 page1 0 100; (* sqlite_master: leaf with a table entry pointing to page 999 *) Bytes.set_uint8 page1 100 0x0d; write_u16_be page1 103 1; let cell_start = 200 in write_u16_be page1 105 cell_start; write_u16_be page1 108 cell_start; (* Build a sqlite_master record: type=table, name=t, tbl=t, root=999, sql *) let sql = "CREATE TABLE t (x TEXT)" in let payload = Btree.Record.encode [ Btree.Record.Vtext "table"; Btree.Record.Vtext "t"; Btree.Record.Vtext "t"; Btree.Record.Vint 999L; Btree.Record.Vtext sql; ] in let payload_len = String.length payload in (* Cell: payload_size varint + rowid varint + payload *) Bytes.set_uint8 page1 cell_start payload_len; (* payload_size *) Bytes.set_uint8 page1 (cell_start + 1) 1; (* rowid = 1 *) Bytes.blit_string payload 0 page1 (cell_start + 2) payload_len; write_db path [ Bytes.unsafe_to_string page1 ]; (* Open should succeed (just reads master), but accessing the table should fail โ€” not crash *) match Sqlite.open_ ~sw path with | exception _ -> () | db -> try_safely (fun () -> Sqlite.fold_table db "t" ~init:() ~f:(fun _ _ () -> ())); Sqlite.close db (* -- Empty/garbage file -- *) let test_empty_file () = with_temp_hostile @@ fun sw path -> Eio.Path.save ~create:(`Or_truncate 0o644) path ""; must_fail_or_succeed_safely sw path let test_garbage_file () = with_temp_hostile @@ fun sw path -> Eio.Path.save ~create:(`Or_truncate 0o644) path (String.init 4096 (fun _ -> Char.chr (Random.int 256))); must_fail_or_succeed_safely sw path (* -- Cell pointer pointing into header -- *) let test_cell_pointer_in_header () = with_temp_hostile @@ fun sw path -> let page1 = Bytes.make page_size '\000' in let hdr = db_header ~page_count:1 in Bytes.blit_string hdr 0 page1 0 100; Bytes.set_uint8 page1 100 0x0d; write_u16_be page1 103 1; (* 1 cell *) write_u16_be page1 105 50; (* content starts inside header! *) write_u16_be page1 108 50; (* cell pointer into header area *) write_db path [ Bytes.unsafe_to_string page1 ]; must_fail_or_succeed_safely sw path (* -- Page count = 0 -- *) let test_zero_page_count () = with_temp_hostile @@ fun sw path -> let page1 = Bytes.make page_size '\000' in let hdr = db_header ~page_count:0 in Bytes.blit_string hdr 0 page1 0 100; write_db path [ Bytes.unsafe_to_string page1 ]; must_fail_or_succeed_safely sw path let hostile_cases = [ Alcotest.test_case "hostile: self-referential page" `Quick test_self_referential_page; Alcotest.test_case "hostile: oversized varint" `Quick test_oversized_varint; Alcotest.test_case "hostile: record header overflow" `Quick test_record_header_overflow; Alcotest.test_case "hostile: wrong root page kind" `Quick test_wrong_root_page_kind; Alcotest.test_case "hostile: root page oob" `Quick test_root_page_oob; Alcotest.test_case "hostile: empty file" `Quick test_empty_file; Alcotest.test_case "hostile: garbage file" `Quick test_garbage_file; Alcotest.test_case "hostile: cell pointer in header" `Quick test_cell_pointer_in_header; Alcotest.test_case "hostile: zero page count" `Quick test_zero_page_count; ] let suite = ( "sqlite", List.concat [ [ Alcotest.test_case "put/get" `Quick test_put_get; Alcotest.test_case "get missing" `Quick test_get_missing; Alcotest.test_case "put overwrite" `Quick test_put_overwrite; Alcotest.test_case "delete" `Quick test_delete; Alcotest.test_case "delete missing" `Quick test_delete_missing; Alcotest.test_case "mem" `Quick test_mem; Alcotest.test_case "iter" `Quick test_iter; Alcotest.test_case "fold" `Quick test_fold; ]; [ Alcotest.test_case "binary values" `Quick test_binary_values; Alcotest.test_case "empty value" `Quick test_empty_value; Alcotest.test_case "large value" `Quick test_large_value; ]; [ Alcotest.test_case "table basic" `Quick test_table_basic; Alcotest.test_case "table isolation" `Quick test_table_isolation; Alcotest.test_case "table mem/delete" `Quick test_table_mem_delete; Alcotest.test_case "table iter" `Quick test_table_iter; ]; [ Alcotest.test_case "sql injection key" `Quick test_sql_injection_key; Alcotest.test_case "sql injection value" `Quick test_sql_injection_value; Alcotest.test_case "table name validation" `Quick test_table_name_validation; Alcotest.test_case "valid table names" `Quick test_valid_table_names; ]; [ Alcotest.test_case "unicode keys" `Quick test_unicode_keys; Alcotest.test_case "unicode values" `Quick test_unicode_values; ]; [ Alcotest.test_case "sync" `Quick test_sync; Alcotest.test_case "persistence basic" `Quick test_persistence_basic; Alcotest.test_case "persistence with delete" `Quick test_persistence_with_delete; Alcotest.test_case "persistence tables" `Quick test_persistence_tables; ]; [ Alcotest.test_case "empty key" `Quick test_empty_key; Alcotest.test_case "key with nulls" `Quick test_key_with_nulls; Alcotest.test_case "long key" `Quick test_long_key; Alcotest.test_case "all byte values" `Quick test_all_byte_values; Alcotest.test_case "max int key length" `Quick test_max_int_key_length; ]; [ Alcotest.test_case "many keys" `Slow test_many_keys; Alcotest.test_case "many updates" `Quick test_many_updates; Alcotest.test_case "interleaved ops" `Quick test_interleaved_operations; Alcotest.test_case "many tables" `Quick test_many_tables; ]; [ Alcotest.test_case "overflow key length" `Quick test_cve_key_overflow; Alcotest.test_case "boundary conditions" `Quick test_cve_like_boundary_conditions; ]; [ Alcotest.test_case "parse simple" `Quick test_parse_simple; Alcotest.test_case "parse integer pk" `Quick test_parse_integer_primary_key; Alcotest.test_case "parse if not exists" `Quick test_parse_if_not_exists; Alcotest.test_case "parse nested parens" `Quick test_parse_nested_parens; Alcotest.test_case "parse table constraints" `Quick test_parse_table_constraints; Alcotest.test_case "parse no type" `Quick test_parse_no_type; Alcotest.test_case "parse autoincrement" `Quick test_parse_autoincrement; Alcotest.test_case "parse invalid" `Quick test_parse_invalid; Alcotest.test_case "open no kv" `Quick test_open_no_kv; Alcotest.test_case "read generic table" `Quick test_read_generic_table; Alcotest.test_case "integer primary key" `Quick test_integer_primary_key; Alcotest.test_case "tables lists all" `Quick test_tables_lists_all; Alcotest.test_case "fold table" `Quick test_fold_table; ]; [ Alcotest.test_case "spec header magic" `Quick test_db_header_magic; Alcotest.test_case "spec header values" `Quick test_db_header_fixed_values; Alcotest.test_case "spec change counter" `Quick test_db_header_change_counter; Alcotest.test_case "spec page1 btree" `Quick test_page1_btree_header; Alcotest.test_case "spec schema format" `Quick test_sqlite_schema_format; Alcotest.test_case "spec overflow values" `Quick test_sqlite_overflow_values; Alcotest.test_case "spec overflow persist" `Quick test_sqlite_overflow_persistence; ]; [ Alcotest.test_case "create and insert" `Quick test_create_and_insert; Alcotest.test_case "insert multiple rows" `Quick test_insert_multiple_rows; Alcotest.test_case "insert all types" `Quick test_insert_all_types; Alcotest.test_case "insert with null" `Quick test_insert_with_null; Alcotest.test_case "insert fewer values" `Quick test_insert_fewer_values; Alcotest.test_case "insert integer pk" `Quick test_insert_integer_primary_key; Alcotest.test_case "insert explicit rowid" `Quick test_insert_explicit_rowid; Alcotest.test_case "insert persistence" `Quick test_insert_persistence; Alcotest.test_case "insert tables listed" `Quick test_insert_tables_lists_created; Alcotest.test_case "insert with kv" `Quick test_insert_coexists_with_kv; Alcotest.test_case "insert nonexistent" `Quick test_insert_nonexistent_table; ]; [ Alcotest.test_case "unique column-level" `Quick test_unique_column_level; Alcotest.test_case "unique table-level" `Quick test_unique_table_level; Alcotest.test_case "unique composite" `Quick test_unique_composite; Alcotest.test_case "unique allows distinct" `Quick test_unique_allows_distinct; Alcotest.test_case "unique persists" `Quick test_unique_persists; Alcotest.test_case "unique named constraint" `Quick test_unique_named_constraint; Alcotest.test_case "unique allows multiple NULLs" `Quick test_unique_allows_multiple_nulls; Alcotest.test_case "unique composite NULL" `Quick test_unique_composite_null; ]; [ Alcotest.test_case "transaction commit" `Quick test_transaction_commit; Alcotest.test_case "transaction rollback" `Quick test_transaction_rollback; Alcotest.test_case "transaction rollback kv" `Quick test_transaction_rollback_kv; Alcotest.test_case "transaction rollback unique" `Quick test_transaction_rollback_unique; Alcotest.test_case "transaction nested failure" `Quick test_transaction_nested_failure; ]; [ Alcotest.test_case "text pk not rowid alias" `Quick test_text_pk_not_alias; Alcotest.test_case "text pk persistence" `Quick test_text_primary_key_persistence; Alcotest.test_case "real pk not rowid alias" `Quick test_real_pk_not_alias; ]; [ Alcotest.test_case "rollback create table" `Quick test_transaction_rollback_create_table; Alcotest.test_case "rollback insert generic" `Quick test_transaction_rollback_insert_generic; Alcotest.test_case "rollback schema persistence" `Quick test_transaction_rollback_schema_persistence; ]; [ Alcotest.test_case "duplicate explicit rowid" `Quick test_duplicate_explicit_rowid; Alcotest.test_case "explicit rowid next auto" `Quick test_explicit_rowid_next_auto; ]; [ Alcotest.test_case "unique index survives close" `Quick test_unique_index_survives_close; Alcotest.test_case "multi index survives close" `Quick test_multiple_indexes_survive_close; Alcotest.test_case "kv with generic survives close" `Quick test_kv_survives_close_generic; ]; [ Alcotest.test_case "text pk rejects dupes" `Quick test_text_pk_rejects_dups; Alcotest.test_case "composite pk rejects dupes" `Quick test_composite_pk_rejects_dups; Alcotest.test_case "text pk enforced after reopen" `Quick test_text_primary_key_persists; ]; [ Alcotest.test_case "rollback named table create" `Quick test_rollback_named_create; ]; [ Alcotest.test_case "dup rowid index consistency" `Quick test_duplicate_rowid_index_consistency; Alcotest.test_case "dup rowid delete cleanup" `Quick test_duplicate_rowid_preserves_delete; ]; [ Alcotest.test_case "create table duplicate name" `Quick test_create_table_duplicate_name; Alcotest.test_case "named table kv collision" `Quick test_named_kv_collision; Alcotest.test_case "quoted table name" `Quick test_quoted_table_name; ]; hostile_cases; ] )