objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Store commit & mst in same table

futurGH af03287b e4ca1585

+211 -108
+85 -77
pegasus/lib/repository.ml
··· 1 + open User_store.Types 1 2 open Util.Exceptions 2 3 module Lex = Mist.Lex 3 4 module Mst = Mist.Mst.Make (User_store) 4 5 module StringMap = Lex.StringMap 5 6 module Tid = Mist.Tid 6 - 7 - let cid_link_of_yojson = function 8 - | `Assoc link -> 9 - link |> List.assoc "$link" |> Cid.of_yojson 10 - |> Result.map (fun cid -> Some cid) 11 - | `Null -> 12 - Ok None 13 - | _ -> 14 - Error "commit prev not a valid cid" 15 - 16 - let cid_link_to_yojson = function 17 - | Some cid -> 18 - Cid.to_yojson cid 19 - | None -> 20 - `Null 21 - 22 - type commit = 23 - { did: string 24 - ; version: int (* always 3 *) 25 - ; data: Cid.t [@of_yojson Cid.of_yojson] [@to_yojson Cid.to_yojson] 26 - ; rev: Tid.t 27 - ; prev: Cid.t option 28 - [@of_yojson cid_link_of_yojson] [@to_yojson cid_link_to_yojson] } 29 - [@@deriving yojson] 30 - 31 - type signed_commit = 32 - { did: string 33 - ; version: int (* always 3 *) 34 - ; data: Cid.t [@of_yojson Cid.of_yojson] [@to_yojson Cid.to_yojson] 35 - ; rev: Tid.t 36 - ; prev: Cid.t option 37 - [@of_yojson cid_link_of_yojson] [@to_yojson cid_link_to_yojson] 38 - ; signature: bytes 39 - [@key "sig"] 40 - [@of_yojson 41 - fun x -> 42 - match Dag_cbor.of_yojson x with 43 - | `Bytes b -> 44 - Ok b 45 - | _ -> 46 - Error "commit sig not a valid bytes value"] 47 - [@to_yojson fun x -> Dag_cbor.to_yojson (`Bytes x)] } 48 - [@@deriving yojson] 49 7 50 8 type signing_key = P256 of bytes | K256 of bytes 51 9 ··· 173 131 174 132 type t = 175 133 { key: signing_key 134 + ; did: string 176 135 ; db: Caqti_lwt.connection 177 - ; mutable block_map: Cid.t Mist.Mst.StringMap.t option } 136 + ; mutable block_map: Cid.t StringMap.t option 137 + ; mutable root: Cid.t } 178 138 179 - let get_map t root = 139 + let get_map t mst_root : Cid.t StringMap.t Lwt.t = 180 140 match t.block_map with 181 141 | Some map -> 182 142 Lwt.return map 183 143 | None -> 184 - let%lwt map = Mst.build_map {blockstore= t.db; root} in 144 + let%lwt map = Mst.build_map {blockstore= t.db; root= mst_root} in 185 145 t.block_map <- Some map ; 186 146 Lwt.return map 187 147 148 + let get_record_cid t path : Cid.t option Lwt.t = 149 + let%lwt map = get_map t t.root in 150 + Lwt.return @@ StringMap.find_opt path map 151 + 152 + let get_record t path : record option Lwt.t = 153 + User_store.get_record_by_path t.db path 154 + 155 + let list_collections t : string list Lwt.t = 156 + let module Set = Set.Make (String) in 157 + let%lwt map = get_map t t.root in 158 + StringMap.bindings map 159 + |> List.fold_left 160 + (fun (acc : Set.t) (path, _) -> 161 + let collection = String.split_on_char '/' path |> List.hd in 162 + Set.add collection acc ) 163 + Set.empty 164 + |> Set.to_list |> Lwt.return 165 + 166 + let list_records t collection : (string * Cid.t * record) list Lwt.t = 167 + let%lwt map = get_map t t.root in 168 + StringMap.bindings map 169 + |> List.filter (fun (path, _) -> 170 + String.starts_with ~prefix:(path ^ "/") collection ) 171 + |> Lwt_list.fold_left_s 172 + (fun acc (path, cid) -> 173 + match%lwt User_store.get_record_by_cid t.db cid with 174 + | Some record -> 175 + Lwt.return 176 + ((Format.sprintf "at://%s/%s" t.did path, cid, record) :: acc) 177 + | None -> 178 + Lwt.return acc ) 179 + [] 180 + 188 181 let sign_commit t commit : signed_commit = 189 182 let sign_fn, privkey = 190 183 match t.key with ··· 202 195 ; prev= commit.prev 203 196 ; signature } 204 197 205 - let get_record_cid t 198 + let put_commit t ?(previous : signed_commit option = None) mst_root : 199 + (Cid.t * signed_commit) Lwt.t = 200 + let tid_now = Tid.now () in 201 + let rev = 202 + match previous with 203 + | Some c when c.rev >= tid_now -> ( 204 + try 205 + let ts, clockid = Tid.to_timestamp_us c.rev in 206 + Tid.of_timestamp_us ~clockid (Int64.succ ts) 207 + with _ -> 208 + failwith 209 + (Format.sprintf 210 + "unable to produce commit rev greater than current %s; now is %s" 211 + c.rev tid_now ) ) 212 + | _ -> 213 + tid_now 214 + in 215 + let commit = {version= 3; did= t.did; prev= None; rev; data= mst_root} in 216 + let signed = sign_commit t commit in 217 + let%lwt commit_cid = 218 + User_store.put_commit t.db signed |> Lwt_result.get_exn 219 + in 220 + Lwt.return (commit_cid, signed) 206 221 207 - let apply_writes (t : t) (commit_cid : Cid.t) 208 - ({did; data= mst_root; rev= commit_rev; _} : commit) 209 - (writes : repo_write list) (swap_commit : Cid.t option) : write_result Lwt.t 210 - = 211 - if swap_commit <> None && swap_commit <> Some commit_cid then 222 + let put_initial_commit t : (Cid.t * signed_commit) Lwt.t = 223 + let%lwt commit = User_store.get_commit t.db in 224 + if commit <> None then failwith ("commit already exists for " ^ t.did) ; 225 + let%lwt {root; _} = Mst.create_empty t.db |> Lwt_result.get_exn in 226 + put_commit t root 227 + 228 + let apply_writes (t : t) (writes : repo_write list) (swap_commit : Cid.t option) 229 + : write_result Lwt.t = 230 + let%lwt commit = 231 + match%lwt User_store.get_commit t.db with 232 + | Some (_, commit) -> 233 + Lwt.return commit 234 + | None -> 235 + failwith "failed to find commit for repo" 236 + in 237 + if swap_commit <> None && swap_commit <> Some t.root then 212 238 raise 213 239 (XrpcError 214 240 ( "InvalidSwap" 215 241 , Format.sprintf "swapRecord cid %s did not match commit cid %s" 216 242 (Cid.to_string (Option.get swap_commit)) 217 - (Cid.to_string commit_cid) ) ) ; 218 - let%lwt block_map = Lwt.map ref (get_map t mst_root) in 243 + (Cid.to_string t.root) ) ) ; 244 + let%lwt block_map = Lwt.map ref (get_map t commit.data) in 219 245 let%lwt results = 220 246 List.map 221 247 (fun (w : repo_write) -> ··· 223 249 | Create {collection; rkey; value; _} -> 224 250 let rkey = Option.value rkey ~default:(Tid.now ()) in 225 251 let path = Format.sprintf "%s/%s" collection rkey in 226 - let uri = Format.sprintf "at://%s/%s" did path in 252 + let uri = Format.sprintf "at://%s/%s" t.did path in 227 253 let%lwt () = 228 254 match StringMap.find_opt path !block_map with 229 255 | Some cid -> ··· 256 282 ) 257 283 | Update {collection; rkey; value; swap_record; _} -> 258 284 let path = Format.sprintf "%s/%s" collection rkey in 259 - let uri = Format.sprintf "at://%s/%s" did path in 285 + let uri = Format.sprintf "at://%s/%s" t.did path in 260 286 let old_cid = StringMap.find_opt path !block_map in 261 287 ( if 262 288 (swap_record <> None && swap_record <> old_cid) ··· 333 359 writes 334 360 |> Lwt.all 335 361 in 336 - let%lwt () = User_store.clear_mst t.db in 362 + let%lwt () = User_store.clear_blocks t.db in 337 363 let%lwt {root; _} = Mst.of_assoc t.db (StringMap.bindings !block_map) in 338 - let tid_now = Tid.now () in 339 - let rev = 340 - if tid_now > commit_rev then tid_now 341 - else 342 - try 343 - let ts, clockid = Tid.to_timestamp_us commit_rev in 344 - Tid.of_timestamp_us ~clockid (Int64.succ ts) 345 - with _ -> 346 - failwith 347 - (Format.sprintf 348 - "unable to produce commit rev greater than current %s; now is %s" 349 - commit_rev tid_now ) 350 - in 351 - let commit = {version= 3; did; prev= None; rev; data= root} in 352 - let signed = sign_commit t commit in 353 - let commit_cid = 354 - signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 355 - |> Cid.create Dcbor 356 - in 357 - Lwt.return {commit= (commit_cid, signed); results} 364 + let%lwt commit = put_commit t root ~previous:(Some commit) in 365 + Lwt.return {commit; results}
+126 -31
pegasus/lib/user_store.ml
··· 5 5 module Lex = Mist.Lex 6 6 module Tid = Mist.Tid 7 7 8 - type block = {cid: Cid.t; data: Blob.t} 8 + module Types = struct 9 + open struct 10 + let cid_link_of_yojson = function 11 + | `Assoc link -> 12 + link |> List.assoc "$link" |> Cid.of_yojson 13 + |> Result.map (fun cid -> Some cid) 14 + | `Null -> 15 + Ok None 16 + | _ -> 17 + Error "commit prev not a valid cid" 18 + 19 + let cid_link_to_yojson = function 20 + | Some cid -> 21 + Cid.to_yojson cid 22 + | None -> 23 + `Null 24 + end 25 + 26 + type commit = 27 + { did: string 28 + ; version: int (* always 3 *) 29 + ; data: Cid.t [@of_yojson Cid.of_yojson] [@to_yojson Cid.to_yojson] 30 + ; rev: Tid.t 31 + ; prev: Cid.t option 32 + [@of_yojson cid_link_of_yojson] [@to_yojson cid_link_to_yojson] } 33 + [@@deriving yojson] 34 + 35 + type signed_commit = 36 + { did: string 37 + ; version: int (* always 3 *) 38 + ; data: Cid.t [@of_yojson Cid.of_yojson] [@to_yojson Cid.to_yojson] 39 + ; rev: Tid.t 40 + ; prev: Cid.t option 41 + [@of_yojson cid_link_of_yojson] [@to_yojson cid_link_to_yojson] 42 + ; signature: bytes 43 + [@key "sig"] 44 + [@of_yojson 45 + fun x -> 46 + match Dag_cbor.of_yojson x with 47 + | `Bytes b -> 48 + Ok b 49 + | _ -> 50 + Error "commit sig not a valid bytes value"] 51 + [@to_yojson fun x -> Dag_cbor.to_yojson (`Bytes x)] } 52 + [@@deriving yojson] 9 53 10 - type record = {path: string; cid: Cid.t; value: Lex.repo_record; since: Tid.t} 54 + type block = {cid: Cid.t; data: Blob.t} 11 55 12 - type blob = {id: int; cid: Cid.t; mimetype: string; data: Blob.t} 56 + type record = {path: string; cid: Cid.t; value: Lex.repo_record; since: Tid.t} 13 57 14 - type t = (module Rapper_helper.CONNECTION) 58 + type blob = {id: int; cid: Cid.t; mimetype: string; data: Blob.t} 59 + end 60 + 61 + open Types 15 62 16 63 module Queries = struct 17 - (* mst storage *) 18 - let create_mst_table = 64 + (* block storage *) 65 + let create_blocks_tables conn = 66 + let$! () = 67 + [%rapper 68 + execute 69 + {sql| CREATE TABLE IF NOT EXISTS blocks ( 70 + cid TEXT NOT NULL PRIMARY KEY, 71 + data BLOB NOT NULL 72 + ); 73 + |sql}] 74 + () conn 75 + in 19 76 [%rapper 20 77 execute 21 - {sql| CREATE TABLE IF NOT EXISTS mst ( 22 - cid TEXT NOT NULL PRIMARY KEY, 23 - data BLOB NOT NULL 78 + {sql| CREATE TABLE IF NOT EXISTS named_blocks ( 79 + name TEXT NOT NULL UNIQUE, 80 + cid TEXT NOT NULL UNIQUE, 81 + FOREIGN KEY (cid) REFERENCES blocks (cid) 24 82 ); 25 83 |sql}] 26 - () 84 + () conn 27 85 28 86 let get_block cid = 29 87 [%rapper 30 88 get_opt 31 - {sql| SELECT @CID{cid}, @Blob{data} FROM mst WHERE cid = %CID{cid} |sql} 89 + {sql| SELECT @CID{cid}, @Blob{data} FROM blocks WHERE cid = %CID{cid} |sql} 32 90 record_out] 33 91 ~cid 34 92 35 93 let get_blocks cids = 36 94 [%rapper 37 95 get_many 38 - {sql| SELECT @CID{cid}, @Blob{data} FROM mst WHERE cid IN (%list{%CID{cids}}) |sql} 96 + {sql| SELECT @CID{cid}, @Blob{data} FROM blocks WHERE cid IN (%list{%CID{cids}}) |sql} 39 97 record_out] 40 98 ~cids 41 99 42 100 let has_block cid = 43 101 [%rapper 44 - get_opt {sql| SELECT @CID{cid} FROM mst WHERE cid = %CID{cid} |sql}] 102 + get_opt {sql| SELECT @CID{cid} FROM blocks WHERE cid = %CID{cid} |sql}] 45 103 ~cid 46 104 47 105 let put_block cid block = 48 106 [%rapper 49 107 get_opt 50 - {sql| INSERT INTO mst (cid, data) VALUES (%CID{cid}, %Blob{block}) ON CONFLICT DO NOTHING RETURNING @CID{cid} |sql}] 108 + {sql| INSERT INTO blocks (cid, data) VALUES (%CID{cid}, %Blob{block}) ON CONFLICT DO NOTHING RETURNING @CID{cid} |sql}] 51 109 ~cid ~block 52 110 53 111 let delete_block cid = 54 - [%rapper execute {sql| DELETE FROM mst WHERE cid = %CID{cid} |sql}] ~cid 112 + [%rapper execute {sql| DELETE FROM blocks WHERE cid = %CID{cid} |sql}] ~cid 55 113 56 114 let delete_blocks cids = 57 115 [%rapper 58 116 get_many 59 - {sql| DELETE FROM mst WHERE cid IN (%list{%CID{cids}}) RETURNING @CID{cid} |sql}] 117 + {sql| DELETE FROM blocks WHERE cid IN (%list{%CID{cids}}) RETURNING @CID{cid} |sql}] 60 118 ~cids 61 119 62 - let clear_mst = [%rapper execute {sql| DELETE FROM mst |sql}] () 120 + let clear_blocks = [%rapper execute {sql| DELETE FROM blocks |sql}] () 121 + 122 + let get_commit = 123 + [%rapper 124 + get_opt 125 + {sql| SELECT @CID{cid}, @Blob{data} FROM blocks 126 + LEFT JOIN named_blocks ON blocks.cid = named_blocks.cid 127 + WHERE blocks.name = 'commit' 128 + |sql}] 129 + () 130 + 131 + let put_commit cid block conn = 132 + let$! _ = put_block cid block conn in 133 + [%rapper 134 + execute 135 + {sql| INSERT INTO named_blocks (cid, name) VALUES (%CID{cid}, 'commit') 136 + ON CONFLICT DO UPDATE SET cid = %CID{cid} 137 + |sql}] 138 + ~cid conn 63 139 64 140 (* record storage *) 65 141 let create_records_table = ··· 71 147 data BLOB NOT NULL, 72 148 since TEXT NOT NULL 73 149 ); 150 + CREATE INDEX IF NOT EXISTS records_cid_idx ON records (cid); 74 151 |sql}] 75 152 () 76 153 77 154 let get_record_by_path = 78 155 [%rapper 79 156 get_opt 80 - {sql| SELECT @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path = %string{path} 81 - |sql}] 157 + {sql| SELECT @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path = %string{path} |sql}] 82 158 83 159 let get_record_by_cid = 84 160 [%rapper 85 161 get_opt 86 - {sql| SELECT @string{path}, @Blob{data}, @string{since} FROM records WHERE cid = %CID{cid} 87 - |sql}] 162 + {sql| SELECT @string{path}, @Blob{data}, @string{since} FROM records WHERE cid = %CID{cid} |sql}] 88 163 89 164 let list_records = 90 165 [%rapper 91 166 get_many 92 - {sql| SELECT @string{path}, @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path LIKE %string{collection}/% 93 - |sql}] 167 + {sql| SELECT @string{path}, @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path LIKE %string{collection}/% |sql}] 94 168 95 169 let put_record = 96 170 [%rapper 97 171 execute 98 172 {sql| INSERT INTO records (path, cid, data, since) 99 - VALUES (%string{path}, %CID{cid}, %Blob{data}, %string{since}) 100 - |sql}] 173 + VALUES (%string{path}, %CID{cid}, %Blob{data}, %string{since}) 174 + ON CONFLICT (path) DO UPDATE SET cid = excluded.cid, data = excluded.data, since = excluded.since 175 + |sql}] 101 176 102 177 (* blob storage *) 103 178 let create_blobs_tables conn = ··· 153 228 let put_blob cid mimetype data = 154 229 [%rapper 155 230 get_one 156 - {sql| INSERT INTO blobs (cid, mimetype, data) VALUES (%CID{cid}, %string{mimetype}, %Blob{data}) RETURNING @int{id} |sql}] 231 + {sql| INSERT INTO blobs (cid, mimetype, data) 232 + VALUES (%CID{cid}, %string{mimetype}, %Blob{data}) 233 + ON CONFLICT (cid) DO UPDATE SET mimetype = excluded.mimetype, data = excluded.data 234 + RETURNING @int{id} 235 + |sql}] 157 236 ~cid ~mimetype ~data 158 237 159 238 let list_blob_refs path = ··· 169 248 (SELECT id FROM blobs WHERE cid = %CID{cid} LIMIT 1), 170 249 %string{path} 171 250 ) 251 + ON CONFLICT DO NOTHING 172 252 |sql}] 173 253 ~cid ~path 174 254 ··· 178 258 {sql| DELETE FROM blobs_records WHERE record_path LIKE %string{path} AND blob_id IN ( 179 259 SELECT id FROM blobs WHERE cid IN (%list{%CID{cids}}) 180 260 ) 181 - |sql}] 261 + |sql}] 182 262 ~path ~cids 183 263 end 184 264 265 + type t = (module Rapper_helper.CONNECTION) 266 + 185 267 let init conn : unit Lwt.t = 186 - let$! () = Queries.create_mst_table conn in 268 + let$! () = Queries.create_blocks_tables conn in 187 269 let$! () = Queries.create_records_table conn in 188 270 let$! () = Queries.create_blobs_tables conn in 189 271 Lwt.return_unit 190 272 191 - (* mst *) 273 + (* blocks *) 192 274 193 275 let get_bytes conn cid : Blob.t option Lwt.t = 194 276 Queries.get_block cid conn ··· 228 310 let delete_many conn cids : (int, exn) Lwt_result.t = 229 311 Queries.delete_blocks cids conn >$! List.length >>= Lwt.return_ok 230 312 231 - let clear_mst conn : unit Lwt.t = 232 - let$! () = Queries.clear_mst conn in 313 + let clear_blocks conn : unit Lwt.t = 314 + let$! () = Queries.clear_blocks conn in 233 315 Lwt.return_unit 316 + 317 + let get_commit conn : (Cid.t * signed_commit) option Lwt.t = 318 + Queries.get_commit conn 319 + >$! Option.map (fun (cid, data) -> 320 + ( cid 321 + , data |> Dag_cbor.decode_to_yojson |> signed_commit_of_yojson 322 + |> Result.get_ok ) ) 323 + 324 + let put_commit conn commit : (Cid.t, exn) Lwt_result.t = 325 + let data = commit |> signed_commit_to_yojson |> Dag_cbor.encode_yojson in 326 + let cid = Cid.create Dcbor data in 327 + let$! () = Queries.put_commit cid data conn in 328 + Lwt.return_ok cid 234 329 235 330 (* records *) 236 331