perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 187 lines 5.2 kB view raw
1use v5.34; 2use warnings; 3use Config (); 4use File::Spec; 5use File::Temp qw(tempdir); 6use FindBin qw($Bin); 7use IO::Socket::INET; 8use JSON::PP qw(decode_json); 9use POSIX qw(WNOHANG); 10use Test::More; 11use Time::HiRes qw(sleep time); 12 13BEGIN { 14 require lib; 15 my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 16 lib->import( 17 File::Spec->catdir($root, 'lib'), 18 File::Spec->catdir($root, 'local', 'lib', 'perl5'), 19 File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 20 ); 21} 22 23use Mojo::URL; 24use Mojo::UserAgent; 25use Test::Mojo; 26use ATProto::PDS; 27 28my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 29my $tmp = tempdir(CLEANUP => 1); 30my @children; 31 32END { 33 local $?; 34 for my $child (reverse @children) { 35 next unless $child->{pid}; 36 next unless kill 0, $child->{pid}; 37 kill 'TERM', $child->{pid}; 38 for (1 .. 40) { 39 last if waitpid($child->{pid}, WNOHANG) == $child->{pid}; 40 sleep 0.1; 41 } 42 kill 'KILL', $child->{pid} if kill 0, $child->{pid}; 43 waitpid($child->{pid}, 0); 44 } 45 $? = 0; 46} 47 48sub free_port { 49 my $sock = IO::Socket::INET->new( 50 LocalAddr => '127.0.0.1', 51 LocalPort => 0, 52 Proto => 'tcp', 53 Listen => 5, 54 ReuseAddr => 1, 55 ) or die "unable to allocate a port: $!"; 56 my $port = $sock->sockport; 57 close $sock; 58 return $port; 59} 60 61sub slurp { 62 my ($path) = @_; 63 open my $fh, '<', $path or die "open($path): $!"; 64 local $/; 65 return <$fh>; 66} 67 68sub spawn_crawler_mock { 69 my ($ready_file, $log_file, $port) = @_; 70 my $pid = fork; 71 die "fork failed: $!" unless defined $pid; 72 73 if ($pid == 0) { 74 open STDOUT, '>', $log_file or die "open($log_file): $!"; 75 open STDERR, '>&', \*STDOUT or die "dup stdout failed"; 76 chdir $root or die "chdir($root): $!"; 77 $ENV{PERLSKY_READY_FILE} = $ready_file; 78 $ENV{PERLSKY_CRAWLER_PORT} = $port; 79 $ENV{PERLSKY_CRAWLER_HOST} = '127.0.0.1'; 80 exec 'fnm', 'exec', '--using=20', '--', 'node', 81 File::Spec->catfile($root, 'tools', 'differential', 'crawler-mock.cjs'); 82 die "exec failed: $!"; 83 } 84 85 push @children, { pid => $pid }; 86 return $pid; 87} 88 89sub wait_for_ready { 90 my ($path, $timeout) = @_; 91 $timeout //= 20; 92 my $deadline = time + $timeout; 93 while (time < $deadline) { 94 if (-f $path) { 95 return decode_json(slurp($path)); 96 } 97 sleep 0.1; 98 } 99 die "timed out waiting for $path"; 100} 101 102sub crawler_state { 103 my ($origin) = @_; 104 my $res = Mojo::UserAgent->new(max_redirects => 0)->get("$origin/requests")->result; 105 die "crawler state fetch failed for $origin" unless $res->is_success; 106 return $res->json || {}; 107} 108 109sub wait_for_requests { 110 my ($origin, $minimum, $timeout) = @_; 111 $minimum //= 1; 112 $timeout //= 10; 113 my $deadline = time + $timeout; 114 while (time < $deadline) { 115 my $state = eval { crawler_state($origin) }; 116 if ($state && (($state->{count} // 0) >= $minimum)) { 117 return $state; 118 } 119 sleep 0.1; 120 } 121 die "timed out waiting for crawler requests at $origin"; 122} 123 124my $crawler_port = free_port(); 125my $crawler_ready = File::Spec->catfile($tmp, 'crawler.ready.json'); 126my $crawler_log = File::Spec->catfile($tmp, 'crawler.log'); 127spawn_crawler_mock($crawler_ready, $crawler_log, $crawler_port); 128my $crawler = wait_for_ready($crawler_ready); 129 130my $app = ATProto::PDS->new( 131 project_root => $root, 132 settings => { 133 base_url => 'http://127.0.0.1:7755', 134 service_handle_domain => 'test', 135 service_did_method => 'did:web', 136 jwt_secret => 'crawl-secret', 137 admin_password => 'admin-secret', 138 crawlers => [$crawler->{origin}], 139 crawler_notify_interval => 3600, 140 db_path => File::Spec->catfile($tmp, 'crawlers.sqlite'), 141 data_dir => File::Spec->catdir($tmp, 'data'), 142 }, 143); 144 145my $t = Test::Mojo->new($app); 146 147$t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 148 handle => 'alice.test', 149 email => 'alice@test.com', 150 password => 'hunter22', 151})->status_is(200); 152 153my $created = $t->tx->res->json; 154my $access = $created->{accessJwt}; 155my $did = $created->{did}; 156 157my $state = wait_for_requests($crawler->{origin}); 158is($state->{requests}[0]{body}{hostname}, '127.0.0.1', 'crawl requests use the public hostname without the port'); 159 160$t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 161 Authorization => "Bearer $access", 162} => json => { 163 repo => $did, 164 collection => 'app.bsky.feed.post', 165 rkey => 'crawler-test', 166 record => { 167 '$type' => 'app.bsky.feed.post', 168 text => 'crawler notification test', 169 createdAt => '2026-03-10T00:00:00Z', 170 }, 171})->status_is(200); 172 173sleep 0.5; 174$state = crawler_state($crawler->{origin}); 175is($state->{count}, 1, 'crawler notifications are throttled inside the configured interval'); 176 177my $crawler_url = Mojo::URL->new($crawler->{origin}); 178my $crawler_host = lc($crawler_url->host // '127.0.0.1'); 179$crawler_host .= ':' . $crawler_url->port if defined($crawler_url->port) && $crawler_url->port != 80; 180 181$t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.getHostStatus')->query( 182 hostname => $crawler_host, 183))->status_is(200) 184 ->json_is('/hostname', $crawler_host) 185 ->json_is('/status', 'active'); 186 187done_testing;