objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement applyWrites

futurGH e4ca1585 5a0ebd20

+348 -17
+3 -1
ipld/lib/dag_cbor.ml
··· 37 37 `Map 38 38 (StringMap.of_list 39 39 (List.map (fun (k, v) -> (k, of_yojson v)) assoc_list) ) 40 - | `List lst -> 40 + | `List lst | `Tuple lst -> 41 41 `Array (Array.of_list (List.map of_yojson lst)) 42 42 | `Bool b -> 43 43 `Boolean b ··· 49 49 `Float f 50 50 | `String s -> 51 51 `String s 52 + | `Variant (_, value) -> ( 53 + match value with Some v -> of_yojson v | None -> `Null ) 52 54 | `Null -> 53 55 `Null 54 56
+1
pegasus/lib/dune
··· 5 5 caqti-lwt 6 6 caqti-driver-sqlite3 7 7 ipld 8 + kleidos 8 9 lwt 9 10 lwt.unix 10 11 mist
+331 -16
pegasus/lib/repository.ml
··· 1 - open Mist 1 + open Util.Exceptions 2 + module Lex = Mist.Lex 3 + module Mst = Mist.Mst.Make (User_store) 4 + module StringMap = Lex.StringMap 5 + module Tid = Mist.Tid 6 + 7 + let cid_link_of_yojson = function 8 + | `Assoc link -> 9 + link |> List.assoc "$link" |> Cid.of_yojson 10 + |> Result.map (fun cid -> Some cid) 11 + | `Null -> 12 + Ok None 13 + | _ -> 14 + Error "commit prev not a valid cid" 15 + 16 + let cid_link_to_yojson = function 17 + | Some cid -> 18 + Cid.to_yojson cid 19 + | None -> 20 + `Null 2 21 3 22 type commit = 4 23 { did: string 5 24 ; version: int (* always 3 *) 6 - ; data: Cid.t 25 + ; data: Cid.t [@of_yojson Cid.of_yojson] [@to_yojson Cid.to_yojson] 7 26 ; rev: Tid.t 8 - ; prev: Cid.t option } 27 + ; prev: Cid.t option 28 + [@of_yojson cid_link_of_yojson] [@to_yojson cid_link_to_yojson] } 29 + [@@deriving yojson] 9 30 10 31 type signed_commit = 11 32 { did: string ··· 13 34 ; data: Cid.t [@of_yojson Cid.of_yojson] [@to_yojson Cid.to_yojson] 14 35 ; rev: Tid.t 15 36 ; prev: Cid.t option 16 - [@of_yojson 17 - function 18 - | `Assoc link -> 19 - link |> List.assoc "$link" |> Cid.of_yojson 20 - |> Result.map (fun cid -> Some cid) 21 - | `Null -> 22 - Ok None 23 - | _ -> 24 - Error "commit prev not a valid cid"] 25 - [@to_yojson function Some cid -> Cid.to_yojson cid | None -> `Null] 37 + [@of_yojson cid_link_of_yojson] [@to_yojson cid_link_to_yojson] 26 38 ; signature: bytes 27 39 [@key "sig"] 28 40 [@of_yojson ··· 37 49 38 50 type signing_key = P256 of bytes | K256 of bytes 39 51 40 - module Make (Store : Storage.Writable_blockstore) = struct 41 - type t = {store: Store.t; key: signing_key} 42 - end 52 + type repo_write = 53 + | Create of 54 + { type': string [@key "$type"] 55 + ; collection: string 56 + ; rkey: string option 57 + ; value: Lex.repo_record } 58 + | Update of 59 + { type': string [@key "$type"] 60 + ; collection: string 61 + ; rkey: string 62 + ; value: Lex.repo_record 63 + ; swap_record: Cid.t option [@key "swapRecord"] } 64 + | Delete of 65 + { type': string [@key "$type"] 66 + ; collection: string 67 + ; rkey: string 68 + ; swap_record: Cid.t option [@key "swapRecord"] } 69 + [@@deriving yojson] 70 + 71 + let repo_write_of_yojson (json : Yojson.Safe.t) = 72 + let open Yojson.Safe.Util in 73 + let type' = member "$type" json |> to_string in 74 + let collection = member "collection" json |> to_string in 75 + let rkey = match member "rkey" json with `String s -> Some s | _ -> None in 76 + let swap_record = 77 + match member "swapRecord" json with 78 + | `String s -> 79 + s |> Cid.of_string |> Result.get_ok |> Option.some 80 + | _ -> 81 + None 82 + in 83 + match type' with 84 + | "com.atproto.repo.applyWrites#create" -> 85 + let value = 86 + member "value" json |> Lex.repo_record_of_yojson |> Result.get_ok 87 + in 88 + Create {type'; collection; rkey; value} 89 + | "com.atproto.repo.applyWrites#update" -> 90 + let value = 91 + member "value" json |> Lex.repo_record_of_yojson |> Result.get_ok 92 + in 93 + Update {type'; collection; rkey= Option.get rkey; value; swap_record} 94 + | "com.atproto.repo.applyWrites#delete" -> 95 + Delete {type'; collection; rkey= Option.get rkey; swap_record} 96 + | _ -> 97 + raise (Invalid_argument "invalid applyWrites write $type") 98 + 99 + let repo_write_to_yojson = function 100 + | Create {type'; collection; rkey; value} -> 101 + `Assoc 102 + [ ("$type", `String type') 103 + ; ("collection", `String collection) 104 + ; ("rkey", match rkey with Some rkey -> `String rkey | None -> `Null) 105 + ; ("value", Lex.repo_record_to_yojson value) ] 106 + | Update {type'; collection; rkey; value; swap_record} -> 107 + `Assoc 108 + [ ("$type", `String type') 109 + ; ("collection", `String collection) 110 + ; ("rkey", `String rkey) 111 + ; ("value", Lex.repo_record_to_yojson value) 112 + ; ( "swapRecord" 113 + , match swap_record with 114 + | Some cid -> 115 + `String (Cid.to_string cid) 116 + | None -> 117 + `Null ) ] 118 + | Delete {type'; collection; rkey; swap_record} -> 119 + `Assoc 120 + [ ("$type", `String type') 121 + ; ("collection", `String collection) 122 + ; ("rkey", `String rkey) 123 + ; ( "swapRecord" 124 + , match swap_record with 125 + | Some cid -> 126 + `String (Cid.to_string cid) 127 + | None -> 128 + `Null ) ] 129 + 130 + type apply_writes_result = 131 + | Create of {type': string [@key "$type"]; uri: string; cid: Cid.t} 132 + | Update of {type': string [@key "$type"]; uri: string; cid: Cid.t} 133 + | Delete of {type': string [@key "$type"]} 134 + [@@deriving yojson] 135 + 136 + let apply_writes_result_of_yojson (json : Yojson.Safe.t) = 137 + let open Yojson.Safe.Util in 138 + let type' = member "$type" json |> to_string in 139 + match type' with 140 + | "com.atproto.repo.applyWrites#createResult" -> 141 + let uri = member "uri" json |> to_string in 142 + let cid = 143 + member "cid" json |> to_string |> Cid.of_string |> Result.get_ok 144 + in 145 + Create {type'; uri; cid} 146 + | "com.atproto.repo.applyWrites#updateResult" -> 147 + let uri = member "uri" json |> to_string in 148 + let cid = 149 + member "cid" json |> to_string |> Cid.of_string |> Result.get_ok 150 + in 151 + Update {type'; uri; cid} 152 + | "com.atproto.repo.applyWrites#deleteResult" -> 153 + Delete {type'} 154 + | _ -> 155 + failwith "invalid applyWrites result $type" 156 + 157 + let apply_writes_result_to_yojson = function 158 + | Create {type'; uri; cid} -> 159 + `Assoc 160 + [ ("$type", `String type') 161 + ; ("uri", `String uri) 162 + ; ("cid", `String (Cid.to_string cid)) ] 163 + | Update {type'; uri; cid} -> 164 + `Assoc 165 + [ ("$type", `String type') 166 + ; ("uri", `String uri) 167 + ; ("cid", `String (Cid.to_string cid)) ] 168 + | Delete {type'} -> 169 + `Assoc [("$type", `String type')] 170 + 171 + type write_result = 172 + {commit: Cid.t * signed_commit; results: apply_writes_result list} 173 + 174 + type t = 175 + { key: signing_key 176 + ; db: Caqti_lwt.connection 177 + ; mutable block_map: Cid.t Mist.Mst.StringMap.t option } 178 + 179 + let get_map t root = 180 + match t.block_map with 181 + | Some map -> 182 + Lwt.return map 183 + | None -> 184 + let%lwt map = Mst.build_map {blockstore= t.db; root} in 185 + t.block_map <- Some map ; 186 + Lwt.return map 187 + 188 + let sign_commit t commit : signed_commit = 189 + let sign_fn, privkey = 190 + match t.key with 191 + | K256 k -> 192 + (Kleidos.K256.sign, k) 193 + | P256 k -> 194 + (Kleidos.P256.sign, k) 195 + in 196 + let msg = commit |> commit_to_yojson |> Dag_cbor.encode_yojson in 197 + let signature = sign_fn ~privkey ~msg in 198 + { did= commit.did 199 + ; version= commit.version 200 + ; data= commit.data 201 + ; rev= commit.rev 202 + ; prev= commit.prev 203 + ; signature } 204 + 205 + let get_record_cid t 206 + 207 + let apply_writes (t : t) (commit_cid : Cid.t) 208 + ({did; data= mst_root; rev= commit_rev; _} : commit) 209 + (writes : repo_write list) (swap_commit : Cid.t option) : write_result Lwt.t 210 + = 211 + if swap_commit <> None && swap_commit <> Some commit_cid then 212 + raise 213 + (XrpcError 214 + ( "InvalidSwap" 215 + , Format.sprintf "swapRecord cid %s did not match commit cid %s" 216 + (Cid.to_string (Option.get swap_commit)) 217 + (Cid.to_string commit_cid) ) ) ; 218 + let%lwt block_map = Lwt.map ref (get_map t mst_root) in 219 + let%lwt results = 220 + List.map 221 + (fun (w : repo_write) -> 222 + match w with 223 + | Create {collection; rkey; value; _} -> 224 + let rkey = Option.value rkey ~default:(Tid.now ()) in 225 + let path = Format.sprintf "%s/%s" collection rkey in 226 + let uri = Format.sprintf "at://%s/%s" did path in 227 + let%lwt () = 228 + match StringMap.find_opt path !block_map with 229 + | Some cid -> 230 + raise 231 + (XrpcError 232 + ( "InvalidSwap" 233 + , Format.sprintf 234 + "attempted to write record %s that already exists \ 235 + with cid %s" 236 + path (Cid.to_string cid) ) ) 237 + | None -> 238 + Lwt.return () 239 + in 240 + let%lwt cid = User_store.put_record t.db (`LexMap value) path in 241 + block_map := StringMap.add path cid !block_map ; 242 + let refs = 243 + Util.find_blob_refs value 244 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 245 + in 246 + let%lwt () = 247 + match%lwt User_store.put_blob_refs t.db path refs with 248 + | Ok () -> 249 + Lwt.return () 250 + | Error err -> 251 + raise err 252 + in 253 + Lwt.return 254 + (Create 255 + {type'= "com.atproto.repo.applyWrites#createResult"; uri; cid} 256 + ) 257 + | Update {collection; rkey; value; swap_record; _} -> 258 + let path = Format.sprintf "%s/%s" collection rkey in 259 + let uri = Format.sprintf "at://%s/%s" did path in 260 + let old_cid = StringMap.find_opt path !block_map in 261 + ( if 262 + (swap_record <> None && swap_record <> old_cid) 263 + || (swap_record = None && old_cid = None) 264 + then 265 + let cid_str = 266 + match old_cid with 267 + | Some cid -> 268 + Cid.to_string cid 269 + | None -> 270 + "null" 271 + in 272 + raise 273 + (XrpcError 274 + ( "InvalidSwap" 275 + , Format.sprintf 276 + "attempted to update record %s with cid %s" path 277 + cid_str ) ) ) ; 278 + let%lwt () = 279 + match old_cid with 280 + | Some _ -> ( 281 + match%lwt User_store.get_record_by_path t.db path with 282 + | Some record -> 283 + let refs = 284 + Util.find_blob_refs record.value 285 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 286 + in 287 + let%lwt () = User_store.clear_blob_refs t.db path refs in 288 + Lwt.return_unit 289 + | None -> 290 + Lwt.return_unit ) 291 + | None -> 292 + Lwt.return_unit 293 + in 294 + let%lwt new_cid = User_store.put_record t.db (`LexMap value) path in 295 + block_map := StringMap.add path new_cid !block_map ; 296 + Lwt.return 297 + (Update 298 + { type'= "com.atproto.repo.applyWrites#updateResult" 299 + ; uri 300 + ; cid= new_cid } ) 301 + | Delete {collection; rkey; swap_record; _} -> 302 + let path = Format.sprintf "%s/%s" collection rkey in 303 + let cid = StringMap.find_opt path !block_map in 304 + ( if cid = None || (swap_record <> None && swap_record <> cid) then 305 + let cid_str = 306 + match cid with 307 + | Some cid -> 308 + Cid.to_string cid 309 + | None -> 310 + "null" 311 + in 312 + raise 313 + (XrpcError 314 + ( "InvalidSwap" 315 + , Format.sprintf 316 + "attempted to delete record %s with cid %s" path 317 + cid_str ) ) ) ; 318 + let%lwt () = 319 + match%lwt User_store.get_record_by_path t.db path with 320 + | Some record -> 321 + let refs = 322 + Util.find_blob_refs record.value 323 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 324 + in 325 + let%lwt () = User_store.clear_blob_refs t.db path refs in 326 + Lwt.return_unit 327 + | None -> 328 + Lwt.return_unit 329 + in 330 + block_map := StringMap.remove path !block_map ; 331 + Lwt.return 332 + (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) ) 333 + writes 334 + |> Lwt.all 335 + in 336 + let%lwt () = User_store.clear_mst t.db in 337 + let%lwt {root; _} = Mst.of_assoc t.db (StringMap.bindings !block_map) in 338 + let tid_now = Tid.now () in 339 + let rev = 340 + if tid_now > commit_rev then tid_now 341 + else 342 + try 343 + let ts, clockid = Tid.to_timestamp_us commit_rev in 344 + Tid.of_timestamp_us ~clockid (Int64.succ ts) 345 + with _ -> 346 + failwith 347 + (Format.sprintf 348 + "unable to produce commit rev greater than current %s; now is %s" 349 + commit_rev tid_now ) 350 + in 351 + let commit = {version= 3; did; prev= None; rev; data= root} in 352 + let signed = sign_commit t commit in 353 + let commit_cid = 354 + signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 355 + |> Cid.create Dcbor 356 + in 357 + Lwt.return {commit= (commit_cid, signed); results}
+13
pegasus/lib/user_store.ml
··· 171 171 ) 172 172 |sql}] 173 173 ~cid ~path 174 + 175 + let clear_blob_refs path cids = 176 + [%rapper 177 + execute 178 + {sql| DELETE FROM blobs_records WHERE record_path LIKE %string{path} AND blob_id IN ( 179 + SELECT id FROM blobs WHERE cid IN (%list{%CID{cids}}) 180 + ) 181 + |sql}] 182 + ~path ~cids 174 183 end 175 184 176 185 let init conn : unit Lwt.t = ··· 277 286 (List.map 278 287 (fun cid -> fun () -> Queries.put_blob_ref cid path conn) 279 288 cids ) 289 + 290 + let clear_blob_refs conn path cids : unit Lwt.t = 291 + let$! () = Queries.clear_blob_refs path cids conn in 292 + Lwt.return_unit