objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Result return type for SQLite blockstore

futurGH d98cc7aa a4278b11

+64 -36
+60 -36
pegasus/lib/sqlite_blockstore.ml
··· 1 + open Util.Syntax 1 2 module Block_map = Mist.Storage.Block_map 3 + 4 + let caqti_result_exn = function 5 + | Ok x -> 6 + Ok x 7 + | Error caqti_err -> 8 + Error (Caqti_error.Exn caqti_err) 2 9 3 10 module Cid : Rapper.CUSTOM with type t = Cid.t = struct 4 11 type t = Cid.t ··· 25 32 26 33 type block = {cid: Cid.t; data: Blob.t} 27 34 28 - let ( let*! ) m f = 29 - match%lwt m with Ok x -> f x | Error e -> failwith (Caqti_error.show e) 30 - 31 35 module Queries = struct 32 36 let create_table = 33 37 [%rapper 34 38 execute 35 - {sql| CREATE TABLE IF NOT EXISTS blocks ( 39 + {sql| CREATE TABLE IF NOT EXISTS mst ( 36 40 cid TEXT NOT NULL PRIMARY KEY, 37 41 data BLOB NOT NULL 38 42 ); ··· 42 46 let get_block cid = 43 47 [%rapper 44 48 get_opt 45 - {sql| SELECT @Cid{cid}, @Blob{data} FROM blocks WHERE cid = %Cid{cid} |sql} 49 + {sql| SELECT @Cid{cid}, @Blob{data} FROM mst WHERE cid = %Cid{cid} |sql} 46 50 record_out] 47 51 ~cid 48 52 49 53 let get_blocks cids = 50 54 [%rapper 51 55 get_many 52 - {sql| SELECT @Cid{cid}, @Blob{data} FROM blocks WHERE cid IN (%list{%Cid{cids}}) |sql} 56 + {sql| SELECT @Cid{cid}, @Blob{data} FROM mst WHERE cid IN (%list{%Cid{cids}}) |sql} 53 57 record_out] 54 58 ~cids 55 59 56 60 let has_block cid = 57 61 [%rapper 58 - get_opt {sql| SELECT @Cid{cid} FROM blocks WHERE cid = %Cid{cid} |sql}] 62 + get_opt {sql| SELECT @Cid{cid} FROM mst WHERE cid = %Cid{cid} |sql}] 59 63 ~cid 60 64 61 65 let put_block cid block = 62 66 [%rapper 63 - execute 64 - {sql| INSERT INTO blocks (cid, data) VALUES (%Cid{cid}, %Blob{block}) |sql}] 67 + get_opt 68 + {sql| INSERT INTO blocks (cid, data) VALUES (%Cid{cid}, %Blob{block}) ON CONFLICT DO NOTHING RETURNING @Cid{cid} |sql}] 65 69 ~cid ~block 66 70 67 71 let delete_block cid = ··· 69 73 70 74 let delete_blocks cids = 71 75 [%rapper 72 - execute {sql| DELETE FROM blocks WHERE cid IN (%list{%Cid{cids}}) |sql}] 76 + get_many 77 + {sql| DELETE FROM blocks WHERE cid IN (%list{%Cid{cids}}) RETURNING @Cid{cid} |sql}] 73 78 ~cids 74 79 end 75 80 ··· 80 85 81 86 type t = {connection: Caqti_lwt.connection} 82 87 83 - let multi_query queries = 84 - let*! () = C.start () in 85 - match%lwt 86 - Lwt_list.fold_left_s 87 - (fun acc query -> Lwt_result.bind (query ()) (fun () -> Lwt.return acc)) 88 - (Ok ()) queries 89 - with 90 - | Ok () -> 91 - C.commit () 92 - | Error e -> 93 - let*! () = C.rollback () in 94 - Lwt.return_error e 88 + let multi_query (queries : (unit -> ('a, Caqti_error.t) Lwt_result.t) list) = 89 + let$! () = C.start () in 90 + let is_ignorable_error e = 91 + match (e : Caqti_error.t) with 92 + | `Request_failed qe | `Response_failed qe -> ( 93 + match Caqti_error.cause (`Request_failed qe) with 94 + | `Not_null_violation | `Unique_violation -> 95 + true 96 + | _ -> 97 + false ) 98 + | _ -> 99 + false 100 + in 101 + let%lwt results = 102 + Lwt_list.map_s 103 + (fun query -> 104 + match%lwt query () with 105 + | Ok _ -> 106 + Lwt.return true 107 + | Error e -> 108 + if is_ignorable_error e then Lwt.return false 109 + else failwith (Caqti_error.show e) ) 110 + queries 111 + in 112 + let inserted = 113 + List.fold_left 114 + (fun acc success -> if success then acc + 1 else acc) 115 + 0 results 116 + in 117 + Lwt.return_ok inserted 95 118 96 119 let init t = 97 - let*! () = 120 + let$! () = 98 121 [%rapper execute {sql| PRAGMA journal_mode=WAL; |sql} syntax_off] 99 122 () t.connection 100 123 in 101 - let*! () = 124 + let$! () = 102 125 [%rapper execute {sql| PRAGMA synchronous=NORMAL; |sql} syntax_off] 103 126 () t.connection 104 127 in 105 - let*! () = Queries.create_table t.connection in 128 + let$! () = Queries.create_table t.connection in 106 129 Lwt.return_unit 107 130 108 131 let get_bytes t cid = 109 - let*! b_opt = Queries.get_block cid t.connection in 132 + let$! b_opt = Queries.get_block cid t.connection in 110 133 match b_opt with 111 134 | Some {data; _} -> 112 135 Lwt.return_some data ··· 114 137 Lwt.return_none 115 138 116 139 let get_blocks t cids = 117 - let*! blocks = Queries.get_blocks cids t.connection in 140 + let$! blocks = Queries.get_blocks cids t.connection in 118 141 Lwt.return 119 142 (List.fold_left 120 143 (fun (acc : Block_map.with_missing) cid -> ··· 127 150 cids ) 128 151 129 152 let has t cid = 130 - let*! b_opt = Queries.has_block cid t.connection in 153 + let$! b_opt = Queries.has_block cid t.connection in 131 154 match b_opt with Some _ -> Lwt.return true | None -> Lwt.return false 132 155 133 156 let put_block t cid block = 134 - let*! () = Queries.put_block cid block t.connection in 135 - Lwt.return_unit 157 + Queries.put_block cid block t.connection 158 + |> Lwt.map caqti_result_exn 159 + |> Lwt.map (Result.map (function Some _ -> true | None -> false)) 136 160 137 161 let put_many t bm = 138 - let*! () = 162 + let$! inserted = 139 163 multi_query 140 164 (List.map 141 165 (fun (cid, block) -> 142 166 fun () -> Queries.put_block cid block t.connection ) 143 167 (Block_map.entries bm) ) 144 168 in 145 - Lwt.return_unit 169 + Lwt.return_ok inserted 146 170 147 171 let delete_block t cid = 148 - let*! () = Queries.delete_block cid t.connection in 149 - Lwt.return_unit 172 + let$! () = Queries.delete_block cid t.connection in 173 + Lwt.return_ok true 150 174 151 175 let delete_many t cids = 152 - let*! () = Queries.delete_blocks cids t.connection in 153 - Lwt.return_unit 176 + let$! deleted = Queries.delete_blocks cids t.connection in 177 + Lwt.return_ok (List.length deleted) 154 178 end 155 179 156 180 let connect db_uri =
+4
pegasus/lib/util.ml
··· 1 + module Syntax = struct 2 + let ( let$! ) m f = 3 + match%lwt m with Ok x -> f x | Error e -> failwith (Caqti_error.show e) 4 + end