objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

xrpc listMissingBlobs

futurGH f6b0e90f 7d2dba82

+93 -27
+3
bin/main.ml
··· 54 54 ; ( get 55 55 , "/xrpc/com.atproto.server.checkAccountStatus" 56 56 , Api.Server.CheckAccountStatus.handler ) 57 + ; ( get 58 + , "/xrpc/com.atproto.repo.listMissingBlobs" 59 + , Api.Repo.ListMissingBlobs.handler ) 57 60 ; ( post 58 61 , "/xrpc/com.atproto.identity.updateHandle" 59 62 , Api.Identity.UpdateHandle.handler )
+35
pegasus/lib/api/repo/listMissingBlobs.ml
··· 1 + type query = 2 + {limit: int option [@default None]; cursor: string option [@default None]} 3 + [@@deriving yojson {strict= false}] 4 + 5 + type response = 6 + {cursor: string option [@default None]; blobs: response_blob list} 7 + [@@deriving yojson {strict= false}] 8 + 9 + and response_blob = {cid: string; record_uri: string [@key "recordUri"]} 10 + [@@deriving yojson {strict= false}] 11 + 12 + let handler = 13 + Xrpc.handler (fun ctx -> 14 + let {limit; cursor} = Xrpc.parse_query ctx.req query_of_yojson in 15 + let limit = 16 + match limit with 17 + | Some limit when limit > 0 && limit <= 1000 -> 18 + limit 19 + | _ -> 20 + 500 21 + in 22 + let did = Auth.get_authed_did_exn ctx.auth in 23 + let%lwt {db; _} = Repository.load ~ensure_active:true did in 24 + let%lwt blobs = User_store.list_missing_blobs ~limit ?cursor db in 25 + let next_cursor = ref None in 26 + let blobs = 27 + List.map 28 + (fun (path, cid) -> 29 + let cid = Cid.to_string cid in 30 + next_cursor := Some cid ; 31 + {cid; record_uri= "at://" ^ did ^ "/" ^ path} ) 32 + blobs 33 + in 34 + {cursor= !next_cursor; blobs} 35 + |> response_to_yojson |> Yojson.Safe.to_string |> Dream.json )
+55 -27
pegasus/lib/user_store.ml
··· 59 59 60 60 type record = {path: string; cid: Cid.t; value: Lex.repo_record; since: Tid.t} 61 61 62 - type blob = {id: int; cid: Cid.t; mimetype: string} 62 + type blob = {cid: Cid.t; mimetype: string} 63 63 64 - type blob_with_contents = {id: int; cid: Cid.t; mimetype: string; data: Blob.t} 64 + type blob_with_contents = {cid: Cid.t; mimetype: string; data: Blob.t} 65 65 end 66 66 67 67 open Types ··· 221 221 [%rapper 222 222 execute 223 223 {sql| CREATE TABLE IF NOT EXISTS blobs ( 224 - id INTEGER PRIMARY KEY, 225 - cid TEXT NOT NULL UNIQUE, 224 + cid TEXT PRIMARY KEY, 226 225 mimetype TEXT NOT NULL 227 226 ) 228 227 |sql}] 229 228 () conn 230 229 in 230 + let$! () = 231 + [%rapper 232 + execute 233 + (* no `references` on blob_cid because blobs may be uploaded later *) 234 + {sql| CREATE TABLE IF NOT EXISTS blobs_records ( 235 + record_path TEXT NOT NULL REFERENCES records(path), 236 + blob_cid TEXT NOT NULL, 237 + PRIMARY KEY (record_path, blob_cid) 238 + ) 239 + |sql}] 240 + () conn 241 + in 231 242 [%rapper 232 243 execute 233 - {sql| CREATE TABLE IF NOT EXISTS blobs_records ( 234 - blob_id INTEGER NOT NULL REFERENCES blobs(id) ON DELETE CASCADE, 235 - record_path TEXT NOT NULL REFERENCES records(path) ON DELETE CASCADE, 236 - PRIMARY KEY (blob_id, record_path) 237 - ) 238 - |sql}] 244 + {sql| CREATE INDEX IF NOT EXISTS blobs_records_blob_cid_idx ON blobs_records (blob_cid) |sql}] 239 245 () conn 240 246 241 247 let get_blob = 242 248 [%rapper 243 249 get_opt 244 - {sql| SELECT @int{id}, @CID{cid}, @string{mimetype} FROM blobs WHERE cid = %CID{cid} |sql} 250 + {sql| SELECT @CID{cid}, @string{mimetype} FROM blobs WHERE cid = %CID{cid} |sql} 245 251 record_out] 246 252 247 253 let list_blobs = ··· 260 266 SELECT MIN(records.since) 261 267 FROM blobs_records 262 268 JOIN records ON records.path = blobs_records.record_path 263 - WHERE blobs_records.blob_id = blobs.id 269 + WHERE blobs_records.blob_cid = blobs.cid 264 270 ) > %string{since} 265 271 ORDER BY cid 266 272 LIMIT %int{limit} 267 273 |sql}] 268 274 275 + let list_missing_blobs = 276 + [%rapper 277 + get_many 278 + {sql| 279 + SELECT @string{record_path}, @CID{blob_cid} 280 + FROM blobs_records 281 + WHERE NOT EXISTS ( 282 + SELECT 1 FROM blobs WHERE cid = blobs_records.blob_cid 283 + ) 284 + AND cid > %string{cursor} 285 + ORDER BY cid 286 + LIMIT %int{limit} 287 + |sql}] 288 + 269 289 let count_blobs = 270 290 [%rapper get_one {sql| SELECT @int{COUNT(*)} FROM blobs |sql}] 271 291 ··· 273 293 [%rapper 274 294 get_one 275 295 {sql| SELECT @int{COUNT(*)} FROM blobs WHERE cid IN ( 276 - SELECT blob_id FROM blobs_records 296 + SELECT blob_cid FROM blobs_records 277 297 ) 278 298 |sql}] 279 299 ··· 283 303 {sql| INSERT INTO blobs (cid, mimetype) 284 304 VALUES (%CID{cid}, %string{mimetype}) 285 305 ON CONFLICT (cid) DO UPDATE SET mimetype = excluded.mimetype 286 - RETURNING @int{id} 306 + RETURNING @CID{cid} 287 307 |sql}] 288 308 ~cid ~mimetype 289 309 ··· 294 314 [%rapper 295 315 get_many 296 316 {sql| DELETE FROM blobs 297 - WHERE id IN ( 298 - SELECT blob_id FROM blobs_records WHERE record_path = %string{path} 317 + WHERE cid IN ( 318 + SELECT blob_cid FROM blobs_records WHERE record_path = %string{path} 299 319 ) 300 320 AND NOT EXISTS ( 301 321 SELECT 1 FROM blobs_records 302 - WHERE blob_id = blobs.id AND record_path != %string{path} 322 + WHERE blob_cid = blobs.cid AND record_path != %string{path} 303 323 ) 304 - RETURNING @string{cid} 324 + RETURNING @CID{cid} 305 325 |sql}] 306 326 ~path 307 327 ··· 314 334 let put_blob_ref cid path = 315 335 [%rapper 316 336 execute 317 - {sql| INSERT INTO blobs_records (blob_id, record_path) VALUES ( 318 - (SELECT id FROM blobs WHERE cid = %CID{cid} LIMIT 1), 337 + {sql| INSERT INTO blobs_records (blob_cid, record_path) VALUES ( 338 + %CID{cid}, 319 339 %string{path} 320 340 ) 321 341 ON CONFLICT DO NOTHING ··· 327 347 execute 328 348 {sql| DELETE FROM blobs_records 329 349 WHERE record_path LIKE %string{path} 330 - AND blob_id IN ( 331 - SELECT id FROM blobs WHERE cid IN (%list{%CID{cids}}) 350 + AND blob_cid IN ( 351 + SELECT cid FROM blobs WHERE cid IN (%list{%CID{cids}}) 332 352 ) 333 353 |sql}] 334 354 ~path ~cids ··· 467 487 let$! deleted_blobs = 468 488 Queries.delete_orphaned_blobs_by_record_path path conn 469 489 in 470 - let () = List.iter (delete_blob_file ~did:t.did) deleted_blobs in 490 + let () = 491 + List.iter 492 + (fun cid -> delete_blob_file ~did:t.did (Cid.to_string cid)) 493 + deleted_blobs 494 + in 471 495 del ) ) 472 496 473 497 (* blobs *) ··· 477 501 | None -> 478 502 Lwt.return_none 479 503 | Some blob -> 480 - let {id; cid; mimetype} : blob = blob in 504 + let {cid; mimetype} : blob = blob in 481 505 let file = 482 506 Filename.concat 483 507 (Util.Constants.user_blobs_location t.did) ··· 486 510 let data = 487 511 In_channel.with_open_bin file In_channel.input_all |> Bytes.of_string 488 512 in 489 - Lwt.return_some {id; cid; mimetype; data} 513 + Lwt.return_some {cid; mimetype; data} 490 514 491 515 let list_blobs ?since t ~limit ~cursor : Cid.t list Lwt.t = 492 516 Util.use_pool t.db ··· 496 520 Queries.list_blobs_since ~limit ~cursor ~since 497 521 | None -> 498 522 Queries.list_blobs ~limit ~cursor 523 + 524 + let list_missing_blobs ?(limit = 500) ?(cursor = "") t : 525 + (string * Cid.t) list Lwt.t = 526 + Util.use_pool t.db @@ Queries.list_missing_blobs ~limit ~cursor 499 527 500 528 let count_blobs t : int Lwt.t = Util.use_pool t.db @@ Queries.count_blobs () 501 529 502 530 let count_referenced_blobs t : int Lwt.t = 503 531 Util.use_pool t.db @@ Queries.count_referenced_blobs () 504 532 505 - let put_blob t cid mimetype data : int Lwt.t = 533 + let put_blob t cid mimetype data : Cid.t Lwt.t = 506 534 let file = 507 535 Filename.concat 508 536 (Util.Constants.user_blobs_location t.did) ··· 516 544 delete_blob_file ~did:t.did (Cid.to_string cid) ; 517 545 Util.use_pool t.db @@ Queries.delete_blob cid 518 546 519 - let delete_orphaned_blobs_by_record_path t path : string list Lwt.t = 547 + let delete_orphaned_blobs_by_record_path t path : Cid.t list Lwt.t = 520 548 Util.use_pool t.db @@ Queries.delete_orphaned_blobs_by_record_path path 521 549 522 550 let list_blob_refs t path : Cid.t list Lwt.t =