objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement SQLite blockstore

futurGH 223eaff7 8835b5d1

+236 -105
+1 -1
mist/lib/storage/block_map.ml
··· 4 4 5 5 type with_missing = {blocks: t; missing: Cid.t list} 6 6 7 - let empty = Cid_map.empty 7 + let empty : t = Cid_map.empty 8 8 9 9 let add value m = 10 10 let cid, bytes = Lex.to_cbor_block value in
+23
mist/lib/storage/blockstore.ml
··· 1 + module type Readable = sig 2 + type t 3 + 4 + val get_bytes : t -> Cid.t -> bytes option Lwt.t 5 + 6 + val has : t -> Cid.t -> bool Lwt.t 7 + 8 + val get_blocks : t -> Cid.t list -> Block_map.with_missing Lwt.t 9 + end 10 + 11 + module type Writable = sig 12 + type t 13 + 14 + include Readable with type t := t 15 + 16 + val put_block : t -> Cid.t -> bytes -> unit Lwt.t 17 + 18 + val put_many : t -> Block_map.t -> unit Lwt.t 19 + 20 + val delete_block : t -> Cid.t -> unit Lwt.t 21 + 22 + val delete_many : t -> Cid.t list -> unit Lwt.t 23 + end
+30
mist/lib/storage/memory_blockstore.ml
··· 1 + module Make () = struct 2 + type t = {mutable blocks: Block_map.t} 3 + 4 + let create ?(blocks = Block_map.empty) () = {blocks} 5 + 6 + let get_bytes s cid = Lwt.return (Block_map.get cid s.blocks) 7 + 8 + let has s cid = Lwt.return (Block_map.has cid s.blocks) 9 + 10 + let get_blocks s cids = Lwt.return (Block_map.get_many cids s.blocks) 11 + 12 + let put_block s cid bytes = 13 + s.blocks <- Block_map.set cid bytes s.blocks ; 14 + Lwt.return_unit 15 + 16 + let put_many s blocks = 17 + s.blocks <- Block_map.merge s.blocks blocks ; 18 + Lwt.return_unit 19 + 20 + let delete_block s cid = 21 + s.blocks <- Block_map.remove cid s.blocks ; 22 + Lwt.return_unit 23 + 24 + let delete_many s cids = 25 + s.blocks <- 26 + List.fold_left 27 + (fun blocks cid -> Block_map.remove cid blocks) 28 + s.blocks cids ; 29 + Lwt.return_unit 30 + end
-45
mist/lib/storage/memory_store.ml
··· 1 - module Make () = struct 2 - type t = 3 - { mutable blocks: Block_map.t 4 - ; mutable root: Cid.t option 5 - ; mutable rev: string option } 6 - 7 - let create ?(blocks = Block_map.empty) () = {blocks; root= None; rev= None} 8 - 9 - let get_root s = 10 - match s.root with 11 - | Some root -> 12 - Lwt.return_some root 13 - | None -> 14 - Lwt.return_none 15 - 16 - let get_bytes s cid = Lwt.return (Block_map.get cid s.blocks) 17 - 18 - let has s cid = Lwt.return (Block_map.has cid s.blocks) 19 - 20 - let get_blocks s cids = Lwt.return (Block_map.get_many cids s.blocks) 21 - 22 - let put_block s ?rev cid bytes = 23 - s.blocks <- Block_map.set cid bytes s.blocks ; 24 - s.rev <- rev ; 25 - Lwt.return_unit 26 - 27 - let put_many s blocks = 28 - s.blocks <- Block_map.merge s.blocks blocks ; 29 - Lwt.return_unit 30 - 31 - let update_root s ?rev cid = 32 - s.root <- Some cid ; 33 - s.rev <- rev ; 34 - Lwt.return_unit 35 - 36 - let apply_commit s (c : Repo_store.commit_data) = 37 - let with_removed = 38 - Cid.Set.fold 39 - (fun cid blocks -> Block_map.remove cid blocks) 40 - c.removed_cids s.blocks 41 - in 42 - s.blocks <- Block_map.merge with_removed c.relevant_blocks ; 43 - s.root <- Some c.cid ; 44 - Lwt.return_unit 45 - end
+4 -13
mist/lib/storage/overlay_store.ml mist/lib/storage/overlay_blockstore.ml
··· 1 - let ( let* ) = Lwt.bind 2 - 3 - module Make (Top : Repo_store.Readable) (Bottom : Repo_store.Readable) : sig 4 - include Repo_store.Readable 1 + module Make (Top : Blockstore.Readable) (Bottom : Blockstore.Readable) : sig 2 + include Blockstore.Readable 5 3 6 4 val create : Top.t -> Bottom.t -> t 7 5 end = struct ··· 9 7 10 8 let create top bottom = {top; bottom} 11 9 12 - let get_root {top; bottom} = 13 - match%lwt Top.get_root top with 14 - | Some _ as res -> 15 - Lwt.return res 16 - | None -> 17 - Bottom.get_root bottom 18 - 19 10 let get_bytes {top; bottom} cid = 20 11 match%lwt Top.get_bytes top cid with 21 12 | Some _ as res -> ··· 31 22 Bottom.has bottom cid 32 23 33 24 let get_blocks {top; bottom} cids = 34 - let* from_top = Top.get_blocks top cids in 35 - let* from_bottom = Bottom.get_blocks bottom from_top.missing in 25 + let%lwt from_top = Top.get_blocks top cids in 26 + let%lwt from_bottom = Bottom.get_blocks bottom from_top.missing in 36 27 let merged_blocks = Block_map.merge from_top.blocks from_bottom.blocks in 37 28 Lwt.return {Block_map.blocks= merged_blocks; missing= from_bottom.missing} 38 29 end
-33
mist/lib/storage/repo_store.ml
··· 1 - type commit_data = 2 - { cid: Cid.t 3 - ; rev: string 4 - ; since: string option 5 - ; prev: Cid.t option 6 - ; relevant_blocks: Block_map.t 7 - ; removed_cids: Cid.Set.t } 8 - 9 - module type Readable = sig 10 - type t 11 - 12 - val get_root : t -> Cid.t option Lwt.t 13 - 14 - val get_bytes : t -> Cid.t -> bytes option Lwt.t 15 - 16 - val has : t -> Cid.t -> bool Lwt.t 17 - 18 - val get_blocks : t -> Cid.t list -> Block_map.with_missing Lwt.t 19 - end 20 - 21 - module type Writable = sig 22 - type t 23 - 24 - include Readable with type t := t 25 - 26 - val put_block : t -> ?rev:string -> Cid.t -> bytes -> unit Lwt.t 27 - 28 - val put_many : t -> Block_map.t -> unit Lwt.t 29 - 30 - val update_root : t -> ?rev:string -> Cid.t -> unit Lwt.t 31 - 32 - val apply_commit : t -> commit_data -> unit Lwt.t 33 - end
+9 -11
mist/lib/storage/storage.ml
··· 1 1 module Block_map = Block_map 2 2 module Blob_store = Blob_store 3 3 4 - module type Readable_blockstore = Repo_store.Readable 4 + module type Readable_blockstore = Blockstore.Readable 5 5 6 - module type Writable_blockstore = Repo_store.Writable 6 + module type Writable_blockstore = Blockstore.Writable 7 7 8 8 module Memory_blockstore = struct 9 - module Impl = Memory_store.Make () 9 + module Impl = Memory_blockstore.Make () 10 10 include Impl 11 11 12 - module Readable : Repo_store.Readable with type t = Impl.t = Impl 12 + module Readable : Blockstore.Readable with type t = Impl.t = Impl 13 13 14 - module Writable : Repo_store.Writable with type t = Impl.t = Impl 14 + module Writable : Blockstore.Writable with type t = Impl.t = Impl 15 15 end 16 16 17 17 module Overlay_blockstore 18 - (Top : Repo_store.Readable) 19 - (Bottom : Repo_store.Readable) = 18 + (Top : Blockstore.Readable) 19 + (Bottom : Blockstore.Readable) = 20 20 struct 21 - module Impl = Overlay_store.Make (Top) (Bottom) 21 + module Impl = Overlay_blockstore.Make (Top) (Bottom) 22 22 include Impl 23 23 24 - module Readable : Repo_store.Readable with type t = Impl.t = Impl 24 + module Readable : Blockstore.Readable with type t = Impl.t = Impl 25 25 end 26 - 27 - type commit_data = Repo_store.commit_data
+6 -2
pegasus/lib/dune
··· 1 1 (library 2 2 (name pegasus) 3 3 (libraries 4 + caqti 5 + caqti-lwt 6 + caqti-driver-sqlite3 4 7 ipld 5 8 lwt 6 9 lwt.unix ··· 9 12 str 10 13 yojson 11 14 lwt_ppx 12 - ppx_deriving_yojson.runtime) 15 + ppx_deriving_yojson.runtime 16 + ppx_rapper_lwt) 13 17 (preprocess 14 - (pps lwt_ppx ppx_deriving_yojson))) 18 + (pps lwt_ppx ppx_deriving_yojson ppx_rapper))) 15 19 16 20 (include_subdirs qualified)
+163
pegasus/lib/sqlite_blockstore.ml
··· 1 + module Block_map = Mist.Storage.Block_map 2 + 3 + module Cid : Rapper.CUSTOM with type t = Cid.t = struct 4 + type t = Cid.t 5 + 6 + let t = 7 + let encode cid = 8 + try Ok (Cid.to_string cid) with e -> Error (Printexc.to_string e) 9 + in 10 + Caqti_type.(custom ~encode ~decode:Cid.of_string string) 11 + end 12 + 13 + module Blob : Rapper.CUSTOM with type t = bytes = struct 14 + type t = bytes 15 + 16 + let t = 17 + let encode blob = 18 + try Ok (Bytes.to_string blob) with e -> Error (Printexc.to_string e) 19 + in 20 + let decode blob = 21 + try Ok (Bytes.of_string blob) with e -> Error (Printexc.to_string e) 22 + in 23 + Caqti_type.(custom ~encode ~decode string) 24 + end 25 + 26 + type block = {cid: Cid.t; data: Blob.t} 27 + 28 + let ( let*! ) m f = 29 + match%lwt m with Ok x -> f x | Error e -> failwith (Caqti_error.show e) 30 + 31 + module Queries = struct 32 + let create_table = 33 + [%rapper 34 + execute 35 + {sql| CREATE TABLE IF NOT EXISTS blocks ( 36 + cid TEXT NOT NULL PRIMARY KEY, 37 + data BLOB NOT NULL 38 + ); 39 + |sql}] 40 + () 41 + 42 + let get_block cid = 43 + [%rapper 44 + get_opt 45 + {sql| SELECT @Cid{cid}, @Blob{data} FROM blocks WHERE cid = %Cid{cid} |sql} 46 + record_out] 47 + ~cid 48 + 49 + let get_blocks cids = 50 + [%rapper 51 + get_many 52 + {sql| SELECT @Cid{cid}, @Blob{data} FROM blocks WHERE cid IN (%list{%Cid{cids}}) |sql} 53 + record_out] 54 + ~cids 55 + 56 + let has_block cid = 57 + [%rapper 58 + get_opt {sql| SELECT @Cid{cid} FROM blocks WHERE cid = %Cid{cid} |sql}] 59 + ~cid 60 + 61 + let put_block cid block = 62 + [%rapper 63 + execute 64 + {sql| INSERT INTO blocks (cid, data) VALUES (%Cid{cid}, %Blob{block}) |sql}] 65 + ~cid ~block 66 + 67 + let delete_block cid = 68 + [%rapper execute {sql| DELETE FROM blocks WHERE cid = %Cid{cid} |sql}] ~cid 69 + 70 + let delete_blocks cids = 71 + [%rapper 72 + execute {sql| DELETE FROM blocks WHERE cid IN (%list{%Cid{cids}}) |sql}] 73 + ~cids 74 + end 75 + 76 + module S (C : Caqti_lwt.CONNECTION) : Mist.Storage.Writable_blockstore = struct 77 + include Caqti_type.Std 78 + include Caqti_request.Infix 79 + include Lwt_result.Syntax 80 + 81 + type t = {connection: Caqti_lwt.connection} 82 + 83 + let multi_query queries = 84 + let*! () = C.start () in 85 + match%lwt 86 + Lwt_list.fold_left_s 87 + (fun acc query -> Lwt_result.bind (query ()) (fun () -> Lwt.return acc)) 88 + (Ok ()) queries 89 + with 90 + | Ok () -> 91 + C.commit () 92 + | Error e -> 93 + let*! () = C.rollback () in 94 + Lwt.return_error e 95 + 96 + let init t = 97 + let*! () = 98 + [%rapper execute {sql| PRAGMA journal_mode=WAL; |sql} syntax_off] 99 + () t.connection 100 + in 101 + let*! () = 102 + [%rapper execute {sql| PRAGMA synchronous=NORMAL; |sql} syntax_off] 103 + () t.connection 104 + in 105 + let*! () = Queries.create_table t.connection in 106 + Lwt.return_unit 107 + 108 + let get_bytes t cid = 109 + let*! b_opt = Queries.get_block cid t.connection in 110 + match b_opt with 111 + | Some {data; _} -> 112 + Lwt.return_some data 113 + | None -> 114 + Lwt.return_none 115 + 116 + let get_blocks t cids = 117 + let*! blocks = Queries.get_blocks cids t.connection in 118 + Lwt.return 119 + (List.fold_left 120 + (fun (acc : Block_map.with_missing) cid -> 121 + match List.find_opt (fun b -> b.cid = cid) blocks with 122 + | Some {data; _} -> 123 + {acc with blocks= Block_map.set cid data acc.blocks} 124 + | None -> 125 + {acc with missing= cid :: acc.missing} ) 126 + {blocks= Block_map.empty; missing= []} 127 + cids ) 128 + 129 + let has t cid = 130 + let*! b_opt = Queries.has_block cid t.connection in 131 + match b_opt with Some _ -> Lwt.return true | None -> Lwt.return false 132 + 133 + let put_block t cid block = 134 + let*! () = Queries.put_block cid block t.connection in 135 + Lwt.return_unit 136 + 137 + let put_many t bm = 138 + let*! () = 139 + multi_query 140 + (List.map 141 + (fun (cid, block) -> 142 + fun () -> Queries.put_block cid block t.connection ) 143 + (Block_map.entries bm) ) 144 + in 145 + Lwt.return_unit 146 + 147 + let delete_block t cid = 148 + let*! () = Queries.delete_block cid t.connection in 149 + Lwt.return_unit 150 + 151 + let delete_many t cids = 152 + let*! () = Queries.delete_blocks cids t.connection in 153 + Lwt.return_unit 154 + end 155 + 156 + let connect db_uri = 157 + match%lwt Caqti_lwt.connect (Uri.of_string db_uri) with 158 + | Ok c -> 159 + let module C = (val c : Caqti_lwt.CONNECTION) in 160 + let module Store = S (C) in 161 + Lwt.return (module Store : Mist.Storage.Writable_blockstore) 162 + | Error e -> 163 + failwith (Caqti_error.show e)