···66 | `LexArray of value Array.t
77 | `LexMap of value StringMap.t ]
8899-type repo_record = value StringMap.t
1010-119let rec to_ipld (v : value) : Dag_cbor.value =
1210 match v with
1311 | `BlobRef r -> (
···7876 let cid = Cid.create Dcbor encoded in
7977 (cid, encoded)
80787979+let of_yojson (v : Yojson.Safe.t) : value = of_ipld (Dag_cbor.of_yojson v)
8080+8181+let to_yojson (v : value) : Yojson.Safe.t = Dag_cbor.to_yojson (to_ipld v)
8282+8383+type repo_record =
8484+ (value StringMap.t
8585+ [@of_yojson
8686+ fun v ->
8787+ match of_yojson v with
8888+ | `LexMap m ->
8989+ Ok m
9090+ | _ ->
9191+ Error "decoded non-map value"]
9292+ [@to_yojson fun v -> to_yojson (`LexMap v)] )
9393+[@@deriving yojson]
9494+8195let of_cbor encoded : repo_record =
8296 let decoded = Dag_cbor.decode encoded in
8397 match of_ipld decoded with
···8599 m
86100 | _ ->
87101 raise (Failure "Decoded non-record value")
8888-8989-let of_yojson (v : Yojson.Safe.t) : value = of_ipld (Dag_cbor.of_yojson v)
9090-9191-let to_yojson (v : value) : Yojson.Safe.t = Dag_cbor.to_yojson (to_ipld v)
+99
pegasus/lib/mst_store.ml
···11+open Util.Rapper
22+open Util.Syntax
33+module Block_map = Mist.Storage.Block_map
44+55+type t = Caqti_lwt.connection
66+77+type block = {cid: Cid.t; data: Blob.t}
88+99+module Queries = struct
1010+ let create_table =
1111+ [%rapper
1212+ execute
1313+ {sql| CREATE TABLE IF NOT EXISTS mst (
1414+ cid TEXT NOT NULL PRIMARY KEY,
1515+ data BLOB NOT NULL
1616+ );
1717+ |sql}]
1818+ ()
1919+2020+ let get_block cid =
2121+ [%rapper
2222+ get_opt
2323+ {sql| SELECT @CID{cid}, @Blob{data} FROM mst WHERE cid = %CID{cid} |sql}
2424+ record_out]
2525+ ~cid
2626+2727+ let get_blocks cids =
2828+ [%rapper
2929+ get_many
3030+ {sql| SELECT @CID{cid}, @Blob{data} FROM mst WHERE cid IN (%list{%CID{cids}}) |sql}
3131+ record_out]
3232+ ~cids
3333+3434+ let has_block cid =
3535+ [%rapper
3636+ get_opt {sql| SELECT @CID{cid} FROM mst WHERE cid = %CID{cid} |sql}]
3737+ ~cid
3838+3939+ let put_block cid block =
4040+ [%rapper
4141+ get_opt
4242+ {sql| INSERT INTO mst (cid, data) VALUES (%CID{cid}, %Blob{block}) ON CONFLICT DO NOTHING RETURNING @CID{cid} |sql}]
4343+ ~cid ~block
4444+4545+ let delete_block cid =
4646+ [%rapper execute {sql| DELETE FROM mst WHERE cid = %CID{cid} |sql}] ~cid
4747+4848+ let delete_blocks cids =
4949+ [%rapper
5050+ get_many
5151+ {sql| DELETE FROM mst WHERE cid IN (%list{%CID{cids}}) RETURNING @CID{cid} |sql}]
5252+ ~cids
5353+end
5454+5555+let init connection = Queries.create_table connection
5656+5757+let get_bytes t cid =
5858+ Queries.get_block cid t
5959+ >$! function
6060+ | Some {data; _} ->
6161+ Lwt.return_some data
6262+ | None ->
6363+ Lwt.return_none
6464+6565+let get_blocks t cids =
6666+ let$! blocks = Queries.get_blocks cids t in
6767+ Lwt.return
6868+ (List.fold_left
6969+ (fun (acc : Block_map.with_missing) cid ->
7070+ match List.find_opt (fun b -> b.cid = cid) blocks with
7171+ | Some {data; _} ->
7272+ {acc with blocks= Block_map.set cid data acc.blocks}
7373+ | None ->
7474+ {acc with missing= cid :: acc.missing} )
7575+ {blocks= Block_map.empty; missing= []}
7676+ cids )
7777+7878+let has t cid =
7979+ Queries.has_block cid t
8080+ >$! function Some _ -> Lwt.return true | None -> Lwt.return false
8181+8282+let put_block t cid block =
8383+ Queries.put_block cid block t
8484+ |> Lwt.map Util.caqti_result_exn
8585+ |> Lwt.map (Result.map (function Some _ -> true | None -> false))
8686+8787+let put_many t bm =
8888+ Util.multi_query t
8989+ (List.map
9090+ (fun (cid, block) -> fun () -> Queries.put_block cid block t)
9191+ (Block_map.entries bm) )
9292+ >$! Lwt.return_ok
9393+9494+let delete_block t cid =
9595+ let$! () = Queries.delete_block cid t in
9696+ Lwt.return_ok true
9797+9898+let delete_many t cids =
9999+ Queries.delete_blocks cids t >$! List.length |> Lwt.return_ok
+74
pegasus/lib/record_store.ml
···11+open Lwt.Infix
22+open Util.Rapper
33+open Util.Syntax
44+module Lex = Mist.Lex
55+module Tid = Mist.Tid
66+77+type t = Caqti_lwt.connection
88+99+type record = {path: string; cid: Cid.t; value: Lex.repo_record; since: Tid.t}
1010+1111+module Queries = struct
1212+ let create_table =
1313+ [%rapper
1414+ execute
1515+ {sql| CREATE TABLE IF NOT EXISTS records (
1616+ path TEXT NOT NULL PRIMARY KEY,
1717+ cid TEXT NOT NULL,
1818+ data BLOB NOT NULL,
1919+ since TEXT NOT NULL
2020+ );
2121+ |sql}]
2222+ ()
2323+2424+ let get_record_by_path =
2525+ [%rapper
2626+ get_opt
2727+ {sql| SELECT @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path = %string{path}
2828+ |sql}]
2929+3030+ let get_record_by_cid =
3131+ [%rapper
3232+ get_opt
3333+ {sql| SELECT @string{path}, @Blob{data}, @string{since} FROM records WHERE cid = %CID{cid}
3434+ |sql}]
3535+3636+ let list_records =
3737+ [%rapper
3838+ get_many
3939+ {sql| SELECT @string{path}, @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path LIKE %string{collection}/%
4040+ |sql}]
4141+4242+ let write_record =
4343+ [%rapper
4444+ execute
4545+ {sql| INSERT INTO records (path, cid, data, since)
4646+ VALUES (%string{path}, %CID{cid}, %Blob{data}, %string{since})
4747+ |sql}]
4848+end
4949+5050+let init connection = Queries.create_table connection
5151+5252+let get_record_by_path t path : record option Lwt.t =
5353+ Queries.get_record_by_path ~path t
5454+ >$! Option.map (fun (cid, data, since) ->
5555+ {path; cid; value= Lex.of_cbor data; since} )
5656+ >>= Lwt.return
5757+5858+let get_record_by_cid t cid : record option Lwt.t =
5959+ Queries.get_record_by_cid ~cid t
6060+ >$! Option.map (fun (path, data, since) ->
6161+ {path; cid; value= Lex.of_cbor data; since} )
6262+ >>= Lwt.return
6363+6464+let list_records t collection : record list Lwt.t =
6565+ Queries.list_records ~collection t
6666+ >$! List.map (fun (path, cid, data, since) ->
6767+ {path; cid; value= Lex.of_cbor data; since} )
6868+ >>= Lwt.return
6969+7070+let write_record t record path : unit Lwt.t =
7171+ let cid, data = Lex.to_cbor_block record in
7272+ let since = Tid.now () in
7373+ let$! () = Queries.write_record ~path ~cid ~data ~since t in
7474+ Lwt.return_unit
-145
pegasus/lib/sqlite_blockstore.ml
···11-open Util.Rapper
22-open Util.Syntax
33-module Block_map = Mist.Storage.Block_map
44-55-type t = {connection: Caqti_lwt.connection}
66-77-type block = {cid: Cid.t; data: Blob.t}
88-99-module Queries = struct
1010- let create_table =
1111- [%rapper
1212- execute
1313- {sql| CREATE TABLE IF NOT EXISTS mst (
1414- cid TEXT NOT NULL PRIMARY KEY,
1515- data BLOB NOT NULL
1616- );
1717- |sql}]
1818- ()
1919-2020- let get_block cid =
2121- [%rapper
2222- get_opt
2323- {sql| SELECT @Cid{cid}, @Blob{data} FROM mst WHERE cid = %Cid{cid} |sql}
2424- record_out]
2525- ~cid
2626-2727- let get_blocks cids =
2828- [%rapper
2929- get_many
3030- {sql| SELECT @Cid{cid}, @Blob{data} FROM mst WHERE cid IN (%list{%Cid{cids}}) |sql}
3131- record_out]
3232- ~cids
3333-3434- let has_block cid =
3535- [%rapper
3636- get_opt {sql| SELECT @Cid{cid} FROM mst WHERE cid = %Cid{cid} |sql}]
3737- ~cid
3838-3939- let put_block cid block =
4040- [%rapper
4141- get_opt
4242- {sql| INSERT INTO mst (cid, data) VALUES (%Cid{cid}, %Blob{block}) ON CONFLICT DO NOTHING RETURNING @Cid{cid} |sql}]
4343- ~cid ~block
4444-4545- let delete_block cid =
4646- [%rapper execute {sql| DELETE FROM mst WHERE cid = %Cid{cid} |sql}] ~cid
4747-4848- let delete_blocks cids =
4949- [%rapper
5050- get_many
5151- {sql| DELETE FROM mst WHERE cid IN (%list{%Cid{cids}}) RETURNING @Cid{cid} |sql}]
5252- ~cids
5353-end
5454-5555-let caqti_result_exn = function
5656- | Ok x ->
5757- Ok x
5858- | Error caqti_err ->
5959- Error (Caqti_error.Exn caqti_err)
6060-6161-let multi_query connection
6262- (queries : (unit -> ('a, Caqti_error.t) Lwt_result.t) list) =
6363- let module C = (val connection : Caqti_lwt.CONNECTION) in
6464- let$! () = C.start () in
6565- let is_ignorable_error e =
6666- match (e : Caqti_error.t) with
6767- | `Request_failed qe | `Response_failed qe -> (
6868- match Caqti_error.cause (`Request_failed qe) with
6969- | `Not_null_violation | `Unique_violation ->
7070- true
7171- | _ ->
7272- false )
7373- | _ ->
7474- false
7575- in
7676- let rec aux acc queries =
7777- match acc with
7878- | Error e ->
7979- Lwt.return_error e
8080- | Ok count -> (
8181- match queries with
8282- | [] ->
8383- Lwt.return (Ok count)
8484- | query :: rest -> (
8585- let%lwt result = query () in
8686- match result with
8787- | Ok _ ->
8888- aux (Ok (count + 1)) rest
8989- | Error e ->
9090- if is_ignorable_error e then aux (Ok count) rest
9191- else Lwt.return_error e ) )
9292- in
9393- aux (Ok 0) queries
9494-9595-let connect db_uri =
9696- let%lwt connection = Util.connect_sqlite db_uri in
9797- let$! () = Queries.create_table connection in
9898- Lwt.return {connection}
9999-100100-let get_bytes t cid =
101101- let$! b_opt = Queries.get_block cid t.connection in
102102- match b_opt with
103103- | Some {data; _} ->
104104- Lwt.return_some data
105105- | None ->
106106- Lwt.return_none
107107-108108-let get_blocks t cids =
109109- let$! blocks = Queries.get_blocks cids t.connection in
110110- Lwt.return
111111- (List.fold_left
112112- (fun (acc : Block_map.with_missing) cid ->
113113- match List.find_opt (fun b -> b.cid = cid) blocks with
114114- | Some {data; _} ->
115115- {acc with blocks= Block_map.set cid data acc.blocks}
116116- | None ->
117117- {acc with missing= cid :: acc.missing} )
118118- {blocks= Block_map.empty; missing= []}
119119- cids )
120120-121121-let has t cid =
122122- let$! b_opt = Queries.has_block cid t.connection in
123123- match b_opt with Some _ -> Lwt.return true | None -> Lwt.return false
124124-125125-let put_block t cid block =
126126- Queries.put_block cid block t.connection
127127- |> Lwt.map caqti_result_exn
128128- |> Lwt.map (Result.map (function Some _ -> true | None -> false))
129129-130130-let put_many t bm =
131131- let$! inserted =
132132- multi_query t.connection
133133- (List.map
134134- (fun (cid, block) -> fun () -> Queries.put_block cid block t.connection)
135135- (Block_map.entries bm) )
136136- in
137137- Lwt.return_ok inserted
138138-139139-let delete_block t cid =
140140- let$! () = Queries.delete_block cid t.connection in
141141- Lwt.return_ok true
142142-143143-let delete_many t cids =
144144- let$! deleted = Queries.delete_blocks cids t.connection in
145145- Lwt.return_ok (List.length deleted)
-5
pegasus/lib/sqlite_blockstore.mli
···11-type t = {connection: Caqti_lwt.connection}
22-33-include Mist.Storage.Writable_blockstore with type t := t
44-55-val connect : string -> t Lwt.t
+54-1
pegasus/lib/util.ml
···11module Syntax = struct
22+ (* unwraps an Lwt result, raising an exception if there's an error *)
23 let ( let$! ) m f =
34 match%lwt m with Ok x -> f x | Error e -> raise (Caqti_error.Exn e)
55+66+ (* unwraps an Lwt result, raising an exception if there's an error *)
77+ let ( >$! ) m f =
88+ match%lwt m with
99+ | Ok x ->
1010+ Lwt.return (f x)
1111+ | Error e ->
1212+ raise (Caqti_error.Exn e)
413end
514615module Rapper = struct
77- module Cid : Rapper.CUSTOM with type t = Cid.t = struct
1616+ module CID : Rapper.CUSTOM with type t = Cid.t = struct
817 type t = Cid.t
9181019 let t =
···2837 end
2938end
30394040+(* turns a caqti error into an exception *)
4141+let caqti_result_exn = function
4242+ | Ok x ->
4343+ Ok x
4444+ | Error caqti_err ->
4545+ Error (Caqti_error.Exn caqti_err)
4646+4747+(* runs a bunch of queries and catches duplicate insertion, returning how many succeeded *)
4848+let multi_query connection
4949+ (queries : (unit -> ('a, Caqti_error.t) Lwt_result.t) list) =
5050+ let open Syntax in
5151+ let module C = (val connection : Caqti_lwt.CONNECTION) in
5252+ let$! () = C.start () in
5353+ let is_ignorable_error e =
5454+ match (e : Caqti_error.t) with
5555+ | `Request_failed qe | `Response_failed qe -> (
5656+ match Caqti_error.cause (`Request_failed qe) with
5757+ | `Not_null_violation | `Unique_violation ->
5858+ true
5959+ | _ ->
6060+ false )
6161+ | _ ->
6262+ false
6363+ in
6464+ let rec aux acc queries =
6565+ match acc with
6666+ | Error e ->
6767+ Lwt.return_error e
6868+ | Ok count -> (
6969+ match queries with
7070+ | [] ->
7171+ Lwt.return (Ok count)
7272+ | query :: rest -> (
7373+ let%lwt result = query () in
7474+ match result with
7575+ | Ok _ ->
7676+ aux (Ok (count + 1)) rest
7777+ | Error e ->
7878+ if is_ignorable_error e then aux (Ok count) rest
7979+ else Lwt.return_error e ) )
8080+ in
8181+ aux (Ok 0) queries
8282+8383+(* opens an sqlite connection *)
3184let connect_sqlite db_uri =
3285 let open Syntax in
3386 match%lwt Caqti_lwt.connect (Uri.of_string db_uri) with