perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Align repo commit ancestry with reference PDS

alice 00a47368 ccc2a1c6

+72 -12
+4 -4
lib/ATProto/PDS/Repo/Manager.pm
··· 135 135 version => 3, 136 136 data => $mst->{root}, 137 137 rev => $rev, 138 - prev => $latest ? ATProto::PDS::Repo::CID->from_string($latest->{cid}) : undef, 138 + prev => undef, 139 139 }; 140 140 my $unsigned_bytes = encode_dag_cbor($unsigned); 141 141 my $sig = sign_compact_low_s($account->{private_key}, $unsigned_bytes); ··· 183 183 commit_cid => $commit_cid->to_string, 184 184 payload => { 185 185 ops => \@ops, 186 - since => undef, 186 + since => $latest ? $latest->{rev} : undef, 187 187 prevData => $latest ? $latest->{root_cid} : undef, 188 188 }, 189 189 car_bytes => $car_bytes, ··· 265 265 version => 3, 266 266 data => $mst->{root}, 267 267 rev => $rev, 268 - prev => $latest ? ATProto::PDS::Repo::CID->from_string($latest->{cid}) : undef, 268 + prev => undef, 269 269 }; 270 270 my $unsigned_bytes = encode_dag_cbor($unsigned); 271 271 my $sig = sign_compact_low_s($account->{private_key}, $unsigned_bytes); ··· 310 310 commit_cid => $commit_cid->to_string, 311 311 payload => { 312 312 ops => \@ops, 313 - since => undef, 313 + since => $latest ? $latest->{rev} : undef, 314 314 prevData => $prev_data, 315 315 }, 316 316 car_bytes => $next_car_bytes,
+25 -8
script/differential-validate
··· 29 29 use Mojo::URL; 30 30 use Mojo::UserAgent; 31 31 use ATProto::PDS::EventStream qw(decode_frame); 32 + use ATProto::PDS::Repo::CAR qw(read_car); 33 + use ATProto::PDS::Repo::DagCbor qw(decode_dag_cbor); 32 34 33 35 my $root = File::Spec->rel2abs(File::Spec->catdir(dirname(__FILE__), '..')); 34 36 my $tmp = tempdir(CLEANUP => 1); ··· 356 358 my $header = $frame->{header} || {}; 357 359 my $body = $frame->{body} || {}; 358 360 my $first = $body->{ops}[0] || {}; 361 + my $commit_prev_is_null = 0; 362 + 363 + if (defined $body->{commit} && defined $body->{blocks}) { 364 + my $car = read_car($body->{blocks}); 365 + my ($commit_block) = grep { 366 + $_->{cid}->to_string eq $body->{commit}->to_string 367 + } @{ $car->{blocks} || [] }; 368 + if ($commit_block) { 369 + my $commit = decode_dag_cbor($commit_block->{bytes}); 370 + $commit_prev_is_null = exists($commit->{prev}) && !defined($commit->{prev}) ? 1 : 0; 371 + } 372 + } 359 373 360 374 return { 361 - op => $header->{op}, 362 - type => $header->{t}, 363 - repo_match => (($body->{repo} // q()) eq $did) ? 1 : 0, 364 - action => $first->{action}, 365 - path => $first->{path}, 366 - has_blocks => length($body->{blocks} // q()) > 0 ? 1 : 0, 367 - has_commit => defined $body->{commit} ? 1 : 0, 368 - seq_is_int => defined($body->{seq}) && $body->{seq} =~ /\A\d+\z/ ? 1 : 0, 375 + op => $header->{op}, 376 + type => $header->{t}, 377 + repo_match => (($body->{repo} // q()) eq $did) ? 1 : 0, 378 + action => $first->{action}, 379 + path => $first->{path}, 380 + has_blocks => length($body->{blocks} // q()) > 0 ? 1 : 0, 381 + has_commit => defined $body->{commit} ? 1 : 0, 382 + has_since => exists($body->{since}) ? 1 : 0, 383 + since_is_stringish => !defined($body->{since}) || !ref($body->{since}) ? 1 : 0, 384 + seq_is_int => defined($body->{seq}) && $body->{seq} =~ /\A\d+\z/ ? 1 : 0, 385 + commit_prev_is_null => $commit_prev_is_null, 369 386 }; 370 387 } 371 388
+43
t/firehose.t
··· 19 19 20 20 use Test::Mojo; 21 21 use ATProto::PDS; 22 + use ATProto::PDS::Repo::CAR qw(read_car); 23 + use ATProto::PDS::Repo::DagCbor qw(decode_dag_cbor); 22 24 use ATProto::PDS::EventStream qw(decode_frame); 23 25 24 26 my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); ··· 51 53 my $access = $session->{accessJwt}; 52 54 53 55 my $baseline_seq = $app->store->latest_event_seq; 56 + my $prior_rev = $app->store->get_latest_commit($did)->{rev}; 54 57 55 58 $ws->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos'); 56 59 is($ws->message, undef, 'no backlog is emitted when no cursor is supplied'); ··· 80 83 is($decoded->{body}{seq}, $baseline_seq + 1, 'sequence advances from prior high water mark'); 81 84 is($decoded->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose', 'operation path is preserved'); 82 85 is($decoded->{body}{ops}[0]{action}, 'create', 'operation action is preserved'); 86 + is($decoded->{body}{since}, $prior_rev, 'first emitted commit advertises the previous rev'); 87 + 88 + my $initial_car = read_car($decoded->{body}{blocks}); 89 + my $initial_commit_block = (grep { $_->{cid}->to_string eq $decoded->{body}{commit}->to_string } @{ $initial_car->{blocks} })[0]; 90 + ok($initial_commit_block, 'initial commit block is present in emitted CAR'); 91 + my $initial_commit = decode_dag_cbor($initial_commit_block->{bytes}); 92 + ok(exists $initial_commit->{prev}, 'initial commit includes prev for compatibility'); 93 + ok(!defined $initial_commit->{prev}, 'initial commit prev is null'); 94 + 95 + my $initial_rev = $decoded->{body}{rev}; 83 96 84 97 $ws->finish_ok; 85 98 ··· 91 104 my $replayed = decode_frame($replay->message->[1]); 92 105 is($replayed->{body}{seq}, $latest_seq, 'cursor replay is inclusive'); 93 106 $replay->finish_ok; 107 + 108 + my $follow = Test::Mojo->new($app); 109 + $follow->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos'); 110 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 111 + Authorization => "Bearer $access", 112 + } => json => { 113 + repo => $did, 114 + collection => 'app.bsky.feed.post', 115 + rkey => 'firehose-second', 116 + record => { 117 + '$type' => 'app.bsky.feed.post', 118 + text => 'follow-up firehose', 119 + createdAt => '2026-03-10T00:00:01Z', 120 + }, 121 + })->status_is(200); 122 + 123 + $follow->message_ok('received a second firehose frame') 124 + ->message_like({binary => qr/.+/}, 'second firehose frame is binary'); 125 + 126 + my $second = decode_frame($follow->message->[1]); 127 + is($second->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose-second', 'second operation path is preserved'); 128 + is($second->{body}{since}, $initial_rev, 'subsequent commit advertises the previous rev'); 129 + 130 + my $second_car = read_car($second->{body}{blocks}); 131 + my $second_commit_block = (grep { $_->{cid}->to_string eq $second->{body}{commit}->to_string } @{ $second_car->{blocks} })[0]; 132 + ok($second_commit_block, 'second commit block is present in emitted CAR'); 133 + my $second_commit = decode_dag_cbor($second_commit_block->{bytes}); 134 + ok(exists $second_commit->{prev}, 'subsequent commit includes prev for compatibility'); 135 + ok(!defined $second_commit->{prev}, 'subsequent commit prev is null'); 136 + $follow->finish_ok; 94 137 95 138 my $future = Test::Mojo->new($app); 96 139 $future->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos?cursor=999999999')