perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Tighten firehose diffs and public browser smoke

alice f2ecb2ba 19c9aceb

+304 -6
+41 -6
lib/ATProto/PDS/Repo/Manager.pm
··· 143 143 my $commit_bytes = encode_dag_cbor($commit); 144 144 my $commit_cid = ATProto::PDS::Repo::CID->for_dag_cbor($commit_bytes); 145 145 146 - my @blocks = ( 146 + my @snapshot_blocks = ( 147 147 { cid => $commit_cid, bytes => $commit_bytes }, 148 148 @{ $mst->{blocks} }, 149 149 map { ··· 153 153 } 154 154 } values %$records, 155 155 ); 156 - my $car_bytes = write_car($commit_cid, \@blocks); 156 + my %firehose_paths = map { 157 + my $path = $_->{path} // q(); 158 + (($_->{action} // q()) ne 'delete' && length $path) ? ($path => 1) : (); 159 + } @ops; 160 + my @firehose_blocks = ( 161 + { cid => $commit_cid, bytes => $commit_bytes }, 162 + @{ $mst->{blocks} }, 163 + map { 164 + my $record = $records->{$_}; 165 + +{ 166 + cid => ATProto::PDS::Repo::CID->from_string($record->{cid}), 167 + bytes => $record->{record_bytes}, 168 + } 169 + } sort keys %firehose_paths, 170 + ); 171 + my $snapshot_car_bytes = write_car($commit_cid, \@snapshot_blocks); 172 + my $car_bytes = write_car($commit_cid, \@firehose_blocks); 157 173 my $sync_car_bytes = write_car($commit_cid, [ 158 174 { cid => $commit_cid, bytes => $commit_bytes }, 159 175 ]); 160 176 161 177 $store->txn(sub ($dbh) { 162 - for my $block (@blocks) { 178 + for my $block (@snapshot_blocks) { 163 179 $store->put_block( 164 180 cid => $block->{cid}->to_string, 165 181 codec => $block->{cid}->codec, ··· 177 193 root_cid => $mst->{root}->to_string, 178 194 prev_cid => $latest ? $latest->{cid} : undef, 179 195 commit_bytes => $commit_bytes, 180 - car_bytes => $car_bytes, 196 + car_bytes => $snapshot_car_bytes, 181 197 ); 182 198 if ($opts{emit_event} // 1) { 183 199 $store->append_event( ··· 289 305 } 290 306 } @$records, 291 307 ); 292 - my $next_car_bytes = write_car($commit_cid, \@repo_blocks); 308 + my %firehose_paths = map { 309 + my $path = $_->{path} // q(); 310 + (($_->{action} // q()) ne 'delete' && length $path) ? ($path => 1) : (); 311 + } @ops; 312 + my %records_by_path = map { 313 + $_->{collection} . '/' . $_->{rkey} => $_ 314 + } @$records; 315 + my @firehose_blocks = ( 316 + { cid => $commit_cid, bytes => $commit_bytes }, 317 + @{ $mst->{blocks} }, 318 + map { 319 + my $record = $records_by_path{$_}; 320 + +{ 321 + cid => ATProto::PDS::Repo::CID->from_string($record->{cid}), 322 + bytes => $record->{record_bytes}, 323 + } 324 + } sort keys %firehose_paths, 325 + ); 326 + my $next_snapshot_car_bytes = write_car($commit_cid, \@repo_blocks); 327 + my $next_car_bytes = write_car($commit_cid, \@firehose_blocks); 293 328 294 329 $store->txn(sub ($dbh) { 295 330 for my $block (values %blocks, @repo_blocks) { ··· 307 342 root_cid => $root_cid, 308 343 prev_cid => $latest ? $latest->{cid} : undef, 309 344 commit_bytes => $commit_bytes, 310 - car_bytes => $next_car_bytes, 345 + car_bytes => $next_snapshot_car_bytes, 311 346 ); 312 347 $store->append_event( 313 348 did => $did,
+6
script/differential-validate
··· 359 359 my $body = $frame->{body} || {}; 360 360 my $first = $body->{ops}[0] || {}; 361 361 my $commit_prev_is_null = 0; 362 + my $blocks_count = 0; 363 + my $commit_block_present = 0; 362 364 363 365 if (defined $body->{commit} && defined $body->{blocks}) { 364 366 my $car = read_car($body->{blocks}); 367 + $blocks_count = scalar @{ $car->{blocks} || [] }; 365 368 my ($commit_block) = grep { 366 369 $_->{cid}->to_string eq $body->{commit}->to_string 367 370 } @{ $car->{blocks} || [] }; 368 371 if ($commit_block) { 372 + $commit_block_present = 1; 369 373 my $commit = decode_dag_cbor($commit_block->{bytes}); 370 374 $commit_prev_is_null = exists($commit->{prev}) && !defined($commit->{prev}) ? 1 : 0; 371 375 } ··· 382 386 has_since => exists($body->{since}) ? 1 : 0, 383 387 since_is_stringish => !defined($body->{since}) || !ref($body->{since}) ? 1 : 0, 384 388 seq_is_int => defined($body->{seq}) && $body->{seq} =~ /\A\d+\z/ ? 1 : 0, 389 + blocks_count => $blocks_count, 390 + commit_block_present => $commit_block_present, 385 391 commit_prev_is_null => $commit_prev_is_null, 386 392 }; 387 393 }
+20
script/perlsky-browser-smoke
··· 23 23 target_handle => $ENV{PERLSKY_BROWSER_TARGET_HANDLE} || 'alice.mosphere.at', 24 24 artifacts_dir => $ENV{PERLSKY_BROWSER_ARTIFACTS} || $default_artifacts, 25 25 birthdate => $ENV{PERLSKY_BROWSER_BIRTHDATE} || '1990-01-01', 26 + public_api_url => $ENV{PERLSKY_BROWSER_PUBLIC_API_URL} || 'https://public.api.bsky.app', 27 + public_check_timeout_ms => $ENV{PERLSKY_BROWSER_PUBLIC_CHECK_TIMEOUT_MS} || 180_000, 26 28 post_text => $ENV{PERLSKY_BROWSER_POST_TEXT}, 27 29 quote_text => $ENV{PERLSKY_BROWSER_QUOTE_TEXT}, 28 30 reply_text => $ENV{PERLSKY_BROWSER_REPLY_TEXT}, 29 31 profile_note => $ENV{PERLSKY_BROWSER_PROFILE_NOTE}, 30 32 headful => $ENV{PERLSKY_BROWSER_HEADFUL} ? 1 : 0, 31 33 edit_profile => $ENV{PERLSKY_BROWSER_EDIT_PROFILE} ? 1 : 0, 34 + public_checks => $ENV{PERLSKY_BROWSER_SKIP_PUBLIC_CHECKS} ? 0 : 1, 35 + strict_errors => $ENV{PERLSKY_BROWSER_STRICT_ERRORS} ? 1 : 0, 32 36 ); 33 37 34 38 my $cmd = shift(@ARGV) // 'run'; ··· 47 51 --pds-url URL 48 52 --artifacts-dir DIR 49 53 --birthdate YYYY-MM-DD 54 + --public-api-url URL 55 + --public-check-timeout-ms MS 50 56 --post-text TEXT 51 57 --quote-text TEXT 52 58 --reply-text TEXT 53 59 --profile-note TEXT 54 60 --headful 55 61 --edit-profile 62 + --public-checks 63 + --strict-errors 56 64 57 65 Environment: 58 66 PERLSKY_BROWSER_HANDLE ··· 62 70 PERLSKY_BROWSER_PDS_URL 63 71 PERLSKY_BROWSER_ARTIFACTS 64 72 PERLSKY_BROWSER_BIRTHDATE 73 + PERLSKY_BROWSER_PUBLIC_API_URL 74 + PERLSKY_BROWSER_PUBLIC_CHECK_TIMEOUT_MS 65 75 PERLSKY_BROWSER_POST_TEXT 66 76 PERLSKY_BROWSER_QUOTE_TEXT 67 77 PERLSKY_BROWSER_REPLY_TEXT 68 78 PERLSKY_BROWSER_PROFILE_NOTE 69 79 PERLSKY_BROWSER_HEADFUL=1 70 80 PERLSKY_BROWSER_EDIT_PROFILE=1 81 + PERLSKY_BROWSER_SKIP_PUBLIC_CHECKS=1 82 + PERLSKY_BROWSER_STRICT_ERRORS=1 71 83 USAGE 72 84 exit 0; 73 85 } ··· 81 93 'pds-url=s' => \$opt{pds_url}, 82 94 'artifacts-dir=s' => \$opt{artifacts_dir}, 83 95 'birthdate=s' => \$opt{birthdate}, 96 + 'public-api-url=s' => \$opt{public_api_url}, 97 + 'public-check-timeout-ms=i' => \$opt{public_check_timeout_ms}, 84 98 'post-text=s' => \$opt{post_text}, 85 99 'quote-text=s' => \$opt{quote_text}, 86 100 'reply-text=s' => \$opt{reply_text}, 87 101 'profile-note=s' => \$opt{profile_note}, 88 102 'headful!' => \$opt{headful}, 89 103 'edit-profile!' => \$opt{edit_profile}, 104 + 'public-checks!' => \$opt{public_checks}, 105 + 'strict-errors!' => \$opt{strict_errors}, 90 106 ) or die "invalid options\n"; 91 107 92 108 if ($cmd eq 'install') { ··· 120 136 targetHandle => $opt{target_handle}, 121 137 artifactsDir => $artifacts_dir, 122 138 birthdate => $opt{birthdate}, 139 + publicApiUrl => $opt{public_api_url}, 140 + publicCheckTimeoutMs => 0 + $opt{public_check_timeout_ms}, 141 + publicChecks => $opt{public_checks} ? JSON::PP::true : JSON::PP::false, 142 + strictErrors => $opt{strict_errors} ? JSON::PP::true : JSON::PP::false, 123 143 postText => $opt{post_text}, 124 144 quoteText => $opt{quote_text}, 125 145 replyText => $opt{reply_text},
+14
t/firehose.t
··· 100 100 createdAt => '2026-03-10T00:00:00Z', 101 101 }, 102 102 })->status_is(200); 103 + my $first_result = $t->tx->res->json; 103 104 104 105 $ws->message_ok('received a firehose frame') 105 106 ->message_like({binary => qr/.+/}, 'firehose frame is binary'); ··· 118 119 my $initial_car = read_car($decoded->{body}{blocks}); 119 120 my $initial_commit_block = (grep { $_->{cid}->to_string eq $decoded->{body}{commit}->to_string } @{ $initial_car->{blocks} })[0]; 120 121 ok($initial_commit_block, 'initial commit block is present in emitted CAR'); 122 + ok( 123 + scalar(grep { $_->{cid}->to_string eq $first_result->{cid} } @{ $initial_car->{blocks} || [] }), 124 + 'initial firehose CAR includes the created record block', 125 + ); 121 126 my $initial_commit = decode_dag_cbor($initial_commit_block->{bytes}); 122 127 ok(exists $initial_commit->{prev}, 'initial commit includes prev for compatibility'); 123 128 ok(!defined $initial_commit->{prev}, 'initial commit prev is null'); ··· 149 154 createdAt => '2026-03-10T00:00:01Z', 150 155 }, 151 156 })->status_is(200); 157 + my $second_result = $t->tx->res->json; 152 158 153 159 $follow->message_ok('received a second firehose frame') 154 160 ->message_like({binary => qr/.+/}, 'second firehose frame is binary'); ··· 160 166 my $second_car = read_car($second->{body}{blocks}); 161 167 my $second_commit_block = (grep { $_->{cid}->to_string eq $second->{body}{commit}->to_string } @{ $second_car->{blocks} })[0]; 162 168 ok($second_commit_block, 'second commit block is present in emitted CAR'); 169 + ok( 170 + scalar(grep { $_->{cid}->to_string eq $second_result->{cid} } @{ $second_car->{blocks} || [] }), 171 + 'second firehose CAR includes the new record block', 172 + ); 173 + ok( 174 + !scalar(grep { $_->{cid}->to_string eq $first_result->{cid} } @{ $second_car->{blocks} || [] }), 175 + 'second firehose CAR does not resend unchanged prior record blocks', 176 + ); 163 177 my $second_commit = decode_dag_cbor($second_commit_block->{bytes}); 164 178 ok(exists $second_commit->{prev}, 'subsequent commit includes prev for compatibility'); 165 179 ok(!defined $second_commit->{prev}, 'subsequent commit prev is null');
+108
t/repo-firehose-car.t
··· 1 + use v5.34; 2 + use warnings; 3 + 4 + use Config (); 5 + use File::Spec; 6 + use File::Temp qw(tempdir); 7 + use FindBin qw($Bin); 8 + use Test::More; 9 + 10 + BEGIN { 11 + require lib; 12 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 13 + lib->import( 14 + File::Spec->catdir($root, 'lib'), 15 + File::Spec->catdir($root, 'local', 'lib', 'perl5'), 16 + File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 17 + ); 18 + } 19 + 20 + use ATProto::PDS; 21 + use ATProto::PDS::Repo::CAR qw(read_car); 22 + 23 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 24 + my $tmp = tempdir(CLEANUP => 1); 25 + 26 + my $app = ATProto::PDS->new( 27 + project_root => $root, 28 + settings => { 29 + base_url => 'http://127.0.0.1:7755', 30 + service_handle_domain => 'example.test', 31 + service_did_method => 'did:web', 32 + jwt_secret => 'repo-firehose-secret', 33 + admin_password => 'admin-secret', 34 + data_dir => File::Spec->catdir($tmp, 'data'), 35 + db_path => File::Spec->catfile($tmp, 'perlsky.sqlite'), 36 + }, 37 + ); 38 + 39 + my $keys = $app->repo_manager->generate_signing_key; 40 + my $account = $app->store->create_account( 41 + account_id => 'acct-1', 42 + did => 'did:plc:repofirehosecartestacct', 43 + handle => 'alice.example.test', 44 + private_key => $keys->{private_key}, 45 + public_key => $keys->{public_key}, 46 + public_key_multibase => $keys->{public_key_multibase}, 47 + signing_key_did => $keys->{signing_key_did}, 48 + ); 49 + 50 + my $init = $app->repo_manager->initialize_repo($account); 51 + $account = $app->store->update_account( 52 + $account->{did}, 53 + repo_commit_cid => $init->{cid}, 54 + repo_root_cid => $init->{root_cid}, 55 + repo_rev => $init->{rev}, 56 + ); 57 + 58 + my $first = $app->repo_manager->apply_writes($account, [{ 59 + action => 'create', 60 + collection => 'app.bsky.feed.post', 61 + rkey => 'first', 62 + value => { 63 + '$type' => 'app.bsky.feed.post', 64 + text => 'first post', 65 + createdAt => '2026-03-11T00:00:00Z', 66 + }, 67 + }]); 68 + 69 + my $first_cid = $first->{results}[0]{cid}; 70 + my $first_car = read_car($first->{car_bytes}); 71 + ok( 72 + scalar(grep { $_->{cid}->to_string eq $first_cid } @{ $first_car->{blocks} || [] }), 73 + 'first firehose CAR includes the created record block', 74 + ); 75 + 76 + my $second = $app->repo_manager->apply_writes($account, [{ 77 + action => 'create', 78 + collection => 'app.bsky.feed.post', 79 + rkey => 'second', 80 + value => { 81 + '$type' => 'app.bsky.feed.post', 82 + text => 'second post', 83 + createdAt => '2026-03-11T00:00:01Z', 84 + }, 85 + }]); 86 + 87 + my $second_cid = $second->{results}[0]{cid}; 88 + my $second_car = read_car($second->{car_bytes}); 89 + ok( 90 + scalar(grep { $_->{cid}->to_string eq $second_cid } @{ $second_car->{blocks} || [] }), 91 + 'second firehose CAR includes the new record block', 92 + ); 93 + ok( 94 + !scalar(grep { $_->{cid}->to_string eq $first_cid } @{ $second_car->{blocks} || [] }), 95 + 'second firehose CAR does not resend the unchanged first record block', 96 + ); 97 + 98 + my $snapshot_car = read_car($app->store->repo_car($account->{did})); 99 + ok( 100 + scalar(grep { $_->{cid}->to_string eq $first_cid } @{ $snapshot_car->{blocks} || [] }), 101 + 'repo snapshot CAR still includes the first record block', 102 + ); 103 + ok( 104 + scalar(grep { $_->{cid}->to_string eq $second_cid } @{ $snapshot_car->{blocks} || [] }), 105 + 'repo snapshot CAR still includes the second record block', 106 + ); 107 + 108 + done_testing;
+115
tools/browser-automation/smoke.mjs
··· 15 15 startedAt: new Date().toISOString(), 16 16 appUrl: config.appUrl, 17 17 pdsUrl: config.pdsUrl, 18 + publicApiUrl: config.publicApiUrl, 18 19 handle: config.handle, 19 20 targetHandle: config.targetHandle, 20 21 steps: [], ··· 25 26 xrpc: [], 26 27 notes: [], 27 28 }; 29 + 30 + const ignoredConsole = [ 31 + /events\.bsky\.app\/.*ERR_BLOCKED_BY_CLIENT/i, 32 + /slider-vertical/i, 33 + ]; 34 + 35 + const ignoredRequestFailure = [ 36 + { url: /events\.bsky\.app\//i, error: /ERR_BLOCKED_BY_CLIENT/i }, 37 + { url: /workers\.dev\/api\/config/i, error: /ERR_ABORTED/i }, 38 + ]; 39 + 40 + const ignoredHttpFailure = [ 41 + { url: /c\.1password\.com\/richicons/i, status: 404 }, 42 + ]; 28 43 29 44 const browser = await chromium.launch({ headless: config.headless !== false }); 30 45 const context = await browser.newContext({ ··· 90 105 }); 91 106 }; 92 107 108 + const isIgnoredConsole = (entry) => 109 + ignoredConsole.some((pattern) => pattern.test(entry.text || '')); 110 + 111 + const isIgnoredRequestFailure = (entry) => 112 + ignoredRequestFailure.some( 113 + (rule) => rule.url.test(entry.url || '') && rule.error.test(entry.errorText || ''), 114 + ); 115 + 116 + const isIgnoredHttpFailure = (entry) => 117 + ignoredHttpFailure.some( 118 + (rule) => rule.url.test(entry.url || '') && (!rule.status || rule.status === entry.status), 119 + ); 120 + 93 121 const step = async (name, fn, { optional = false } = {}) => { 94 122 try { 95 123 const result = await fn(); ··· 110 138 }; 111 139 112 140 const wait = (ms) => page.waitForTimeout(ms); 141 + const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); 142 + 143 + const fetchJson = async (url) => { 144 + const res = await fetch(url, { 145 + headers: { accept: 'application/json' }, 146 + }); 147 + const text = await res.text(); 148 + let json; 149 + try { 150 + json = text ? JSON.parse(text) : null; 151 + } catch { 152 + json = null; 153 + } 154 + return { ok: res.ok, status: res.status, text, json }; 155 + }; 156 + 157 + const pollJson = async (name, buildUrl, predicate, timeoutMs) => { 158 + const started = Date.now(); 159 + let last; 160 + while (Date.now() - started < timeoutMs) { 161 + last = await fetchJson(buildUrl()); 162 + if (predicate(last)) { 163 + return last; 164 + } 165 + await sleep(5000); 166 + } 167 + throw new Error(`${name} did not succeed before timeout; last status=${last?.status ?? 'none'}`); 168 + }; 113 169 114 170 const closeWelcomeModal = async () => { 115 171 const close = page.getByRole('button', { name: 'Close welcome modal' }); ··· 260 316 await wait(4000); 261 317 }; 262 318 319 + const verifyPublicHandleResolution = async () => { 320 + const result = await pollJson( 321 + 'public handle resolution', 322 + () => `${config.publicApiUrl}/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(config.handle)}`, 323 + ({ ok, json }) => ok && typeof json?.did === 'string' && json.did.length > 0, 324 + config.publicCheckTimeoutMs ?? 180000, 325 + ); 326 + return { did: result.json.did }; 327 + }; 328 + 329 + const verifyPublicAuthorFeed = async () => { 330 + const result = await pollJson( 331 + 'public author feed indexing', 332 + () => `${config.publicApiUrl}/xrpc/app.bsky.feed.getAuthorFeed?actor=${encodeURIComponent(config.handle)}&limit=20`, 333 + ({ ok, json }) => 334 + ok && Array.isArray(json?.feed) && json.feed.some((item) => item?.post?.record?.text === config.postText), 335 + config.publicCheckTimeoutMs ?? 180000, 336 + ); 337 + const matching = result.json.feed.find((item) => item?.post?.record?.text === config.postText); 338 + return { 339 + uri: matching?.post?.uri, 340 + cid: matching?.post?.cid, 341 + }; 342 + }; 343 + 344 + const verifyPublicProfile = async () => { 345 + const result = await pollJson( 346 + 'public profile indexing', 347 + () => `${config.publicApiUrl}/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(config.handle)}`, 348 + ({ ok, json }) => ok && typeof json?.postsCount === 'number' && json.postsCount > 0, 349 + config.publicCheckTimeoutMs ?? 180000, 350 + ); 351 + return { 352 + postsCount: result.json.postsCount, 353 + followersCount: result.json.followersCount, 354 + followsCount: result.json.followsCount, 355 + }; 356 + }; 357 + 263 358 const editProfile = async () => { 264 359 const edit = page.getByRole('button', { name: /edit profile/i }); 265 360 if (!(await edit.count())) { ··· 282 377 await step('login', login); 283 378 await step('age-assurance', completeAgeAssuranceIfNeeded, { optional: true }); 284 379 await step('compose-own-post', () => composePost(config.postText)); 380 + if (config.publicChecks !== false) { 381 + await step('public-resolve-handle', verifyPublicHandleResolution); 382 + await step('public-profile', verifyPublicProfile); 383 + await step('public-author-feed', verifyPublicAuthorFeed); 384 + } 285 385 await step('own-profile', openOwnProfile); 286 386 287 387 const ownPost = await step('find-own-post', async () => { ··· 327 427 } 328 428 329 429 summary.finishedAt = new Date().toISOString(); 430 + summary.unexpected = { 431 + console: summary.console.filter((entry) => !isIgnoredConsole(entry)), 432 + requestFailures: summary.requestFailures.filter((entry) => !isIgnoredRequestFailure(entry)), 433 + httpFailures: summary.httpFailures.filter((entry) => !isIgnoredHttpFailure(entry)), 434 + pageErrors: summary.pageErrors, 435 + }; 436 + summary.unexpected.total = 437 + summary.unexpected.console.length + 438 + summary.unexpected.requestFailures.length + 439 + summary.unexpected.httpFailures.length + 440 + summary.unexpected.pageErrors.length; 441 + if (!summary.fatal && config.strictErrors !== false && summary.unexpected.total > 0) { 442 + summary.fatal = `Unexpected browser/runtime errors: ${summary.unexpected.total}`; 443 + } 444 + summary.ok = !summary.fatal; 330 445 await screenshot('final').catch(() => undefined); 331 446 await fs.writeFile( 332 447 path.join(config.artifactsDir, 'summary.json'),