perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Upgrade firehose stream framing and delivery

alice 922e53ab fe97477a

+326 -40
+58 -38
lib/ATProto/PDS/API/Sync.pm
··· 7 7 8 8 use Exporter 'import'; 9 9 use JSON::PP (); 10 + use Mojo::IOLoop; 10 11 12 + use ATProto::PDS::EventStream qw(encode_error_frame encode_info_frame encode_message_frame); 11 13 use ATProto::PDS::API::Util qw(iso8601 resolve_did_account xrpc_error); 12 14 use ATProto::PDS::Identity qw(service_host); 13 15 use ATProto::PDS::Repo::CAR qw(write_car); 14 16 use ATProto::PDS::Repo::CID; 17 + use ATProto::PDS::Repo::Bytes; 15 18 16 19 our @EXPORT_OK = qw(register_sync_handlers); 17 20 ··· 213 216 }); 214 217 215 218 $registry->register('com.atproto.sync.subscribeRepos', sub ($c, $endpoint) { 216 - my $cursor = int($c->param('cursor') // 0); 219 + my $cursor_param = $c->param('cursor'); 217 220 my $latest = $c->store->latest_event_seq; 218 - if ($cursor > $latest) { 219 - $c->send({ json => { 220 - name => 'OutdatedCursor', 221 - message => 'Cursor is ahead of the local event stream', 222 - }}); 223 - $c->finish(1000); 224 - return; 221 + my $oldest = $c->store->oldest_event_seq; 222 + 223 + my $next_seq; 224 + if (!defined $cursor_param || $cursor_param eq q()) { 225 + $next_seq = $latest + 1; 226 + } else { 227 + my $cursor = int($cursor_param); 228 + if ($cursor > $latest + 1) { 229 + $c->send({ binary => encode_error_frame('FutureCursor', 'Cursor is ahead of the local event stream') }); 230 + $c->finish(1008); 231 + return; 232 + } 233 + if ($oldest && $cursor && $cursor < $oldest) { 234 + $c->send({ binary => encode_info_frame('OutdatedCursor', 'Cursor predates the oldest locally retained event') }); 235 + $next_seq = $oldest; 236 + } else { 237 + $next_seq = $cursor || ($oldest || ($latest + 1)); 238 + } 225 239 } 226 240 227 - my $events = $c->store->list_events_after($cursor, limit => 100); 228 - for my $event (@$events) { 229 - my $message = _event_message($event); 230 - next unless $message; 231 - $c->send({ json => $message }); 232 - } 241 + my $drain; 242 + $drain = sub { 243 + my $events = $c->store->list_events_from($next_seq, limit => 100); 244 + for my $event (@$events) { 245 + my $frame = _event_frame($event); 246 + next unless $frame; 247 + $next_seq = $event->{seq} + 1; 248 + $c->send({ binary => $frame }); 249 + } 250 + }; 233 251 234 - $c->finish(1000); 252 + $drain->(); 253 + my $timer_id = Mojo::IOLoop->recurring(0.25 => sub { $drain->() }); 254 + $c->on(finish => sub ($c, $code, $reason = undef) { 255 + Mojo::IOLoop->remove($timer_id) if defined $timer_id; 256 + }); 235 257 return; 236 258 }); 237 259 } ··· 253 275 }; 254 276 } 255 277 256 - sub _event_message ($event) { 278 + sub _event_frame ($event) { 257 279 if (($event->{type} // q()) eq 'commit') { 258 - my @ops; 259 - my $writes = $event->{payload}{writes} || []; 260 - my $results = $event->{payload}{results} || []; 261 - for my $idx (0 .. $#$writes) { 262 - my $write = $writes->[$idx]; 263 - my $result = $results->[$idx] || {}; 264 - push @ops, { 265 - action => $write->{action}, 266 - path => join('/', grep { defined && length } $write->{collection}, $write->{rkey}), 267 - cid => ($write->{action} eq 'delete' ? undef : $result->{cid}), 268 - }; 269 - } 270 - return { 280 + return encode_message_frame('#commit', { 271 281 seq => 0 + $event->{seq}, 272 282 rebase => JSON::PP::false, 273 283 tooBig => JSON::PP::false, 274 284 repo => $event->{did}, 275 - commit => { '$link' => $event->{commit_cid} }, 285 + commit => ATProto::PDS::Repo::CID->from_string($event->{commit_cid}), 276 286 rev => $event->{rev}, 277 - since => undef, 278 - blocks => unpack('H*', $event->{car_bytes} // q()), 279 - ops => \@ops, 287 + since => $event->{payload}{since}, 288 + blocks => ATProto::PDS::Repo::Bytes->new($event->{car_bytes} // q()), 289 + ops => [ 290 + map { 291 + +{ 292 + action => $_->{action}, 293 + path => $_->{path}, 294 + cid => defined($_->{cid}) ? ATProto::PDS::Repo::CID->from_string($_->{cid}) : undef, 295 + (defined($_->{prev}) ? (prev => ATProto::PDS::Repo::CID->from_string($_->{prev})) : ()), 296 + } 297 + } @{ $event->{payload}{ops} || [] } 298 + ], 280 299 blobs => [], 300 + (defined($event->{payload}{prevData}) ? (prevData => ATProto::PDS::Repo::CID->from_string($event->{payload}{prevData})) : ()), 281 301 time => iso8601($event->{created_at}), 282 - }; 302 + }); 283 303 } 284 304 285 305 if (($event->{type} // q()) eq 'identity') { 286 - return { 306 + return encode_message_frame('#identity', { 287 307 seq => 0 + $event->{seq}, 288 308 did => $event->{did}, 289 309 handle => $event->{payload}{handle}, 290 310 time => iso8601($event->{created_at}), 291 - }; 311 + }); 292 312 } 293 313 294 314 if (($event->{type} // q()) eq 'account') { 295 - return { 315 + return encode_message_frame('#account', { 296 316 seq => 0 + $event->{seq}, 297 317 did => $event->{did}, 298 318 active => $event->{payload}{active} ? JSON::PP::true : JSON::PP::false, 299 319 ($event->{payload}{status} ? (status => $event->{payload}{status}) : ()), 300 320 time => iso8601($event->{created_at}), 301 - }; 321 + }); 302 322 } 303 323 304 324 return undef;
+66
lib/ATProto/PDS/EventStream.pm
··· 1 + package ATProto::PDS::EventStream; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use Exporter 'import'; 9 + use CBOR::XS (); 10 + 11 + use ATProto::PDS::Repo::DagCbor qw(encode_dag_cbor); 12 + use ATProto::PDS::Repo::CID; 13 + 14 + our @EXPORT_OK = qw( 15 + decode_frame 16 + encode_error_frame 17 + encode_info_frame 18 + encode_message_frame 19 + ); 20 + 21 + sub encode_message_frame ($type, $payload) { 22 + my $header = { 23 + op => 1, 24 + t => $type, 25 + }; 26 + return encode_dag_cbor($header) . encode_dag_cbor($payload); 27 + } 28 + 29 + sub encode_error_frame ($error, $message = undef) { 30 + my $header = { 31 + op => -1, 32 + }; 33 + my $body = { 34 + error => $error, 35 + (defined $message ? (message => $message) : ()), 36 + }; 37 + return encode_dag_cbor($header) . encode_dag_cbor($body); 38 + } 39 + 40 + sub encode_info_frame ($name, $message = undef) { 41 + return encode_message_frame('#info', { 42 + name => $name, 43 + (defined $message ? (message => $message) : ()), 44 + }); 45 + } 46 + 47 + sub decode_frame ($bytes) { 48 + my $decoder = CBOR::XS->new->filter(sub { 49 + my ($tag, $value) = @_; 50 + if ($tag == 42 && !ref($value)) { 51 + my $cid_bytes = substr($value, 1); 52 + return ATProto::PDS::Repo::CID->from_bytes($cid_bytes); 53 + } 54 + return; 55 + }); 56 + 57 + my ($header, $header_len) = $decoder->decode_prefix($bytes); 58 + my ($body, $body_len) = $decoder->decode_prefix(substr($bytes, $header_len)); 59 + return { 60 + header => $header, 61 + body => $body, 62 + consumed => $header_len + $body_len, 63 + }; 64 + } 65 + 66 + 1;
+22 -2
lib/ATProto/PDS/Repo/Manager.pm
··· 67 67 } 68 68 } @{ $store->all_records_for_did($did) } 69 69 }; 70 + my %previous_records = map { $_ => { %{ $records->{$_} } } } keys %$records; 70 71 71 72 my @results; 73 + my @ops; 72 74 for my $write (@$writes) { 73 75 my $action = $write->{action} // ''; 74 76 if ($action eq 'delete') { 75 - my $path = $write->{collection} . '/' . $write->{rkey}; 77 + my $path = join('/', grep { defined && length } $write->{collection}, $write->{rkey}); 78 + my $previous = $previous_records{$path}; 76 79 delete $records->{$path}; 77 80 push @results, {}; 81 + push @ops, { 82 + action => 'delete', 83 + path => $path, 84 + cid => undef, 85 + ($previous ? (prev => $previous->{cid}) : ()), 86 + }; 78 87 next; 79 88 } 80 89 ··· 84 93 my $bytes = encode_dag_cbor($value); 85 94 my $cid = ATProto::PDS::Repo::CID->for_dag_cbor($bytes); 86 95 my $path = $collection . '/' . $rkey; 96 + my $previous = $previous_records{$path}; 87 97 $records->{$path} = { 88 98 collection => $collection, 89 99 rkey => $rkey, ··· 95 105 uri => "at://$did/$collection/$rkey", 96 106 cid => $cid->to_string, 97 107 validationStatus => 'unknown', 108 + }; 109 + push @ops, { 110 + action => $previous ? 'update' : 'create', 111 + path => $path, 112 + cid => $cid->to_string, 113 + ($previous ? (prev => $previous->{cid}) : ()), 98 114 }; 99 115 } 100 116 ··· 154 170 type => 'commit', 155 171 rev => $rev, 156 172 commit_cid => $commit_cid->to_string, 157 - payload => { writes => $writes, results => \@results }, 173 + payload => { 174 + ops => \@ops, 175 + since => undef, 176 + prevData => $latest ? $latest->{root_cid} : undef, 177 + }, 158 178 car_bytes => $car_bytes, 159 179 ); 160 180 });
+14
lib/ATProto/PDS/Store/SQLite.pm
··· 730 730 return [ map { _row_from_json_columns($_, qw(payload_json)) } @$rows ]; 731 731 } 732 732 733 + sub list_events_from ($self, $cursor, %args) { 734 + my $limit = $args{limit} // 100; 735 + my $sql = q{SELECT * FROM events WHERE seq >= ? ORDER BY seq LIMIT ?}; 736 + my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, $cursor // 0, $limit); 737 + return [ map { _row_from_json_columns($_, qw(payload_json)) } @$rows ]; 738 + } 739 + 733 740 sub latest_event_seq ($self) { 734 741 return $self->dbh->selectrow_array( 735 742 q{SELECT COALESCE(MAX(seq), 0) FROM events}, 736 743 ) // 0; 744 + } 745 + 746 + sub oldest_event_seq ($self) { 747 + my $value = $self->dbh->selectrow_array( 748 + q{SELECT MIN(seq) FROM events}, 749 + ); 750 + return defined $value ? $value : 0; 737 751 } 738 752 739 753 sub create_action_token ($self, %args) {
+62
script/firehose-probe
··· 1 + #!/usr/bin/env perl 2 + use v5.34; 3 + use warnings; 4 + use feature 'signatures'; 5 + no warnings 'experimental::signatures'; 6 + 7 + use FindBin qw($Bin); 8 + use File::Spec; 9 + use lib File::Spec->catdir($Bin, '..', 'lib'); 10 + use lib File::Spec->catdir($Bin, '..', 'local', 'lib', 'perl5'); 11 + 12 + BEGIN { 13 + require Config; 14 + require lib; 15 + lib->import(File::Spec->catdir($Bin, '..', 'local', 'lib', 'perl5', $Config::Config{archname})); 16 + } 17 + 18 + use Mojo::IOLoop; 19 + use Mojo::URL; 20 + use Mojo::UserAgent; 21 + use ATProto::PDS::EventStream qw(decode_frame); 22 + 23 + my $url = shift(@ARGV) // 'wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos'; 24 + my $max = shift(@ARGV) // 3; 25 + my $timeout = shift(@ARGV) // 10; 26 + 27 + my $ua = Mojo::UserAgent->new; 28 + my $count = 0; 29 + my $done = 0; 30 + 31 + my $timer = Mojo::IOLoop->timer($timeout => sub { 32 + die "timed out after ${timeout}s waiting for firehose frames\n"; 33 + }); 34 + 35 + $ua->websocket( 36 + Mojo::URL->new($url) => sub ($ua, $tx) { 37 + die "websocket handshake failed: " . ($tx->res->error->{message} // 'unknown error') . "\n" 38 + unless $tx->is_websocket; 39 + 40 + $tx->on(binary => sub ($tx, $bytes) { 41 + my $frame = decode_frame($bytes); 42 + my $header = $frame->{header} || {}; 43 + my $body = $frame->{body} || {}; 44 + my $type = defined($header->{t}) ? $header->{t} : '<error>'; 45 + my $seq = defined($body->{seq}) ? $body->{seq} : '-'; 46 + my $repo = $body->{repo} // $body->{did} // '-'; 47 + print "frame $count: op=$header->{op} type=$type seq=$seq repo=$repo\n"; 48 + $count++; 49 + if ($count >= $max) { 50 + $done = 1; 51 + Mojo::IOLoop->remove($timer); 52 + $tx->finish(1000); 53 + } 54 + }); 55 + 56 + $tx->on(finish => sub ($tx, $code, $reason = undef) { 57 + Mojo::IOLoop->stop if $done || $count; 58 + }); 59 + } 60 + ); 61 + 62 + Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+104
t/firehose.t
··· 1 + use v5.34; 2 + use warnings; 3 + 4 + use Config (); 5 + use File::Spec; 6 + use File::Temp qw(tempdir); 7 + use FindBin qw($Bin); 8 + use Test2::V0; 9 + 10 + BEGIN { 11 + require lib; 12 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 13 + lib->import( 14 + File::Spec->catdir($root, 'lib'), 15 + File::Spec->catdir($root, 'local', 'lib', 'perl5'), 16 + File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 17 + ); 18 + } 19 + 20 + use Test::Mojo; 21 + use ATProto::PDS; 22 + use ATProto::PDS::EventStream qw(decode_frame); 23 + 24 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 25 + my $tmp = tempdir(CLEANUP => 1); 26 + 27 + my $app = ATProto::PDS->new( 28 + project_root => $root, 29 + settings => { 30 + base_url => 'http://127.0.0.1:7755', 31 + service_handle_domain => 'example.test', 32 + service_did_method => 'did:web', 33 + jwt_secret => 'firehose-secret', 34 + admin_password => 'admin-secret', 35 + data_dir => File::Spec->catdir($tmp, 'data'), 36 + db_path => File::Spec->catfile($tmp, 'perlds.sqlite'), 37 + }, 38 + ); 39 + 40 + my $t = Test::Mojo->new($app); 41 + my $ws = Test::Mojo->new($app); 42 + 43 + $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 44 + handle => 'alice.example.test', 45 + email => 'alice@example.test', 46 + password => 'hunter22', 47 + })->status_is(200); 48 + 49 + my $session = $t->tx->res->json; 50 + my $did = $session->{did}; 51 + my $access = $session->{accessJwt}; 52 + 53 + my $baseline_seq = $app->store->latest_event_seq; 54 + 55 + $ws->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos'); 56 + is($ws->message, undef, 'no backlog is emitted when no cursor is supplied'); 57 + 58 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 59 + Authorization => "Bearer $access", 60 + } => json => { 61 + repo => $did, 62 + collection => 'app.bsky.feed.post', 63 + rkey => 'firehose', 64 + record => { 65 + '$type' => 'app.bsky.feed.post', 66 + text => 'hello firehose', 67 + createdAt => '2026-03-10T00:00:00Z', 68 + }, 69 + })->status_is(200); 70 + 71 + $ws->message_ok('received a firehose frame') 72 + ->message_like({binary => qr/.+/}, 'firehose frame is binary'); 73 + 74 + my $decoded = decode_frame($ws->message->[1]); 75 + is($decoded->{header}{op}, 1, 'frame is an event message'); 76 + is($decoded->{header}{t}, '#commit', 'frame type is commit'); 77 + is($decoded->{body}{repo}, $did, 'commit identifies repo'); 78 + ok($decoded->{body}{commit}->isa('ATProto::PDS::Repo::CID'), 'commit field is decoded as CID'); 79 + ok(length($decoded->{body}{blocks} // q()) > 0, 'blocks field contains raw bytes'); 80 + is($decoded->{body}{seq}, $baseline_seq + 1, 'sequence advances from prior high water mark'); 81 + is($decoded->{body}{ops}[0]{path}, 'app.bsky.feed.post/firehose', 'operation path is preserved'); 82 + is($decoded->{body}{ops}[0]{action}, 'create', 'operation action is preserved'); 83 + 84 + $ws->finish_ok; 85 + 86 + my $latest_seq = $app->store->latest_event_seq; 87 + my $replay = Test::Mojo->new($app); 88 + $replay->websocket_ok("/xrpc/com.atproto.sync.subscribeRepos?cursor=$latest_seq") 89 + ->message_ok('replayed current cursor event'); 90 + 91 + my $replayed = decode_frame($replay->message->[1]); 92 + is($replayed->{body}{seq}, $latest_seq, 'cursor replay is inclusive'); 93 + $replay->finish_ok; 94 + 95 + my $future = Test::Mojo->new($app); 96 + $future->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos?cursor=999999999') 97 + ->message_ok('future cursor returns an error frame'); 98 + 99 + my $error = decode_frame($future->message->[1]); 100 + is($error->{header}{op}, -1, 'future cursor frame is an error'); 101 + is($error->{body}{error}, 'FutureCursor', 'error type is FutureCursor'); 102 + $future->finish_ok; 103 + 104 + done_testing;