perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Notify configured crawlers about local repo activity

alice d4a14a08 146ffe6b

+191 -9
+23 -1
lib/ATProto/PDS.pm
··· 5 5 6 6 use Mojo::Base 'Mojolicious', -signatures; 7 7 use Mojo::JSON (); 8 + use Mojo::URL; 8 9 use ATProto::PDS::API::Admin qw(register_admin_handlers); 9 10 use ATProto::PDS::API::Builtins qw(register_builtin_handlers); 10 11 use ATProto::PDS::API::Misc qw(register_misc_handlers); ··· 12 13 use ATProto::PDS::API::Registry; 13 14 use ATProto::PDS::API::Server qw(register_server_handlers); 14 15 use ATProto::PDS::API::Sync qw(register_sync_handlers); 16 + use ATProto::PDS::Crawlers; 15 17 use ATProto::PDS::Identity qw(account_did_doc service_did); 16 18 use ATProto::PDS::LexiconCatalog qw(endpoint_catalog); 17 19 use ATProto::PDS::LexiconRegistry; ··· 26 28 sub startup ($self) { 27 29 my $config = $self->settings; 28 30 my $root = $self->project_root; 31 + my $public_url = Mojo::URL->new($config->{base_url} // 'http://127.0.0.1:7755'); 32 + my $crawler_notifier = ATProto::PDS::Crawlers->new( 33 + hostname => ($config->{hostname} // lc($public_url->host // 'localhost')), 34 + crawlers => $config->{crawlers} // [], 35 + min_interval => $config->{crawler_notify_interval} // (20 * 60), 36 + ); 29 37 30 38 $self->secrets([$config->{jwt_secret} // 'perlds-dev-secret']); 31 39 $self->helper(api_registry => sub { state $registry = ATProto::PDS::API::Registry->new }); ··· 37 45 path => $c->app->settings->{db_path} || File::Spec->catfile($root, 'data', 'runtime', 'perlds.sqlite'), 38 46 )->bootstrap; 39 47 }); 48 + $self->helper(crawler_notifier => sub ($c) { 49 + state $notifier = do { 50 + $crawler_notifier->{store} = $c->store; 51 + $crawler_notifier; 52 + }; 53 + }); 54 + $self->helper(append_event => sub ($c, %args) { 55 + my $seq = $c->store->append_event(%args); 56 + $c->crawler_notifier->notify_of_update(last_seq => $seq); 57 + return $seq; 58 + }); 40 59 $self->helper(repo_manager => sub ($c) { 41 - state $manager = ATProto::PDS::Repo::Manager->new(store => $c->store); 60 + state $manager = ATProto::PDS::Repo::Manager->new( 61 + store => $c->store, 62 + crawler_notifier => $c->crawler_notifier, 63 + ); 42 64 }); 43 65 44 66 my $routes = $self->routes;
+2 -2
lib/ATProto/PDS/API/Admin.pm
··· 79 79 $subject->{did}, 80 80 deactivated_at => $body->{deactivated}{applied} ? time : undef, 81 81 ); 82 - $c->store->append_event( 82 + $c->append_event( 83 83 did => $subject->{did}, 84 84 type => 'account', 85 85 rev => ($c->store->get_account_by_did($subject->{did})->{repo_rev} // undef), ··· 335 335 ); 336 336 } 337 337 338 - $c->store->append_event( 338 + $c->append_event( 339 339 did => $src, 340 340 type => 'label', 341 341 payload => {
+2 -2
lib/ATProto/PDS/API/Misc.pm
··· 127 127 my $did_doc = refresh_plc_did_doc($c->app->settings, $account->{did}); 128 128 $c->store->update_account($account->{did}, did_doc => $did_doc); 129 129 $account = $c->store->update_account($account->{did}, did_doc => $did_doc); 130 - $c->store->append_event( 130 + $c->append_event( 131 131 did => $account->{did}, 132 132 type => 'identity', 133 133 rev => $account->{repo_rev}, ··· 158 158 handle => $handle, 159 159 did_doc => $did_doc, 160 160 ); 161 - $c->store->append_event( 161 + $c->append_event( 162 162 did => $updated->{did}, 163 163 type => 'identity', 164 164 rev => $updated->{repo_rev},
+4 -4
lib/ATProto/PDS/API/Server.pm
··· 106 106 used_by => $account->{did}, 107 107 ) if $invite; 108 108 $c->store->claim_reserved_signing_key($did) if $reserved && !defined $reserved->{claimed_at}; 109 - $c->store->append_event( 109 + $c->append_event( 110 110 did => $account->{did}, 111 111 type => 'account', 112 112 rev => $account->{repo_rev}, ··· 222 222 $registry->register('com.atproto.server.deactivateAccount', sub ($c, $endpoint) { 223 223 my (undef, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 224 224 $c->store->update_account($account->{did}, deactivated_at => time); 225 - $c->store->append_event( 225 + $c->append_event( 226 226 did => $account->{did}, 227 227 type => 'account', 228 228 rev => $account->{repo_rev}, ··· 237 237 $registry->register('com.atproto.server.activateAccount', sub ($c, $endpoint) { 238 238 my (undef, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 239 239 $c->store->update_account($account->{did}, deactivated_at => undef); 240 - $c->store->append_event( 240 + $c->append_event( 241 241 did => $account->{did}, 242 242 type => 'account', 243 243 rev => $account->{repo_rev}, ··· 408 408 $c->store->revoke_sessions_by_did($account->{did}); 409 409 $c->store->revoke_app_passwords_by_did($account->{did}); 410 410 $c->store->consume_action_token($token->{token}); 411 - $c->store->append_event( 411 + $c->append_event( 412 412 did => $account->{did}, 413 413 type => 'account', 414 414 rev => $account->{repo_rev},
+152
lib/ATProto/PDS/Crawlers.pm
··· 1 + package ATProto::PDS::Crawlers; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use Mojo::IOLoop; 9 + use Mojo::URL; 10 + use Mojo::UserAgent; 11 + 12 + sub new ($class, %args) { 13 + return bless { 14 + hostname => $args{hostname} // 'localhost', 15 + crawlers => $args{crawlers} // [], 16 + store => $args{store}, 17 + min_interval => $args{min_interval} // (20 * 60), 18 + last_notified => $args{last_notified} // 0, 19 + in_flight => 0, 20 + }, $class; 21 + } 22 + 23 + sub has_crawlers ($self) { 24 + return scalar @{ $self->{crawlers} || [] }; 25 + } 26 + 27 + sub notify_of_update ($self, %args) { 28 + return 0 unless $self->has_crawlers; 29 + 30 + my $now = $args{now} // time; 31 + return 0 if $self->{in_flight}; 32 + return 0 if !$args{force} && ($now - ($self->{last_notified} // 0) < ($self->{min_interval} // 0)); 33 + 34 + my $hostname = $self->{hostname}; 35 + my @services = @{ $self->{crawlers} || [] }; 36 + my $seq = $args{last_seq}; 37 + $seq = eval { $self->{store}->latest_event_seq } if !defined($seq) && $self->{store}; 38 + 39 + $self->{in_flight} = 1; 40 + $self->{last_notified} = $now; 41 + 42 + for my $service (@services) { 43 + $self->_touch_status($service, 44 + requested_at => $now, 45 + last_seq => $seq, 46 + status => { 47 + status => 'pending', 48 + service => $service, 49 + }, 50 + ); 51 + } 52 + 53 + my $subprocess = Mojo::IOLoop->subprocess; 54 + $subprocess->run( 55 + sub ($subprocess) { 56 + return _request_crawl_batch($hostname, \@services); 57 + }, 58 + sub ($subprocess, $err, $results) { 59 + $self->{in_flight} = 0; 60 + if ($err) { 61 + for my $service (@services) { 62 + $self->_touch_status($service, 63 + last_seq => $seq, 64 + status => { 65 + status => 'error', 66 + service => $service, 67 + lastError => "$err", 68 + }, 69 + ); 70 + } 71 + return; 72 + } 73 + 74 + for my $result (@{ $results || [] }) { 75 + $self->_touch_status($result->{service}, 76 + notified_at => time, 77 + last_seq => $seq, 78 + status => { 79 + status => $result->{ok} ? 'active' : 'error', 80 + service => $result->{service}, 81 + responseCode => $result->{code}, 82 + ($result->{error} ? (lastError => $result->{error}) : ()), 83 + }, 84 + ); 85 + } 86 + }, 87 + ); 88 + 89 + return 1; 90 + } 91 + 92 + sub _touch_status ($self, $service, %args) { 93 + return unless $self->{store}; 94 + my $host = _service_host($service); 95 + $self->{store}->touch_host_notice( 96 + hostname => $host, 97 + %args, 98 + ); 99 + } 100 + 101 + sub _request_crawl_batch ($hostname, $services) { 102 + my $ua = Mojo::UserAgent->new(max_redirects => 0); 103 + $ua->request_timeout(10); 104 + $ua->inactivity_timeout(10); 105 + 106 + my @results; 107 + for my $service (@$services) { 108 + my $result = { 109 + service => $service, 110 + ok => 0, 111 + }; 112 + my $tx = eval { 113 + $ua->post( 114 + Mojo::URL->new($service)->path('/xrpc/com.atproto.sync.requestCrawl')->to_string => { 115 + 'Content-Type' => 'application/json', 116 + } => json => { 117 + hostname => $hostname, 118 + } 119 + ); 120 + }; 121 + 122 + if (!$tx || $@) { 123 + $result->{error} = $@ ? "$@" : 'request failed'; 124 + push @results, $result; 125 + next; 126 + } 127 + 128 + my $res = $tx->result; 129 + $result->{code} = $res->code; 130 + if ($res->is_success) { 131 + $result->{ok} = 1; 132 + } else { 133 + my $message = $res->json->{message} // $res->message // 'request failed'; 134 + $result->{error} = $message; 135 + } 136 + push @results, $result; 137 + } 138 + 139 + return \@results; 140 + } 141 + 142 + sub _service_host ($service) { 143 + my $url = Mojo::URL->new($service); 144 + my $host = lc($url->host // $service); 145 + my $scheme = $url->scheme // 'http'; 146 + my $port = $url->port; 147 + my $default = $scheme eq 'https' ? 443 : 80; 148 + $host .= ':' . $port if defined $port && $port != $default; 149 + return $host; 150 + } 151 + 152 + 1;
+8
lib/ATProto/PDS/Repo/Manager.pm
··· 26 26 return $self->{store}; 27 27 } 28 28 29 + sub crawler_notifier ($self) { 30 + return $self->{crawler_notifier}; 31 + } 32 + 29 33 sub generate_signing_key ($self) { 30 34 return generate_keypair(); 31 35 } ··· 185 189 car_bytes => $car_bytes, 186 190 ); 187 191 }); 192 + $self->crawler_notifier->notify_of_update() 193 + if $self->crawler_notifier; 188 194 189 195 return { 190 196 cid => $commit_cid->to_string, ··· 310 316 car_bytes => $next_car_bytes, 311 317 ); 312 318 }); 319 + $self->crawler_notifier->notify_of_update() 320 + if $self->crawler_notifier; 313 321 314 322 return { 315 323 cid => $commit_cid->to_string,