perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Repair legacy TIDs and firehose cursor semantics

alice dd616345 b28a3fc0

+376 -11
+4 -2
lib/ATProto/PDS/API/Sync.pm
··· 237 237 $next_seq = $latest + 1; 238 238 } else { 239 239 my $cursor = int($cursor_param); 240 - if ($cursor > $latest + 1) { 240 + if ($cursor > $latest) { 241 241 $c->subscription_send( 242 242 binary => encode_error_frame('FutureCursor', 'Cursor is ahead of the local event stream'), 243 243 frame_type => 'error', ··· 252 252 ); 253 253 $next_seq = $oldest; 254 254 } else { 255 - $next_seq = $cursor || ($oldest || ($latest + 1)); 255 + $next_seq = defined($cursor_param) && length($cursor_param) 256 + ? ($cursor + 1) 257 + : ($oldest || ($latest + 1)); 256 258 } 257 259 } 258 260
+178 -2
lib/ATProto/PDS/Repo/Manager.pm
··· 5 5 use feature 'signatures'; 6 6 no warnings 'experimental::signatures'; 7 7 8 - use JSON::PP qw(decode_json); 8 + use JSON::PP qw(encode_json); 9 9 use Scalar::Util qw(blessed); 10 10 11 11 use ATProto::PDS::Crypto::Secp256k1 qw(generate_keypair sign_compact_low_s); ··· 15 15 use ATProto::PDS::Repo::CID; 16 16 use ATProto::PDS::Repo::DagCbor qw(decode_dag_cbor encode_dag_cbor); 17 17 use ATProto::PDS::Repo::MST qw(build_mst); 18 - use ATProto::PDS::Util::TID qw(next_tid); 18 + use ATProto::PDS::Util::TID qw(is_valid_tid next_tid repair_tid); 19 19 20 20 sub new ($class, %args) { 21 21 die 'store is required' unless $args{store}; ··· 366 366 root_cid => $root_cid, 367 367 records => $records, 368 368 }; 369 + } 370 + 371 + sub repair_invalid_tids ($self, $account, %opts) { 372 + my $store = $self->store; 373 + my $did = $account->{did}; 374 + my $latest = $store->get_latest_commit($did); 375 + my $records = $store->all_records_for_did($did); 376 + 377 + my $repaired = _repair_records_for_repo($account, $records); 378 + my $rev_needs_repair = defined($latest->{rev}) && !is_valid_tid($latest->{rev}) && defined(repair_tid($latest->{rev})); 379 + my $needs_repair = $repaired->{changed} || $rev_needs_repair || ($opts{force} // 0); 380 + 381 + return { 382 + changed => 0, 383 + repaired_paths => $repaired->{repaired_paths}, 384 + rewritten_refs => $repaired->{rewritten_refs}, 385 + rev_repaired => $rev_needs_repair ? 1 : 0, 386 + imported => undef, 387 + } unless $needs_repair; 388 + 389 + my $snapshot_car = _build_snapshot_car( 390 + $account, 391 + $repaired->{records}, 392 + $latest ? (repair_tid($latest->{rev}) // $latest->{rev}) : undef, 393 + ); 394 + my $imported = $self->import_repo_car($account, $snapshot_car); 395 + 396 + return { 397 + changed => 1, 398 + repaired_paths => $repaired->{repaired_paths}, 399 + rewritten_refs => $repaired->{rewritten_refs}, 400 + rev_repaired => $rev_needs_repair ? 1 : 0, 401 + imported => $imported, 402 + }; 403 + } 404 + 405 + sub _repair_records_for_repo ($account, $records) { 406 + my $did = $account->{did}; 407 + my $handle = $account->{handle}; 408 + my %path_map; 409 + my %occupied = map { 410 + $_->{collection} . '/' . $_->{rkey} => 1 411 + } @$records; 412 + 413 + my $repaired_paths = 0; 414 + for my $record (@$records) { 415 + my $old_path = $record->{collection} . '/' . $record->{rkey}; 416 + my $repaired_rkey = repair_tid($record->{rkey}); 417 + next unless defined $repaired_rkey && $repaired_rkey ne $record->{rkey}; 418 + 419 + my $new_path = $record->{collection} . '/' . $repaired_rkey; 420 + die { 421 + status => 500, 422 + error => 'RepoRepairCollision', 423 + message => "repair would collide at '$new_path'", 424 + } if $occupied{$new_path} && $new_path ne $old_path; 425 + 426 + delete $occupied{$old_path}; 427 + $occupied{$new_path} = 1; 428 + $path_map{$old_path} = $new_path; 429 + $repaired_paths++; 430 + } 431 + 432 + my $rewritten_refs = 0; 433 + my @repaired; 434 + for my $record (@$records) { 435 + my $old_path = $record->{collection} . '/' . $record->{rkey}; 436 + my $new_path = $path_map{$old_path} // $old_path; 437 + my ($collection, $rkey) = split m{/}, $new_path, 2; 438 + 439 + my $counter = 0; 440 + my $value = _rewrite_owned_at_uris( 441 + $record->{value}, 442 + { 443 + $did => 1, 444 + ($handle ? ($handle => 1) : ()), 445 + }, 446 + \%path_map, 447 + \$counter, 448 + ); 449 + $rewritten_refs += $counter; 450 + 451 + my $record_bytes = encode_dag_cbor($value); 452 + my $cid = ATProto::PDS::Repo::CID->for_dag_cbor($record_bytes)->to_string; 453 + push @repaired, { 454 + %$record, 455 + collection => $collection, 456 + rkey => $rkey, 457 + value => $value, 458 + cid => $cid, 459 + record_bytes => $record_bytes, 460 + }; 461 + } 462 + 463 + my $changed = $repaired_paths || $rewritten_refs; 464 + if (!$changed) { 465 + for my $idx (0 .. $#repaired) { 466 + my $before = $records->[$idx]; 467 + my $after = $repaired[$idx]; 468 + if (($before->{collection} // q()) ne ($after->{collection} // q()) 469 + || ($before->{rkey} // q()) ne ($after->{rkey} // q()) 470 + || ($before->{cid} // q()) ne ($after->{cid} // q()) 471 + ) { 472 + $changed = 1; 473 + last; 474 + } 475 + } 476 + } 477 + 478 + return { 479 + changed => $changed ? 1 : 0, 480 + repaired_paths => $repaired_paths, 481 + rewritten_refs => $rewritten_refs, 482 + records => \@repaired, 483 + path_map => \%path_map, 484 + }; 485 + } 486 + 487 + sub _rewrite_owned_at_uris ($value, $hosts, $path_map, $counter_ref) { 488 + return undef unless defined $value; 489 + 490 + if (!ref($value)) { 491 + if ($value =~ m{\Aat://([^/]+)/([^/]+/[^/?#]+)\z}) { 492 + my ($host, $path) = ($1, $2); 493 + if ($hosts->{$host} && defined $path_map->{$path}) { 494 + $$counter_ref++ if $counter_ref; 495 + return "at://$host/$path_map->{$path}"; 496 + } 497 + } 498 + return $value; 499 + } 500 + 501 + if (ref($value) eq 'ARRAY') { 502 + return [ map { _rewrite_owned_at_uris($_, $hosts, $path_map, $counter_ref) } @$value ]; 503 + } 504 + 505 + if (ref($value) eq 'HASH') { 506 + return { 507 + map { 508 + $_ => _rewrite_owned_at_uris($value->{$_}, $hosts, $path_map, $counter_ref) 509 + } sort keys %$value 510 + }; 511 + } 512 + 513 + return $value; 514 + } 515 + 516 + sub _build_snapshot_car ($account, $records, $rev = undef) { 517 + my $did = $account->{did}; 518 + my %mst_input = map { 519 + $_->{collection} . '/' . $_->{rkey} => ATProto::PDS::Repo::CID->from_string($_->{cid}) 520 + } @$records; 521 + my $mst = build_mst(\%mst_input); 522 + my $unsigned = { 523 + did => $did, 524 + version => 3, 525 + data => $mst->{root}, 526 + rev => $rev // next_tid(), 527 + prev => undef, 528 + }; 529 + my $unsigned_bytes = encode_dag_cbor($unsigned); 530 + my $sig = sign_compact_low_s($account->{private_key}, $unsigned_bytes); 531 + my $commit = { %$unsigned, sig => ATProto::PDS::Repo::Bytes->new($sig) }; 532 + my $commit_bytes = encode_dag_cbor($commit); 533 + my $commit_cid = ATProto::PDS::Repo::CID->for_dag_cbor($commit_bytes); 534 + my @blocks = ( 535 + { cid => $commit_cid, bytes => $commit_bytes }, 536 + @{ $mst->{blocks} }, 537 + map { 538 + +{ 539 + cid => ATProto::PDS::Repo::CID->from_string($_->{cid}), 540 + bytes => $_->{record_bytes}, 541 + } 542 + } @$records, 543 + ); 544 + return write_car($commit_cid, \@blocks); 369 545 } 370 546 371 547 sub _records_from_import ($root_cid, $blocks) {
+32 -4
lib/ATProto/PDS/Util/TID.pm
··· 5 5 6 6 use Exporter 'import'; 7 7 8 - our @EXPORT_OK = qw(next_tid); 8 + our @EXPORT_OK = qw( 9 + is_valid_tid 10 + next_tid 11 + repair_tid 12 + ); 9 13 10 14 my $S32_CHAR = '234567abcdefghijklmnopqrstuvwxyz'; 11 15 my %S32_INDEX = map { substr($S32_CHAR, $_, 1) => $_ } 0 .. length($S32_CHAR) - 1; ··· 16 20 17 21 sub next_tid { 18 22 my ($prev) = @_; 23 + my $normalized_prev = repair_tid($prev) // $prev; 19 24 20 25 my $time = int(Time::HiRes::time() * 1000); 21 26 $time = $last_timestamp if $time < $last_timestamp; ··· 30 35 $clockid //= int(rand(32)); 31 36 my $tid = _from_time($time * 1000 + $timestamp_count, $clockid); 32 37 33 - if (defined $prev && $tid le $prev) { 34 - return _from_time(_s32decode(substr($prev, 0, 11)) + 1, $clockid); 38 + if (defined $normalized_prev && $tid le $normalized_prev) { 39 + return _from_time(_s32decode(substr($normalized_prev, 0, 11)) + 1, $clockid); 35 40 } 36 41 37 42 return $tid; 38 43 } 39 44 45 + sub is_valid_tid { 46 + my ($tid) = @_; 47 + return 0 unless defined $tid && length($tid) == 13; 48 + return $tid =~ m{\A[234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{12}\z} ? 1 : 0; 49 + } 50 + 51 + sub repair_tid { 52 + my ($tid) = @_; 53 + return undef unless defined $tid && length($tid) == 13; 54 + return $tid if is_valid_tid($tid); 55 + 56 + # Older perlsky builds incorrectly zero-padded the 2-char clock id, producing 57 + # otherwise-valid TIDs ending in `0x`. Repair that losslessly to `2x`. 58 + if ($tid =~ m{\A([234567abcdefghij][234567abcdefghijklmnopqrstuvwxyz]{10})0([234567abcdefghijklmnopqrstuvwxyz])\z}) { 59 + my $repaired = $1 . '2' . $2; 60 + return $repaired if is_valid_tid($repaired); 61 + } 62 + 63 + return undef; 64 + } 65 + 40 66 sub _from_time { 41 67 my ($timestamp, $clock) = @_; 42 - return _s32encode($timestamp) . sprintf('%02s', _s32encode($clock)) =~ s/ /2/gr; 68 + my $clock_s32 = _s32encode($clock); 69 + $clock_s32 = ('2' x (2 - length($clock_s32))) . $clock_s32 if length($clock_s32) < 2; 70 + return _s32encode($timestamp) . $clock_s32; 43 71 } 44 72 45 73 sub _s32encode {
+40
script/perlsky-admin
··· 34 34 use ATProto::PDS::Store::SQLite; 35 35 use ATProto::PDS::Auth::Password qw(random_hex); 36 36 use ATProto::PDS::Identity qw(service_did); 37 + use ATProto::PDS::Repo::Manager; 37 38 38 39 my $root = File::Spec->rel2abs(File::Spec->catdir($FindBin::RealBin, '..')); 39 40 my $config_path = $ENV{PERLSKY_CONFIG} || File::Spec->catfile($root, 'etc', 'perlsky.example.json'); ··· 78 79 exit 0; 79 80 } 80 81 82 + if ($command eq 'repair-invalid-tids') { 83 + my $did; 84 + my $force = 0; 85 + GetOptionsFromArray( 86 + \@ARGV, 87 + 'did=s' => \$did, 88 + 'force!' => \$force, 89 + ) or die usage(); 90 + 91 + my $store = ATProto::PDS::Store::SQLite->new( 92 + path => $config->{db_path} || File::Spec->catfile($root, 'data', 'runtime', 'perlsky.sqlite'), 93 + )->bootstrap; 94 + my $manager = ATProto::PDS::Repo::Manager->new(store => $store); 95 + 96 + my @accounts = $did 97 + ? do { 98 + my $account = $store->get_account_by_did($did) 99 + or die "account not found for did=$did\n"; 100 + ($account); 101 + } 102 + : @{ $store->list_accounts }; 103 + 104 + for my $account (@accounts) { 105 + my $result = $manager->repair_invalid_tids($account, force => $force); 106 + my $label = $account->{handle} // $account->{did}; 107 + print join( 108 + q{ }, 109 + $label, 110 + 'changed=' . ($result->{changed} // 0), 111 + 'repaired_paths=' . ($result->{repaired_paths} // 0), 112 + 'rewritten_refs=' . ($result->{rewritten_refs} // 0), 113 + 'rev_repaired=' . ($result->{rev_repaired} // 0), 114 + (defined($result->{imported}{rev}) ? ('new_rev=' . $result->{imported}{rev}) : ()), 115 + ), "\n"; 116 + } 117 + exit 0; 118 + } 119 + 81 120 die usage(); 82 121 83 122 sub _new_invite_code { ··· 89 128 usage: 90 129 perlsky-admin create-invite [--use-count N] [--for-account DID] [--created-by DID] 91 130 perlsky-admin repair-binary-columns 131 + perlsky-admin repair-invalid-tids [--did DID] [--force] 92 132 EOF 93 133 }
+3 -3
t/firehose.t
··· 133 133 134 134 my $latest_seq = $app->store->latest_event_seq; 135 135 my $replay = Test::Mojo->new($app); 136 - $replay->websocket_ok("/xrpc/com.atproto.sync.subscribeRepos?cursor=$latest_seq") 137 - ->message_ok('replayed current cursor event'); 136 + $replay->websocket_ok("/xrpc/com.atproto.sync.subscribeRepos?cursor=$baseline_seq"); 138 137 138 + $replay->message_ok('replayed event after the cursor'); 139 139 my $replayed = decode_frame($replay->message->[1]); 140 - is($replayed->{body}{seq}, $latest_seq, 'cursor replay is inclusive'); 140 + is($replayed->{body}{seq}, $baseline_seq + 1, 'cursor replay is exclusive'); 141 141 $replay->finish_ok; 142 142 143 143 my $follow = Test::Mojo->new($app);
+119
t/tid-repair.t
··· 1 + use v5.34; 2 + use warnings; 3 + 4 + use Config (); 5 + use File::Spec; 6 + use File::Temp qw(tempdir); 7 + use FindBin qw($Bin); 8 + use Test::More; 9 + 10 + BEGIN { 11 + require lib; 12 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 13 + lib->import( 14 + File::Spec->catdir($root, 'lib'), 15 + File::Spec->catdir($root, 'local', 'lib', 'perl5'), 16 + File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 17 + ); 18 + } 19 + 20 + use Test::Mojo; 21 + use ATProto::PDS; 22 + use ATProto::PDS::Util::TID qw(is_valid_tid next_tid repair_tid); 23 + 24 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 25 + my $tmp = tempdir(CLEANUP => 1); 26 + 27 + my $legacy_tid = '3mgqt45pjds0s'; 28 + is(repair_tid($legacy_tid), '3mgqt45pjds2s', 'legacy zero-padded TID repairs losslessly'); 29 + ok(!is_valid_tid($legacy_tid), 'legacy zero-padded TID is invalid'); 30 + ok(is_valid_tid(repair_tid($legacy_tid)), 'repaired legacy TID is valid'); 31 + 32 + my $next = next_tid($legacy_tid); 33 + ok(is_valid_tid($next), 'next_tid emits a valid TID after an invalid predecessor'); 34 + ok($next gt repair_tid($legacy_tid), 'next_tid stays monotonic across repaired predecessor'); 35 + 36 + my $app = ATProto::PDS->new( 37 + project_root => $root, 38 + settings => { 39 + base_url => 'http://127.0.0.1:7755', 40 + service_handle_domain => 'example.test', 41 + service_did_method => 'did:web', 42 + jwt_secret => 'tid-repair-secret', 43 + admin_password => 'admin-secret', 44 + data_dir => File::Spec->catdir($tmp, 'data'), 45 + db_path => File::Spec->catfile($tmp, 'perlsky.sqlite'), 46 + }, 47 + ); 48 + 49 + my $t = Test::Mojo->new($app); 50 + $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 51 + handle => 'alice.example.test', 52 + email => 'alice@example.test', 53 + password => 'hunter22', 54 + })->status_is(200); 55 + 56 + my $session = $t->tx->res->json; 57 + my $did = $session->{did}; 58 + my $account = $app->store->get_account_by_did($did); 59 + 60 + my $post_rkey = '3mgqt45piek0s'; 61 + my $reply_rkey = '3mgqt45pjds0s'; 62 + 63 + $app->repo_manager->apply_writes($account, [ 64 + { 65 + action => 'create', 66 + collection => 'app.bsky.feed.post', 67 + rkey => $post_rkey, 68 + record => { 69 + '$type' => 'app.bsky.feed.post', 70 + text => 'legacy tid root', 71 + createdAt => '2026-03-11T02:00:00Z', 72 + }, 73 + }, 74 + { 75 + action => 'create', 76 + collection => 'app.bsky.feed.post', 77 + rkey => $reply_rkey, 78 + record => { 79 + '$type' => 'app.bsky.feed.post', 80 + text => 'legacy tid reply', 81 + createdAt => '2026-03-11T02:00:01Z', 82 + reply => { 83 + root => { uri => "at://$did/app.bsky.feed.post/$post_rkey" }, 84 + parent => { uri => "at://$did/app.bsky.feed.post/$post_rkey" }, 85 + }, 86 + }, 87 + }, 88 + ]); 89 + 90 + ok($app->store->get_record($did, 'app.bsky.feed.post', $post_rkey), 'legacy root post exists before repair'); 91 + ok($app->store->get_record($did, 'app.bsky.feed.post', $reply_rkey), 'legacy reply post exists before repair'); 92 + 93 + my $repair = $app->repo_manager->repair_invalid_tids($account); 94 + ok($repair->{changed}, 'repair migrates the repo'); 95 + is($repair->{repaired_paths}, 2, 'repair updated both legacy TID record keys'); 96 + 97 + my $fixed_post_rkey = repair_tid($post_rkey); 98 + my $fixed_reply_rkey = repair_tid($reply_rkey); 99 + ok($app->store->get_record($did, 'app.bsky.feed.post', $fixed_post_rkey), 'root post moved to repaired rkey'); 100 + ok($app->store->get_record($did, 'app.bsky.feed.post', $fixed_reply_rkey), 'reply post moved to repaired rkey'); 101 + ok(!$app->store->get_record($did, 'app.bsky.feed.post', $post_rkey), 'old root post rkey no longer exists'); 102 + ok(!$app->store->get_record($did, 'app.bsky.feed.post', $reply_rkey), 'old reply post rkey no longer exists'); 103 + 104 + my $reply = $app->store->get_record($did, 'app.bsky.feed.post', $fixed_reply_rkey); 105 + is( 106 + $reply->{value}{reply}{root}{uri}, 107 + "at://$did/app.bsky.feed.post/$fixed_post_rkey", 108 + 'repair rewrites self-referential root URIs', 109 + ); 110 + is( 111 + $reply->{value}{reply}{parent}{uri}, 112 + "at://$did/app.bsky.feed.post/$fixed_post_rkey", 113 + 'repair rewrites self-referential parent URIs', 114 + ); 115 + 116 + my $latest = $app->store->get_latest_commit($did); 117 + ok(is_valid_tid($latest->{rev}), 'latest repo rev is valid after repair'); 118 + 119 + done_testing;