perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Strengthen firehose differential coverage

alice 7ba53f9d 93174bfb

+150 -2
+57
script/differential-validate
··· 448 448 repo_match => (($body->{repo} // q()) eq $did) ? 1 : 0, 449 449 action => $first->{action}, 450 450 path => $first->{path}, 451 + ops_count => 0 + @{ $body->{ops} || [] }, 452 + cid_present => defined($first->{cid}) ? 1 : 0, 453 + prev_present => defined($first->{prev}) ? 1 : 0, 454 + prev_data_present => defined($body->{prevData}) ? 1 : 0, 451 455 has_blocks => length($body->{blocks} // q()) > 0 ? 1 : 0, 452 456 has_commit => defined $body->{commit} ? 1 : 0, 453 457 has_since => exists($body->{since}) ? 1 : 0, 454 458 since_is_stringish => !defined($body->{since}) || !ref($body->{since}) ? 1 : 0, 455 459 seq_is_int => defined($body->{seq}) && $body->{seq} =~ /\A\d+\z/ ? 1 : 0, 460 + too_big => $body->{tooBig} ? 1 : 0, 456 461 blocks_count => $blocks_count, 457 462 commit_block_present => $commit_block_present, 458 463 commit_prev_is_null => $commit_prev_is_null, ··· 1346 1351 check( 1347 1352 same_hash($server{reference}{firehose_commit}, $server{perlsky}{firehose_commit}), 1348 1353 'subscribeRepos emits the same normalized commit semantics as the official reference PDS', 1354 + ); 1355 + 1356 + note('Comparing firehose update behavior'); 1357 + for my $name (sort keys %server) { 1358 + my $path = 'app.bsky.feed.post/firehose-diff'; 1359 + my $frame = next_commit_frame( 1360 + "$server{$name}{origin}/xrpc/com.atproto.sync.subscribeRepos", 1361 + $path, 1362 + sub { 1363 + my $res = post_json($server{$name}{origin}, 'com.atproto.repo.putRecord', { 1364 + repo => $server{$name}{did}, 1365 + collection => 'app.bsky.feed.post', 1366 + rkey => 'firehose-diff', 1367 + record => { 1368 + %{$record}, 1369 + text => "firehose update validation for $name", 1370 + }, 1371 + }, auth_header($server{$name}{access})); 1372 + die "putRecord for firehose failed on $name\n" unless $res->is_success; 1373 + }, 1374 + ); 1375 + $server{$name}{firehose_update_commit} = normalize_commit_frame($frame, $server{$name}{did}); 1376 + check(($server{$name}{firehose_update_commit}{action} // q()) eq 'update', "$name firehose update emits an update op"); 1377 + } 1378 + 1379 + check( 1380 + same_hash($server{reference}{firehose_update_commit}, $server{perlsky}{firehose_update_commit}), 1381 + 'subscribeRepos emits the same normalized update semantics as the official reference PDS', 1382 + ); 1383 + 1384 + note('Comparing firehose delete behavior'); 1385 + for my $name (sort keys %server) { 1386 + my $path = 'app.bsky.feed.post/firehose-diff'; 1387 + my $frame = next_commit_frame( 1388 + "$server{$name}{origin}/xrpc/com.atproto.sync.subscribeRepos", 1389 + $path, 1390 + sub { 1391 + my $res = post_json($server{$name}{origin}, 'com.atproto.repo.deleteRecord', { 1392 + repo => $server{$name}{did}, 1393 + collection => 'app.bsky.feed.post', 1394 + rkey => 'firehose-diff', 1395 + }, auth_header($server{$name}{access})); 1396 + die "deleteRecord for firehose failed on $name\n" unless $res->is_success; 1397 + }, 1398 + ); 1399 + $server{$name}{firehose_delete_commit} = normalize_commit_frame($frame, $server{$name}{did}); 1400 + check(($server{$name}{firehose_delete_commit}{action} // q()) eq 'delete', "$name firehose delete emits a delete op"); 1401 + } 1402 + 1403 + check( 1404 + same_hash($server{reference}{firehose_delete_commit}, $server{perlsky}{firehose_delete_commit}), 1405 + 'subscribeRepos emits the same normalized delete semantics as the official reference PDS', 1349 1406 ); 1350 1407 1351 1408 note('Comparing future cursor error behavior');
+93 -2
t/firehose.t
··· 83 83 $bootstrap->finish_ok; 84 84 85 85 my $baseline_seq = $app->store->latest_event_seq; 86 - my $prior_rev = $app->store->get_latest_commit($did)->{rev}; 86 + my $prior_head = $app->store->get_latest_commit($did); 87 + my $prior_rev = $prior_head->{rev}; 87 88 88 89 $ws->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos'); 89 90 is($ws->message, undef, 'no backlog is emitted when no cursor is supplied'); ··· 115 116 is($decoded->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose', 'operation path is preserved'); 116 117 is($decoded->{body}{ops}[0]{action}, 'create', 'operation action is preserved'); 117 118 is($decoded->{body}{since}, $prior_rev, 'first emitted commit advertises the previous rev'); 119 + is($decoded->{body}{prevData}->to_string, $prior_head->{root_cid}, 'first emitted commit advertises the previous data root'); 118 120 ok(!$decoded->{body}{rebase}, 'commit event is not marked as a rebase'); 119 121 ok(!$decoded->{body}{tooBig}, 'commit event is not marked as too big'); 120 122 like($decoded->{body}{time}, qr/\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z\z/, 'commit event time is ISO8601'); ··· 134 136 135 137 $ws->finish_ok; 136 138 137 - my $latest_seq = $app->store->latest_event_seq; 138 139 my $replay = Test::Mojo->new($app); 139 140 $replay->websocket_ok("/xrpc/com.atproto.sync.subscribeRepos?cursor=$baseline_seq"); 140 141 ··· 145 146 146 147 my $follow = Test::Mojo->new($app); 147 148 $follow->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos'); 149 + my $first_head = $app->store->get_latest_commit($did); 148 150 $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 149 151 Authorization => "Bearer $access", 150 152 } => json => { ··· 165 167 my $second = decode_frame($follow->message->[1]); 166 168 is($second->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose-second', 'second operation path is preserved'); 167 169 is($second->{body}{since}, $initial_rev, 'subsequent commit advertises the previous rev'); 170 + is($second->{body}{prevData}->to_string, $first_head->{root_cid}, 'subsequent commit advertises the previous data root'); 168 171 ok(!$second->{body}{rebase}, 'subsequent commit is not marked as a rebase'); 169 172 ok(!$second->{body}{tooBig}, 'subsequent commit is not marked as too big'); 170 173 like($second->{body}{time}, qr/\A\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z\z/, 'subsequent commit time is ISO8601'); ··· 181 184 is($second_commit->{rev}, $second->{body}{rev}, 'subsequent commit block rev matches the event'); 182 185 $follow->finish_ok; 183 186 187 + my $update_watch = Test::Mojo->new($app); 188 + $update_watch->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos'); 189 + my $second_head = $app->store->get_latest_commit($did); 190 + $t->post_ok('/xrpc/com.atproto.repo.putRecord' => { 191 + Authorization => "Bearer $access", 192 + } => json => { 193 + repo => $did, 194 + collection => 'app.bsky.feed.post', 195 + rkey => 'firehose-second', 196 + record => { 197 + '$type' => 'app.bsky.feed.post', 198 + text => 'follow-up firehose edited', 199 + createdAt => '2026-03-10T00:00:01Z', 200 + }, 201 + })->status_is(200); 202 + my $updated_result = $t->tx->res->json; 203 + 204 + $update_watch->message_ok('received an update firehose frame') 205 + ->message_like({binary => qr/.+/}, 'update firehose frame is binary'); 206 + 207 + my $updated = decode_frame($update_watch->message->[1]); 208 + is($updated->{header}{t}, '#commit', 'update frame is a commit'); 209 + is($updated->{body}{ops}[0]{action}, 'update', 'update commit reports an update op'); 210 + is($updated->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose-second', 'update operation path is preserved'); 211 + is($updated->{body}{ops}[0]{cid}->to_string, $updated_result->{cid}, 'update op exposes the replacement record CID'); 212 + is($updated->{body}{ops}[0]{prev}->to_string, $second_result->{cid}, 'update op exposes the previous record CID'); 213 + is($updated->{body}{since}, $second->{body}{rev}, 'update commit advertises the prior rev'); 214 + is($updated->{body}{prevData}->to_string, $second_head->{root_cid}, 'update commit advertises the previous data root'); 215 + 216 + my $updated_car = read_car($updated->{body}{blocks}); 217 + ok( 218 + scalar(grep { $_->{cid}->to_string eq $updated_result->{cid} } @{ $updated_car->{blocks} || [] }), 219 + 'update firehose CAR includes the replacement record block', 220 + ); 221 + ok( 222 + !scalar(grep { $_->{cid}->to_string eq $second_result->{cid} } @{ $updated_car->{blocks} || [] }), 223 + 'update firehose CAR does not resend the superseded record block', 224 + ); 225 + $update_watch->finish_ok; 226 + 227 + my $delete_watch = Test::Mojo->new($app); 228 + $delete_watch->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos'); 229 + my $updated_head = $app->store->get_latest_commit($did); 230 + $t->post_ok('/xrpc/com.atproto.repo.deleteRecord' => { 231 + Authorization => "Bearer $access", 232 + } => json => { 233 + repo => $did, 234 + collection => 'app.bsky.feed.post', 235 + rkey => 'firehose-second', 236 + })->status_is(200); 237 + 238 + $delete_watch->message_ok('received a delete firehose frame') 239 + ->message_like({binary => qr/.+/}, 'delete firehose frame is binary'); 240 + 241 + my $deleted = decode_frame($delete_watch->message->[1]); 242 + is($deleted->{header}{t}, '#commit', 'delete frame is a commit'); 243 + is($deleted->{body}{ops}[0]{action}, 'delete', 'delete commit reports a delete op'); 244 + is($deleted->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose-second', 'delete operation path is preserved'); 245 + ok(!defined $deleted->{body}{ops}[0]{cid}, 'delete op omits the deleted record CID'); 246 + is($deleted->{body}{ops}[0]{prev}->to_string, $updated_result->{cid}, 'delete op exposes the deleted record CID as prev'); 247 + is($deleted->{body}{since}, $updated->{body}{rev}, 'delete commit advertises the prior rev'); 248 + is($deleted->{body}{prevData}->to_string, $updated_head->{root_cid}, 'delete commit advertises the previous data root'); 249 + 250 + my $deleted_car = read_car($deleted->{body}{blocks}); 251 + ok( 252 + !scalar(grep { $_->{cid}->to_string eq $updated_result->{cid} } @{ $deleted_car->{blocks} || [] }), 253 + 'delete firehose CAR does not include the deleted record block', 254 + ); 255 + ok( 256 + !scalar(grep { $_->{cid}->to_string eq $first_result->{cid} } @{ $deleted_car->{blocks} || [] }), 257 + 'delete firehose CAR does not resend unchanged record blocks', 258 + ); 259 + $delete_watch->finish_ok; 260 + 261 + my $firehose_latest = $app->store->latest_event_seq; 262 + my $exclusive = Test::Mojo->new($app); 263 + $exclusive->websocket_ok("/xrpc/com.atproto.sync.subscribeRepos?cursor=$firehose_latest"); 264 + is($exclusive->message, undef, 'repo cursor replay is exclusive at the current latest event'); 265 + $exclusive->finish_ok; 266 + 184 267 my $future = Test::Mojo->new($app); 185 268 $future->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos?cursor=999999999') 186 269 ->message_ok('future cursor returns an error frame'); ··· 189 272 is($error->{header}{op}, -1, 'future cursor frame is an error'); 190 273 is($error->{body}{error}, 'FutureCursor', 'error type is FutureCursor'); 191 274 $future->finish_ok; 275 + 276 + my $future_edge = Test::Mojo->new($app); 277 + $future_edge->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos?cursor=' . ($app->store->latest_event_seq + 1)) 278 + ->message_ok('latest+1 repo cursor is rejected as future'); 279 + 280 + my $future_edge_error = decode_frame($future_edge->message->[1]); 281 + is($future_edge_error->{body}{error}, 'FutureCursor', 'edge future repo cursor is also rejected'); 282 + $future_edge->finish_ok; 192 283 193 284 my $skip_start = $app->store->latest_event_seq; 194 285 $app->store->append_event(