objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Combine mst/blob/record storage

futurGH 0adbfbc8 00cbe27a

+249 -255
-80
pegasus/lib/blob_store.ml
··· 1 - open Util.Rapper 2 - open Util.Syntax 3 - module Lex = Mist.Lex 4 - module Tid = Mist.Tid 5 - 6 - type t = Caqti_lwt.connection 7 - 8 - type blob = {id: int; cid: Cid.t; mimetype: string; data: Blob.t} 9 - 10 - module Queries = struct 11 - let create_tables t = 12 - let$! () = 13 - [%rapper 14 - execute 15 - {sql| CREATE TABLE IF NOT EXISTS blobs ( 16 - id INTEGER PRIMARY KEY, 17 - cid TEXT NOT NULL, 18 - mimetype TEXT NOT NULL, 19 - data BLOB NOT NULL 20 - ); 21 - |sql}] 22 - () t 23 - in 24 - let$! () = 25 - [%rapper 26 - execute 27 - {sql| CREATE TABLE IF NOT EXISTS blobs_records ( 28 - blob_id INTEGER NOT NULL REFERENCES blobs(id) ON DELETE CASCADE, 29 - record_path TEXT NOT NULL REFERENCES records(path) ON DELETE CASCADE, 30 - PRIMARY KEY (blob_id, record_path) 31 - ); 32 - |sql}] 33 - () t 34 - in 35 - [%rapper 36 - execute 37 - {sql| CREATE TRIGGER IF NOT EXISTS cleanup_orphaned_blobs 38 - AFTER DELETE ON blobs_records 39 - BEGIN 40 - DELETE FROM blobs 41 - WHERE id NOT IN ( 42 - SELECT DISTINCT blob_id FROM blobs_records 43 - ); 44 - END; 45 - |sql} 46 - syntax_off] 47 - () t 48 - 49 - let get_blob = 50 - [%rapper 51 - get_opt 52 - {sql| SELECT @int{id}, @CID{cid}, @string{mimetype}, @Blob{data} FROM blobs WHERE cid = %CID{cid} |sql} 53 - record_out] 54 - 55 - let list_blobs ~limit ~cursor = 56 - [%rapper 57 - get_many 58 - {sql| SELECT @CID{cid} FROM blobs WHERE id > %int{cursor} ORDER BY id LIMIT %int{limit} |sql}] 59 - ~limit ~cursor 60 - 61 - let write_blob cid mimetype data = 62 - [%rapper 63 - get_one 64 - {sql| INSERT INTO blobs (cid, mimetype, data) VALUES (%CID{cid}, %string{mimetype}, %Blob{data}) RETURNING @int{id} |sql}] 65 - ~cid ~mimetype ~data 66 - end 67 - 68 - let init connection = Queries.create_tables connection 69 - 70 - let get_blob t cid : blob option Lwt.t = 71 - let$! blob = Queries.get_blob t ~cid in 72 - Lwt.return blob 73 - 74 - let list_blobs t ~limit ~cursor : Cid.t list Lwt.t = 75 - let$! blobs = Queries.list_blobs t ~limit ~cursor in 76 - Lwt.return blobs 77 - 78 - let write_blob t cid mimetype data : int Lwt.t = 79 - let$! blob_id = Queries.write_blob t cid mimetype data in 80 - Lwt.return blob_id
-99
pegasus/lib/mst_store.ml
··· 1 - open Util.Rapper 2 - open Util.Syntax 3 - module Block_map = Mist.Storage.Block_map 4 - 5 - type t = Caqti_lwt.connection 6 - 7 - type block = {cid: Cid.t; data: Blob.t} 8 - 9 - module Queries = struct 10 - let create_table = 11 - [%rapper 12 - execute 13 - {sql| CREATE TABLE IF NOT EXISTS mst ( 14 - cid TEXT NOT NULL PRIMARY KEY, 15 - data BLOB NOT NULL 16 - ); 17 - |sql}] 18 - () 19 - 20 - let get_block cid = 21 - [%rapper 22 - get_opt 23 - {sql| SELECT @CID{cid}, @Blob{data} FROM mst WHERE cid = %CID{cid} |sql} 24 - record_out] 25 - ~cid 26 - 27 - let get_blocks cids = 28 - [%rapper 29 - get_many 30 - {sql| SELECT @CID{cid}, @Blob{data} FROM mst WHERE cid IN (%list{%CID{cids}}) |sql} 31 - record_out] 32 - ~cids 33 - 34 - let has_block cid = 35 - [%rapper 36 - get_opt {sql| SELECT @CID{cid} FROM mst WHERE cid = %CID{cid} |sql}] 37 - ~cid 38 - 39 - let put_block cid block = 40 - [%rapper 41 - get_opt 42 - {sql| INSERT INTO mst (cid, data) VALUES (%CID{cid}, %Blob{block}) ON CONFLICT DO NOTHING RETURNING @CID{cid} |sql}] 43 - ~cid ~block 44 - 45 - let delete_block cid = 46 - [%rapper execute {sql| DELETE FROM mst WHERE cid = %CID{cid} |sql}] ~cid 47 - 48 - let delete_blocks cids = 49 - [%rapper 50 - get_many 51 - {sql| DELETE FROM mst WHERE cid IN (%list{%CID{cids}}) RETURNING @CID{cid} |sql}] 52 - ~cids 53 - end 54 - 55 - let init connection = Queries.create_table connection 56 - 57 - let get_bytes t cid = 58 - Queries.get_block cid t 59 - >$! function 60 - | Some {data; _} -> 61 - Lwt.return_some data 62 - | None -> 63 - Lwt.return_none 64 - 65 - let get_blocks t cids = 66 - let$! blocks = Queries.get_blocks cids t in 67 - Lwt.return 68 - (List.fold_left 69 - (fun (acc : Block_map.with_missing) cid -> 70 - match List.find_opt (fun b -> b.cid = cid) blocks with 71 - | Some {data; _} -> 72 - {acc with blocks= Block_map.set cid data acc.blocks} 73 - | None -> 74 - {acc with missing= cid :: acc.missing} ) 75 - {blocks= Block_map.empty; missing= []} 76 - cids ) 77 - 78 - let has t cid = 79 - Queries.has_block cid t 80 - >$! function Some _ -> Lwt.return true | None -> Lwt.return false 81 - 82 - let put_block t cid block = 83 - Queries.put_block cid block t 84 - |> Lwt.map Util.caqti_result_exn 85 - |> Lwt.map (Result.map (function Some _ -> true | None -> false)) 86 - 87 - let put_many t bm = 88 - Util.multi_query t 89 - (List.map 90 - (fun (cid, block) -> fun () -> Queries.put_block cid block t) 91 - (Block_map.entries bm) ) 92 - >$! Lwt.return_ok 93 - 94 - let delete_block t cid = 95 - let$! () = Queries.delete_block cid t in 96 - Lwt.return_ok true 97 - 98 - let delete_many t cids = 99 - Queries.delete_blocks cids t >$! List.length |> Lwt.return_ok
-74
pegasus/lib/record_store.ml
··· 1 - open Lwt.Infix 2 - open Util.Rapper 3 - open Util.Syntax 4 - module Lex = Mist.Lex 5 - module Tid = Mist.Tid 6 - 7 - type t = Caqti_lwt.connection 8 - 9 - type record = {path: string; cid: Cid.t; value: Lex.repo_record; since: Tid.t} 10 - 11 - module Queries = struct 12 - let create_table = 13 - [%rapper 14 - execute 15 - {sql| CREATE TABLE IF NOT EXISTS records ( 16 - path TEXT NOT NULL PRIMARY KEY, 17 - cid TEXT NOT NULL, 18 - data BLOB NOT NULL, 19 - since TEXT NOT NULL 20 - ); 21 - |sql}] 22 - () 23 - 24 - let get_record_by_path = 25 - [%rapper 26 - get_opt 27 - {sql| SELECT @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path = %string{path} 28 - |sql}] 29 - 30 - let get_record_by_cid = 31 - [%rapper 32 - get_opt 33 - {sql| SELECT @string{path}, @Blob{data}, @string{since} FROM records WHERE cid = %CID{cid} 34 - |sql}] 35 - 36 - let list_records = 37 - [%rapper 38 - get_many 39 - {sql| SELECT @string{path}, @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path LIKE %string{collection}/% 40 - |sql}] 41 - 42 - let write_record = 43 - [%rapper 44 - execute 45 - {sql| INSERT INTO records (path, cid, data, since) 46 - VALUES (%string{path}, %CID{cid}, %Blob{data}, %string{since}) 47 - |sql}] 48 - end 49 - 50 - let init connection = Queries.create_table connection 51 - 52 - let get_record_by_path t path : record option Lwt.t = 53 - Queries.get_record_by_path ~path t 54 - >$! Option.map (fun (cid, data, since) -> 55 - {path; cid; value= Lex.of_cbor data; since} ) 56 - >>= Lwt.return 57 - 58 - let get_record_by_cid t cid : record option Lwt.t = 59 - Queries.get_record_by_cid ~cid t 60 - >$! Option.map (fun (path, data, since) -> 61 - {path; cid; value= Lex.of_cbor data; since} ) 62 - >>= Lwt.return 63 - 64 - let list_records t collection : record list Lwt.t = 65 - Queries.list_records ~collection t 66 - >$! List.map (fun (path, cid, data, since) -> 67 - {path; cid; value= Lex.of_cbor data; since} ) 68 - >>= Lwt.return 69 - 70 - let write_record t record path : unit Lwt.t = 71 - let cid, data = Lex.to_cbor_block record in 72 - let since = Tid.now () in 73 - let$! () = Queries.write_record ~path ~cid ~data ~since t in 74 - Lwt.return_unit
+246
pegasus/lib/user_store.ml
··· 1 + open Lwt.Infix 2 + open Util.Rapper 3 + open Util.Syntax 4 + module Block_map = Mist.Storage.Block_map 5 + module Lex = Mist.Lex 6 + module Tid = Mist.Tid 7 + 8 + type block = {cid: Cid.t; data: Blob.t} 9 + 10 + type record = {path: string; cid: Cid.t; value: Lex.repo_record; since: Tid.t} 11 + 12 + type blob = {id: int; cid: Cid.t; mimetype: string; data: Blob.t} 13 + 14 + module Queries = struct 15 + (* mst storage *) 16 + let create_mst_table = 17 + [%rapper 18 + execute 19 + {sql| CREATE TABLE IF NOT EXISTS mst ( 20 + cid TEXT NOT NULL PRIMARY KEY, 21 + data BLOB NOT NULL 22 + ); 23 + |sql}] 24 + () 25 + 26 + let get_block cid = 27 + [%rapper 28 + get_opt 29 + {sql| SELECT @CID{cid}, @Blob{data} FROM mst WHERE cid = %CID{cid} |sql} 30 + record_out] 31 + ~cid 32 + 33 + let get_blocks cids = 34 + [%rapper 35 + get_many 36 + {sql| SELECT @CID{cid}, @Blob{data} FROM mst WHERE cid IN (%list{%CID{cids}}) |sql} 37 + record_out] 38 + ~cids 39 + 40 + let has_block cid = 41 + [%rapper 42 + get_opt {sql| SELECT @CID{cid} FROM mst WHERE cid = %CID{cid} |sql}] 43 + ~cid 44 + 45 + let put_block cid block = 46 + [%rapper 47 + get_opt 48 + {sql| INSERT INTO mst (cid, data) VALUES (%CID{cid}, %Blob{block}) ON CONFLICT DO NOTHING RETURNING @CID{cid} |sql}] 49 + ~cid ~block 50 + 51 + let delete_block cid = 52 + [%rapper execute {sql| DELETE FROM mst WHERE cid = %CID{cid} |sql}] ~cid 53 + 54 + let delete_blocks cids = 55 + [%rapper 56 + get_many 57 + {sql| DELETE FROM mst WHERE cid IN (%list{%CID{cids}}) RETURNING @CID{cid} |sql}] 58 + ~cids 59 + 60 + let clear_mst = [%rapper execute {sql| DELETE FROM mst |sql}] () 61 + 62 + (* record storage *) 63 + let create_records_table = 64 + [%rapper 65 + execute 66 + {sql| CREATE TABLE IF NOT EXISTS records ( 67 + path TEXT NOT NULL PRIMARY KEY, 68 + cid TEXT NOT NULL, 69 + data BLOB NOT NULL, 70 + since TEXT NOT NULL 71 + ); 72 + |sql}] 73 + () 74 + 75 + let get_record_by_path = 76 + [%rapper 77 + get_opt 78 + {sql| SELECT @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path = %string{path} 79 + |sql}] 80 + 81 + let get_record_by_cid = 82 + [%rapper 83 + get_opt 84 + {sql| SELECT @string{path}, @Blob{data}, @string{since} FROM records WHERE cid = %CID{cid} 85 + |sql}] 86 + 87 + let list_records = 88 + [%rapper 89 + get_many 90 + {sql| SELECT @string{path}, @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path LIKE %string{collection}/% 91 + |sql}] 92 + 93 + let write_record = 94 + [%rapper 95 + execute 96 + {sql| INSERT INTO records (path, cid, data, since) 97 + VALUES (%string{path}, %CID{cid}, %Blob{data}, %string{since}) 98 + |sql}] 99 + 100 + (* blob storage *) 101 + let create_blobs_tables conn = 102 + let$! () = 103 + [%rapper 104 + execute 105 + {sql| CREATE TABLE IF NOT EXISTS blobs ( 106 + id INTEGER PRIMARY KEY, 107 + cid TEXT NOT NULL, 108 + mimetype TEXT NOT NULL, 109 + data BLOB NOT NULL 110 + ); 111 + |sql}] 112 + () conn 113 + in 114 + let$! () = 115 + [%rapper 116 + execute 117 + {sql| CREATE TABLE IF NOT EXISTS blobs_records ( 118 + blob_id INTEGER NOT NULL REFERENCES blobs(id) ON DELETE CASCADE, 119 + record_path TEXT NOT NULL REFERENCES records(path) ON DELETE CASCADE, 120 + PRIMARY KEY (blob_id, record_path) 121 + ); 122 + |sql}] 123 + () conn 124 + in 125 + [%rapper 126 + execute 127 + {sql| CREATE TRIGGER IF NOT EXISTS cleanup_orphaned_blobs 128 + AFTER DELETE ON blobs_records 129 + BEGIN 130 + DELETE FROM blobs 131 + WHERE id NOT IN ( 132 + SELECT DISTINCT blob_id FROM blobs_records 133 + ); 134 + END; 135 + |sql} 136 + syntax_off] 137 + () conn 138 + 139 + let get_blob = 140 + [%rapper 141 + get_opt 142 + {sql| SELECT @int{id}, @CID{cid}, @string{mimetype}, @Blob{data} FROM blobs WHERE cid = %CID{cid} |sql} 143 + record_out] 144 + 145 + let list_blobs ~limit ~cursor = 146 + [%rapper 147 + get_many 148 + {sql| SELECT @CID{cid} FROM blobs WHERE id > %int{cursor} ORDER BY id LIMIT %int{limit} |sql}] 149 + ~limit ~cursor 150 + 151 + let write_blob cid mimetype data = 152 + [%rapper 153 + get_one 154 + {sql| INSERT INTO blobs (cid, mimetype, data) VALUES (%CID{cid}, %string{mimetype}, %Blob{data}) RETURNING @int{id} |sql}] 155 + ~cid ~mimetype ~data 156 + end 157 + 158 + let init conn : unit Lwt.t = 159 + let$! () = Queries.create_mst_table conn in 160 + let$! () = Queries.create_records_table conn in 161 + let$! () = Queries.create_blobs_tables conn in 162 + Lwt.return_unit 163 + 164 + (* mst *) 165 + 166 + let get_bytes conn cid : Blob.t option Lwt.t = 167 + Queries.get_block cid conn 168 + >$! function Some {data; _} -> Some data | None -> None 169 + 170 + let get_blocks conn cids : Block_map.with_missing Lwt.t = 171 + let$! blocks = Queries.get_blocks cids conn in 172 + Lwt.return 173 + (List.fold_left 174 + (fun (acc : Block_map.with_missing) cid -> 175 + match List.find_opt (fun (b : block) -> b.cid = cid) blocks with 176 + | Some {data; _} -> 177 + {acc with blocks= Block_map.set cid data acc.blocks} 178 + | None -> 179 + {acc with missing= cid :: acc.missing} ) 180 + {blocks= Block_map.empty; missing= []} 181 + cids ) 182 + 183 + let has conn cid : bool Lwt.t = 184 + Queries.has_block cid conn >$! function Some _ -> true | None -> false 185 + 186 + let put_block conn cid block : (bool, exn) Lwt_result.t = 187 + Queries.put_block cid block conn 188 + |> Lwt.map Util.caqti_result_exn 189 + |> Lwt.map (Result.map (function Some _ -> true | None -> false)) 190 + 191 + let put_many conn bm : (int, exn) Lwt_result.t = 192 + Util.multi_query conn 193 + (List.map 194 + (fun (cid, block) -> fun () -> Queries.put_block cid block conn) 195 + (Block_map.entries bm) ) 196 + 197 + let delete_block conn cid : (bool, exn) Lwt_result.t = 198 + let$! () = Queries.delete_block cid conn in 199 + Lwt.return_ok true 200 + 201 + let delete_many conn cids : (int, exn) Lwt_result.t = 202 + Queries.delete_blocks cids conn >$! List.length >>= Lwt.return_ok 203 + 204 + let clear_mst conn : unit Lwt.t = 205 + let$! () = Queries.clear_mst conn in 206 + Lwt.return_unit 207 + 208 + (* records *) 209 + 210 + let get_record_by_path conn path : record option Lwt.t = 211 + Queries.get_record_by_path ~path conn 212 + >$! Option.map (fun (cid, data, since) -> 213 + {path; cid; value= Lex.of_cbor data; since} ) 214 + >>= Lwt.return 215 + 216 + let get_record_by_cid conn cid : record option Lwt.t = 217 + Queries.get_record_by_cid ~cid conn 218 + >$! Option.map (fun (path, data, since) -> 219 + {path; cid; value= Lex.of_cbor data; since} ) 220 + >>= Lwt.return 221 + 222 + let list_records conn collection : record list Lwt.t = 223 + Queries.list_records ~collection conn 224 + >$! List.map (fun (path, cid, data, since) -> 225 + {path; cid; value= Lex.of_cbor data; since} ) 226 + >>= Lwt.return 227 + 228 + let write_record conn record path : unit Lwt.t = 229 + let cid, data = Lex.to_cbor_block record in 230 + let since = Tid.now () in 231 + let$! () = Queries.write_record ~path ~cid ~data ~since conn in 232 + Lwt.return_unit 233 + 234 + (* blobs *) 235 + 236 + let get_blob conn cid : blob option Lwt.t = 237 + let$! blob = Queries.get_blob conn ~cid in 238 + Lwt.return blob 239 + 240 + let list_blobs conn ~limit ~cursor : Cid.t list Lwt.t = 241 + let$! blobs = Queries.list_blobs conn ~limit ~cursor in 242 + Lwt.return blobs 243 + 244 + let write_blob conn cid mimetype data : int Lwt.t = 245 + let$! blob_id = Queries.write_blob conn cid mimetype data in 246 + Lwt.return blob_id
+3 -2
pegasus/lib/util.ml
··· 46 46 47 47 (* runs a bunch of queries and catches duplicate insertion, returning how many succeeded *) 48 48 let multi_query connection 49 - (queries : (unit -> ('a, Caqti_error.t) Lwt_result.t) list) = 49 + (queries : (unit -> ('a, Caqti_error.t) Lwt_result.t) list) : 50 + (int, exn) Lwt_result.t = 50 51 let open Syntax in 51 52 let module C = (val connection : Caqti_lwt.CONNECTION) in 52 53 let$! () = C.start () in ··· 76 77 aux (Ok (count + 1)) rest 77 78 | Error e -> 78 79 if is_ignorable_error e then aux (Ok count) rest 79 - else Lwt.return_error e ) ) 80 + else Lwt.return_error (Caqti_error.Exn e) ) ) 80 81 in 81 82 aux (Ok 0) queries 82 83