perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Align sync blob listing semantics

alice b8f5737e 7f774fc6

+198 -29
+1
lib/ATProto/PDS/API/Sync.pm
··· 161 161 my $account = _readable_repo_by_did($c); 162 162 my $page = $c->store->list_blobs_by_did( 163 163 $account->{did}, 164 + since => $c->param('since'), 164 165 limit => $c->param('limit') // 500, 165 166 cursor => $c->param('cursor'), 166 167 );
+7
lib/ATProto/PDS/Repo/Manager.pm
··· 75 75 rkey => $_->{rkey}, 76 76 cid => $_->{cid}, 77 77 value => $_->{value}, 78 + repo_rev => $_->{repo_rev}, 78 79 record_bytes => $_->{record_bytes}, 79 80 } 80 81 } @{ $store->all_records_for_did($did) } ··· 191 192 my $snapshot_car_bytes = $artifacts->{snapshot_car_bytes}; 192 193 my $car_bytes = $artifacts->{firehose_car_bytes}; 193 194 my $sync_car_bytes = $artifacts->{sync_car_bytes}; 195 + 196 + for my $op (@ops) { 197 + next if ($op->{action} // q()) eq 'delete'; 198 + next unless $records->{ $op->{path} }; 199 + $records->{ $op->{path} }{repo_rev} = $rev; 200 + } 194 201 195 202 $store->txn(sub ($dbh) { 196 203 for my $block (@{ $artifacts->{repo_blocks} }) {
+8 -6
lib/ATProto/PDS/Repo/Manager/Import.pm
··· 50 50 message => 'imported repo belongs to a different DID', 51 51 } unless ($commit->{did} // q()) eq $did; 52 52 53 - my $records = _records_from_import($commit->{data}, \%blocks); 53 + my $records = _records_from_import($commit->{data}, \%blocks, $commit->{rev}); 54 54 my %imported = map { $_->{collection} . '/' . $_->{rkey} => $_ } @$records; 55 55 my %previous = map { 56 56 $_->{collection} . '/' . $_->{rkey} => $_ ··· 207 207 value => $value, 208 208 cid => $cid, 209 209 record_bytes => $record_bytes, 210 + repo_rev => $account->{repo_rev}, 210 211 }; 211 212 } 212 213 ··· 263 264 return $value; 264 265 } 265 266 266 - sub _records_from_import ($root_cid, $blocks) { 267 + sub _records_from_import ($root_cid, $blocks, $import_rev = undef) { 267 268 my @records; 268 - _walk_mst($root_cid, $blocks, \@records); 269 + _walk_mst($root_cid, $blocks, \@records, $import_rev); 269 270 return \@records; 270 271 } 271 272 272 - sub _walk_mst ($cid, $blocks, $records) { 273 + sub _walk_mst ($cid, $blocks, $records, $import_rev = undef) { 273 274 return unless $cid; 274 275 my $block = $blocks->{ _cid_string($cid) } or die { 275 276 status => 400, ··· 277 278 message => 'missing MST block in imported CAR', 278 279 }; 279 280 my $node = decode_dag_cbor($block->{bytes}); 280 - _walk_mst($node->{l}, $blocks, $records) if $node->{l}; 281 + _walk_mst($node->{l}, $blocks, $records, $import_rev) if $node->{l}; 281 282 282 283 my $previous = q(); 283 284 for my $entry (@{ $node->{e} || [] }) { ··· 304 305 cid => _cid_string($record_cid), 305 306 value => decode_dag_cbor($record_block->{bytes}), 306 307 record_bytes => $record_block->{bytes}, 308 + repo_rev => $import_rev, 307 309 created_at => time, 308 310 updated_at => time, 309 311 }; 310 312 311 - _walk_mst($entry->{t}, $blocks, $records) if $entry->{t}; 313 + _walk_mst($entry->{t}, $blocks, $records, $import_rev) if $entry->{t}; 312 314 $previous = $path; 313 315 } 314 316 }
+56 -19
lib/ATProto/PDS/Store/SQLite.pm
··· 795 795 sub list_blobs_by_did ($self, $did, %args) { 796 796 my $limit = $args{limit} // 500; 797 797 my $cursor = $args{cursor}; 798 - my @bind = ($did); 799 - my $sql = q{ 800 - SELECT b.* 801 - FROM blobs b 802 - JOIN blob_owners bo ON bo.cid = b.cid 803 - WHERE bo.did = ? 804 - }; 798 + my $since = $args{since}; 799 + my %seen; 800 + my @rows = map { +{ cid => $_ } } sort grep { !$seen{$_}++ } map { 801 + _record_blob_cids($_->{value}) 802 + } grep { 803 + !defined($since) || !length($since) || (defined($_->{repo_rev}) && $_->{repo_rev} gt $since) 804 + } @{ $self->all_records_for_did($did) }; 805 805 if (defined $cursor && length $cursor) { 806 - $sql .= q{ AND b.cid > ?}; 807 - push @bind, $cursor; 806 + @rows = grep { $_->{cid} gt $cursor } @rows; 808 807 } 809 - $sql .= q{ ORDER BY b.cid LIMIT ?}; 810 - push @bind, $limit + 1; 811 - my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, @bind); 812 - return _paginate($rows, $limit, 'cid'); 808 + return _paginate(\@rows, $limit, 'cid'); 813 809 } 814 810 815 811 sub count_blobs_by_did ($self, $did) { ··· 831 827 $self->dbh, 832 828 q{ 833 829 INSERT INTO records ( 834 - did, collection, rkey, cid, value_json, record_bytes, created_at, updated_at 835 - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 830 + did, collection, rkey, cid, value_json, record_bytes, repo_rev, created_at, updated_at 831 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 836 832 ON CONFLICT(did, collection, rkey) DO UPDATE SET 837 833 cid = excluded.cid, 838 834 value_json = excluded.value_json, 839 835 record_bytes = excluded.record_bytes, 836 + repo_rev = excluded.repo_rev, 840 837 updated_at = excluded.updated_at 841 838 }, 842 839 [ ··· 846 843 $cid, 847 844 encode_json($args{value}), 848 845 $args{record_bytes}, 846 + $args{repo_rev}, 849 847 $args{created_at} // $now, 850 848 $now, 851 849 ], 852 850 _blob_bind_positions_for_names([qw( 853 - did collection rkey cid value_json record_bytes created_at updated_at 851 + did collection rkey cid value_json record_bytes repo_rev created_at updated_at 854 852 )], qw(record_bytes)), 855 853 ); 856 854 ··· 865 863 $dbh, 866 864 q{ 867 865 INSERT INTO records ( 868 - did, collection, rkey, cid, value_json, record_bytes, created_at, updated_at 869 - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 866 + did, collection, rkey, cid, value_json, record_bytes, repo_rev, created_at, updated_at 867 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 870 868 }, 871 869 [ 872 870 $did, ··· 875 873 $record->{cid}, 876 874 encode_json($record->{value}), 877 875 $record->{record_bytes}, 876 + $record->{repo_rev}, 878 877 $record->{created_at} // time, 879 878 $record->{updated_at} // time, 880 879 ], 881 880 _blob_bind_positions_for_names([qw( 882 - did collection rkey cid value_json record_bytes created_at updated_at 881 + did collection rkey cid value_json record_bytes repo_rev created_at updated_at 883 882 )], qw(record_bytes)), 884 883 ); 885 884 } ··· 1671 1670 }, 1672 1671 ], 1673 1672 }, 1673 + { 1674 + version => 11, 1675 + statements => [ 1676 + q{ALTER TABLE records ADD COLUMN repo_rev TEXT}, 1677 + q{ 1678 + UPDATE records 1679 + SET repo_rev = ( 1680 + SELECT accounts.repo_rev 1681 + FROM accounts 1682 + WHERE accounts.did = records.did 1683 + ) 1684 + WHERE repo_rev IS NULL 1685 + }, 1686 + q{CREATE INDEX IF NOT EXISTS records_by_did_repo_rev ON records (did, repo_rev, rkey)}, 1687 + ], 1688 + }, 1674 1689 ); 1675 1690 } 1676 1691 ··· 1721 1736 $row->{value} = decode_json($row->{value_json}) if defined $row->{value_json}; 1722 1737 delete $row->{value_json}; 1723 1738 return $row; 1739 + } 1740 + 1741 + sub _record_blob_cids ($value) { 1742 + return () unless defined $value; 1743 + if (ref($value) eq 'HASH') { 1744 + if (($value->{'$type'} // q()) eq 'blob' && ref($value->{ref}) eq 'HASH' && defined($value->{ref}{'$link'})) { 1745 + return ($value->{ref}{'$link'}); 1746 + } 1747 + my @found; 1748 + for my $child (values %$value) { 1749 + push @found, _record_blob_cids($child); 1750 + } 1751 + return @found; 1752 + } 1753 + if (ref($value) eq 'ARRAY') { 1754 + my @found; 1755 + for my $child (@$value) { 1756 + push @found, _record_blob_cids($child); 1757 + } 1758 + return @found; 1759 + } 1760 + return (); 1724 1761 } 1725 1762 1726 1763 sub _execute_sql ($dbh, $sql, $params = undef, $blob_positions = undef) {
+55
script/differential-validate
··· 1273 1273 'getCheckout matches the official reference PDS semantics', 1274 1274 ); 1275 1275 1276 + note('Comparing listBlobs since semantics'); 1277 + for my $name (sort keys %server) { 1278 + my $session = post_json($server{$name}{origin}, 'com.atproto.server.createSession', { 1279 + identifier => $server{$name}{renamed_handle} || $server{$name}{handle}, 1280 + password => 'hunter22', 1281 + }); 1282 + check($session->is_success, "$name createSession succeeds for listBlobs comparison"); 1283 + next unless $session->is_success; 1284 + 1285 + my $upload = post_bytes( 1286 + $server{$name}{origin}, 1287 + 'com.atproto.repo.uploadBlob', 1288 + "blob bytes for $name", 1289 + 'text/plain', 1290 + auth_header(($session->json || {})->{accessJwt}), 1291 + ); 1292 + check($upload->is_success, "$name uploadBlob succeeds for listBlobs comparison"); 1293 + next unless $upload->is_success; 1294 + 1295 + my $blob = ($upload->json || {})->{blob}; 1296 + my $create = post_json( 1297 + $server{$name}{origin}, 1298 + 'com.atproto.repo.createRecord', 1299 + { 1300 + repo => $server{$name}{did}, 1301 + collection => 'com.example.blobtest', 1302 + rkey => 'blobtest', 1303 + record => { 1304 + '$type' => 'com.example.blobtest', 1305 + blob => $blob, 1306 + }, 1307 + }, 1308 + auth_header(($session->json || {})->{accessJwt}), 1309 + ); 1310 + check($create->is_success, "$name createRecord with blob succeeds for listBlobs comparison"); 1311 + 1312 + my $res = get_form($server{$name}{origin}, 'com.atproto.sync.listBlobs', { 1313 + did => $server{$name}{did}, 1314 + since => $server{$name}{latest_commit_raw}{rev}, 1315 + }); 1316 + check($res->is_success, "$name listBlobs with since succeeds"); 1317 + my $json = $res->json || {}; 1318 + my $blob_cid = $blob->{ref}{'$link'}; 1319 + $server{$name}{list_blobs_since} = { 1320 + ok => $res->is_success ? 1 : 0, 1321 + returns_blob => grep { $_ eq $blob_cid } @{ $json->{cids} || [] } ? 1 : 0, 1322 + cursor_matches_tail => (($json->{cursor} // q()) eq (($json->{cids} || [])->[-1] // q())) ? 1 : 0, 1323 + }; 1324 + } 1325 + 1326 + check( 1327 + same_hash($server{reference}{list_blobs_since}, $server{perlsky}{list_blobs_since}), 1328 + 'listBlobs since semantics match the official reference PDS', 1329 + ); 1330 + 1276 1331 note('Comparing listMissingBlobs empty-state semantics'); 1277 1332 for my $name (sort keys %server) { 1278 1333 my $res = get_form($server{$name}{origin}, 'com.atproto.repo.listMissingBlobs', {}, auth_header($server{$name}{access}));
+11
t/extended-api.t
··· 237 237 my $blob = $t->tx->res->json->{blob}; 238 238 my $blob_cid = $blob->{ref}{'$link'}; 239 239 240 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 241 + Authorization => "Bearer $access", 242 + } => json => { 243 + repo => $did, 244 + collection => 'com.example.attach', 245 + record => { 246 + '$type' => 'com.example.attach', 247 + blob => $blob, 248 + }, 249 + })->status_is(200); 250 + 240 251 $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.listBlobs')->query( 241 252 did => $did, 242 253 ))->status_is(200)
+12 -4
t/external-surface.t
··· 169 169 $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.listBlobs')->query( 170 170 did => $second_did, 171 171 ))->status_is(200) 172 - ->json_is('/cids/0' => $blob_cid); 172 + ->json_is('/cids' => []); 173 173 174 174 $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.getBlocks')->query( 175 175 did => $second_did, ··· 184 184 } => 'blob-two')->status_is(200); 185 185 186 186 my $blob_two_cid = $t->tx->res->json->{blob}{ref}{'$link'}; 187 - my @sorted_blob_cids = sort ($blob_cid, $blob_two_cid); 187 + my @sorted_blob_cids = sort ($blob_cid); 188 188 189 189 $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.listBlobs')->query( 190 190 did => $did, 191 191 limit => 1, 192 192 ))->status_is(200) 193 193 ->json_is('/cids/0' => $sorted_blob_cids[0]) 194 - ->json_is('/cursor' => $sorted_blob_cids[0]); 194 + ->json_is('/cursor' => undef); 195 195 196 196 $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.listBlobs')->query( 197 197 did => $did, 198 198 limit => 1, 199 199 cursor => $sorted_blob_cids[0], 200 200 ))->status_is(200) 201 - ->json_is('/cids/0' => $sorted_blob_cids[1]); 201 + ->json_is('/cids' => []); 202 202 203 203 $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.getBlob')->query( 204 204 did => $second_did, ··· 236 236 })->status_is(200); 237 237 238 238 my $nested_record_uri = $t->tx->res->json->{uri}; 239 + my @since_sorted_blob_cids = sort ($blob_cid, $nested_blob_cid); 240 + 241 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.listBlobs')->query( 242 + did => $did, 243 + since => $latest->{rev}, 244 + ))->status_is(200) 245 + ->json_is('/cids/0' => $since_sorted_blob_cids[0]) 246 + ->json_is('/cids/1' => $since_sorted_blob_cids[1]); 239 247 240 248 $t->get_ok('/xrpc/com.atproto.server.checkAccountStatus' => { 241 249 Authorization => "Bearer $access",
+48
t/store-sqlite.t
··· 92 92 byte_size => 1234, 93 93 storage_path => 'blobs/bafk.png', 94 94 ); 95 + $store->put_record( 96 + did => $account->{did}, 97 + collection => 'com.example.attach', 98 + rkey => 'blob-one', 99 + cid => 'bafyattach1', 100 + record_bytes => q(), 101 + value => { 102 + '$type' => 'com.example.attach', 103 + blob => { 104 + '$type' => 'blob', 105 + ref => { '$link' => 'bafkreigh2akiscaildc' }, 106 + mimeType => 'image/png', 107 + size => 1234, 108 + }, 109 + }, 110 + ); 111 + $store->put_record( 112 + did => $second_account->{did}, 113 + collection => 'com.example.attach', 114 + rkey => 'blob-one', 115 + cid => 'bafyattach2', 116 + record_bytes => q(), 117 + value => { 118 + '$type' => 'com.example.attach', 119 + blob => { 120 + '$type' => 'blob', 121 + ref => { '$link' => 'bafkreigh2akiscaildc' }, 122 + mimeType => 'image/png', 123 + size => 1234, 124 + }, 125 + }, 126 + ); 95 127 ok($store->blob_owned_by_did('bafkreigh2akiscaildc', $second_account->{did}), 'second owner is tracked for shared blob'); 96 128 is($store->count_blobs_by_did($account->{did}), 1, 'first account still counts shared blob'); 97 129 is($store->count_blobs_by_did($second_account->{did}), 1, 'second account counts shared blob'); ··· 104 136 mime_type => 'image/jpeg', 105 137 byte_size => 4321, 106 138 storage_path => 'blobs/bafk-second.jpg', 139 + ); 140 + $store->put_record( 141 + did => $account->{did}, 142 + collection => 'com.example.attach', 143 + rkey => 'blob-two', 144 + cid => 'bafyattach3', 145 + record_bytes => q(), 146 + value => { 147 + '$type' => 'com.example.attach', 148 + blob => { 149 + '$type' => 'blob', 150 + ref => { '$link' => 'bafkreighsecondblob' }, 151 + mimeType => 'image/jpeg', 152 + size => 4321, 153 + }, 154 + }, 107 155 ); 108 156 109 157 my $blob_page_one = $store->list_blobs_by_did($account->{did}, limit => 1);