···11open Util.Syntax
22+include Caqti_type.Std
33+include Caqti_request.Infix
44+include Lwt_result.Syntax
25module Block_map = Mist.Storage.Block_map
33-44-let caqti_result_exn = function
55- | Ok x ->
66- Ok x
77- | Error caqti_err ->
88- Error (Caqti_error.Exn caqti_err)
96107module Cid : Rapper.CUSTOM with type t = Cid.t = struct
118 type t = Cid.t
···2926 in
3027 Caqti_type.(custom ~encode ~decode string)
3128end
2929+3030+type t = {connection: Caqti_lwt.connection}
32313332type block = {cid: Cid.t; data: Blob.t}
3433···6564 let put_block cid block =
6665 [%rapper
6766 get_opt
6868- {sql| INSERT INTO blocks (cid, data) VALUES (%Cid{cid}, %Blob{block}) ON CONFLICT DO NOTHING RETURNING @Cid{cid} |sql}]
6767+ {sql| INSERT INTO mst (cid, data) VALUES (%Cid{cid}, %Blob{block}) ON CONFLICT DO NOTHING RETURNING @Cid{cid} |sql}]
6968 ~cid ~block
70697170 let delete_block cid =
7272- [%rapper execute {sql| DELETE FROM blocks WHERE cid = %Cid{cid} |sql}] ~cid
7171+ [%rapper execute {sql| DELETE FROM mst WHERE cid = %Cid{cid} |sql}] ~cid
73727473 let delete_blocks cids =
7574 [%rapper
7675 get_many
7777- {sql| DELETE FROM blocks WHERE cid IN (%list{%Cid{cids}}) RETURNING @Cid{cid} |sql}]
7676+ {sql| DELETE FROM mst WHERE cid IN (%list{%Cid{cids}}) RETURNING @Cid{cid} |sql}]
7877 ~cids
7978end
80798181-module S (C : Caqti_lwt.CONNECTION) : Mist.Storage.Writable_blockstore = struct
8282- include Caqti_type.Std
8383- include Caqti_request.Infix
8484- include Lwt_result.Syntax
8585-8686- type t = {connection: Caqti_lwt.connection}
8080+let caqti_result_exn = function
8181+ | Ok x ->
8282+ Ok x
8383+ | Error caqti_err ->
8484+ Error (Caqti_error.Exn caqti_err)
87858888- let multi_query (queries : (unit -> ('a, Caqti_error.t) Lwt_result.t) list) =
8989- let$! () = C.start () in
9090- let is_ignorable_error e =
9191- match (e : Caqti_error.t) with
9292- | `Request_failed qe | `Response_failed qe -> (
9393- match Caqti_error.cause (`Request_failed qe) with
9494- | `Not_null_violation | `Unique_violation ->
9595- true
9696- | _ ->
9797- false )
8686+let multi_query connection
8787+ (queries : (unit -> ('a, Caqti_error.t) Lwt_result.t) list) =
8888+ let module C = (val connection : Caqti_lwt.CONNECTION) in
8989+ let$! () = C.start () in
9090+ let is_ignorable_error e =
9191+ match (e : Caqti_error.t) with
9292+ | `Request_failed qe | `Response_failed qe -> (
9393+ match Caqti_error.cause (`Request_failed qe) with
9494+ | `Not_null_violation | `Unique_violation ->
9595+ true
9896 | _ ->
9999- false
100100- in
101101- let%lwt results =
102102- Lwt_list.map_s
103103- (fun query ->
104104- match%lwt query () with
9797+ false )
9898+ | _ ->
9999+ false
100100+ in
101101+ let rec aux acc queries =
102102+ match acc with
103103+ | Error e ->
104104+ Lwt.return_error e
105105+ | Ok count -> (
106106+ match queries with
107107+ | [] ->
108108+ Lwt.return (Ok count)
109109+ | query :: rest -> (
110110+ let%lwt result = query () in
111111+ match result with
105112 | Ok _ ->
106106- Lwt.return true
113113+ aux (Ok (count + 1)) rest
107114 | Error e ->
108108- if is_ignorable_error e then Lwt.return false
109109- else failwith (Caqti_error.show e) )
110110- queries
111111- in
112112- let inserted =
113113- List.fold_left
114114- (fun acc success -> if success then acc + 1 else acc)
115115- 0 results
116116- in
117117- Lwt.return_ok inserted
115115+ if is_ignorable_error e then aux (Ok count) rest
116116+ else Lwt.return_error e ) )
117117+ in
118118+ aux (Ok 0) queries
118119119119- let init t =
120120- let$! () =
121121- [%rapper execute {sql| PRAGMA journal_mode=WAL; |sql} syntax_off]
122122- () t.connection
123123- in
124124- let$! () =
125125- [%rapper execute {sql| PRAGMA synchronous=NORMAL; |sql} syntax_off]
126126- () t.connection
127127- in
128128- let$! () = Queries.create_table t.connection in
129129- Lwt.return_unit
120120+let get_bytes t cid =
121121+ let$! b_opt = Queries.get_block cid t.connection in
122122+ match b_opt with
123123+ | Some {data; _} ->
124124+ Lwt.return_some data
125125+ | None ->
126126+ Lwt.return_none
130127131131- let get_bytes t cid =
132132- let$! b_opt = Queries.get_block cid t.connection in
133133- match b_opt with
134134- | Some {data; _} ->
135135- Lwt.return_some data
136136- | None ->
137137- Lwt.return_none
128128+let get_blocks t cids =
129129+ let$! blocks = Queries.get_blocks cids t.connection in
130130+ Lwt.return
131131+ (List.fold_left
132132+ (fun (acc : Block_map.with_missing) cid ->
133133+ match List.find_opt (fun b -> b.cid = cid) blocks with
134134+ | Some {data; _} ->
135135+ {acc with blocks= Block_map.set cid data acc.blocks}
136136+ | None ->
137137+ {acc with missing= cid :: acc.missing} )
138138+ {blocks= Block_map.empty; missing= []}
139139+ cids )
138140139139- let get_blocks t cids =
140140- let$! blocks = Queries.get_blocks cids t.connection in
141141- Lwt.return
142142- (List.fold_left
143143- (fun (acc : Block_map.with_missing) cid ->
144144- match List.find_opt (fun b -> b.cid = cid) blocks with
145145- | Some {data; _} ->
146146- {acc with blocks= Block_map.set cid data acc.blocks}
147147- | None ->
148148- {acc with missing= cid :: acc.missing} )
149149- {blocks= Block_map.empty; missing= []}
150150- cids )
141141+let has t cid =
142142+ let$! b_opt = Queries.has_block cid t.connection in
143143+ match b_opt with Some _ -> Lwt.return true | None -> Lwt.return false
151144152152- let has t cid =
153153- let$! b_opt = Queries.has_block cid t.connection in
154154- match b_opt with Some _ -> Lwt.return true | None -> Lwt.return false
155155-156156- let put_block t cid block =
157157- Queries.put_block cid block t.connection
158158- |> Lwt.map caqti_result_exn
159159- |> Lwt.map (Result.map (function Some _ -> true | None -> false))
145145+let put_block t cid block =
146146+ Queries.put_block cid block t.connection
147147+ |> Lwt.map caqti_result_exn
148148+ |> Lwt.map (Result.map (function Some _ -> true | None -> false))
160149161161- let put_many t bm =
162162- let$! inserted =
163163- multi_query
164164- (List.map
165165- (fun (cid, block) ->
166166- fun () -> Queries.put_block cid block t.connection )
167167- (Block_map.entries bm) )
168168- in
169169- Lwt.return_ok inserted
150150+let put_many t bm =
151151+ let$! inserted =
152152+ multi_query t.connection
153153+ (List.map
154154+ (fun (cid, block) -> fun () -> Queries.put_block cid block t.connection)
155155+ (Block_map.entries bm) )
156156+ in
157157+ Lwt.return_ok inserted
170158171171- let delete_block t cid =
172172- let$! () = Queries.delete_block cid t.connection in
173173- Lwt.return_ok true
159159+let delete_block t cid =
160160+ let$! () = Queries.delete_block cid t.connection in
161161+ Lwt.return_ok true
174162175175- let delete_many t cids =
176176- let$! deleted = Queries.delete_blocks cids t.connection in
177177- Lwt.return_ok (List.length deleted)
178178-end
163163+let delete_many t cids =
164164+ let$! deleted = Queries.delete_blocks cids t.connection in
165165+ Lwt.return_ok (List.length deleted)
179166180167let connect db_uri =
181181- match%lwt Caqti_lwt.connect (Uri.of_string db_uri) with
182182- | Ok c ->
183183- let module C = (val c : Caqti_lwt.CONNECTION) in
184184- let module Store = S (C) in
185185- Lwt.return (module Store : Mist.Storage.Writable_blockstore)
186186- | Error e ->
187187- failwith (Caqti_error.show e)
168168+ let%lwt connection = Util.connect_sqlite db_uri in
169169+ let$! () = Queries.create_table connection in
170170+ Lwt.return {connection}
+5
pegasus/lib/sqlite_blockstore.mli
···11+type t = {connection: Caqti_lwt.connection}
22+33+include Mist.Storage.Writable_blockstore with type t := t
44+55+val connect : string -> t Lwt.t
+15-1
pegasus/lib/util.ml
···11module Syntax = struct
22 let ( let$! ) m f =
33- match%lwt m with Ok x -> f x | Error e -> failwith (Caqti_error.show e)
33+ match%lwt m with Ok x -> f x | Error e -> raise (Caqti_error.Exn e)
44end
55+66+let connect_sqlite db_uri =
77+ let open Syntax in
88+ match%lwt Caqti_lwt.connect (Uri.of_string db_uri) with
99+ | Ok c ->
1010+ let$! () =
1111+ [%rapper execute {sql| PRAGMA journal_mode=WAL; |sql} syntax_off] () c
1212+ in
1313+ let$! () =
1414+ [%rapper execute {sql| PRAGMA synchronous=NORMAL; |sql} syntax_off] () c
1515+ in
1616+ Lwt.return c
1717+ | Error e ->
1818+ raise (Caqti_error.Exn e)