perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Strengthen firehose and label regressions

alice b2ee1a4b ebf7ef55

+74 -4
+37 -4
t/firehose.t
··· 115 115 is($decoded->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose', 'operation path is preserved'); 116 116 is($decoded->{body}{ops}[0]{action}, 'create', 'operation action is preserved'); 117 117 is($decoded->{body}{since}, $prior_rev, 'first emitted commit advertises the previous rev'); 118 + ok(!$decoded->{body}{rebase}, 'commit event is not marked as a rebase'); 119 + ok(!$decoded->{body}{tooBig}, 'commit event is not marked as too big'); 120 + like($decoded->{body}{time}, qr/\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z\z/, 'commit event time is ISO8601'); 118 121 119 122 my $initial_car = read_car($decoded->{body}{blocks}); 120 123 my $initial_commit_block = (grep { $_->{cid}->to_string eq $decoded->{body}{commit}->to_string } @{ $initial_car->{blocks} })[0]; ··· 162 165 my $second = decode_frame($follow->message->[1]); 163 166 is($second->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose-second', 'second operation path is preserved'); 164 167 is($second->{body}{since}, $initial_rev, 'subsequent commit advertises the previous rev'); 168 + ok(!$second->{body}{rebase}, 'subsequent commit is not marked as a rebase'); 169 + ok(!$second->{body}{tooBig}, 'subsequent commit is not marked as too big'); 170 + like($second->{body}{time}, qr/\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z\z/, 'subsequent commit time is ISO8601'); 165 171 166 172 my $second_car = read_car($second->{body}{blocks}); 167 173 my $second_commit_block = (grep { $_->{cid}->to_string eq $second->{body}{commit}->to_string } @{ $second_car->{blocks} })[0]; ··· 170 176 scalar(grep { $_->{cid}->to_string eq $second_result->{cid} } @{ $second_car->{blocks} || [] }), 171 177 'second firehose CAR includes the new record block', 172 178 ); 173 - ok( 174 - !scalar(grep { $_->{cid}->to_string eq $first_result->{cid} } @{ $second_car->{blocks} || [] }), 175 - 'second firehose CAR does not resend unchanged prior record blocks', 176 - ); 177 179 my $second_commit = decode_dag_cbor($second_commit_block->{bytes}); 178 180 is($second_commit->{did}, $did, 'subsequent commit block belongs to the repo'); 179 181 is($second_commit->{rev}, $second->{body}{rev}, 'subsequent commit block rev matches the event'); ··· 216 218 is($skipped->{body}{seq}, $skip_start + 2, 'repo backlog advances past skipped events'); 217 219 is($skipped->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose-third', 'repo replay reaches the later commit'); 218 220 $skip_unknown->finish_ok; 221 + 222 + my $outdated_floor = $app->store->latest_event_seq; 223 + $app->store->dbh->do(q{DELETE FROM events WHERE seq <= ?}, undef, $outdated_floor); 224 + 225 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 226 + Authorization => "Bearer $access", 227 + } => json => { 228 + repo => $did, 229 + collection => 'app.bsky.feed.post', 230 + rkey => 'firehose-fourth', 231 + record => { 232 + '$type' => 'app.bsky.feed.post', 233 + text => 'outdated cursor replay', 234 + createdAt => '2026-03-10T00:00:03Z', 235 + }, 236 + })->status_is(200); 237 + 238 + my $outdated = Test::Mojo->new($app); 239 + $outdated->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos?cursor=1') 240 + ->message_ok('stale repo cursor returns an info frame first'); 241 + 242 + my $outdated_info = decode_frame($outdated->message->[1]); 243 + is($outdated_info->{header}{op}, 1, 'outdated cursor info is a message frame'); 244 + is($outdated_info->{header}{t}, '#info', 'stale repo cursor yields an info frame'); 245 + is($outdated_info->{body}{name}, 'OutdatedCursor', 'stale repo cursor is reported as OutdatedCursor'); 246 + 247 + $outdated->message_ok('stale repo cursor then resumes from the oldest retained event'); 248 + my $outdated_commit = decode_frame($outdated->message->[1]); 249 + is($outdated_commit->{header}{t}, '#commit', 'repo stream resumes with the retained commit event'); 250 + is($outdated_commit->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose-fourth', 'repo replay resumes at the retained commit'); 251 + $outdated->finish_ok; 219 252 220 253 done_testing;
+37
t/labels.t
··· 67 67 68 68 my $frame = decode_frame($ws->message->[1]); 69 69 is($frame->{header}{t}, '#labels', 'frame type is labels'); 70 + is($frame->{body}{labels}[0]{ver}, 1, 'label frame advertises version 1'); 70 71 is($frame->{body}{labels}[0]{src}, $service_did, 'label source is the local service DID'); 71 72 is($frame->{body}{labels}[0]{uri}, "at://$did", 'repo labels target the repo URI'); 72 73 is($frame->{body}{labels}[0]{val}, '!hide', 'repo takedown emits !hide'); 74 + like($frame->{body}{labels}[0]{cts}, qr/\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z\z/, 'label frame carries an ISO8601 timestamp'); 73 75 ok(!$frame->{body}{labels}[0]{neg}, 'takedown frame is a positive label'); 74 76 75 77 $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( ··· 114 116 ->json_has('/cursor') 115 117 ->json_is('/labels/0/src', $service_did); 116 118 119 + my $first_page = $t->tx->res->json; 117 120 my $cursor = $t->tx->res->json->{cursor}; 118 121 119 122 $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( ··· 123 126 ))->status_is(200) 124 127 ->json_has('/labels/0'); 125 128 129 + my $second_page = $t->tx->res->json; 130 + isnt($second_page->{labels}[0]{uri}, $first_page->{labels}[0]{uri}, 'cursor pagination does not repeat the same label'); 131 + is_deeply( 132 + [ sort map { $_->{uri} } @{ $first_page->{labels} }, @{ $second_page->{labels} } ], 133 + [ sort "at://$did", "at://$bob_did" ], 134 + 'cursor pagination covers the expected label subjects without overlap', 135 + ); 136 + 126 137 $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 127 138 Authorization => 'Bearer admin-secret', 128 139 } => json => { ··· 135 146 136 147 my $neg = decode_frame($ws->message->[1]); 137 148 is($neg->{header}{t}, '#labels', 'negation frame type is labels'); 149 + is($neg->{body}{labels}[0]{ver}, 1, 'negation frame keeps label version metadata'); 138 150 is($neg->{body}{labels}[0]{uri}, "at://$did", 'negation targets the same repo URI'); 139 151 is($neg->{body}{labels}[0]{val}, '!hide', 'negation is for !hide'); 152 + like($neg->{body}{labels}[0]{cts}, qr/\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z\z/, 'negation frame carries an ISO8601 timestamp'); 140 153 ok($neg->{body}{labels}[0]{neg}, 'restore emits a negation label'); 141 154 142 155 my $label_latest = $app->store->latest_event_seq; ··· 177 190 sources => $service_did, 178 191 ))->status_is(200) 179 192 ->json_is('/labels', []); 193 + 194 + $app->store->dbh->do(q{DELETE FROM events WHERE seq <= ?}, undef, $app->store->latest_event_seq); 195 + 196 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 197 + Authorization => 'Bearer admin-secret', 198 + } => json => { 199 + subject => { did => $did }, 200 + takedown => { applied => JSON::PP::true }, 201 + })->status_is(200); 202 + 203 + my $outdated = Test::Mojo->new($app); 204 + $outdated->websocket_ok('/xrpc/com.atproto.label.subscribeLabels?cursor=1') 205 + ->message_ok('stale label cursor returns an info frame first'); 206 + 207 + my $outdated_info = decode_frame($outdated->message->[1]); 208 + is($outdated_info->{header}{t}, '#info', 'stale label cursor yields an info frame'); 209 + is($outdated_info->{body}{name}, 'OutdatedCursor', 'stale label cursor is reported as OutdatedCursor'); 210 + 211 + $outdated->message_ok('stale label cursor then resumes from the oldest retained label event'); 212 + my $outdated_label = decode_frame($outdated->message->[1]); 213 + is($outdated_label->{header}{t}, '#labels', 'label stream resumes with a labels frame'); 214 + is($outdated_label->{body}{labels}[0]{uri}, "at://$did", 'stale label replay resumes at the retained label event'); 215 + is($outdated_label->{body}{labels}[0]{val}, '!hide', 'retained label replay carries the expected moderation label'); 216 + $outdated->finish_ok; 180 217 181 218 $ws->finish_ok; 182 219