objective categorical abstract machine language personal data server
65
fork

Configure Feed

Select the types of activity you want to include in your feed.

Store blobs as files

futurGH 4c1ef90c 5798ecd1

+93 -64
+1 -1
pegasus/lib/env.ml
··· 1 - let database_dir = Option.value ~default:"./db" @@ Sys.getenv_opt "DATABASE_DIR" 1 + let data_dir = Option.value ~default:"./data" @@ Sys.getenv_opt "DATA_DIR" 2 2 3 3 let hostname = Sys.getenv "PDS_HOSTNAME" 4 4
+2 -2
pegasus/lib/repository.ml
··· 141 141 type t = 142 142 { key: signing_key 143 143 ; did: string 144 - ; db: Caqti_lwt.connection 144 + ; db: User_store.t 145 145 ; mutable block_map: Cid.t StringMap.t option 146 146 ; mutable commit: Cid.t option } 147 147 ··· 429 429 Car.blocks_to_stream commit_cid block_stream |> Car.collect_stream 430 430 in 431 431 let%lwt _ = 432 - Sequencer.sequence_commit t.db ~did:t.did ~commit:commit_cid 432 + Sequencer.sequence_commit t.db.db ~did:t.did ~commit:commit_cid 433 433 ~rev:commit_signed.rev ~blocks ~ops:!commit_ops ~since:commit.rev 434 434 ~prev_data:commit.data () 435 435 in
+81 -59
pegasus/lib/user_store.ml
··· 55 55 56 56 type record = {path: string; cid: Cid.t; value: Lex.repo_record; since: Tid.t} 57 57 58 - type blob = {id: int; cid: Cid.t; mimetype: string; data: Blob.t} 58 + type blob = {id: int; cid: Cid.t; mimetype: string} 59 + 60 + type blob_with_contents = {id: int; cid: Cid.t; mimetype: string; data: Blob.t} 59 61 end 60 62 61 63 open Types ··· 198 200 {sql| CREATE TABLE IF NOT EXISTS blobs ( 199 201 id INTEGER PRIMARY KEY, 200 202 cid TEXT NOT NULL, 201 - mimetype TEXT NOT NULL, 202 - data BLOB NOT NULL 203 + mimetype TEXT NOT NULL 203 204 ); 204 205 |sql}] 205 206 () conn ··· 232 233 let get_blob = 233 234 [%rapper 234 235 get_opt 235 - {sql| SELECT @int{id}, @CID{cid}, @string{mimetype}, @Blob{data} FROM blobs WHERE cid = %CID{cid} |sql} 236 + {sql| SELECT @int{id}, @CID{cid}, @string{mimetype} FROM blobs WHERE cid = %CID{cid} |sql} 236 237 record_out] 237 238 238 239 let list_blobs ~limit ~cursor = ··· 241 242 {sql| SELECT @CID{cid} FROM blobs WHERE id > %int{cursor} ORDER BY id LIMIT %int{limit} |sql}] 242 243 ~limit ~cursor 243 244 244 - let put_blob cid mimetype data = 245 + let put_blob cid mimetype = 245 246 [%rapper 246 247 get_one 247 - {sql| INSERT INTO blobs (cid, mimetype, data) 248 - VALUES (%CID{cid}, %string{mimetype}, %Blob{data}) 249 - ON CONFLICT (cid) DO UPDATE SET mimetype = excluded.mimetype, data = excluded.data 248 + {sql| INSERT INTO blobs (cid, mimetype) 249 + VALUES (%CID{cid}, %string{mimetype}) 250 + ON CONFLICT (cid) DO UPDATE SET mimetype = excluded.mimetype 250 251 RETURNING @int{id} 251 252 |sql}] 252 - ~cid ~mimetype ~data 253 + ~cid ~mimetype 253 254 254 255 let list_blob_refs path = 255 256 [%rapper ··· 280 281 ~path ~cids 281 282 end 282 283 283 - type t = (module Rapper_helper.CONNECTION) 284 + type t = {did: string; db: (module Rapper_helper.CONNECTION)} 284 285 285 286 let connect did : t Lwt.t = 286 - Util.connect_sqlite (Util.Constants.user_db_location did) 287 + let%lwt db = Util.connect_sqlite (Util.Constants.user_db_location did) in 288 + Lwt.return {did; db} 287 289 288 - let init conn : unit Lwt.t = 289 - let$! () = Queries.create_blocks_tables conn in 290 - let$! () = Queries.create_records_table conn in 291 - let$! () = Queries.create_blobs_tables conn in 290 + let init t : unit Lwt.t = 291 + let$! () = Queries.create_blocks_tables t.db in 292 + let$! () = Queries.create_records_table t.db in 293 + let$! () = Queries.create_blobs_tables t.db in 292 294 Lwt.return_unit 293 295 294 296 (* mst blocks; implements Writable_blockstore *) 295 297 296 - let get_bytes conn cid : Blob.t option Lwt.t = 297 - Queries.get_block cid conn 298 + let get_bytes t cid : Blob.t option Lwt.t = 299 + Queries.get_block cid t.db 298 300 >$! function Some {data; _} -> Some data | None -> None 299 301 300 - let get_blocks conn cids : Block_map.with_missing Lwt.t = 301 - let$! blocks = Queries.get_blocks cids conn in 302 + let get_blocks t cids : Block_map.with_missing Lwt.t = 303 + let$! blocks = Queries.get_blocks cids t.db in 302 304 Lwt.return 303 305 (List.fold_left 304 306 (fun (acc : Block_map.with_missing) cid -> ··· 310 312 {blocks= Block_map.empty; missing= []} 311 313 cids ) 312 314 313 - let has conn cid : bool Lwt.t = 314 - Queries.has_block cid conn >$! function Some _ -> true | None -> false 315 + let has t cid : bool Lwt.t = 316 + Queries.has_block cid t.db >$! function Some _ -> true | None -> false 315 317 316 - let put_block conn cid block : (bool, exn) Lwt_result.t = 317 - Queries.put_block cid block conn 318 + let put_block t cid block : (bool, exn) Lwt_result.t = 319 + Queries.put_block cid block t.db 318 320 |> Lwt.map Util.caqti_result_exn 319 321 |> Lwt.map (Result.map (function Some _ -> true | None -> false)) 320 322 321 - let put_many conn bm : (int, exn) Lwt_result.t = 322 - Util.multi_query conn 323 + let put_many t bm : (int, exn) Lwt_result.t = 324 + Util.multi_query t.db 323 325 (List.map 324 - (fun (cid, block) -> fun () -> Queries.put_block cid block conn) 326 + (fun (cid, block) -> fun () -> Queries.put_block cid block t.db) 325 327 (Block_map.entries bm) ) 326 328 327 - let delete_block conn cid : (bool, exn) Lwt_result.t = 328 - let$! () = Queries.delete_block cid conn in 329 + let delete_block t cid : (bool, exn) Lwt_result.t = 330 + let$! () = Queries.delete_block cid t.db in 329 331 Lwt.return_ok true 330 332 331 - let delete_many conn cids : (int, exn) Lwt_result.t = 332 - Queries.delete_blocks cids conn >$! List.length >>= Lwt.return_ok 333 + let delete_many t cids : (int, exn) Lwt_result.t = 334 + Queries.delete_blocks cids t.db >$! List.length >>= Lwt.return_ok 333 335 334 - let clear_mst conn : unit Lwt.t = 335 - let$! () = Queries.clear_mst conn in 336 + let clear_mst t : unit Lwt.t = 337 + let$! () = Queries.clear_mst t.db in 336 338 Lwt.return_unit 337 339 338 340 (* repo commit *) 339 341 340 - let get_commit conn : (Cid.t * signed_commit) option Lwt.t = 341 - Queries.get_commit conn 342 + let get_commit t : (Cid.t * signed_commit) option Lwt.t = 343 + Queries.get_commit t.db 342 344 >$! Option.map (fun (cid, data) -> 343 345 ( cid 344 346 , data |> Dag_cbor.decode_to_yojson |> signed_commit_of_yojson 345 347 |> Result.get_ok ) ) 346 348 347 - let put_commit conn commit : (Cid.t, exn) Lwt_result.t = 349 + let put_commit t commit : (Cid.t, exn) Lwt_result.t = 348 350 let data = commit |> signed_commit_to_yojson |> Dag_cbor.encode_yojson in 349 351 let cid = Cid.create Dcbor data in 350 - let$! () = Queries.put_commit cid data conn in 352 + let$! () = Queries.put_commit cid data t.db in 351 353 Lwt.return_ok cid 352 354 353 355 (* records *) 354 356 355 - let get_record_by_path conn path : record option Lwt.t = 356 - Queries.get_record_by_path ~path conn 357 + let get_record_by_path t path : record option Lwt.t = 358 + Queries.get_record_by_path ~path t.db 357 359 >$! Option.map (fun (cid, data, since) -> 358 360 {path; cid; value= Lex.of_cbor data; since} ) 359 361 >>= Lwt.return 360 362 361 - let get_record_by_cid conn cid : record option Lwt.t = 362 - Queries.get_record_by_cid ~cid conn 363 + let get_record_by_cid t cid : record option Lwt.t = 364 + Queries.get_record_by_cid ~cid t.db 363 365 >$! Option.map (fun (path, data, since) -> 364 366 {path; cid; value= Lex.of_cbor data; since} ) 365 367 >>= Lwt.return 366 368 367 - let list_records conn ?(limit = 100) ?(cursor = "") ?(reverse = false) 368 - collection : record list Lwt.t = 369 + let list_records t ?(limit = 100) ?(cursor = "") ?(reverse = false) collection : 370 + record list Lwt.t = 369 371 let fn = 370 372 if reverse then Queries.list_records_reverse else Queries.list_records 371 373 in 372 - fn ~collection ~limit ~cursor conn 374 + fn ~collection ~limit ~cursor t.db 373 375 >$! List.map (fun (path, cid, data, since) -> 374 376 {path; cid; value= Lex.of_cbor data; since} ) 375 377 >>= Lwt.return 376 378 377 - let put_record conn record path : (Cid.t * bytes) Lwt.t = 379 + let put_record t record path : (Cid.t * bytes) Lwt.t = 378 380 let cid, data = Lex.to_cbor_block record in 379 381 let since = Tid.now () in 380 - let$! () = Queries.put_record ~path ~cid ~data ~since conn in 382 + let$! () = Queries.put_record ~path ~cid ~data ~since t.db in 381 383 Lwt.return (cid, data) 382 384 383 385 (* blobs *) 384 386 385 - let get_blob conn cid : blob option Lwt.t = unwrap @@ Queries.get_blob conn ~cid 387 + let get_blob t cid : blob_with_contents option Lwt.t = 388 + match%lwt unwrap @@ Queries.get_blob t.db ~cid with 389 + | None -> 390 + Lwt.return_none 391 + | Some blob -> 392 + let {id; cid; mimetype} : blob = blob in 393 + let file = 394 + Filename.concat 395 + (Util.Constants.user_blobs_location t.did) 396 + (Cid.to_string cid) 397 + in 398 + let data = 399 + In_channel.with_open_bin file In_channel.input_all |> Bytes.of_string 400 + in 401 + Lwt.return_some {id; cid; mimetype; data} 386 402 387 - let list_blobs conn ~limit ~cursor : Cid.t list Lwt.t = 388 - unwrap @@ Queries.list_blobs conn ~limit ~cursor 403 + let list_blobs t ~limit ~cursor : Cid.t list Lwt.t = 404 + unwrap @@ Queries.list_blobs t.db ~limit ~cursor 389 405 390 - let put_blob conn cid mimetype data : int Lwt.t = 391 - unwrap @@ Queries.put_blob cid mimetype data conn 406 + let put_blob t cid mimetype data : int Lwt.t = 407 + let file = 408 + Filename.concat 409 + (Util.Constants.user_blobs_location t.did) 410 + (Cid.to_string cid) 411 + in 412 + let _ = Out_channel.with_open_bin file Out_channel.output_bytes data in 413 + unwrap @@ Queries.put_blob cid mimetype t.db 392 414 393 - let list_blob_refs conn path : Cid.t list Lwt.t = 394 - unwrap @@ Queries.list_blob_refs path conn 415 + let list_blob_refs t path : Cid.t list Lwt.t = 416 + unwrap @@ Queries.list_blob_refs path t.db 395 417 396 - let put_blob_ref conn path cid : unit Lwt.t = 397 - unwrap @@ Queries.put_blob_ref path cid conn 418 + let put_blob_ref t path cid : unit Lwt.t = 419 + unwrap @@ Queries.put_blob_ref path cid t.db 398 420 399 - let put_blob_refs conn path cids : (unit, exn) Lwt_result.t = 421 + let put_blob_refs t path cids : (unit, exn) Lwt_result.t = 400 422 Lwt_result.map (fun _ -> ()) 401 - @@ Util.multi_query conn 423 + @@ Util.multi_query t.db 402 424 (List.map 403 - (fun cid -> fun () -> Queries.put_blob_ref cid path conn) 425 + (fun cid -> fun () -> Queries.put_blob_ref cid path t.db) 404 426 cids ) 405 427 406 - let clear_blob_refs conn path cids : unit Lwt.t = 407 - unwrap @@ Queries.clear_blob_refs path cids conn 428 + let clear_blob_refs t path cids : unit Lwt.t = 429 + unwrap @@ Queries.clear_blob_refs path cids t.db
+9 -2
pegasus/lib/util.ml
··· 1 1 module Constants = struct 2 2 let pegasus_db_location = 3 - Filename.concat Env.database_dir "sqlite3://pegasus.db" |> Uri.of_string 3 + Filename.concat Env.data_dir "pegasus.db" 4 + |> Format.sprintf "sqlite3://%s.db" 5 + |> Uri.of_string 4 6 5 7 let user_db_location did = 6 8 did 7 9 |> Str.global_replace (Str.regexp ":") "_" 10 + |> Filename.concat Env.data_dir 8 11 |> Format.sprintf "sqlite3://%s.db" 9 - |> Filename.concat Env.database_dir 10 12 |> Uri.of_string 13 + 14 + let user_blobs_location did = 15 + did 16 + |> Str.global_replace (Str.regexp ":") "_" 17 + |> (Filename.concat Env.data_dir "blobs" |> Filename.concat) 11 18 end 12 19 13 20 module Syntax = struct