perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement importRepo and reference snapshot restore checks

alice 0ce778bc fcf832e0

+360 -3
+1 -1
README.md
··· 13 13 14 14 Reference differential validation: 15 15 16 - - Run `script/differential-validate` to compare `perlds` against the official published `@atproto/pds` on a focused set of account, repo, sync, and firehose behaviors. 16 + - Run `script/differential-validate` to compare `perlds` against the official published `@atproto/pds` on a focused set of account, repo, sync, firehose, and `importRepo` snapshot-restore behaviors. 17 17 - Run `PERLDS_DIFF_ACCOUNT_DID_METHOD=did:plc script/differential-validate` to exercise the same harness in PLC-account mode, including recommended DID credentials, PLC signature requests, PLC handle updates, and token-gated PLC signing behavior. 18 18 - The helper installs the reference runtime into `.tools/reference-runtime` with Node 20 via `fnm`. 19 19 - Run `PERLDS_RUN_REFERENCE_DIFF=1 prove -lv t/reference-differential.t` to exercise the same harness from the test suite.
+7 -1
lib/ATProto/PDS/API/Repo.pm
··· 177 177 178 178 $registry->register('com.atproto.repo.importRepo', sub ($c, $endpoint) { 179 179 my (undef, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 180 - _xrpc_error(400, 'UnsupportedRepoImport', "Repo import is not yet supported for $account->{did}"); 180 + _xrpc_error(400, 'InvalidRequest', 'Service is not accepting repo imports') 181 + unless $c->config_value('accepting_imports', 1); 182 + my $car_bytes = $c->req->body // q(); 183 + _xrpc_error(400, 'InvalidRequest', 'Repo import requires a CAR payload') 184 + unless length $car_bytes; 185 + $c->repo_manager->import_repo_car($account, $car_bytes); 186 + return {}; 181 187 }); 182 188 } 183 189
+194 -1
lib/ATProto/PDS/Repo/Manager.pm
··· 6 6 no warnings 'experimental::signatures'; 7 7 8 8 use JSON::PP qw(decode_json); 9 + use Scalar::Util qw(blessed); 9 10 10 11 use ATProto::PDS::Crypto::Secp256k1 qw(generate_keypair sign_compact_low_s); 11 12 use ATProto::PDS::Repo::Bytes; 12 13 use ATProto::PDS::Repo::CAR qw(read_car write_car); 13 14 use ATProto::PDS::Repo::CID; 14 - use ATProto::PDS::Repo::DagCbor qw(encode_dag_cbor); 15 + use ATProto::PDS::Repo::DagCbor qw(decode_dag_cbor encode_dag_cbor); 15 16 use ATProto::PDS::Repo::MST qw(build_mst); 16 17 use ATProto::PDS::Util::TID qw(next_tid); 17 18 ··· 174 175 car_bytes => $car_bytes, 175 176 results => \@results, 176 177 }; 178 + } 179 + 180 + sub import_repo_car ($self, $account, $car_bytes) { 181 + my $store = $self->store; 182 + my $did = $account->{did}; 183 + my $car = read_car($car_bytes); 184 + die { 185 + status => 400, 186 + error => 'InvalidRepoImport', 187 + message => 'expected one root', 188 + } unless @{ $car->{roots} || [] } == 1; 189 + 190 + my %blocks = map { $_->{cid}->to_string => $_ } @{ $car->{blocks} || [] }; 191 + my $import_root_cid = $car->{roots}[0]; 192 + my $commit_block = $blocks{ $import_root_cid->to_string } or die { 193 + status => 400, 194 + error => 'InvalidRepoImport', 195 + message => 'root commit block is missing from the CAR', 196 + }; 197 + my $commit = decode_dag_cbor($commit_block->{bytes}); 198 + die { 199 + status => 400, 200 + error => 'InvalidRepoImport', 201 + message => 'imported repo belongs to a different DID', 202 + } unless ($commit->{did} // q()) eq $did; 203 + 204 + my $records = _records_from_import($commit->{data}, \%blocks); 205 + my %imported = map { $_->{collection} . '/' . $_->{rkey} => $_ } @$records; 206 + my %previous = map { 207 + $_->{collection} . '/' . $_->{rkey} => $_ 208 + } @{ $store->all_records_for_did($did) }; 209 + my @ops = _diff_record_sets(\%previous, \%imported); 210 + my $latest = $store->get_latest_commit($did); 211 + my $prev_data = $latest ? $latest->{root_cid} : undef; 212 + my %mst_input = map { 213 + $_->{collection} . '/' . $_->{rkey} => ATProto::PDS::Repo::CID->from_string($_->{cid}) 214 + } @$records; 215 + my $mst = build_mst(\%mst_input); 216 + my $rev = next_tid($latest ? $latest->{rev} : undef); 217 + my $unsigned = { 218 + did => $did, 219 + version => 3, 220 + data => $mst->{root}, 221 + rev => $rev, 222 + prev => $latest ? ATProto::PDS::Repo::CID->from_string($latest->{cid}) : undef, 223 + }; 224 + my $unsigned_bytes = encode_dag_cbor($unsigned); 225 + my $sig = sign_compact_low_s($account->{private_key}, $unsigned_bytes); 226 + my $next_commit = { %$unsigned, sig => ATProto::PDS::Repo::Bytes->new($sig) }; 227 + my $commit_bytes = encode_dag_cbor($next_commit); 228 + my $commit_cid = ATProto::PDS::Repo::CID->for_dag_cbor($commit_bytes); 229 + my $root_cid = $mst->{root}->to_string; 230 + my @repo_blocks = ( 231 + { cid => $commit_cid, bytes => $commit_bytes }, 232 + @{ $mst->{blocks} }, 233 + map { 234 + +{ 235 + cid => ATProto::PDS::Repo::CID->from_string($_->{cid}), 236 + bytes => $_->{record_bytes}, 237 + } 238 + } @$records, 239 + ); 240 + my $next_car_bytes = write_car($commit_cid, \@repo_blocks); 241 + 242 + $store->txn(sub ($dbh) { 243 + for my $block (values %blocks, @repo_blocks) { 244 + $store->put_block( 245 + cid => $block->{cid}->to_string, 246 + codec => $block->{cid}->codec, 247 + bytes => $block->{bytes}, 248 + ); 249 + } 250 + $store->replace_records_for_did($did, $records); 251 + $store->put_commit( 252 + did => $did, 253 + rev => $rev, 254 + cid => $commit_cid->to_string, 255 + root_cid => $root_cid, 256 + prev_cid => $latest ? $latest->{cid} : undef, 257 + commit_bytes => $commit_bytes, 258 + car_bytes => $next_car_bytes, 259 + ); 260 + $store->append_event( 261 + did => $did, 262 + type => 'commit', 263 + rev => $rev, 264 + commit_cid => $commit_cid->to_string, 265 + payload => { 266 + ops => \@ops, 267 + since => undef, 268 + prevData => $prev_data, 269 + }, 270 + car_bytes => $next_car_bytes, 271 + ); 272 + }); 273 + 274 + return { 275 + cid => $commit_cid->to_string, 276 + rev => $rev, 277 + root_cid => $root_cid, 278 + records => $records, 279 + }; 280 + } 281 + 282 + sub _records_from_import ($root_cid, $blocks) { 283 + my @records; 284 + _walk_mst($root_cid, $blocks, \@records); 285 + return \@records; 286 + } 287 + 288 + sub _walk_mst ($cid, $blocks, $records) { 289 + return unless $cid; 290 + my $block = $blocks->{ _cid_string($cid) } or die { 291 + status => 400, 292 + error => 'InvalidRepoImport', 293 + message => 'missing MST block in imported CAR', 294 + }; 295 + my $node = decode_dag_cbor($block->{bytes}); 296 + _walk_mst($node->{l}, $blocks, $records) if $node->{l}; 297 + 298 + my $previous = q(); 299 + for my $entry (@{ $node->{e} || [] }) { 300 + my $suffix = blessed($entry->{k}) && $entry->{k}->isa('ATProto::PDS::Repo::Bytes') 301 + ? $entry->{k}->bytes 302 + : ($entry->{k} // q()); 303 + my $path = substr($previous, 0, $entry->{p} // 0) . $suffix; 304 + my ($collection, $rkey) = split m{/}, $path, 2; 305 + die { 306 + status => 400, 307 + error => 'InvalidRepoImport', 308 + message => "invalid repo path '$path' in imported MST", 309 + } unless defined($collection) && length($collection) && defined($rkey) && length($rkey); 310 + 311 + my $record_cid = $entry->{v}; 312 + my $record_block = $blocks->{ _cid_string($record_cid) } or die { 313 + status => 400, 314 + error => 'InvalidRepoImport', 315 + message => "missing record block for '$path' in imported CAR", 316 + }; 317 + push @$records, { 318 + collection => $collection, 319 + rkey => $rkey, 320 + cid => _cid_string($record_cid), 321 + value => decode_dag_cbor($record_block->{bytes}), 322 + record_bytes => $record_block->{bytes}, 323 + created_at => time, 324 + updated_at => time, 325 + }; 326 + 327 + _walk_mst($entry->{t}, $blocks, $records) if $entry->{t}; 328 + $previous = $path; 329 + } 330 + } 331 + 332 + sub _diff_record_sets ($previous, $imported) { 333 + my %paths = map { $_ => 1 } (keys %$previous, keys %$imported); 334 + my @ops; 335 + for my $path (sort keys %paths) { 336 + my $before = $previous->{$path}; 337 + my $after = $imported->{$path}; 338 + if ($before && !$after) { 339 + push @ops, { 340 + action => 'delete', 341 + path => $path, 342 + cid => undef, 343 + prev => $before->{cid}, 344 + }; 345 + next; 346 + } 347 + if (!$before && $after) { 348 + push @ops, { 349 + action => 'create', 350 + path => $path, 351 + cid => $after->{cid}, 352 + }; 353 + next; 354 + } 355 + next if ($before->{cid} // q()) eq ($after->{cid} // q()); 356 + push @ops, { 357 + action => 'update', 358 + path => $path, 359 + cid => $after->{cid}, 360 + prev => $before->{cid}, 361 + }; 362 + } 363 + return @ops; 364 + } 365 + 366 + sub _cid_string ($cid) { 367 + return undef unless $cid; 368 + return $cid->to_string if blessed($cid) && $cid->isa('ATProto::PDS::Repo::CID'); 369 + return $cid; 177 370 } 178 371 179 372 1;
+54
script/differential-validate
··· 170 170 return $tx->result; 171 171 } 172 172 173 + sub post_bytes ($origin, $nsid, $bytes, $content_type, $headers = {}) { 174 + my $ua = Mojo::UserAgent->new(max_redirects => 0); 175 + my $tx = $ua->post( 176 + "$origin/xrpc/$nsid" => { 177 + 'Content-Type' => $content_type, 178 + %{$headers}, 179 + } => $bytes, 180 + ); 181 + return $tx->result; 182 + } 183 + 173 184 sub get_form ($origin, $nsid, $query, $headers = {}) { 174 185 my $ua = Mojo::UserAgent->new(max_redirects => 0); 175 186 my $url = Mojo::URL->new("$origin/xrpc/$nsid")->query($query); ··· 696 707 my $ctype = $res->headers->content_type // q(); 697 708 check($ctype =~ m{application/vnd\.ipld\.car}, "$name getRepo returns CAR bytes"); 698 709 check(length($res->body // q()) > 0, "$name getRepo CAR payload is non-empty"); 710 + $server{$name}{repo_snapshot_car} = $res->body; 699 711 } 712 + 713 + note('Comparing importRepo snapshot restore'); 714 + for my $name (sort keys %server) { 715 + my $res = post_json($server{$name}{origin}, 'com.atproto.repo.createRecord', { 716 + repo => $server{$name}{did}, 717 + collection => 'app.bsky.feed.post', 718 + rkey => 'import-diff', 719 + record => { 720 + %{$record}, 721 + text => "import validation for $name", 722 + }, 723 + }, auth_header($server{$name}{access})); 724 + check($res->is_success, "$name creates an extra record before importRepo"); 725 + next unless $res->is_success; 726 + 727 + $res = post_bytes( 728 + $server{$name}{origin}, 729 + 'com.atproto.repo.importRepo', 730 + $server{$name}{repo_snapshot_car}, 731 + 'application/vnd.ipld.car', 732 + auth_header($server{$name}{access}), 733 + ); 734 + check($res->is_success, "$name importRepo succeeds"); 735 + next unless $res->is_success; 736 + 737 + $res = get_form($server{$name}{origin}, 'com.atproto.repo.listRecords', { 738 + repo => $server{$name}{did}, 739 + collection => 'app.bsky.feed.post', 740 + }); 741 + check($res->is_success, "$name listRecords succeeds after importRepo"); 742 + next unless $res->is_success; 743 + my $records = $res->json->{records} || []; 744 + $server{$name}{import_repo_state} = { 745 + record_count => 0 + @$records, 746 + texts => [ map { $_->{value}{text} } @$records ], 747 + }; 748 + } 749 + 750 + check( 751 + same_hash($server{reference}{import_repo_state}, $server{perlds}{import_repo_state}), 752 + 'importRepo restores the same normalized repo state as the official reference PDS', 753 + ); 700 754 701 755 note('Comparing firehose live follow behavior'); 702 756 for my $name (sort keys %server) {
+104
t/import-repo.t
··· 1 + use v5.34; 2 + use warnings; 3 + 4 + use Config (); 5 + use File::Spec; 6 + use File::Temp qw(tempdir); 7 + use FindBin qw($Bin); 8 + use Test::More; 9 + 10 + BEGIN { 11 + require lib; 12 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 13 + lib->import( 14 + File::Spec->catdir($root, 'lib'), 15 + File::Spec->catdir($root, 'local', 'lib', 'perl5'), 16 + File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 17 + ); 18 + } 19 + 20 + use Test::Mojo; 21 + use ATProto::PDS; 22 + 23 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 24 + my $tmp = tempdir(CLEANUP => 1); 25 + 26 + my $app = ATProto::PDS->new( 27 + project_root => $root, 28 + settings => { 29 + base_url => 'http://127.0.0.1:7755', 30 + service_handle_domain => 'example.test', 31 + service_did_method => 'did:web', 32 + accepting_imports => 1, 33 + jwt_secret => 'import-secret', 34 + admin_password => 'admin-secret', 35 + data_dir => File::Spec->catdir($tmp, 'data'), 36 + db_path => File::Spec->catfile($tmp, 'perlds.sqlite'), 37 + }, 38 + ); 39 + 40 + my $t = Test::Mojo->new($app); 41 + 42 + $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 43 + handle => 'alice.example.test', 44 + email => 'alice@example.test', 45 + password => 'hunter22', 46 + })->status_is(200); 47 + 48 + my $created = $t->tx->res->json; 49 + my $did = $created->{did}; 50 + my $access = $created->{accessJwt}; 51 + 52 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 53 + Authorization => "Bearer $access", 54 + } => json => { 55 + repo => $did, 56 + collection => 'app.bsky.feed.post', 57 + rkey => 'before-import', 58 + record => { 59 + '$type' => 'app.bsky.feed.post', 60 + text => 'state before import', 61 + createdAt => '2026-03-10T00:00:00Z', 62 + }, 63 + })->status_is(200); 64 + 65 + $t->get_ok('/xrpc/com.atproto.sync.getRepo' => form => { 66 + did => $did, 67 + })->status_is(200); 68 + my $snapshot = $t->tx->res->body; 69 + 70 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 71 + Authorization => "Bearer $access", 72 + } => json => { 73 + repo => $did, 74 + collection => 'app.bsky.feed.post', 75 + rkey => 'after-import', 76 + record => { 77 + '$type' => 'app.bsky.feed.post', 78 + text => 'state after import', 79 + createdAt => '2026-03-10T00:00:01Z', 80 + }, 81 + })->status_is(200); 82 + 83 + $t->get_ok('/xrpc/com.atproto.repo.listRecords' => form => { 84 + repo => $did, 85 + collection => 'app.bsky.feed.post', 86 + })->status_is(200); 87 + is(scalar @{ $t->tx->res->json->{records} || [] }, 2, 'repo contains the extra write before import'); 88 + 89 + $t->post_ok('/xrpc/com.atproto.repo.importRepo' => { 90 + Authorization => "Bearer $access", 91 + 'Content-Type' => 'application/vnd.ipld.car', 92 + } => $snapshot)->status_is(200); 93 + 94 + $t->get_ok('/xrpc/com.atproto.repo.listRecords' => form => { 95 + repo => $did, 96 + collection => 'app.bsky.feed.post', 97 + })->status_is(200); 98 + 99 + my $records = $t->tx->res->json->{records} || []; 100 + is(scalar @$records, 1, 'importRepo restores the earlier repo snapshot'); 101 + is($records->[0]{uri}, "at://$did/app.bsky.feed.post/before-import", 'imported repo keeps the earlier record URI'); 102 + is($records->[0]{value}{text}, 'state before import', 'imported repo restores the earlier record body'); 103 + 104 + done_testing;