···303303 ~path ~cids
304304end
305305306306-type t = {did: string; db: (module Rapper_helper.CONNECTION)}
306306+type t = {did: string; db: Util.caqti_pool}
307307308308let connect ?create ?write did : t Lwt.t =
309309 let%lwt db =
···312312 Lwt.return {did; db}
313313314314let init t : unit Lwt.t =
315315- let$! () = Queries.create_blocks_tables t.db in
316316- let$! () = Queries.create_records_table t.db in
317317- let$! () = Queries.create_blobs_tables t.db in
315315+ let%lwt () = Util.use_pool t.db Queries.create_blocks_tables in
316316+ let%lwt () = Util.use_pool t.db Queries.create_records_table in
317317+ let%lwt () = Util.use_pool t.db Queries.create_blobs_tables in
318318 Lwt.return_unit
319319320320(* mst blocks; implements Writable_blockstore *)
321321322322let get_bytes t cid : Blob.t option Lwt.t =
323323- Queries.get_block cid t.db
324324- >$! function Some {data; _} -> Some data | None -> None
323323+ Util.use_pool t.db @@ Queries.get_block cid
324324+ >|= function Some {data; _} -> Some data | None -> None
325325326326let get_blocks t cids : Block_map.with_missing Lwt.t =
327327- let$! blocks = Queries.get_blocks cids t.db in
327327+ let%lwt blocks = Util.use_pool t.db @@ Queries.get_blocks cids in
328328 Lwt.return
329329 (List.fold_left
330330 (fun (acc : Block_map.with_missing) cid ->
···337337 cids )
338338339339let has t cid : bool Lwt.t =
340340- Queries.has_block cid t.db >$! function Some _ -> true | None -> false
340340+ Util.use_pool t.db @@ Queries.has_block cid
341341+ >|= function Some _ -> true | None -> false
341342342343let put_block t cid block : (bool, exn) Lwt_result.t =
343343- Queries.put_block cid block t.db
344344- |> Lwt.map Util.caqti_result_exn
345345- |> Lwt.map (Result.map (function Some _ -> true | None -> false))
344344+ Lwt_result.catch
345345+ @@ fun () ->
346346+ match%lwt Util.use_pool t.db @@ Queries.put_block cid block with
347347+ | Some _ ->
348348+ Lwt.return true
349349+ | None ->
350350+ Lwt.return false
346351347352let put_many t bm : (int, exn) Lwt_result.t =
348353 Util.multi_query t.db
349354 (List.map
350350- (fun (cid, block) -> fun () -> Queries.put_block cid block t.db)
355355+ (fun (cid, block) -> Queries.put_block cid block)
351356 (Block_map.entries bm) )
352357353358let delete_block t cid : (bool, exn) Lwt_result.t =
354354- let$! () = Queries.delete_block cid t.db in
355355- Lwt.return_ok true
359359+ Lwt_result.catch
360360+ @@ fun () -> Util.use_pool t.db @@ Queries.delete_block cid >|= fun _ -> true
356361357362let delete_many t cids : (int, exn) Lwt_result.t =
358358- Queries.delete_blocks cids t.db >$! List.length >>= Lwt.return_ok
363363+ Lwt_result.catch
364364+ @@ fun () -> Util.use_pool t.db @@ Queries.delete_blocks cids >|= List.length
359365360366let clear_mst t : unit Lwt.t =
361361- let$! () = Queries.clear_mst t.db in
367367+ let%lwt () = Util.use_pool t.db Queries.clear_mst in
362368 Lwt.return_unit
363369364370(* repo commit *)
365371366372let get_commit t : (Cid.t * signed_commit) option Lwt.t =
367367- Queries.get_commit t.db
368368- >$! Option.map (fun (cid, data) ->
369369- ( cid
370370- , data |> Dag_cbor.decode_to_yojson |> signed_commit_of_yojson
371371- |> Result.get_ok ) )
373373+ let%lwt commit = Util.use_pool t.db Queries.get_commit in
374374+ Lwt.return
375375+ @@ Option.map
376376+ (fun (cid, data) ->
377377+ ( cid
378378+ , data |> Dag_cbor.decode_to_yojson |> signed_commit_of_yojson
379379+ |> Result.get_ok ) )
380380+ commit
372381373382let put_commit t commit : (Cid.t, exn) Lwt_result.t =
374383 let data = commit |> signed_commit_to_yojson |> Dag_cbor.encode_yojson in
375384 let cid = Cid.create Dcbor data in
376376- let$! () = Queries.put_commit cid data t.db in
377377- Lwt.return_ok cid
385385+ ( Lwt_result.catch
386386+ @@ fun () -> Util.use_pool t.db @@ Queries.put_commit cid data )
387387+ |> Lwt_result.map (fun () -> cid)
378388379389(* records *)
380390381391let get_record_by_path t path : record option Lwt.t =
382382- Queries.get_record_by_path ~path t.db
383383- >$! Option.map (fun (cid, data, since) ->
392392+ Util.use_pool t.db @@ Queries.get_record_by_path ~path
393393+ >|= Option.map (fun (cid, data, since) ->
384394 {path; cid; value= Lex.of_cbor data; since} )
385385- >>= Lwt.return
386395387396let get_record_by_cid t cid : record option Lwt.t =
388388- Queries.get_record_by_cid ~cid t.db
389389- >$! Option.map (fun (path, data, since) ->
397397+ Util.use_pool t.db @@ Queries.get_record_by_cid ~cid
398398+ >|= Option.map (fun (path, data, since) ->
390399 {path; cid; value= Lex.of_cbor data; since} )
391391- >>= Lwt.return
392400393401let list_records t ?(limit = 100) ?(cursor = "") ?(reverse = false) collection :
394402 record list Lwt.t =
395403 let fn =
396404 if reverse then Queries.list_records_reverse else Queries.list_records
397405 in
398398- fn ~collection ~limit ~cursor t.db
399399- >$! List.map (fun (path, cid, data, since) ->
406406+ Util.use_pool t.db @@ fn ~collection ~limit ~cursor
407407+ >|= List.map (fun (path, cid, data, since) ->
400408 {path; cid; value= Lex.of_cbor data; since} )
401401- >>= Lwt.return
402409403410let put_record t record path : (Cid.t * bytes) Lwt.t =
404411 let cid, data = Lex.to_cbor_block record in
405412 let since = Tid.now () in
406406- let$! () = Queries.put_record ~path ~cid ~data ~since t.db in
413413+ let%lwt () =
414414+ Util.use_pool t.db @@ Queries.put_record ~path ~cid ~data ~since
415415+ in
407416 Lwt.return (cid, data)
408417409418(* blobs *)
410419411420let get_blob t cid : blob_with_contents option Lwt.t =
412412- match%lwt unwrap @@ Queries.get_blob t.db ~cid with
421421+ match%lwt Util.use_pool t.db @@ Queries.get_blob ~cid with
413422 | None ->
414423 Lwt.return_none
415424 | Some blob ->
···425434 Lwt.return_some {id; cid; mimetype; data}
426435427436let list_blobs ?since t ~limit ~cursor : Cid.t list Lwt.t =
428428- unwrap
437437+ Util.use_pool t.db
429438 @@
430439 match since with
431440 | Some since ->
432432- Queries.list_blobs_since t.db ~limit ~cursor ~since
441441+ Queries.list_blobs_since ~limit ~cursor ~since
433442 | None ->
434434- Queries.list_blobs t.db ~limit ~cursor
443443+ Queries.list_blobs ~limit ~cursor
435444436445let put_blob t cid mimetype data : int Lwt.t =
437446 let file =
···440449 (Cid.to_string cid)
441450 in
442451 let _ = Out_channel.with_open_bin file Out_channel.output_bytes data in
443443- unwrap @@ Queries.put_blob cid mimetype t.db
452452+ Util.use_pool t.db @@ Queries.put_blob cid mimetype
444453445454let list_blob_refs t path : Cid.t list Lwt.t =
446446- unwrap @@ Queries.list_blob_refs path t.db
455455+ Util.use_pool t.db @@ Queries.list_blob_refs path
447456448457let put_blob_ref t path cid : unit Lwt.t =
449449- unwrap @@ Queries.put_blob_ref path cid t.db
458458+ Util.use_pool t.db @@ Queries.put_blob_ref path cid
450459451460let put_blob_refs t path cids : (unit, exn) Lwt_result.t =
452461 Lwt_result.map (fun _ -> ())
453462 @@ Util.multi_query t.db
454454- (List.map
455455- (fun cid -> fun () -> Queries.put_blob_ref cid path t.db)
456456- cids )
463463+ (List.map (fun cid -> Queries.put_blob_ref cid path) cids)
457464458465let clear_blob_refs t path cids : unit Lwt.t =
459459- unwrap @@ Queries.clear_blob_refs path cids t.db
466466+ Util.use_pool t.db @@ Queries.clear_blob_refs path cids
+62-43
pegasus/lib/util.ml
···164164 Error "invalid field value"
165165end
166166167167+type caqti_pool = (Caqti_lwt.connection, Caqti_error.t) Caqti_lwt_unix.Pool.t
168168+167169(* turns a caqti error into an exception *)
168170let caqti_result_exn = function
169171 | Ok x ->
···172174 Error (Caqti_error.Exn caqti_err)
173175174176let _init_connection conn =
175175- let open Syntax in
176176- let$! () =
177177+ match%lwt
177178 [%rapper
178179 execute
179180 {sql|
···183184 |sql}
184185 syntax_off]
185186 () conn
186186- in
187187- Lwt.return conn
187187+ with
188188+ | Ok conn ->
189189+ Lwt.return conn
190190+ | Error e ->
191191+ raise (Caqti_error.Exn e)
188192189189-(* opens an sqlite connection *)
190190-let connect_sqlite ?(create = false) ?(write = true) db_uri =
193193+(* creates an sqlite pool *)
194194+let connect_sqlite ?(create = false) ?(write = true) db_uri : caqti_pool Lwt.t =
191195 let uri =
192196 Uri.add_query_params' db_uri
193197 [("create", string_of_bool create); ("write", string_of_bool write)]
194198 in
195195- match%lwt Caqti_lwt_unix.connect uri with
196196- | Ok c ->
197197- _init_connection c
199199+ match
200200+ Caqti_lwt_unix.connect_pool
201201+ ~post_connect:(fun conn -> Lwt_result.ok @@ _init_connection conn)
202202+ uri
203203+ with
204204+ | Ok pool ->
205205+ Lwt.return pool
198206 | Error e ->
199207 raise (Caqti_error.Exn e)
200208···209217 | Error e ->
210218 raise (Caqti_error.Exn e)
211219220220+let use_pool pool (f : Caqti_lwt.connection -> ('a, Caqti_error.t) Lwt_result.t)
221221+ : 'a Lwt.t =
222222+ match%lwt Caqti_lwt_unix.Pool.use f pool with
223223+ | Ok res ->
224224+ Lwt.return res
225225+ | Error e ->
226226+ raise (Caqti_error.Exn e)
227227+212228(* runs a bunch of queries and catches duplicate insertion, returning how many succeeded *)
213213-let multi_query connection
214214- (queries : (unit -> ('a, Caqti_error.t) Lwt_result.t) list) :
215215- (int, exn) Lwt_result.t =
229229+let multi_query pool
230230+ (queries : (Caqti_lwt.connection -> ('a, Caqti_error.t) Lwt_result.t) list)
231231+ : (int, exn) Lwt_result.t =
216232 let open Syntax in
217217- let module C = (val connection : Caqti_lwt.CONNECTION) in
218218- let$! () = C.start () in
219219- let is_ignorable_error e =
220220- match (e : Caqti_error.t) with
221221- | `Request_failed qe | `Response_failed qe -> (
222222- match Caqti_error.cause (`Request_failed qe) with
223223- | `Not_null_violation | `Unique_violation ->
224224- true
225225- | _ ->
226226- false )
227227- | _ ->
228228- false
229229- in
230230- let rec aux acc queries =
231231- match acc with
232232- | Error e ->
233233- Lwt.return_error e
234234- | Ok count -> (
235235- match queries with
236236- | [] ->
237237- Lwt.return (Ok count)
238238- | query :: rest -> (
239239- let%lwt result = query () in
240240- match result with
241241- | Ok _ ->
242242- aux (Ok (count + 1)) rest
243243- | Error e ->
244244- if is_ignorable_error e then aux (Ok count) rest
245245- else Lwt.return_error (Caqti_error.Exn e) ) )
246246- in
247247- aux (Ok 0) queries
233233+ Lwt_result.catch (fun () ->
234234+ use_pool pool (fun connection ->
235235+ let module C = (val connection : Caqti_lwt.CONNECTION) in
236236+ let$! () = C.start () in
237237+ let is_ignorable_error e =
238238+ match (e : Caqti_error.t) with
239239+ | `Request_failed qe | `Response_failed qe -> (
240240+ match Caqti_error.cause (`Request_failed qe) with
241241+ | `Not_null_violation | `Unique_violation ->
242242+ true
243243+ | _ ->
244244+ false )
245245+ | _ ->
246246+ false
247247+ in
248248+ let rec aux acc queries =
249249+ match acc with
250250+ | Error e ->
251251+ Lwt.return_error e
252252+ | Ok count -> (
253253+ match queries with
254254+ | [] ->
255255+ Lwt.return (Ok count)
256256+ | query :: rest -> (
257257+ let%lwt result = query connection in
258258+ match result with
259259+ | Ok _ ->
260260+ aux (Ok (count + 1)) rest
261261+ | Error e ->
262262+ if is_ignorable_error e then aux (Ok count) rest
263263+ else Lwt.return_error e ) )
264264+ in
265265+ let%lwt result = aux (Ok 0) queries in
266266+ Lwt.return result ) )
248267249268(* unix timestamp *)
250269let now_ms () : int = int_of_float (Unix.gettimeofday () *. 1000.)
+7-6
pegasus/test/test_sequencer.ml
···23232424let with_db (f : Data_store.t -> unit Lwt.t) : unit Lwt.t =
2525 let tmp = Filename.temp_file "pegasus_sequencer_test" ".db" in
2626- Util.with_connection
2727- (Uri.of_string ("sqlite3://" ^ tmp))
2828- (fun conn ->
2929- let%lwt () = Data_store.init conn in
3030- let%lwt () = f conn in
3131- Lwt.return_ok () )
2626+ let%lwt pool =
2727+ Util.connect_sqlite ~create:true ~write:true
2828+ (Uri.of_string ("sqlite3://" ^ tmp))
2929+ in
3030+ let%lwt () = Data_store.init pool in
3131+ let%lwt () = f pool in
3232+ Lwt.return ()
32333334let mk_cid () =
3435 let block =