perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Align bootstrap firehose events with reference PDS

alice 19c9aceb 00a47368

+127 -20
+62 -1
lib/ATProto/PDS/API/Server.pm
··· 110 110 $c->store->claim_reserved_signing_key($did) if $reserved && !defined $reserved->{claimed_at}; 111 111 $c->append_event( 112 112 did => $account->{did}, 113 + type => 'identity', 114 + rev => $account->{repo_rev}, 115 + payload => { 116 + did => $account->{did}, 117 + handle => $account->{handle}, 118 + }, 119 + ); 120 + $c->append_event( 121 + did => $account->{did}, 113 122 type => 'account', 114 123 rev => $account->{repo_rev}, 115 124 payload => { 116 125 active => JSON::PP::true, 126 + }, 127 + ); 128 + $c->append_event( 129 + did => $account->{did}, 130 + type => 'commit', 131 + rev => $account->{repo_rev}, 132 + commit_cid => $repo->{cid}, 133 + payload => { 134 + ops => [], 135 + since => undef, 136 + }, 137 + car_bytes => $repo->{car_bytes}, 138 + ); 139 + $c->append_event( 140 + did => $account->{did}, 141 + type => 'sync', 142 + rev => $account->{repo_rev}, 143 + commit_cid => $repo->{cid}, 144 + car_bytes => $repo->{sync_car_bytes}, 145 + payload => { 146 + did => $account->{did}, 117 147 }, 118 148 ); 119 149 ··· 238 268 239 269 $registry->register('com.atproto.server.activateAccount', sub ($c, $endpoint) { 240 270 my (undef, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 241 - $c->store->update_account($account->{did}, deactivated_at => undef); 271 + $account = $c->store->update_account($account->{did}, deactivated_at => undef); 242 272 $c->append_event( 243 273 did => $account->{did}, 244 274 type => 'account', ··· 247 277 active => JSON::PP::true, 248 278 }, 249 279 ); 280 + $c->append_event( 281 + did => $account->{did}, 282 + type => 'identity', 283 + rev => $account->{repo_rev}, 284 + payload => { 285 + did => $account->{did}, 286 + handle => $account->{handle}, 287 + }, 288 + ); 289 + my $commit = $c->store->get_latest_commit($account->{did}); 290 + if ($commit) { 291 + my $sync_car = $commit->{commit_bytes} 292 + ? ATProto::PDS::Repo::CAR::write_car( 293 + ATProto::PDS::Repo::CID->from_string($commit->{cid}), 294 + [{ 295 + cid => ATProto::PDS::Repo::CID->from_string($commit->{cid}), 296 + bytes => $commit->{commit_bytes}, 297 + }], 298 + ) 299 + : undef; 300 + $c->append_event( 301 + did => $account->{did}, 302 + type => 'sync', 303 + rev => $account->{repo_rev}, 304 + commit_cid => $commit->{cid}, 305 + car_bytes => $sync_car, 306 + payload => { 307 + did => $account->{did}, 308 + }, 309 + ) if defined $sync_car; 310 + } 250 311 return {}; 251 312 }); 252 313
+10
lib/ATProto/PDS/API/Sync.pm
··· 323 323 }); 324 324 } 325 325 326 + if (($event->{type} // q()) eq 'sync') { 327 + return encode_message_frame('#sync', { 328 + seq => 0 + $event->{seq}, 329 + did => $event->{did}, 330 + rev => $event->{rev}, 331 + blocks => ATProto::PDS::Repo::Bytes->new($event->{car_bytes} // q()), 332 + time => iso8601($event->{created_at}), 333 + }); 334 + } 335 + 326 336 if (($event->{type} // q()) eq 'identity') { 327 337 return encode_message_frame('#identity', { 328 338 seq => 0 + $event->{seq},
+25 -19
lib/ATProto/PDS/Repo/Manager.pm
··· 35 35 } 36 36 37 37 sub initialize_repo ($self, $account) { 38 - return $self->apply_writes($account, []); 38 + return $self->apply_writes($account, [], emit_event => 0); 39 39 } 40 40 41 41 sub apply_writes ($self, $account, $writes, %opts) { ··· 154 154 } values %$records, 155 155 ); 156 156 my $car_bytes = write_car($commit_cid, \@blocks); 157 + my $sync_car_bytes = write_car($commit_cid, [ 158 + { cid => $commit_cid, bytes => $commit_bytes }, 159 + ]); 157 160 158 161 $store->txn(sub ($dbh) { 159 162 for my $block (@blocks) { ··· 176 179 commit_bytes => $commit_bytes, 177 180 car_bytes => $car_bytes, 178 181 ); 179 - $store->append_event( 180 - did => $did, 181 - type => 'commit', 182 - rev => $rev, 183 - commit_cid => $commit_cid->to_string, 184 - payload => { 185 - ops => \@ops, 186 - since => $latest ? $latest->{rev} : undef, 187 - prevData => $latest ? $latest->{root_cid} : undef, 188 - }, 189 - car_bytes => $car_bytes, 190 - ); 182 + if ($opts{emit_event} // 1) { 183 + $store->append_event( 184 + did => $did, 185 + type => 'commit', 186 + rev => $rev, 187 + commit_cid => $commit_cid->to_string, 188 + payload => { 189 + ops => \@ops, 190 + since => $latest ? $latest->{rev} : undef, 191 + prevData => $latest ? $latest->{root_cid} : undef, 192 + }, 193 + car_bytes => $car_bytes, 194 + ); 195 + } 191 196 }); 192 197 $self->crawler_notifier->notify_of_update() 193 - if $self->crawler_notifier; 198 + if ($opts{emit_event} // 1) && $self->crawler_notifier; 194 199 195 200 return { 196 - cid => $commit_cid->to_string, 197 - rev => $rev, 198 - root_cid => $mst->{root}->to_string, 199 - car_bytes => $car_bytes, 200 - results => \@results, 201 + cid => $commit_cid->to_string, 202 + rev => $rev, 203 + root_cid => $mst->{root}->to_string, 204 + car_bytes => $car_bytes, 205 + sync_car_bytes => $sync_car_bytes, 206 + results => \@results, 201 207 }; 202 208 } 203 209
+30
t/firehose.t
··· 52 52 my $did = $session->{did}; 53 53 my $access = $session->{accessJwt}; 54 54 55 + my $bootstrap = Test::Mojo->new($app); 56 + $bootstrap->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos?cursor=0'); 57 + 58 + $bootstrap->message_ok('bootstrap identity event arrived'); 59 + my $identity = decode_frame($bootstrap->message->[1]); 60 + is($identity->{header}{t}, '#identity', 'bootstrap frame starts with identity'); 61 + is($identity->{body}{did}, $did, 'identity event identifies the account'); 62 + is($identity->{body}{handle}, 'alice.example.test', 'identity event carries the handle'); 63 + 64 + $bootstrap->message_ok('bootstrap account event arrived'); 65 + my $account_evt = decode_frame($bootstrap->message->[1]); 66 + is($account_evt->{header}{t}, '#account', 'bootstrap account event follows identity'); 67 + ok($account_evt->{body}{active}, 'bootstrap account event marks the account active'); 68 + 69 + $bootstrap->message_ok('bootstrap commit event arrived'); 70 + my $bootstrap_commit = decode_frame($bootstrap->message->[1]); 71 + is($bootstrap_commit->{header}{t}, '#commit', 'bootstrap commit event is emitted'); 72 + is_deeply($bootstrap_commit->{body}{ops}, [], 'bootstrap commit contains no record ops'); 73 + ok(!defined $bootstrap_commit->{body}{since}, 'bootstrap commit since is null'); 74 + 75 + $bootstrap->message_ok('bootstrap sync event arrived'); 76 + my $bootstrap_sync = decode_frame($bootstrap->message->[1]); 77 + is($bootstrap_sync->{header}{t}, '#sync', 'bootstrap sync event is emitted'); 78 + is($bootstrap_sync->{body}{did}, $did, 'bootstrap sync identifies the account'); 79 + is($bootstrap_sync->{body}{rev}, $bootstrap_commit->{body}{rev}, 'bootstrap sync rev matches the bootstrap commit'); 80 + my $bootstrap_sync_car = read_car($bootstrap_sync->{body}{blocks}); 81 + is(scalar @{ $bootstrap_sync_car->{blocks} }, 1, 'bootstrap sync CAR contains only the commit block'); 82 + is($bootstrap_sync_car->{roots}[0]->to_string, $bootstrap_commit->{body}{commit}->to_string, 'bootstrap sync CAR roots the bootstrap commit'); 83 + $bootstrap->finish_ok; 84 + 55 85 my $baseline_seq = $app->store->latest_event_seq; 56 86 my $prior_rev = $app->store->get_latest_commit($did)->{rev}; 57 87