objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Don't handle deleting orphaned blobs with trigger

futurGH 576e86b6 ce68283a

+74 -38
+1
pegasus/lib/dune
··· 7 7 caqti-driver-sqlite3 8 8 cohttp 9 9 cohttp-lwt-unix 10 + core_unix 10 11 dns-client.unix 11 12 dream 12 13 frontend
+19 -6
pegasus/lib/repository.ml
··· 328 328 |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 329 329 in 330 330 if not (List.is_empty refs) then 331 - let%lwt () = User_store.clear_blob_refs t.db path refs in 331 + let%lwt _ = 332 + User_store.delete_orphaned_blobs_by_record_path t.db 333 + path 334 + in 332 335 Lwt.return_unit 333 336 else Lwt.return_unit 334 337 | None -> ··· 348 351 commit_ops := 349 352 !commit_ops 350 353 @ [{action= `Update; path; cid= Some new_cid; prev= old_cid}] ; 354 + let refs = 355 + Util.find_blob_refs value 356 + |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 357 + in 358 + let%lwt () = 359 + match%lwt User_store.put_blob_refs t.db path refs with 360 + | Ok () -> 361 + Lwt.return () 362 + | Error err -> 363 + raise err 364 + in 351 365 Lwt.return 352 366 (Update 353 367 { type'= "com.atproto.repo.applyWrites#updateResult" ··· 375 389 |> List.map (fun (r : Mist.Blob_ref.t) -> r.ref) 376 390 in 377 391 if not (List.is_empty refs) then 378 - let%lwt () = User_store.clear_blob_refs t.db path refs in 392 + let%lwt _ = 393 + User_store.delete_orphaned_blobs_by_record_path t.db path 394 + in 379 395 Lwt.return_unit 380 396 else Lwt.return_unit 381 397 | None -> ··· 617 633 failwith ("missing record block: " ^ Cid.to_string cid) ) 618 634 leaves 619 635 in 620 - Lwt.return_ok () ) 621 - (* use_pool expects Caqti_error.t as exn type, so handle errors via try/with *) 622 - |> Lwt_result.get_exn 623 - |> Lwt_result.ok ) 636 + Lwt.return_ok () ) ) 624 637 in 625 638 (* clear cached block_map so it's rebuilt on next access *) 626 639 t.block_map <- None ;
+48 -26
pegasus/lib/user_store.ml
··· 5 5 module Lex = Mist.Lex 6 6 module Tid = Mist.Tid 7 7 8 + let delete_blob_file ~did cid = 9 + let file = Filename.concat (Util.Constants.user_blobs_location did) cid in 10 + if Sys.file_exists file then Sys.remove file 11 + 8 12 module Types = struct 9 13 open struct 10 14 let cid_link_of_yojson = function ··· 167 171 get_opt 168 172 {sql| SELECT @CID{cid}, @Blob{data}, @string{since} FROM records WHERE path = %string{path} |sql}] 169 173 174 + let get_records_by_cids cids = 175 + [%rapper 176 + get_many 177 + {sql| SELECT @CID{cid}, @Blob{data} FROM records WHERE cid IN (%list{%CID{cids}}) |sql} 178 + record_out] 179 + ~cids 180 + 170 181 let list_records = 171 182 [%rapper 172 183 get_many ··· 193 204 ON CONFLICT (path) DO UPDATE SET cid = excluded.cid, data = excluded.data, since = excluded.since 194 205 |sql}] 195 206 207 + let delete_record path = 208 + [%rapper execute {sql| DELETE FROM records WHERE path = %string{path} |sql}] 209 + ~path 210 + 196 211 (* blob storage *) 197 212 let create_blobs_tables conn = 198 213 let$! () = ··· 206 221 |sql}] 207 222 () conn 208 223 in 209 - let$! () = 210 - [%rapper 211 - execute 212 - {sql| CREATE TABLE IF NOT EXISTS blobs_records ( 224 + [%rapper 225 + execute 226 + {sql| CREATE TABLE IF NOT EXISTS blobs_records ( 213 227 blob_id INTEGER NOT NULL REFERENCES blobs(id) ON DELETE CASCADE, 214 228 record_path TEXT NOT NULL REFERENCES records(path) ON DELETE CASCADE, 215 229 PRIMARY KEY (blob_id, record_path) 216 230 ) 217 231 |sql}] 218 - () conn 219 - in 220 - [%rapper 221 - execute 222 - {sql| CREATE TRIGGER IF NOT EXISTS cleanup_orphaned_blobs 223 - AFTER DELETE ON blobs_records 224 - BEGIN 225 - DELETE FROM blobs 226 - WHERE id NOT IN ( 227 - SELECT DISTINCT blob_id FROM blobs_records 228 - ); 229 - END 230 - |sql} 231 - syntax_off] 232 232 () conn 233 233 234 234 let get_blob = ··· 269 269 |sql}] 270 270 ~cid ~mimetype 271 271 272 - let get_records_by_cids cids = 272 + let delete_blob cid = 273 + [%rapper execute {sql| DELETE FROM blobs WHERE cid = %CID{cid} |sql}] ~cid 274 + 275 + let delete_orphaned_blobs_by_record_path path = 273 276 [%rapper 274 277 get_many 275 - {sql| SELECT @CID{cid}, @Blob{data} FROM records WHERE cid IN (%list{%CID{cids}}) |sql} 276 - record_out] 277 - ~cids 278 - 279 - let delete_record path = 280 - [%rapper execute {sql| DELETE FROM records WHERE path = %string{path} |sql}] 278 + {sql| DELETE FROM blobs 279 + WHERE id IN ( 280 + SELECT blob_id FROM blobs_records WHERE record_path = %string{path} 281 + ) 282 + AND NOT EXISTS ( 283 + SELECT 1 FROM blobs_records 284 + WHERE blob_id = blobs.id AND record_path != %string{path} 285 + ) 286 + RETURNING @string{cid} 287 + |sql}] 281 288 ~path 282 289 283 290 let list_blob_refs path = ··· 429 436 Util.use_pool t.db @@ Queries.put_record ~path ~cid ~data ~since 430 437 431 438 let delete_record t path : unit Lwt.t = 432 - Util.use_pool t.db @@ Queries.delete_record path 439 + Util.use_pool t.db (fun conn -> 440 + Util.transact conn (fun () -> 441 + let del = Queries.delete_record path conn in 442 + let$! () = del in 443 + let$! deleted_blobs = 444 + Queries.delete_orphaned_blobs_by_record_path path conn 445 + in 446 + let () = List.iter (delete_blob_file ~did:t.did) deleted_blobs in 447 + del ) ) 433 448 434 449 (* blobs *) 435 450 ··· 467 482 Core_unix.mkdir_p (Filename.dirname file) ~perm:0o755 ; 468 483 Out_channel.with_open_bin file (fun oc -> Out_channel.output_bytes oc data) ; 469 484 Util.use_pool t.db @@ Queries.put_blob cid mimetype 485 + 486 + let delete_blob t cid : unit Lwt.t = 487 + delete_blob_file ~did:t.did (Cid.to_string cid) ; 488 + Util.use_pool t.db @@ Queries.delete_blob cid 489 + 490 + let delete_orphaned_blobs_by_record_path t path : string list Lwt.t = 491 + Util.use_pool t.db @@ Queries.delete_orphaned_blobs_by_record_path path 470 492 471 493 let list_blob_refs t path : Cid.t list Lwt.t = 472 494 Util.use_pool t.db @@ Queries.list_blob_refs path
+6 -6
pegasus/lib/util.ml
··· 236 236 | Error e -> 237 237 raise (Caqti_error.Exn e) 238 238 239 - let transact conn fn : (unit, exn) Lwt_result.t = 239 + let transact conn fn : (unit, 'e) Lwt_result.t = 240 240 let module C = (val conn : Caqti_lwt.CONNECTION) in 241 241 match%lwt C.start () with 242 242 | Ok () -> ( 243 243 match%lwt fn () with 244 - | Ok () -> ( 244 + | Ok _ -> ( 245 245 match%lwt C.commit () with 246 246 | Ok () -> 247 247 Lwt.return_ok () 248 248 | Error e -> ( 249 249 match%lwt C.rollback () with 250 250 | Ok () -> 251 - Lwt.return_error (Caqti_error.Exn e) 251 + Lwt.return_error e 252 252 | Error e -> 253 - Lwt.return_error (Caqti_error.Exn e) ) ) 253 + Lwt.return_error e ) ) 254 254 | Error e -> ( 255 255 match%lwt C.rollback () with 256 256 | Ok () -> 257 257 Lwt.return_error e 258 258 | Error e -> 259 - Lwt.return_error (Caqti_error.Exn e) ) ) 259 + Lwt.return_error e ) ) 260 260 | Error e -> 261 - Lwt.return_error (Caqti_error.Exn e) 261 + Lwt.return_error e 262 262 263 263 (* runs a bunch of queries and catches duplicate insertion, returning how many succeeded *) 264 264 let multi_query pool