perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Track shared blobs across multiple repo owners

alice 95519510 f64ff173

+156 -11
+1 -1
lib/ATProto/PDS/API/Admin.pm
··· 286 286 if (exists($subject->{did}) && exists($subject->{cid})) { 287 287 my $blob = $c->store->get_blob($subject->{cid}); 288 288 xrpc_error(404, 'NotFound', 'Subject not found') 289 - unless $blob && ($blob->{did} // q()) eq ($subject->{did} // q()); 289 + unless $blob && $c->store->blob_owned_by_did($subject->{cid}, $subject->{did}); 290 290 return { 291 291 %{$subject}, 292 292 '$type' => ($subject->{'$type'} // 'com.atproto.admin.defs#repoBlobRef'),
+1 -1
lib/ATProto/PDS/API/Sync.pm
··· 104 104 my $account = _repo_by_did_or_error($c); 105 105 my $blob = $c->store->get_blob($c->param('cid') // q()); 106 106 xrpc_error(404, 'BlobNotFound', 'Blob was not found') 107 - unless $blob && ($blob->{did} // q()) eq $account->{did}; 107 + unless $blob && $c->store->blob_owned_by_did($c->param('cid') // q(), $account->{did}); 108 108 assert_blob_readable($c, $account, $blob); 109 109 xrpc_error(404, 'BlobNotFound', 'Blob content is not available') 110 110 unless $blob->{storage_path} && -f $blob->{storage_path};
+2 -2
lib/ATProto/PDS/Repo/Manager.pm
··· 95 95 status => 400, 96 96 error => 'InvalidBlob', 97 97 message => "Could not find blob: $blob_cid", 98 - } unless $blob && ($blob->{did} // q()) eq $did; 98 + } unless $blob && $store->blob_owned_by_did($blob_cid, $did); 99 99 die { 100 100 status => 400, 101 101 error => 'BlobTakenDown', ··· 187 187 } 188 188 $store->replace_records_for_did($did, [ values %$records ]); 189 189 for my $record (values %$records) { 190 - $store->mark_blobs_referenced(_blob_cids($record->{value})); 190 + $store->mark_blobs_referenced($did, _blob_cids($record->{value})); 191 191 } 192 192 $store->put_commit( 193 193 did => $did,
+90 -7
lib/ATProto/PDS/Store/SQLite.pm
··· 362 362 sub put_blob ($self, %args) { 363 363 return observe_store_operation($self->{metrics}, 'put_blob', sub { 364 364 my $cid = $args{cid} // die 'cid is required'; 365 + my $did = $args{did} // die 'did is required'; 365 366 my $now = $args{created_at} // time; 366 367 $self->dbh->do( 367 368 q{ ··· 370 371 created_at, referenced_at, quarantined_at 371 372 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 372 373 ON CONFLICT(cid) DO UPDATE SET 373 - did = excluded.did, 374 + did = COALESCE(blobs.did, excluded.did), 374 375 mime_type = excluded.mime_type, 375 376 byte_size = excluded.byte_size, 376 377 storage_path = excluded.storage_path, ··· 380 381 }, 381 382 undef, 382 383 $cid, 383 - $args{did}, 384 + $did, 384 385 $args{mime_type}, 385 386 $args{byte_size}, 386 387 $args{storage_path}, ··· 389 390 $args{referenced_at}, 390 391 $args{quarantined_at}, 391 392 ); 393 + $self->dbh->do( 394 + q{ 395 + INSERT INTO blob_owners (cid, did, created_at, referenced_at) 396 + VALUES (?, ?, ?, ?) 397 + ON CONFLICT(cid, did) DO UPDATE SET 398 + referenced_at = COALESCE(excluded.referenced_at, blob_owners.referenced_at) 399 + }, 400 + undef, 401 + $cid, 402 + $did, 403 + $now, 404 + $args{referenced_at}, 405 + ); 392 406 return $self->get_blob($cid); 393 407 }); 394 408 } ··· 421 435 return $self->get_blob($cid); 422 436 } 423 437 424 - sub mark_blobs_referenced ($self, @cids) { 438 + sub blob_owned_by_did ($self, $cid, $did) { 439 + return 0 unless defined $cid && length $cid && defined $did && length $did; 440 + return !!($self->dbh->selectrow_array( 441 + q{SELECT 1 FROM blob_owners WHERE cid = ? AND did = ?}, 442 + undef, 443 + $cid, 444 + $did, 445 + ) // 0); 446 + } 447 + 448 + sub mark_blobs_referenced ($self, $did, @cids) { 449 + if (!defined($did) || ref($did) || (!length($did) && @cids)) { 450 + unshift @cids, $did if defined $did; 451 + undef $did; 452 + } 425 453 return 0 unless @cids; 426 454 my $now = time; 427 455 my %seen; ··· 432 460 $now, 433 461 $cid, 434 462 ); 463 + if (defined $did && length $did) { 464 + $self->dbh->do( 465 + q{ 466 + INSERT INTO blob_owners (cid, did, created_at, referenced_at) 467 + VALUES (?, ?, ?, ?) 468 + ON CONFLICT(cid, did) DO UPDATE SET 469 + referenced_at = excluded.referenced_at 470 + }, 471 + undef, 472 + $cid, 473 + $did, 474 + $now, 475 + $now, 476 + ); 477 + } 435 478 } 436 479 return scalar keys %seen; 437 480 } ··· 440 483 my $limit = $args{limit} // 500; 441 484 my $cursor = $args{cursor}; 442 485 my @bind = ($did); 443 - my $sql = q{SELECT * FROM blobs WHERE did = ?}; 486 + my $sql = q{ 487 + SELECT b.* 488 + FROM blobs b 489 + JOIN blob_owners bo ON bo.cid = b.cid 490 + WHERE bo.did = ? 491 + }; 444 492 if (defined $cursor && length $cursor) { 445 - $sql .= q{ AND cid > ?}; 493 + $sql .= q{ AND b.cid > ?}; 446 494 push @bind, $cursor; 447 495 } 448 - $sql .= q{ ORDER BY cid LIMIT ?}; 496 + $sql .= q{ ORDER BY b.cid LIMIT ?}; 449 497 push @bind, $limit + 1; 450 498 my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, @bind); 451 499 return _paginate($rows, $limit, 'cid'); ··· 453 501 454 502 sub count_blobs_by_did ($self, $did) { 455 503 return $self->dbh->selectrow_array( 456 - q{SELECT COUNT(*) FROM blobs WHERE did = ?}, 504 + q{SELECT COUNT(*) FROM blob_owners WHERE did = ?}, 457 505 undef, 458 506 $did, 459 507 ) // 0; ··· 1632 1680 }, 1633 1681 q{CREATE INDEX IF NOT EXISTS blobs_by_did ON blobs (did, created_at DESC)}, 1634 1682 q{ 1683 + CREATE TABLE IF NOT EXISTS blob_owners ( 1684 + cid TEXT NOT NULL, 1685 + did TEXT NOT NULL, 1686 + created_at INTEGER NOT NULL, 1687 + referenced_at INTEGER, 1688 + PRIMARY KEY (cid, did), 1689 + FOREIGN KEY (cid) REFERENCES blobs(cid), 1690 + FOREIGN KEY (did) REFERENCES accounts(did) 1691 + ) 1692 + }, 1693 + q{CREATE INDEX IF NOT EXISTS blob_owners_by_did ON blob_owners (did, created_at DESC)}, 1694 + q{ 1635 1695 CREATE TABLE IF NOT EXISTS repo_heads ( 1636 1696 did TEXT PRIMARY KEY, 1637 1697 commit_cid TEXT, ··· 1848 1908 ) 1849 1909 }, 1850 1910 q{CREATE INDEX IF NOT EXISTS preferences_lookup_idx ON preferences (did, namespace, pref_type)}, 1911 + ], 1912 + }, 1913 + { 1914 + version => 7, 1915 + statements => [ 1916 + q{ 1917 + CREATE TABLE IF NOT EXISTS blob_owners ( 1918 + cid TEXT NOT NULL, 1919 + did TEXT NOT NULL, 1920 + created_at INTEGER NOT NULL, 1921 + referenced_at INTEGER, 1922 + PRIMARY KEY (cid, did), 1923 + FOREIGN KEY (cid) REFERENCES blobs(cid), 1924 + FOREIGN KEY (did) REFERENCES accounts(did) 1925 + ) 1926 + }, 1927 + q{CREATE INDEX IF NOT EXISTS blob_owners_by_did ON blob_owners (did, created_at DESC)}, 1928 + q{ 1929 + INSERT OR IGNORE INTO blob_owners (cid, did, created_at, referenced_at) 1930 + SELECT cid, did, created_at, referenced_at 1931 + FROM blobs 1932 + WHERE did IS NOT NULL 1933 + }, 1851 1934 ], 1852 1935 }, 1853 1936 );
+39
t/external-surface.t
··· 120 120 ->content_type_is('text/plain') 121 121 ->content_is('blob-bytes'); 122 122 123 + $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 124 + handle => 'bob.example.test', 125 + email => 'bob@example.test', 126 + password => 'hunter22', 127 + })->status_is(200); 128 + 129 + my $second = $t->tx->res->json; 130 + my $second_did = $second->{did}; 131 + my $second_access = $second->{accessJwt}; 132 + 133 + $t->post_ok('/xrpc/com.atproto.repo.uploadBlob' => { 134 + Authorization => "Bearer $second_access", 135 + 'Content-Type' => 'text/plain', 136 + } => 'blob-bytes')->status_is(200) 137 + ->json_is('/blob/ref/$link' => $blob_cid); 138 + 139 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.listBlobs')->query( 140 + did => $did, 141 + ))->status_is(200) 142 + ->json_is('/cids/0' => $blob_cid); 143 + 144 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.listBlobs')->query( 145 + did => $second_did, 146 + ))->status_is(200) 147 + ->json_is('/cids/0' => $blob_cid); 148 + 149 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.getBlob')->query( 150 + did => $second_did, 151 + cid => $blob_cid, 152 + ))->status_is(200) 153 + ->content_type_is('text/plain') 154 + ->content_is('blob-bytes'); 155 + 123 156 $t->get_ok('/xrpc/com.atproto.server.checkAccountStatus' => { 124 157 Authorization => "Bearer $access", 125 158 })->status_is(200) ··· 130 163 ->json_has('/indexedRecords') 131 164 ->json_has('/expectedBlobs') 132 165 ->json_has('/importedBlobs'); 166 + 167 + $t->get_ok('/xrpc/com.atproto.server.checkAccountStatus' => { 168 + Authorization => "Bearer $second_access", 169 + })->status_is(200) 170 + ->json_is('/expectedBlobs' => 1) 171 + ->json_is('/importedBlobs' => 1); 133 172 134 173 $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.admin.getAccountInfo')->query( 135 174 did => $did,
+23
t/store-sqlite.t
··· 37 37 did_doc => { id => 'did:web:pds.example.com:users:alice' }, 38 38 ); 39 39 40 + my $second_account = $store->create_account( 41 + id => 'acct-2', 42 + did => 'did:web:pds.example.com:users:bob', 43 + handle => 'bob.example.com', 44 + email => 'bob@example.com', 45 + password_hash => 'sha256:def', 46 + did_doc => { id => 'did:web:pds.example.com:users:bob' }, 47 + ); 48 + 40 49 is($account->{handle}, 'alice.example.com', 'account round-trips'); 41 50 is($store->get_account_by_email('alice@example.com')->{did}, $account->{did}, 'lookup by email works'); 42 51 ··· 66 75 storage_path => 'blobs/bafk.png', 67 76 ); 68 77 is($store->get_blob('bafkreigh2akiscaildc')->{byte_size}, 1234, 'blob metadata is stored'); 78 + ok($store->blob_owned_by_did('bafkreigh2akiscaildc', $account->{did}), 'primary owner is tracked'); 79 + 80 + $store->put_blob( 81 + cid => 'bafkreigh2akiscaildc', 82 + did => $second_account->{did}, 83 + mime_type => 'image/png', 84 + byte_size => 1234, 85 + storage_path => 'blobs/bafk.png', 86 + ); 87 + ok($store->blob_owned_by_did('bafkreigh2akiscaildc', $second_account->{did}), 'second owner is tracked for shared blob'); 88 + is($store->count_blobs_by_did($account->{did}), 1, 'first account still counts shared blob'); 89 + is($store->count_blobs_by_did($second_account->{did}), 1, 'second account counts shared blob'); 90 + is($store->list_blobs_by_did($account->{did})->{items}[0]{cid}, 'bafkreigh2akiscaildc', 'shared blob lists for first owner'); 91 + is($store->list_blobs_by_did($second_account->{did})->{items}[0]{cid}, 'bafkreigh2akiscaildc', 'shared blob lists for second owner'); 69 92 70 93 $store->set_repo_head( 71 94 did => $account->{did},