objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Undo accidental partial revert of d765faf in c11a3df

futurGH 7b776612 9b59aea3

+197 -250
+197 -250
pegasus/lib/repository.ml
··· 8 8 module String_map = Lex.String_map 9 9 module Tid = Mist.Tid 10 10 11 - let write_locks : (string, Lwt_mutex.t) Hashtbl.t = Hashtbl.create 100 12 - 13 - let write_lock_mutex = Lwt_mutex.create () 14 - 15 - let with_write_lock did f = 16 - let%lwt lock = 17 - Lwt_mutex.with_lock write_lock_mutex (fun () -> 18 - match Hashtbl.find_opt write_locks did with 19 - | Some l -> 20 - Lwt.return l 21 - | None -> 22 - let l = Lwt_mutex.create () in 23 - Hashtbl.add write_locks did l ; 24 - Lwt.return l ) 25 - in 26 - Lwt_mutex.with_lock lock f 27 - 28 11 module Write_op = struct 29 12 let create = "com.atproto.repo.applyWrites#create" 30 13 ··· 261 244 262 245 let apply_writes (t : t) (writes : repo_write list) (swap_commit : Cid.t option) 263 246 : write_result Lwt.t = 264 - with_write_lock t.did (fun () -> 265 - Dream.debug (fun l -> l "lock acquired") ; 266 - let open Sequencer.Types in 267 - let module Inductive = Mist.Mst.Inductive (Mst) in 268 - let%lwt prev_commit = 269 - match%lwt User_store.get_commit t.db with 270 - | Some (_, commit) -> 271 - Lwt.return commit 272 - | None -> 273 - failwith ("failed to retrieve commit for " ^ t.did) 274 - in 275 - Dream.debug (fun l -> l "commit retrieved") ; 276 - if swap_commit <> None && swap_commit <> Option.map fst t.commit then 277 - Errors.invalid_request ~name:"InvalidSwap" 278 - (Format.sprintf "swapCommit cid %s did not match last commit cid %s" 279 - (Cid.to_string (Option.get swap_commit)) 280 - ( match t.commit with 281 - | Some (c, _) -> 282 - Cid.to_string c 283 - | None -> 284 - "null" ) ) ; 285 - let%lwt block_map = Lwt.map ref (get_map t) in 286 - let cached_store = Cached_store.create t.db in 287 - let mst : Cached_mst.t ref = 288 - ref (Cached_mst.create cached_store prev_commit.data) 289 - in 290 - (* ops to emit, built in loop because prev_data (previous cid) is otherwise inaccessible *) 291 - let commit_ops : commit_evt_op list ref = ref [] in 292 - let added_leaves = ref Block_map.empty in 293 - let%lwt results = 294 - Lwt_list.map_s 295 - (fun (w : repo_write) -> 296 - match w with 297 - | Create {collection; rkey; value; _} -> 298 - let rkey = Option.value rkey ~default:(Tid.now ()) in 299 - let path = Format.sprintf "%s/%s" collection rkey in 300 - let uri = Format.sprintf "at://%s/%s" t.did path in 301 - let%lwt () = 302 - match String_map.find_opt path !block_map with 247 + let open Sequencer.Types in 248 + let%lwt prev_commit = 249 + match%lwt User_store.get_commit t.db with 250 + | Some (_, commit) -> 251 + Lwt.return commit 252 + | None -> 253 + failwith ("failed to retrieve commit for " ^ t.did) 254 + in 255 + if swap_commit <> None && swap_commit <> Option.map fst t.commit then 256 + Errors.invalid_request ~name:"InvalidSwap" 257 + (Format.sprintf "swapCommit cid %s did not match last commit cid %s" 258 + (Cid.to_string (Option.get swap_commit)) 259 + (match t.commit with Some (c, _) -> Cid.to_string c | None -> "null") ) ; 260 + let cached_store = Cached_store.create t.db in 261 + let mst : Cached_mst.t ref = 262 + ref (Cached_mst.create cached_store prev_commit.data) 263 + in 264 + t.block_map <- None ; 265 + (* ops to emit, built in loop because prev_data (previous cid) is otherwise inaccessible *) 266 + let commit_ops : commit_evt_op list ref = ref [] in 267 + let added_leaves = ref Block_map.empty in 268 + let%lwt results = 269 + Lwt_list.map_s 270 + (fun (w : repo_write) -> 271 + match w with 272 + | Create {collection; rkey; value; _} -> 273 + let rkey = Option.value rkey ~default:(Tid.now ()) in 274 + let path = Format.sprintf "%s/%s" collection rkey in 275 + let uri = Format.sprintf "at://%s/%s" t.did path in 276 + let%lwt () = 277 + match%lwt User_store.get_record_cid t.db path with 278 + | Some cid -> 279 + Errors.invalid_request ~name:"InvalidSwap" 280 + (Format.sprintf 281 + "attempted to write record %s that already exists with \ 282 + cid %s" 283 + path (Cid.to_string cid) ) 284 + | None -> 285 + Lwt.return () 286 + in 287 + let record_with_type : Lex.repo_record = 288 + if String_map.mem "$type" value then value 289 + else String_map.add "$type" (`String collection) value 290 + in 291 + let%lwt cid, block = 292 + User_store.put_record t.db (`LexMap record_with_type) path 293 + in 294 + added_leaves := Block_map.set cid block !added_leaves ; 295 + commit_ops := 296 + !commit_ops @ [{action= `Create; path; cid= Some cid; prev= None}] ; 297 + let%lwt new_mst = Cached_mst.add !mst path cid in 298 + mst := new_mst ; 299 + let refs = 300 + Util.find_blob_refs value 301 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 302 + in 303 + let%lwt () = 304 + match%lwt User_store.put_blob_refs t.db path refs with 305 + | Ok () -> 306 + Lwt.return () 307 + | Error err -> 308 + raise err 309 + in 310 + Lwt.return 311 + (Create 312 + {type'= "com.atproto.repo.applyWrites#createResult"; uri; cid} 313 + ) 314 + | Update {collection; rkey; value; swap_record; _} -> 315 + let path = Format.sprintf "%s/%s" collection rkey in 316 + let uri = Format.sprintf "at://%s/%s" t.did path in 317 + let%lwt old_cid = User_store.get_record_cid t.db path in 318 + ( if 319 + (swap_record <> None && swap_record <> old_cid) 320 + || (swap_record = None && old_cid = None) 321 + then 322 + let cid_str = 323 + match old_cid with 303 324 | Some cid -> 304 - Errors.invalid_request ~name:"InvalidSwap" 305 - (Format.sprintf 306 - "attempted to write record %s that already exists \ 307 - with cid %s" 308 - path (Cid.to_string cid) ) 325 + Cid.to_string cid 309 326 | None -> 310 - Lwt.return () 311 - in 312 - let record_with_type : Lex.repo_record = 313 - if String_map.mem "$type" value then value 314 - else String_map.add "$type" (`String collection) value 315 - in 316 - let%lwt cid, block = 317 - User_store.put_record t.db (`LexMap record_with_type) path 327 + "null" 318 328 in 319 - block_map := String_map.add path cid !block_map ; 320 - added_leaves := Block_map.set cid block !added_leaves ; 321 - commit_ops := 322 - !commit_ops 323 - @ [{action= `Create; path; cid= Some cid; prev= None}] ; 324 - let%lwt new_mst = Cached_mst.add !mst path cid in 325 - mst := new_mst ; 326 - let refs = 327 - Util.find_blob_refs value 328 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 329 - in 330 - let%lwt () = 331 - match%lwt User_store.put_blob_refs t.db path refs with 332 - | Ok () -> 333 - Lwt.return () 334 - | Error err -> 335 - raise err 336 - in 337 - Lwt.return 338 - (Create 339 - { type'= "com.atproto.repo.applyWrites#createResult" 340 - ; uri 341 - ; cid } ) 342 - | Update {collection; rkey; value; swap_record; _} -> 343 - let path = Format.sprintf "%s/%s" collection rkey in 344 - let uri = Format.sprintf "at://%s/%s" t.did path in 345 - let old_cid = String_map.find_opt path !block_map in 346 - ( if 347 - (swap_record <> None && swap_record <> old_cid) 348 - || (swap_record = None && old_cid = None) 349 - then 350 - let cid_str = 351 - match old_cid with 352 - | Some cid -> 353 - Cid.to_string cid 354 - | None -> 355 - "null" 329 + Errors.invalid_request ~name:"InvalidSwap" 330 + (Format.sprintf "attempted to update record %s with cid %s" 331 + path cid_str ) ) ; 332 + let%lwt () = 333 + match old_cid with 334 + | Some _ -> ( 335 + match%lwt User_store.get_record t.db path with 336 + | Some record -> 337 + let refs = 338 + Util.find_blob_refs record.value 339 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 356 340 in 357 - Errors.invalid_request ~name:"InvalidSwap" 358 - (Format.sprintf 359 - "attempted to update record %s with cid %s" path 360 - cid_str ) ) ; 361 - let%lwt () = 362 - match old_cid with 363 - | Some _ -> ( 364 - match%lwt User_store.get_record t.db path with 365 - | Some record -> 366 - let refs = 367 - Util.find_blob_refs record.value 368 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 369 - in 370 - if not (List.is_empty refs) then 371 - let%lwt _ = 372 - User_store.delete_orphaned_blobs_by_record_path t.db 373 - path 374 - in 375 - Lwt.return_unit 376 - else Lwt.return_unit 377 - | None -> 378 - Lwt.return_unit ) 341 + if not (List.is_empty refs) then 342 + let%lwt _ = 343 + User_store.delete_orphaned_blobs_by_record_path t.db 344 + path 345 + in 346 + Lwt.return_unit 347 + else Lwt.return_unit 348 + | None -> 349 + Lwt.return_unit ) 350 + | None -> 351 + Lwt.return_unit 352 + in 353 + let record_with_type : Lex.repo_record = 354 + if String_map.mem "$type" value then value 355 + else String_map.add "$type" (`String collection) value 356 + in 357 + let%lwt new_cid, new_block = 358 + User_store.put_record t.db (`LexMap record_with_type) path 359 + in 360 + added_leaves := Block_map.set new_cid new_block !added_leaves ; 361 + commit_ops := 362 + !commit_ops 363 + @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ; 364 + let%lwt new_mst = Cached_mst.add !mst path new_cid in 365 + mst := new_mst ; 366 + let refs = 367 + Util.find_blob_refs value 368 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 369 + in 370 + let%lwt () = 371 + match%lwt User_store.put_blob_refs t.db path refs with 372 + | Ok () -> 373 + Lwt.return () 374 + | Error err -> 375 + raise err 376 + in 377 + Lwt.return 378 + (Update 379 + { type'= "com.atproto.repo.applyWrites#updateResult" 380 + ; uri 381 + ; cid= new_cid } ) 382 + | Delete {collection; rkey; swap_record; _} -> 383 + let path = Format.sprintf "%s/%s" collection rkey in 384 + let%lwt cid = User_store.get_record_cid t.db path in 385 + ( if cid = None || (swap_record <> None && swap_record <> cid) then 386 + let cid_str = 387 + match cid with 388 + | Some cid -> 389 + Cid.to_string cid 379 390 | None -> 380 - Lwt.return_unit 381 - in 382 - let record_with_type : Lex.repo_record = 383 - if String_map.mem "$type" value then value 384 - else String_map.add "$type" (`String collection) value 385 - in 386 - let%lwt new_cid, new_block = 387 - User_store.put_record t.db (`LexMap record_with_type) path 388 - in 389 - added_leaves := Block_map.set new_cid new_block !added_leaves ; 390 - block_map := String_map.add path new_cid !block_map ; 391 - commit_ops := 392 - !commit_ops 393 - @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ; 394 - let%lwt new_mst = Cached_mst.add !mst path new_cid in 395 - mst := new_mst ; 396 - let refs = 397 - Util.find_blob_refs value 398 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 399 - in 400 - let%lwt () = 401 - match%lwt User_store.put_blob_refs t.db path refs with 402 - | Ok () -> 403 - Lwt.return () 404 - | Error err -> 405 - raise err 391 + "null" 406 392 in 407 - Lwt.return 408 - (Update 409 - { type'= "com.atproto.repo.applyWrites#updateResult" 410 - ; uri 411 - ; cid= new_cid } ) 412 - | Delete {collection; rkey; swap_record; _} -> 413 - let path = Format.sprintf "%s/%s" collection rkey in 414 - let cid = String_map.find_opt path !block_map in 415 - ( if cid = None || (swap_record <> None && swap_record <> cid) 416 - then 417 - let cid_str = 418 - match cid with 419 - | Some cid -> 420 - Cid.to_string cid 421 - | None -> 422 - "null" 393 + Errors.invalid_request ~name:"InvalidSwap" 394 + (Format.sprintf "attempted to delete record %s with cid %s" 395 + path cid_str ) ) ; 396 + let%lwt () = 397 + match%lwt User_store.get_record t.db path with 398 + | Some record -> 399 + let refs = 400 + Util.find_blob_refs record.value 401 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 402 + in 403 + if not (List.is_empty refs) then 404 + let%lwt _ = 405 + User_store.delete_orphaned_blobs_by_record_path t.db path 423 406 in 424 - Errors.invalid_request ~name:"InvalidSwap" 425 - (Format.sprintf 426 - "attempted to delete record %s with cid %s" path 427 - cid_str ) ) ; 428 - let%lwt () = 429 - match%lwt User_store.get_record t.db path with 430 - | Some record -> 431 - let refs = 432 - Util.find_blob_refs record.value 433 - |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 434 - in 435 - if not (List.is_empty refs) then 436 - let%lwt _ = 437 - User_store.delete_orphaned_blobs_by_record_path t.db 438 - path 439 - in 440 - Lwt.return_unit 441 - else Lwt.return_unit 442 - | None -> 443 - Lwt.return_unit 444 - in 445 - let%lwt () = User_store.delete_record t.db path in 446 - block_map := String_map.remove path !block_map ; 447 - commit_ops := 448 - !commit_ops @ [{action= `Delete; path; cid= None; prev= cid}] ; 449 - let%lwt new_mst = Cached_mst.delete !mst path in 450 - mst := new_mst ; 451 - Lwt.return 452 - (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) ) 453 - writes 454 - in 455 - Dream.debug (fun l -> l "writes processed") ; 456 - let new_mst = !mst in 457 - let%lwt new_commit = 458 - put_commit t new_mst.root ~previous:(Some prev_commit) 459 - in 460 - Dream.debug (fun l -> l "commit inserted") ; 461 - let new_commit_cid, new_commit_signed = new_commit in 462 - let commit_block = 463 - new_commit_signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 464 - in 465 - let diff : Inductive.diff list = 466 - List.fold_left 467 - (fun (acc : Inductive.diff list) 468 - ({action; path; cid; prev} : commit_evt_op) -> 469 - match action with 470 - | `Create -> 471 - acc @ [Add {key= path; cid= Option.get cid}] 472 - | `Update -> 473 - acc @ [Update {key= path; cid= Option.get cid; prev}] 474 - | `Delete -> 475 - acc @ [Delete {key= path; prev= Option.get prev}] ) 476 - [] !commit_ops 477 - in 478 - let%lwt proof_blocks = 479 - match%lwt 480 - Inductive.generate_proof !block_map diff ~new_root:new_mst.root 481 - ~prev_root:prev_commit.data 482 - with 483 - | Ok blocks -> 484 - Lwt.return (Block_map.merge blocks !added_leaves) 485 - | Error err -> 486 - raise err 487 - in 488 - Dream.debug (fun l -> l "proof generated") ; 489 - let block_stream = 490 - proof_blocks |> Block_map.entries |> Lwt_seq.of_list 491 - |> Lwt_seq.cons (new_commit_cid, commit_block) 492 - in 493 - let%lwt blocks = 494 - Car.blocks_to_stream new_commit_cid block_stream |> Car.collect_stream 495 - in 496 - let%lwt ds = Data_store.connect () in 497 - let%lwt _ = 498 - Sequencer.sequence_commit ds ~did:t.did ~commit:new_commit_cid 499 - ~rev:new_commit_signed.rev ~blocks ~ops:!commit_ops 500 - ~since:prev_commit.rev ~prev_data:prev_commit.data () 501 - in 502 - Dream.debug (fun l -> l "commit sequenced") ; 503 - Lwt.return {commit= new_commit; results} ) 407 + Lwt.return_unit 408 + else Lwt.return_unit 409 + | None -> 410 + Lwt.return_unit 411 + in 412 + let%lwt () = User_store.delete_record t.db path in 413 + commit_ops := 414 + !commit_ops @ [{action= `Delete; path; cid= None; prev= cid}] ; 415 + let%lwt new_mst = Cached_mst.delete !mst path in 416 + mst := new_mst ; 417 + Lwt.return 418 + (Delete {type'= "com.atproto.repo.applyWrites#deleteResult"}) ) 419 + writes 420 + in 421 + let new_mst = !mst in 422 + let%lwt new_commit = put_commit t new_mst.root ~previous:(Some prev_commit) in 423 + let new_commit_cid, new_commit_signed = new_commit in 424 + let commit_block = 425 + new_commit_signed |> signed_commit_to_yojson |> Dag_cbor.encode_yojson 426 + in 427 + let%lwt proof_blocks = 428 + Lwt_list.fold_left_s 429 + (fun acc ({path; _} : commit_evt_op) -> 430 + let%lwt key_proof = 431 + Cached_mst.proof_for_key new_mst new_mst.root path 432 + in 433 + Lwt.return (Block_map.merge acc key_proof) ) 434 + Block_map.empty !commit_ops 435 + in 436 + let proof_blocks = Block_map.merge proof_blocks !added_leaves in 437 + let block_stream = 438 + proof_blocks |> Block_map.entries |> Lwt_seq.of_list 439 + |> Lwt_seq.cons (new_commit_cid, commit_block) 440 + in 441 + let%lwt blocks = 442 + Car.blocks_to_stream new_commit_cid block_stream |> Car.collect_stream 443 + in 444 + let%lwt ds = Data_store.connect () in 445 + let%lwt _ = 446 + Sequencer.sequence_commit ds ~did:t.did ~commit:new_commit_cid 447 + ~rev:new_commit_signed.rev ~blocks ~ops:!commit_ops ~since:prev_commit.rev 448 + ~prev_data:prev_commit.data () 449 + in 450 + Lwt.return {commit= new_commit; results} 504 451 505 452 let load ?create ?(ensure_active = false) did : t Lwt.t = 506 453 let%lwt ds_conn = Data_store.connect () in