perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement repo and sync flows on SQLite

alice 4b7cebe6 c7aaab86

+1593 -1349
+10 -2
lib/ATProto/PDS.pm
··· 6 6 use Mojo::Base 'Mojolicious', -signatures; 7 7 use Mojo::JSON (); 8 8 use ATProto::PDS::API::Builtins qw(register_builtin_handlers); 9 + use ATProto::PDS::API::Repo qw(register_repo_handlers); 9 10 use ATProto::PDS::API::Registry; 10 11 use ATProto::PDS::API::Server qw(register_server_handlers); 11 - use ATProto::PDS::Identity qw(service_did); 12 + use ATProto::PDS::API::Sync qw(register_sync_handlers); 13 + use ATProto::PDS::Identity qw(account_did_doc service_did); 12 14 use ATProto::PDS::LexiconCatalog qw(endpoint_catalog); 13 15 use ATProto::PDS::LexiconRegistry; 16 + use ATProto::PDS::Repo::Manager; 14 17 use ATProto::PDS::Store::SQLite; 15 18 use ATProto::PDS::XRPC::Dispatcher; 16 19 use File::Spec; ··· 32 35 path => $c->app->settings->{db_path} || File::Spec->catfile($root, 'data', 'runtime', 'perlds.sqlite'), 33 36 )->bootstrap; 34 37 }); 38 + $self->helper(repo_manager => sub ($c) { 39 + state $manager = ATProto::PDS::Repo::Manager->new(store => $c->store); 40 + }); 35 41 36 42 my $routes = $self->routes; 37 43 $routes->get('/')->to(cb => sub ($c) { ··· 66 72 $routes->get('/users/:account_id/did.json')->to(cb => sub ($c) { 67 73 my $match = $c->store->get_account_by_id($c->stash('account_id')); 68 74 return $c->render(status => 404, json => { error => 'DidNotFound' }) unless $match; 69 - $c->render(json => $match->{did_doc}); 75 + $c->render(json => account_did_doc($c->app->settings, $match)); 70 76 }); 71 77 72 78 register_builtin_handlers($self->api_registry, $self); 73 79 register_server_handlers($self->api_registry, $self); 80 + register_repo_handlers($self->api_registry, $self); 81 + register_sync_handlers($self->api_registry, $self); 74 82 ATProto::PDS::XRPC::Dispatcher->new( 75 83 app => $self, 76 84 routes => $routes,
+12 -6
lib/ATProto/PDS/API/Builtins.pm
··· 8 8 use Exporter 'import'; 9 9 use Mojo::JSON qw(false true); 10 10 11 - use ATProto::PDS::Identity qw(account_did_doc service_did service_did_doc); 11 + use ATProto::PDS::Identity qw(account_did_doc normalize_handle service_did service_did_doc); 12 12 13 13 our @EXPORT_OK = qw(register_builtin_handlers); 14 14 ··· 31 31 }; 32 32 } 33 33 34 - my $account = $c->store->get_account_by_did($did); 34 + my $account = $c->store->get_account_by_did(_canonical_did($did)); 35 35 die { 36 36 status => 404, 37 37 error => 'DidNotFound', ··· 65 65 my $identifier = lc($c->param('identifier') // ''); 66 66 my $service_did = lc(service_did($c->app->settings)); 67 67 my $service_handle = lc($c->config_value('service_handle_domain', 'localhost')); 68 - if (my $account = $identifier =~ /^did:/ ? $c->store->get_account_by_did($identifier) : $c->store->get_account_by_handle($identifier)) { 68 + if (my $account = $identifier =~ /^did:/ ? $c->store->get_account_by_did(_canonical_did($identifier)) : $c->store->get_account_by_handle($identifier)) { 69 69 return { 70 70 did => $account->{did}, 71 71 handle => $account->{handle}, ··· 88 88 89 89 $registry->register('com.atproto.temp.checkHandleAvailability', sub ($c, $endpoint) { 90 90 my $payload = $c->req->json || {}; 91 - my $handle = lc($payload->{handle} // ''); 91 + my $handle = normalize_handle($payload->{handle} // '', $c->config_value('service_handle_domain', 'localhost')); 92 92 my $service_handle = lc($c->config_value('service_handle_domain', 'localhost')); 93 93 return { 94 - handle => $handle, 95 - available => ($handle ne '' && $handle ne $service_handle && !$c->store->get_account_by_handle($handle) ? true : false), 94 + handle => $handle // ($payload->{handle} // ''), 95 + available => (defined $handle && $handle ne '' && $handle ne $service_handle && !$c->store->get_account_by_handle($handle) ? true : false), 96 96 }; 97 97 }); 98 98 } ··· 104 104 sub _relaxed_did ($did) { 105 105 $did //= ''; 106 106 $did =~ s/%3a/:/ig; 107 + return $did; 108 + } 109 + 110 + sub _canonical_did ($did) { 111 + $did = _relaxed_did($did); 112 + $did =~ s/^(did:web:[^:]+):(\d+)$/$1%3A$2/i; 107 113 return $did; 108 114 } 109 115
+171
lib/ATProto/PDS/API/Repo.pm
··· 1 + package ATProto::PDS::API::Repo; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use Exporter 'import'; 9 + use JSON::PP (); 10 + 11 + use ATProto::PDS::API::Server qw(require_auth); 12 + 13 + our @EXPORT_OK = qw(register_repo_handlers); 14 + 15 + sub register_repo_handlers ($registry, $app) { 16 + $registry->register('com.atproto.repo.describeRepo', sub ($c, $endpoint) { 17 + my $account = _resolve_repo($c, $c->param('repo')); 18 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 19 + 20 + return { 21 + handle => $account->{handle}, 22 + did => $account->{did}, 23 + didDoc => $account->{did_doc}, 24 + collections => $c->store->list_collections_for_did($account->{did}), 25 + handleIsCorrect => JSON::PP::true, 26 + }; 27 + }); 28 + 29 + $registry->register('com.atproto.repo.createRecord', sub ($c, $endpoint) { 30 + my $body = $c->req->json || {}; 31 + my $account = _require_repo_owner($c, $body->{repo}); 32 + my $commit = $c->repo_manager->apply_writes($account, [{ 33 + action => 'create', 34 + collection => $body->{collection}, 35 + rkey => $body->{rkey}, 36 + value => $body->{record}, 37 + }], swap_commit => $body->{swapCommit}); 38 + my $result = $commit->{results}[0]; 39 + return { 40 + %$result, 41 + commit => { 42 + cid => $commit->{cid}, 43 + rev => $commit->{rev}, 44 + }, 45 + }; 46 + }); 47 + 48 + $registry->register('com.atproto.repo.putRecord', sub ($c, $endpoint) { 49 + my $body = $c->req->json || {}; 50 + my $account = _require_repo_owner($c, $body->{repo}); 51 + my $commit = $c->repo_manager->apply_writes($account, [{ 52 + action => 'update', 53 + collection => $body->{collection}, 54 + rkey => $body->{rkey}, 55 + value => $body->{record}, 56 + }], swap_commit => $body->{swapCommit}); 57 + my $result = $commit->{results}[0]; 58 + return { 59 + %$result, 60 + commit => { 61 + cid => $commit->{cid}, 62 + rev => $commit->{rev}, 63 + }, 64 + }; 65 + }); 66 + 67 + $registry->register('com.atproto.repo.deleteRecord', sub ($c, $endpoint) { 68 + my $body = $c->req->json || {}; 69 + my $account = _require_repo_owner($c, $body->{repo}); 70 + my $commit = $c->repo_manager->apply_writes($account, [{ 71 + action => 'delete', 72 + collection => $body->{collection}, 73 + rkey => $body->{rkey}, 74 + }], swap_commit => $body->{swapCommit}); 75 + return { 76 + commit => { 77 + cid => $commit->{cid}, 78 + rev => $commit->{rev}, 79 + }, 80 + }; 81 + }); 82 + 83 + $registry->register('com.atproto.repo.applyWrites', sub ($c, $endpoint) { 84 + my $body = $c->req->json || {}; 85 + my $account = _require_repo_owner($c, $body->{repo}); 86 + my $commit = $c->repo_manager->apply_writes( 87 + $account, 88 + $body->{writes} || [], 89 + swap_commit => $body->{swapCommit}, 90 + ); 91 + return { 92 + commit => { 93 + cid => $commit->{cid}, 94 + rev => $commit->{rev}, 95 + }, 96 + results => $commit->{results}, 97 + }; 98 + }); 99 + 100 + $registry->register('com.atproto.repo.getRecord', sub ($c, $endpoint) { 101 + my $account = _resolve_repo($c, $c->param('repo')); 102 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 103 + my $row = $c->store->get_record($account->{did}, $c->param('collection'), $c->param('rkey')); 104 + _xrpc_error(404, 'RecordNotFound', 'Record was not found') unless $row; 105 + return { 106 + uri => "at://$account->{did}/$row->{collection}/$row->{rkey}", 107 + cid => $row->{cid}, 108 + value => $row->{value}, 109 + }; 110 + }); 111 + 112 + $registry->register('com.atproto.repo.listRecords', sub ($c, $endpoint) { 113 + my $account = _resolve_repo($c, $c->param('repo')); 114 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 115 + my $page = $c->store->list_records( 116 + $account->{did}, 117 + $c->param('collection'), 118 + limit => $c->param('limit') // 50, 119 + cursor => $c->param('cursor'), 120 + reverse => $c->param('reverse') ? 1 : 0, 121 + ); 122 + return { 123 + (defined $page->{cursor} ? (cursor => $page->{cursor}) : ()), 124 + records => [ 125 + map { 126 + +{ 127 + uri => "at://$account->{did}/$_->{collection}/$_->{rkey}", 128 + cid => $_->{cid}, 129 + value => $_->{value}, 130 + } 131 + } @{ $page->{items} } 132 + ], 133 + }; 134 + }); 135 + } 136 + 137 + sub _resolve_repo ($c, $repo) { 138 + return undef unless defined $repo && length $repo; 139 + return $c->store->get_account_by_handle($repo) unless $repo =~ /\Adid:/; 140 + 141 + my $account = $c->store->get_account_by_did($repo); 142 + return $account if $account; 143 + 144 + my $target = lc $repo; 145 + $target =~ s/%3a/:/ig; 146 + for my $row (@{ $c->store->list_accounts }) { 147 + my $candidate = lc($row->{did} // q()); 148 + $candidate =~ s/%3a/:/ig; 149 + return $row if $candidate eq $target; 150 + } 151 + 152 + return undef; 153 + } 154 + 155 + sub _require_repo_owner ($c, $repo) { 156 + my $account = _resolve_repo($c, $repo); 157 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 158 + my ($claims) = require_auth($c, audience => 'access', allow_refresh => 1); 159 + _xrpc_error(401, 'AuthRequired', 'Token is not authorized for that repo') unless ($claims->{sub} // '') eq $account->{did}; 160 + return $account; 161 + } 162 + 163 + sub _xrpc_error ($status, $error, $message) { 164 + die { 165 + status => $status, 166 + error => $error, 167 + message => $message, 168 + }; 169 + } 170 + 171 + 1;
+78 -73
lib/ATProto/PDS/API/Server.pm
··· 9 9 use Mojo::JSON qw(false true); 10 10 11 11 use ATProto::PDS::Auth::JWT qw(decode_jwt encode_jwt); 12 - use ATProto::PDS::Auth::Password qw(hash_password verify_password); 12 + use ATProto::PDS::Auth::Password qw(hash_password random_hex verify_password); 13 13 use ATProto::PDS::Identity qw(account_did account_did_doc normalize_handle service_did); 14 14 15 - our @EXPORT_OK = qw(register_server_handlers); 15 + our @EXPORT_OK = qw(register_server_handlers require_auth session_view issue_session); 16 16 17 17 sub register_server_handlers ($registry, $app) { 18 18 $registry->register('com.atproto.server.createAccount', sub ($c, $endpoint) { ··· 20 20 my $domain = $c->config_value('service_handle_domain', 'localhost'); 21 21 my $handle = normalize_handle($body->{handle}, $domain); 22 22 _xrpc_error(400, 'InvalidHandle', 'Requested handle is invalid') unless defined $handle; 23 - _xrpc_error(400, 'UnsupportedDomain', 'Handle is outside the configured domain') 24 - unless $handle =~ /\.\Q$domain\E\z/ || $handle eq $domain; 23 + _xrpc_error(400, 'HandleNotAvailable', 'That handle is already registered') 24 + if $c->store->get_account_by_handle($handle); 25 25 26 26 my $password = $body->{password} // ''; 27 27 _xrpc_error(400, 'InvalidPassword', 'Passwords must be at least 8 characters long') 28 28 if length($password) < 8; 29 - _xrpc_error(400, 'HandleNotAvailable', 'That handle is already registered') 30 - if $c->store->get_account_by_handle($handle); 31 29 32 - my $account_id = _new_id(); 30 + my $account_id = random_hex(8); 33 31 my $did = $body->{did} || account_did($c->app->settings, $account_id); 34 - my $did_doc = account_did_doc($c->app->settings, { 35 - id => $account_id, 36 - did => $did, 37 - handle => $handle, 32 + my $keys = $c->repo_manager->generate_signing_key; 33 + my $password_record = hash_password($password); 34 + my $did_doc = account_did_doc($c->app->settings, { 35 + account_id => $account_id, 36 + did => $did, 37 + handle => $handle, 38 + public_key_multibase => $keys->{public_key_multibase}, 38 39 }); 39 40 40 41 my $account = $c->store->create_account( 41 - id => $account_id, 42 - did => $did, 43 - handle => $handle, 44 - email => $body->{email}, 45 - password_hash => hash_password($password), 46 - did_doc => $did_doc, 47 - recovery_key => $body->{recoveryKey}, 42 + account_id => $account_id, 43 + did => $did, 44 + handle => $handle, 45 + email => $body->{email}, 46 + password_hash => $password_record->{hash}, 47 + password_salt => $password_record->{salt}, 48 + did_doc => $did_doc, 49 + private_key => $keys->{private_key}, 50 + public_key => $keys->{public_key}, 51 + public_key_multibase => $keys->{public_key_multibase}, 52 + ); 53 + 54 + my $repo = $c->repo_manager->initialize_repo($account); 55 + $account = $c->store->update_account($account->{did}, 56 + repo_commit_cid => $repo->{cid}, 57 + repo_root_cid => $repo->{root_cid}, 58 + repo_rev => $repo->{rev}, 59 + did_doc => account_did_doc($c->app->settings, $account), 48 60 ); 49 61 50 - return _issue_session($c, $account); 62 + return issue_session($c, $account); 51 63 }); 52 64 53 65 $registry->register('com.atproto.server.createSession', sub ($c, $endpoint) { ··· 56 68 _xrpc_error(401, 'AuthRequired', 'Invalid identifier or password') unless $account; 57 69 58 70 my $password = $body->{password} // ''; 59 - my $valid = verify_password($password, $account->{password_hash} // ''); 71 + my $valid = verify_password($password, $account->{password_salt}, $account->{password_hash}); 60 72 unless ($valid) { 61 73 for my $app_password (@{ $c->store->list_app_passwords_by_did($account->{did}) }) { 62 74 next if defined $app_password->{revoked_at}; 63 - if (verify_password($password, $app_password->{password_hash} // '')) { 75 + my ($salt_hex, $hash) = split /:/, ($app_password->{password_hash} // q()), 2; 76 + next unless defined $salt_hex && defined $hash; 77 + my $salt = pack('H*', $salt_hex); 78 + if (verify_password($password, $salt, $hash)) { 64 79 $valid = 1; 65 80 last; 66 81 } ··· 68 83 } 69 84 70 85 _xrpc_error(401, 'AuthRequired', 'Invalid identifier or password') unless $valid; 71 - return _issue_session($c, $account); 86 + return issue_session($c, $account); 72 87 }); 73 88 74 89 $registry->register('com.atproto.server.getSession', sub ($c, $endpoint) { 75 - my ($claims, $account) = _require_auth($c, audience => 'access', allow_refresh => 1); 76 - return _session_view($account); 90 + my ($claims, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 91 + return session_view($account); 77 92 }); 78 93 79 94 $registry->register('com.atproto.server.refreshSession', sub ($c, $endpoint) { 80 - my ($claims, $account) = _require_auth($c, audience => 'refresh'); 95 + my ($claims, $account) = require_auth($c, audience => 'refresh'); 81 96 my $session = $c->store->get_session($claims->{jti}); 82 97 _xrpc_error(401, 'InvalidToken', 'Refresh session was not found') unless $session; 83 98 _xrpc_error(401, 'ExpiredToken', 'Refresh session has already been revoked') if defined $session->{revoked_at}; 84 99 $c->store->revoke_session($session->{id}); 85 - return _issue_session($c, $account); 100 + return issue_session($c, $account); 86 101 }); 87 102 88 103 $registry->register('com.atproto.server.deleteSession', sub ($c, $endpoint) { 89 - my ($claims, $account) = _require_auth($c, audience => 'refresh'); 104 + my ($claims, $account) = require_auth($c, audience => 'refresh', allow_refresh => 1); 90 105 my $session = $c->store->get_session($claims->{jti}); 91 - _xrpc_error(401, 'InvalidToken', 'Refresh session was not found') unless $session; 92 - $c->store->revoke_session($session->{id}); 93 - $c->res->code(200); 106 + $c->store->revoke_session($session->{id}) if $session; 94 107 return {}; 95 108 }); 96 109 110 + $registry->register('com.atproto.server.checkAccountStatus', sub ($c, $endpoint) { 111 + my ($claims, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 112 + return { 113 + active => !defined($account->{deactivated_at}) ? true : false, 114 + status => defined($account->{deactivated_at}) ? 'deactivated' : undef, 115 + }; 116 + }); 117 + 97 118 $registry->register('com.atproto.server.createAppPassword', sub ($c, $endpoint) { 98 - my ($claims, $account) = _require_auth($c, audience => 'access', allow_refresh => 1); 119 + my ($claims, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 99 120 my $body = $c->req->json || {}; 100 121 my $name = $body->{name} // ''; 101 122 _xrpc_error(400, 'InvalidRequest', 'App password name is required') unless length $name; 102 123 103 124 my $password = _new_app_password(); 125 + my $password_record = hash_password($password); 104 126 my $row = $c->store->create_app_password( 105 127 did => $account->{did}, 106 128 name => $name, 107 - password_hash => hash_password($password), 129 + password_hash => unpack('H*', $password_record->{salt}) . ':' . $password_record->{hash}, 108 130 ); 109 131 110 132 return { ··· 116 138 }); 117 139 118 140 $registry->register('com.atproto.server.listAppPasswords', sub ($c, $endpoint) { 119 - my ($claims, $account) = _require_auth($c, audience => 'access', allow_refresh => 1); 141 + my ($claims, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 120 142 my $rows = $c->store->list_app_passwords_by_did($account->{did}); 121 143 return { 122 144 passwords => [ ··· 126 148 createdAt => _iso8601($_->{created_at}), 127 149 privileged => false, 128 150 } 129 - } 130 - grep { !defined $_->{revoked_at} } @$rows 151 + } grep { !defined $_->{revoked_at} } @$rows 131 152 ], 132 153 }; 133 154 }); 134 155 135 156 $registry->register('com.atproto.server.revokeAppPassword', sub ($c, $endpoint) { 136 - my ($claims, $account) = _require_auth($c, audience => 'access', allow_refresh => 1); 157 + my ($claims, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 137 158 my $body = $c->req->json || {}; 138 159 my $name = $body->{name} // ''; 139 160 _xrpc_error(400, 'InvalidRequest', 'App password name is required') unless length $name; 140 - 141 161 my $row = $c->store->get_app_password_by_name($account->{did}, $name); 142 162 _xrpc_error(404, 'AppPasswordNotFound', 'No app password exists with that name') unless $row; 143 163 $c->store->revoke_app_password($row->{id}); 144 - $c->res->code(200); 145 164 return {}; 146 165 }); 147 166 } 148 167 149 - sub _issue_session ($c, $account) { 168 + sub issue_session ($c, $account) { 150 169 my $session = $c->store->create_session( 151 170 did => $account->{did}, 152 171 expires_at => time + (30 * 24 * 60 * 60), ··· 175 194 }, $secret); 176 195 177 196 return { 178 - accessJwt => $access, 197 + accessJwt => $access, 179 198 refreshJwt => $refresh, 180 - %{ _session_view($account) }, 199 + %{ session_view($account) }, 181 200 }; 182 201 } 183 202 184 - sub _session_view ($account) { 203 + sub session_view ($account) { 185 204 return { 186 - handle => $account->{handle}, 187 - did => $account->{did}, 188 - didDoc => $account->{did_doc}, 189 - email => $account->{email}, 190 - emailConfirmed => $account->{email} ? true : false, 205 + handle => $account->{handle}, 206 + did => $account->{did}, 207 + didDoc => $account->{did_doc} || account_did_doc({}, $account), 208 + email => $account->{email}, 209 + emailConfirmed => $account->{email} ? true : false, 191 210 emailAuthFactor => false, 192 - active => !defined($account->{deactivated_at}) ? true : false, 211 + active => !defined($account->{deactivated_at}) ? true : false, 193 212 (defined($account->{deactivated_at}) ? (status => 'deactivated') : ()), 194 213 }; 195 214 } 196 215 197 - sub _find_account ($c, $identifier) { 198 - return undef unless defined $identifier && length $identifier; 199 - return $c->store->get_account_by_did($identifier) if $identifier =~ /\Adid:/; 200 - my $account = $c->store->get_account_by_handle($identifier); 201 - return $account if $account; 202 - return $c->store->get_account_by_email($identifier); 203 - } 204 - 205 - sub _require_auth ($c, %opts) { 216 + sub require_auth ($c, %opts) { 206 217 my $auth = $c->req->headers->authorization // ''; 207 218 _xrpc_error(401, 'AuthRequired', 'Authorization header is required') unless $auth =~ /\ABearer\s+(.+)\z/i; 208 219 my $token = $1; 209 220 210 - my $decoded = eval { 211 - decode_jwt($token, $c->config_value('jwt_secret', 'perlds-dev-secret')) 212 - }; 221 + my $decoded = eval { decode_jwt($token, $c->config_value('jwt_secret', 'perlds-dev-secret')) }; 213 222 if (my $err = $@) { 214 223 my $message = "$err"; 215 - my $code = $message =~ /expired/ ? 'ExpiredToken' : 'InvalidToken'; 224 + my $code = $message =~ /expired/i ? 'ExpiredToken' : 'InvalidToken'; 216 225 _xrpc_error(401, $code, $message); 217 226 } 218 227 ··· 227 236 return ($claims, $account); 228 237 } 229 238 239 + sub _find_account ($c, $identifier) { 240 + return undef unless defined $identifier && length $identifier; 241 + my $account = $c->store->get_account_by_identifier($identifier); 242 + return $account if $account; 243 + return $c->store->get_account_by_email($identifier); 244 + } 245 + 230 246 sub _xrpc_error ($status, $error, $message) { 231 247 die { 232 248 status => $status, ··· 235 251 }; 236 252 } 237 253 238 - sub _new_id { 239 - open(my $fh, '<:raw', '/dev/urandom') or die "open(/dev/urandom): $!"; 240 - my $bytes = q(); 241 - my $read = read($fh, $bytes, 10); 242 - CORE::close($fh); 243 - die 'failed to read random bytes' unless defined $read && $read == 10; 244 - return unpack('H*', $bytes); 245 - } 246 - 247 254 sub _new_app_password { 248 - my @parts; 249 - push @parts, substr(_new_id(), 0, 4) for 1 .. 4; 250 - return join('-', @parts); 255 + return join('-', map { substr(random_hex(4), 0, 4) } 1 .. 4); 251 256 } 252 257 253 258 sub _iso8601 ($epoch) {
+105
lib/ATProto/PDS/API/Sync.pm
··· 1 + package ATProto::PDS::API::Sync; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use Exporter 'import'; 9 + use JSON::PP (); 10 + 11 + our @EXPORT_OK = qw(register_sync_handlers); 12 + 13 + sub register_sync_handlers ($registry, $app) { 14 + $registry->register('com.atproto.sync.getLatestCommit', sub ($c, $endpoint) { 15 + my $account = _account_for_did($c, $c->param('did') // q()); 16 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 17 + my $head = $c->store->get_repo_head($account->{did}); 18 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $head; 19 + return { 20 + cid => $head->{commit_cid}, 21 + rev => $head->{rev}, 22 + }; 23 + }); 24 + 25 + $registry->register('com.atproto.sync.getRepoStatus', sub ($c, $endpoint) { 26 + my $account = _account_for_did($c, $c->param('did') // q()); 27 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 28 + return { 29 + did => $account->{did}, 30 + active => defined($account->{deactivated_at}) ? JSON::PP::false : JSON::PP::true, 31 + (defined($account->{repo_rev}) ? (rev => $account->{repo_rev}) : ()), 32 + (defined($account->{deactivated_at}) ? (status => 'deactivated') : ()), 33 + }; 34 + }); 35 + 36 + $registry->register('com.atproto.sync.getRepo', sub ($c, $endpoint) { 37 + my $account = _account_for_did($c, $c->param('did') // q()); 38 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 39 + my $car = $c->store->repo_car($account->{did}); 40 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless defined $car; 41 + $c->res->headers->content_type('application/vnd.ipld.car'); 42 + $c->render(data => $car); 43 + return; 44 + }); 45 + 46 + $registry->register('com.atproto.sync.getRecord', sub ($c, $endpoint) { 47 + my $account = _account_for_did($c, $c->param('did') // q()); 48 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 49 + my $record = $c->store->get_record($account->{did}, $c->param('collection'), $c->param('rkey')); 50 + _xrpc_error(404, 'RecordNotFound', 'Record was not found') unless $record; 51 + my $car = $c->store->repo_car($account->{did}); 52 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless defined $car; 53 + $c->res->headers->content_type('application/vnd.ipld.car'); 54 + $c->render(data => $car); 55 + return; 56 + }); 57 + 58 + $registry->register('com.atproto.sync.listRepos', sub ($c, $endpoint) { 59 + my $page = $c->store->list_repos( 60 + limit => $c->param('limit') // 500, 61 + cursor => $c->param('cursor'), 62 + ); 63 + return { 64 + (defined $page->{cursor} ? (cursor => $page->{cursor}) : ()), 65 + repos => $page->{items}, 66 + }; 67 + }); 68 + 69 + $registry->register('com.atproto.sync.listBlobs', sub ($c, $endpoint) { 70 + my $account = _account_for_did($c, $c->param('did') // q()); 71 + _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 72 + my $page = $c->store->list_blobs_by_did( 73 + $account->{did}, 74 + limit => $c->param('limit') // 500, 75 + cursor => $c->param('cursor'), 76 + ); 77 + return { 78 + (defined $page->{cursor} ? (cursor => $page->{cursor}) : ()), 79 + cids => [ map { $_->{cid} } @{ $page->{items} } ], 80 + }; 81 + }); 82 + } 83 + 84 + sub _account_for_did ($c, $did) { 85 + my $account = $c->store->get_account_by_did($did); 86 + return $account if $account; 87 + my $target = lc($did // q()); 88 + $target =~ s/%3a/:/ig; 89 + for my $row (@{ $c->store->list_accounts }) { 90 + my $candidate = lc($row->{did} // q()); 91 + $candidate =~ s/%3a/:/ig; 92 + return $row if $candidate eq $target; 93 + } 94 + return undef; 95 + } 96 + 97 + sub _xrpc_error ($status, $error, $message) { 98 + die { 99 + status => $status, 100 + error => $error, 101 + message => $message, 102 + }; 103 + } 104 + 105 + 1;
+33
lib/ATProto/PDS/API/Util.pm
··· 1 + package ATProto::PDS::API::Util; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use Exporter 'import'; 9 + 10 + our @EXPORT_OK = qw(xrpc_error iso8601); 11 + 12 + sub xrpc_error ($status, $error, $message) { 13 + die { 14 + status => $status, 15 + error => $error, 16 + message => $message, 17 + }; 18 + } 19 + 20 + sub iso8601 ($epoch = undef) { 21 + my @gmt = gmtime($epoch // time); 22 + return sprintf( 23 + '%04d-%02d-%02dT%02d:%02d:%02dZ', 24 + $gmt[5] + 1900, 25 + $gmt[4] + 1, 26 + $gmt[3], 27 + $gmt[2], 28 + $gmt[1], 29 + $gmt[0], 30 + ); 31 + } 32 + 33 + 1;
+38 -17
lib/ATProto/PDS/Auth/Password.pm
··· 6 6 no warnings 'experimental::signatures'; 7 7 8 8 use Exporter 'import'; 9 - use Digest::SHA qw(sha256_hex); 9 + use Digest::SHA qw(sha256 sha256_hex); 10 10 11 - our @EXPORT_OK = qw(hash_password verify_password); 11 + our @EXPORT_OK = qw(hash_password random_bytes random_hex verify_password); 12 12 13 - sub hash_password ($password) { 13 + sub random_bytes ($length = 16) { 14 + open(my $fh, '<:raw', '/dev/urandom') or die "open(/dev/urandom): $!"; 15 + my $bytes = q(); 16 + my $read = read($fh, $bytes, $length); 17 + close($fh); 18 + die 'failed to read random bytes' unless defined $read && $read == $length; 19 + return $bytes; 20 + } 21 + 22 + sub random_hex ($length = 16) { 23 + return unpack('H*', random_bytes($length)); 24 + } 25 + 26 + sub hash_password ($password, $salt = undef, %opts) { 14 27 die 'password is required' unless defined $password && length $password; 15 - my $salt = _random_hex(16); 16 - return join(':', 'sha256', $salt, sha256_hex($salt . $password)); 28 + $salt //= random_bytes(16); 29 + my $rounds = $opts{rounds} // 50_000; 30 + my $digest = $salt . $password; 31 + for (1 .. $rounds) { 32 + $digest = sha256($digest . $salt . $password); 33 + } 34 + return { 35 + salt => $salt, 36 + hash => unpack('H*', $digest), 37 + rounds => $rounds, 38 + }; 17 39 } 18 40 19 - sub verify_password ($password, $stored) { 20 - return 0 unless defined $password && defined $stored; 21 - my ($scheme, $salt, $digest) = split(/:/, $stored, 3); 22 - return 0 unless defined $scheme && $scheme eq 'sha256'; 23 - return sha256_hex($salt . $password) eq $digest ? 1 : 0; 41 + sub verify_password ($password, $salt, $expected_hash, %opts) { 42 + my $actual = hash_password($password, $salt, %opts); 43 + return _timing_safe_eq($actual->{hash}, $expected_hash); 24 44 } 25 45 26 - sub _random_hex ($bytes) { 27 - open(my $fh, '<:raw', '/dev/urandom') or die "open(/dev/urandom): $!"; 28 - my $buf = q(); 29 - my $read = read($fh, $buf, $bytes); 30 - CORE::close($fh); 31 - die 'failed to read random bytes' unless defined $read && $read == $bytes; 32 - return unpack('H*', $buf); 46 + sub _timing_safe_eq ($left, $right) { 47 + return 0 unless defined $left && defined $right; 48 + return 0 unless length($left) == length($right); 49 + my $diff = 0; 50 + for my $i (0 .. length($left) - 1) { 51 + $diff |= ord(substr($left, $i, 1)) ^ ord(substr($right, $i, 1)); 52 + } 53 + return $diff == 0 ? 1 : 0; 33 54 } 34 55 35 56 1;
+10 -1
lib/ATProto/PDS/Identity.pm
··· 49 49 sub account_did_doc ($config_or_url, $account) { 50 50 my $config = _coerce_config($config_or_url); 51 51 my $base_url = $config->{base_url} // 'http://127.0.0.1:7755'; 52 - my $did = $account->{did} // account_did($config, $account->{id}); 52 + my $did = $account->{did} // account_did($config, $account->{account_id} // $account->{id}); 53 53 my $handle = $account->{handle}; 54 54 55 55 my %doc = ( ··· 62 62 }], 63 63 ); 64 64 $doc{alsoKnownAs} = ["at://$handle"] if defined $handle && length $handle; 65 + if (my $multibase = $account->{public_key_multibase}) { 66 + $doc{verificationMethod} = [{ 67 + id => "$did#atproto", 68 + type => 'Multikey', 69 + controller => $did, 70 + publicKeyMultibase => $multibase, 71 + }]; 72 + $doc{assertionMethod} = ["$did#atproto"]; 73 + } 65 74 return \%doc; 66 75 } 67 76
+171
lib/ATProto/PDS/Repo/Manager.pm
··· 1 + package ATProto::PDS::Repo::Manager; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use JSON::PP qw(decode_json); 9 + use Crypt::PK::Ed25519; 10 + 11 + use ATProto::PDS::Repo::Bytes; 12 + use ATProto::PDS::Repo::CAR qw(read_car write_car); 13 + use ATProto::PDS::Repo::CID; 14 + use ATProto::PDS::Repo::DagCbor qw(encode_dag_cbor); 15 + use ATProto::PDS::Repo::MST qw(build_mst); 16 + use ATProto::PDS::Util::BaseX qw(encode_base58btc); 17 + use ATProto::PDS::Util::TID qw(next_tid); 18 + 19 + sub new ($class, %args) { 20 + die 'store is required' unless $args{store}; 21 + return bless \%args, $class; 22 + } 23 + 24 + sub store ($self) { 25 + return $self->{store}; 26 + } 27 + 28 + sub generate_signing_key ($self) { 29 + my $pk = Crypt::PK::Ed25519->new; 30 + $pk->generate_key; 31 + my $private = $pk->export_key_raw('private'); 32 + my $public = $pk->export_key_raw('public'); 33 + my $multibase = 'z' . encode_base58btc(pack('C*', 0xed, 0x01) . $public); 34 + return { 35 + private_key => $private, 36 + public_key => $public, 37 + public_key_multibase => $multibase, 38 + }; 39 + } 40 + 41 + sub initialize_repo ($self, $account) { 42 + return $self->apply_writes($account, []); 43 + } 44 + 45 + sub apply_writes ($self, $account, $writes, %opts) { 46 + my $store = $self->store; 47 + my $did = $account->{did}; 48 + my $latest = $store->get_latest_commit($did); 49 + if (defined $opts{swap_commit} && length($opts{swap_commit} // q())) { 50 + my $current = $latest ? $latest->{cid} : undef; 51 + die { 52 + status => 400, 53 + error => 'InvalidSwap', 54 + message => 'swapCommit did not match the current repo head', 55 + } unless defined $current && $current eq $opts{swap_commit}; 56 + } 57 + 58 + my $records = { 59 + map { 60 + my $path = $_->{collection} . '/' . $_->{rkey}; 61 + $path => { 62 + collection => $_->{collection}, 63 + rkey => $_->{rkey}, 64 + cid => $_->{cid}, 65 + value => $_->{value}, 66 + record_bytes => $_->{record_bytes}, 67 + } 68 + } @{ $store->all_records_for_did($did) } 69 + }; 70 + 71 + my @results; 72 + for my $write (@$writes) { 73 + my $action = $write->{action} // ''; 74 + if ($action eq 'delete') { 75 + my $path = $write->{collection} . '/' . $write->{rkey}; 76 + delete $records->{$path}; 77 + push @results, {}; 78 + next; 79 + } 80 + 81 + my $collection = $write->{collection}; 82 + my $rkey = $write->{rkey} // next_tid(); 83 + my $value = $write->{value} // $write->{record}; 84 + my $bytes = encode_dag_cbor($value); 85 + my $cid = ATProto::PDS::Repo::CID->for_dag_cbor($bytes); 86 + my $path = $collection . '/' . $rkey; 87 + $records->{$path} = { 88 + collection => $collection, 89 + rkey => $rkey, 90 + cid => $cid->to_string, 91 + value => $value, 92 + record_bytes => $bytes, 93 + }; 94 + push @results, { 95 + uri => "at://$did/$collection/$rkey", 96 + cid => $cid->to_string, 97 + validationStatus => 'unknown', 98 + }; 99 + } 100 + 101 + my %mst_input = map { 102 + $_ => ATProto::PDS::Repo::CID->from_string($records->{$_}{cid}) 103 + } sort keys %$records; 104 + my $mst = build_mst(\%mst_input); 105 + 106 + my $rev = next_tid($latest ? $latest->{rev} : undef); 107 + my $unsigned = { 108 + did => $did, 109 + version => 3, 110 + data => $mst->{root}, 111 + rev => $rev, 112 + prev => $latest ? ATProto::PDS::Repo::CID->from_string($latest->{cid}) : undef, 113 + }; 114 + my $unsigned_bytes = encode_dag_cbor($unsigned); 115 + my $pk = Crypt::PK::Ed25519->new; 116 + $pk->import_key_raw($account->{private_key}, 'private'); 117 + my $sig = $pk->sign_message($unsigned_bytes); 118 + my $commit = { %$unsigned, sig => ATProto::PDS::Repo::Bytes->new($sig) }; 119 + my $commit_bytes = encode_dag_cbor($commit); 120 + my $commit_cid = ATProto::PDS::Repo::CID->for_dag_cbor($commit_bytes); 121 + 122 + my @blocks = ( 123 + { cid => $commit_cid, bytes => $commit_bytes }, 124 + @{ $mst->{blocks} }, 125 + map { 126 + +{ 127 + cid => ATProto::PDS::Repo::CID->from_string($_->{cid}), 128 + bytes => $_->{record_bytes}, 129 + } 130 + } values %$records, 131 + ); 132 + my $car_bytes = write_car($commit_cid, \@blocks); 133 + 134 + $store->txn(sub ($dbh) { 135 + for my $block (@blocks) { 136 + $store->put_block( 137 + cid => $block->{cid}->to_string, 138 + codec => $block->{cid}->codec, 139 + bytes => $block->{bytes}, 140 + ); 141 + } 142 + $store->replace_records_for_did($did, [ values %$records ]); 143 + $store->put_commit( 144 + did => $did, 145 + rev => $rev, 146 + cid => $commit_cid->to_string, 147 + root_cid => $mst->{root}->to_string, 148 + prev_cid => $latest ? $latest->{cid} : undef, 149 + commit_bytes => $commit_bytes, 150 + car_bytes => $car_bytes, 151 + ); 152 + $store->append_event( 153 + did => $did, 154 + type => 'commit', 155 + rev => $rev, 156 + commit_cid => $commit_cid->to_string, 157 + payload => { writes => $writes, results => \@results }, 158 + car_bytes => $car_bytes, 159 + ); 160 + }); 161 + 162 + return { 163 + cid => $commit_cid->to_string, 164 + rev => $rev, 165 + root_cid => $mst->{root}->to_string, 166 + car_bytes => $car_bytes, 167 + results => \@results, 168 + }; 169 + } 170 + 171 + 1;
+688 -1209
lib/ATProto/PDS/Store/SQLite.pm
··· 6 6 no warnings 'experimental::signatures'; 7 7 8 8 use DBI; 9 - use Digest::SHA qw(sha256 sha256_hex); 9 + use Exporter 'import'; 10 + use File::Basename qw(dirname); 10 11 use File::Path qw(make_path); 11 - use File::Spec; 12 12 use JSON::PP qw(decode_json encode_json); 13 - use Mojo::JSON qw(false true); 14 - 15 - use Crypt::PK::Ed25519; 16 - use Crypt::PRNG qw(random_bytes); 17 13 18 - use ATProto::PDS::Auth::JWT qw(decode_jwt encode_jwt); 19 - use ATProto::PDS::IPLD::Base58 qw(encode_base58btc); 20 - use ATProto::PDS::Identity qw(normalize_handle service_did); 21 - use ATProto::PDS::Repo::Bytes; 22 - use ATProto::PDS::Repo::CAR qw(write_car); 23 - use ATProto::PDS::Repo::CID; 24 - use ATProto::PDS::Repo::DagCbor qw(encode_dag_cbor); 25 - use ATProto::PDS::Repo::MST qw(build_mst); 26 - use ATProto::PDS::Util::TID qw(next_tid); 14 + our @EXPORT_OK = qw(default_migrations); 27 15 28 16 sub new ($class, %args) { 29 - my $self = bless { 30 - config => $args{config} || {}, 17 + die 'path is required' unless defined $args{path} && length $args{path}; 18 + return bless { 19 + path => $args{path}, 20 + dbh => undef, 31 21 }, $class; 32 - 33 - $self->{dbh} = DBI->connect( 34 - 'dbi:SQLite:dbname=' . $self->{config}{db_path}, 35 - '', 36 - '', 37 - { 38 - RaiseError => 1, 39 - PrintError => 0, 40 - AutoCommit => 1, 41 - sqlite_unicode => 1, 42 - sqlite_use_immediate_transaction => 1, 43 - }, 44 - ); 22 + } 45 23 46 - make_path($self->{config}{data_dir}); 47 - make_path($self->_blob_root); 48 - $self->_init_schema; 49 - return $self; 24 + sub path ($self) { 25 + return $self->{path}; 50 26 } 51 27 52 28 sub dbh ($self) { 53 - return $self->{dbh}; 29 + return $self->{dbh} ||= $self->_connect; 54 30 } 55 31 56 - sub _init_schema ($self) { 57 - my $dbh = $self->dbh; 58 - 59 - $dbh->do($_) for ( 60 - q{ 61 - CREATE TABLE IF NOT EXISTS accounts ( 62 - did TEXT PRIMARY KEY, 63 - handle TEXT NOT NULL UNIQUE, 64 - email TEXT, 65 - password_salt TEXT NOT NULL, 66 - password_hash TEXT NOT NULL, 67 - recovery_key TEXT, 68 - signing_private BLOB NOT NULL, 69 - signing_public BLOB NOT NULL, 70 - plc_operation_json TEXT, 71 - email_confirmed INTEGER NOT NULL DEFAULT 0, 72 - active INTEGER NOT NULL DEFAULT 1, 73 - status TEXT, 74 - created_at INTEGER NOT NULL, 75 - updated_at INTEGER NOT NULL 76 - ) 77 - }, 78 - q{ 79 - CREATE TABLE IF NOT EXISTS sessions ( 80 - jti TEXT PRIMARY KEY, 81 - did TEXT NOT NULL, 82 - scope TEXT NOT NULL, 83 - app_password_id TEXT, 84 - created_at INTEGER NOT NULL, 85 - expires_at INTEGER NOT NULL, 86 - revoked_at INTEGER, 87 - FOREIGN KEY(did) REFERENCES accounts(did) 88 - ) 89 - }, 90 - q{ 91 - CREATE TABLE IF NOT EXISTS app_passwords ( 92 - id TEXT PRIMARY KEY, 93 - did TEXT NOT NULL, 94 - name TEXT NOT NULL, 95 - privileged INTEGER NOT NULL DEFAULT 0, 96 - password_salt TEXT NOT NULL, 97 - password_hash TEXT NOT NULL, 98 - created_at INTEGER NOT NULL, 99 - revoked_at INTEGER, 100 - FOREIGN KEY(did) REFERENCES accounts(did) 101 - ) 102 - }, 103 - q{ 104 - CREATE TABLE IF NOT EXISTS invite_codes ( 105 - code TEXT PRIMARY KEY, 106 - created_by TEXT, 107 - disabled INTEGER NOT NULL DEFAULT 0, 108 - created_at INTEGER NOT NULL 109 - ) 110 - }, 111 - q{ 112 - CREATE TABLE IF NOT EXISTS pending_codes ( 113 - id TEXT PRIMARY KEY, 114 - did TEXT NOT NULL, 115 - purpose TEXT NOT NULL, 116 - target TEXT, 117 - code TEXT NOT NULL, 118 - created_at INTEGER NOT NULL, 119 - used_at INTEGER 120 - ) 121 - }, 122 - q{ 123 - CREATE TABLE IF NOT EXISTS blobs ( 124 - cid TEXT PRIMARY KEY, 125 - did TEXT NOT NULL, 126 - mime_type TEXT NOT NULL, 127 - size INTEGER NOT NULL, 128 - sha256 TEXT NOT NULL, 129 - path TEXT NOT NULL, 130 - created_at INTEGER NOT NULL, 131 - FOREIGN KEY(did) REFERENCES accounts(did) 132 - ) 133 - }, 134 - q{ 135 - CREATE TABLE IF NOT EXISTS records ( 136 - did TEXT NOT NULL, 137 - collection TEXT NOT NULL, 138 - rkey TEXT NOT NULL, 139 - uri TEXT NOT NULL, 140 - cid TEXT NOT NULL, 141 - record_json TEXT NOT NULL, 142 - created_at INTEGER NOT NULL, 143 - updated_at INTEGER NOT NULL, 144 - PRIMARY KEY(did, collection, rkey), 145 - FOREIGN KEY(did) REFERENCES accounts(did) 146 - ) 147 - }, 148 - q{ 149 - CREATE TABLE IF NOT EXISTS blocks ( 150 - cid TEXT PRIMARY KEY, 151 - codec INTEGER NOT NULL, 152 - bytes BLOB NOT NULL, 153 - created_at INTEGER NOT NULL 154 - ) 155 - }, 156 - q{ 157 - CREATE TABLE IF NOT EXISTS repo_roots ( 158 - did TEXT PRIMARY KEY, 159 - commit_cid TEXT NOT NULL, 160 - data_cid TEXT NOT NULL, 161 - rev TEXT NOT NULL, 162 - prev_commit_cid TEXT, 163 - updated_at INTEGER NOT NULL, 164 - FOREIGN KEY(did) REFERENCES accounts(did) 165 - ) 166 - }, 167 - q{ 168 - CREATE TABLE IF NOT EXISTS repo_commits ( 169 - cid TEXT PRIMARY KEY, 170 - did TEXT NOT NULL, 171 - rev TEXT NOT NULL, 172 - prev_commit_cid TEXT, 173 - data_cid TEXT NOT NULL, 174 - created_at INTEGER NOT NULL, 175 - FOREIGN KEY(did) REFERENCES accounts(did) 176 - ) 177 - }, 178 - q{ 179 - CREATE TABLE IF NOT EXISTS commit_blocks ( 180 - commit_cid TEXT NOT NULL, 181 - cid TEXT NOT NULL, 182 - ord INTEGER NOT NULL, 183 - PRIMARY KEY(commit_cid, cid) 184 - ) 185 - }, 186 - q{ 187 - CREATE TABLE IF NOT EXISTS moderation_reports ( 188 - id TEXT PRIMARY KEY, 189 - did TEXT NOT NULL, 190 - reason_type TEXT, 191 - reason TEXT, 192 - subject_json TEXT NOT NULL, 193 - created_at INTEGER NOT NULL 194 - ) 195 - } 196 - ); 32 + sub bootstrap ($self) { 33 + $self->migrate; 34 + return $self; 197 35 } 198 36 199 - sub create_account ($self, $input) { 200 - my $config = $self->{config}; 201 - my $did = $input->{did} || $self->_new_plc_did; 202 - my $handle = normalize_handle($input->{handle}, $config->{service_handle_domain} // 'localhost') 203 - or die { 204 - status => 400, 205 - error => 'InvalidHandle', 206 - message => 'Handle is not valid for this service', 207 - }; 208 - my $password = $input->{password} // _token_string(18); 209 - die { 210 - status => 400, 211 - error => 'InvalidPassword', 212 - message => 'Password must be at least 8 characters', 213 - } if length($password) < 8; 37 + sub migrate ($self) { 38 + my $dbh = $self->dbh; 39 + $dbh->do(q{ 40 + CREATE TABLE IF NOT EXISTS schema_migrations ( 41 + version INTEGER PRIMARY KEY, 42 + applied_at INTEGER NOT NULL 43 + ) 44 + }); 214 45 215 - my $salt = _token_string(16); 216 - my $now = time; 217 - my $key = Crypt::PK::Ed25519->new->generate_key; 218 - my $priv = $key->export_key_raw('private'); 219 - my $pub = $key->export_key_raw('public'); 46 + my %applied = map { $_->{version} => 1 } @{ $dbh->selectall_arrayref( 47 + q{SELECT version FROM schema_migrations ORDER BY version}, 48 + { Slice => {} }, 49 + ) }; 220 50 221 - $self->_txn(sub { 222 - my $existing = $self->account_by_handle($handle); 223 - die { 224 - status => 400, 225 - error => 'HandleNotAvailable', 226 - message => "Handle already exists: $handle", 227 - } if $existing; 51 + for my $migration (default_migrations()) { 52 + next if $applied{ $migration->{version} }; 53 + $self->txn(sub ($txn) { 54 + $txn->do($_) for @{ $migration->{statements} }; 55 + $txn->do( 56 + q{INSERT INTO schema_migrations(version, applied_at) VALUES (?, ?)}, 57 + undef, 58 + $migration->{version}, 59 + time, 60 + ); 61 + }); 62 + } 228 63 229 - $self->dbh->do( 230 - q{ 231 - INSERT INTO accounts ( 232 - did, handle, email, password_salt, password_hash, recovery_key, 233 - signing_private, signing_public, plc_operation_json, 234 - email_confirmed, active, status, created_at, updated_at 235 - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 236 - }, 237 - undef, 238 - $did, 239 - $handle, 240 - $input->{email}, 241 - $salt, 242 - $self->_hash_password($password, $salt), 243 - $input->{recoveryKey}, 244 - $priv, 245 - $pub, 246 - encode_json($self->_plc_operation_for($did, $handle, $pub)), 247 - $input->{email} ? 0 : 1, 248 - 1, 249 - undef, 250 - $now, 251 - $now, 252 - ); 253 - 254 - $self->_rebuild_repo($did); 255 - }); 64 + return 1; 65 + } 256 66 257 - my $pair = $self->issue_session_pair($did); 258 - my $account = $self->account_by_did($did); 67 + sub close ($self) { 68 + return unless $self->{dbh}; 69 + $self->{dbh}->disconnect; 70 + $self->{dbh} = undef; 71 + } 259 72 260 - return { 261 - accessJwt => $pair->{accessJwt}, 262 - refreshJwt => $pair->{refreshJwt}, 263 - handle => $account->{handle}, 264 - did => $account->{did}, 265 - didDoc => $self->did_doc($account), 73 + sub txn ($self, $code) { 74 + my $dbh = $self->dbh; 75 + $dbh->begin_work; 76 + my $wantarray = wantarray; 77 + my @result; 78 + my $ok = eval { 79 + if (!defined $wantarray) { 80 + $code->($dbh); 81 + } elsif ($wantarray) { 82 + @result = $code->($dbh); 83 + } else { 84 + $result[0] = $code->($dbh); 85 + } 86 + 1; 266 87 }; 88 + if (!$ok) { 89 + my $err = $@ || 'transaction failed'; 90 + eval { $dbh->rollback }; 91 + die $err; 92 + } 93 + $dbh->commit; 94 + return if !defined $wantarray; 95 + return $wantarray ? @result : $result[0]; 267 96 } 268 97 269 - sub account_by_did ($self, $did) { 270 - return $self->dbh->selectrow_hashref( 271 - q{SELECT * FROM accounts WHERE did = ?}, 98 + sub create_account ($self, %args) { 99 + my $did = $args{did} // die 'did is required'; 100 + my $account_id = $args{account_id} // $args{id} // _random_id(); 101 + my $handle = $args{handle} // die 'handle is required'; 102 + my $now = $args{created_at} // time; 103 + 104 + $self->dbh->do( 105 + q{ 106 + INSERT INTO accounts ( 107 + id, account_id, did, handle, email, password_hash, password_salt, 108 + created_at, updated_at, deactivated_at, deleted_at, 109 + did_doc_json, private_key, public_key, public_key_multibase, 110 + repo_commit_cid, repo_root_cid, repo_rev 111 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 112 + }, 272 113 undef, 114 + $account_id, 115 + $account_id, 273 116 $did, 274 - ); 275 - } 276 - 277 - sub account_by_handle ($self, $handle) { 278 - return $self->dbh->selectrow_hashref( 279 - q{SELECT * FROM accounts WHERE lower(handle) = lower(?)}, 280 - undef, 281 117 $handle, 118 + $args{email}, 119 + $args{password_hash}, 120 + $args{password_salt}, 121 + $now, 122 + $now, 123 + $args{deactivated_at}, 124 + $args{deleted_at}, 125 + _maybe_json($args{did_doc}), 126 + $args{private_key}, 127 + $args{public_key}, 128 + $args{public_key_multibase}, 129 + $args{repo_commit_cid}, 130 + $args{repo_root_cid}, 131 + $args{repo_rev}, 282 132 ); 283 - } 284 133 285 - sub account_by_identifier ($self, $identifier) { 286 - return $identifier =~ /^did:/ 287 - ? $self->account_by_did($identifier) 288 - : $self->account_by_handle($identifier) 289 - || $self->dbh->selectrow_hashref(q{SELECT * FROM accounts WHERE lower(email) = lower(?)}, undef, $identifier); 134 + return $self->get_account_by_did($did); 290 135 } 291 136 292 - sub create_session ($self, $identifier, $password) { 293 - my $account = $self->account_by_identifier($identifier) 294 - or die { 295 - status => 401, 296 - error => 'AuthenticationRequired', 297 - message => 'Invalid identifier or password', 298 - }; 299 - 300 - my $ok = $self->_verify_password($password, $account->{password_salt}, $account->{password_hash}); 301 - my $app_password_id; 302 - if (!$ok) { 303 - my $app_password = $self->_find_app_password($account->{did}, $password); 304 - $ok = !!$app_password; 305 - $app_password_id = $app_password->{id} if $app_password; 137 + sub update_account ($self, $did, %changes) { 138 + my %allowed = map { $_ => 1 } qw( 139 + handle email password_hash password_salt updated_at deactivated_at deleted_at 140 + did_doc private_key public_key public_key_multibase 141 + repo_commit_cid repo_root_cid repo_rev 142 + ); 143 + my (@sets, @bind); 144 + for my $key (sort keys %changes) { 145 + next unless $allowed{$key}; 146 + my $column = $key eq 'did_doc' ? 'did_doc_json' : $key; 147 + push @sets, "$column = ?"; 148 + push @bind, $key eq 'did_doc' ? _maybe_json($changes{$key}) : $changes{$key}; 306 149 } 150 + return $self->get_account_by_did($did) unless @sets; 307 151 308 - die { 309 - status => 401, 310 - error => 'AuthenticationRequired', 311 - message => 'Invalid identifier or password', 312 - } unless $ok; 313 - 314 - my $pair = $self->issue_session_pair($account->{did}, $app_password_id); 315 - return { 316 - %$pair, 317 - handle => $account->{handle}, 318 - did => $account->{did}, 319 - didDoc => $self->did_doc($account), 320 - email => $account->{email}, 321 - emailConfirmed => $account->{email_confirmed} ? true : false, 322 - emailAuthFactor => false, 323 - active => $account->{active} ? true : false, 324 - ($account->{status} ? (status => $account->{status}) : ()), 325 - }; 326 - } 327 - 328 - sub issue_session_pair ($self, $did, $app_password_id = undef) { 329 - my $now = time; 330 - my $access_exp = $now + 3600; 331 - my $refresh_exp = $now + 60 * 60 * 24 * 14; 332 - my $aud = service_did($self->{config}); 333 - 334 - my $access_jti = _token_string(24); 335 - my $refresh_jti = _token_string(24); 336 - 337 - $self->dbh->do( 338 - q{INSERT INTO sessions (jti, did, scope, app_password_id, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?)}, 339 - undef, 340 - $access_jti, $did, 'access', $app_password_id, $now, $access_exp, 341 - ); 152 + push @sets, 'updated_at = ?'; 153 + push @bind, ($changes{updated_at} // time), $did; 342 154 $self->dbh->do( 343 - q{INSERT INTO sessions (jti, did, scope, app_password_id, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?)}, 155 + 'UPDATE accounts SET ' . join(', ', @sets) . ' WHERE did = ?', 344 156 undef, 345 - $refresh_jti, $did, 'refresh', $app_password_id, $now, $refresh_exp, 157 + @bind, 346 158 ); 347 - 348 - my $access = encode_jwt({ 349 - iss => $aud, 350 - aud => $aud, 351 - sub => $did, 352 - scope => 'access', 353 - jti => $access_jti, 354 - iat => $now, 355 - exp => $access_exp, 356 - }, $self->{config}{jwt_secret}); 357 - 358 - my $refresh = encode_jwt({ 359 - iss => $aud, 360 - aud => $aud, 361 - sub => $did, 362 - scope => 'refresh', 363 - jti => $refresh_jti, 364 - iat => $now, 365 - exp => $refresh_exp, 366 - }, $self->{config}{jwt_secret}); 367 - 368 - return { 369 - accessJwt => $access, 370 - refreshJwt => $refresh, 371 - }; 159 + return $self->get_account_by_did($did); 372 160 } 373 161 374 - sub auth_from_bearer ($self, $token, $expected_scope = undef) { 375 - my $decoded = eval { 376 - decode_jwt($token, $self->{config}{jwt_secret}, audience => service_did($self->{config})); 377 - }; 378 - die { 379 - status => 401, 380 - error => 'InvalidToken', 381 - message => 'Invalid bearer token', 382 - } if $@; 383 - 384 - my $claims = $decoded->{claims}; 385 - my $session = $self->dbh->selectrow_hashref( 386 - q{SELECT * FROM sessions WHERE jti = ? AND revoked_at IS NULL}, 162 + sub get_account_by_did ($self, $did) { 163 + return $self->_row_to_account($self->dbh->selectrow_hashref( 164 + q{SELECT * FROM accounts WHERE did = ?}, 387 165 undef, 388 - $claims->{jti}, 389 - ) or die { 390 - status => 401, 391 - error => 'InvalidToken', 392 - message => 'Session is not active', 393 - }; 394 - 395 - if ($expected_scope && $session->{scope} ne $expected_scope) { 396 - die { 397 - status => 401, 398 - error => 'InvalidToken', 399 - message => 'Token scope mismatch', 400 - }; 401 - } 402 - 403 - if ($session->{expires_at} <= time) { 404 - die { 405 - status => 401, 406 - error => 'ExpiredToken', 407 - message => 'Token has expired', 408 - }; 409 - } 410 - 411 - my $account = $self->account_by_did($session->{did}) 412 - or die { 413 - status => 401, 414 - error => 'InvalidToken', 415 - message => 'Account not found for token', 416 - }; 417 - 418 - return { 419 - claims => $claims, 420 - session => $session, 421 - account => $account, 422 - }; 166 + $did, 167 + )); 423 168 } 424 169 425 - sub refresh_session ($self, $refresh_token) { 426 - my $auth = $self->auth_from_bearer($refresh_token, 'refresh'); 427 - my $account = $auth->{account}; 428 - my $pair = $self->issue_session_pair($account->{did}, $auth->{session}{app_password_id}); 429 - 430 - $self->revoke_session_jti($auth->{session}{jti}); 431 - 432 - return { 433 - %$pair, 434 - handle => $account->{handle}, 435 - did => $account->{did}, 436 - didDoc => $self->did_doc($account), 437 - email => $account->{email}, 438 - emailConfirmed => $account->{email_confirmed} ? true : false, 439 - emailAuthFactor => false, 440 - active => $account->{active} ? true : false, 441 - ($account->{status} ? (status => $account->{status}) : ()), 442 - }; 170 + sub get_account_by_id ($self, $account_id) { 171 + return $self->_row_to_account($self->dbh->selectrow_hashref( 172 + q{SELECT * FROM accounts WHERE account_id = ? OR id = ?}, 173 + undef, 174 + $account_id, 175 + $account_id, 176 + )); 443 177 } 444 178 445 - sub revoke_session_jti ($self, $jti) { 446 - $self->dbh->do(q{UPDATE sessions SET revoked_at = ? WHERE jti = ?}, undef, time, $jti); 179 + sub get_account_by_handle ($self, $handle) { 180 + return $self->_row_to_account($self->dbh->selectrow_hashref( 181 + q{SELECT * FROM accounts WHERE handle = ?}, 182 + undef, 183 + $handle, 184 + )); 447 185 } 448 186 449 - sub get_session_view ($self, $access_token) { 450 - my $auth = $self->auth_from_bearer($access_token, 'access'); 451 - my $account = $auth->{account}; 452 - return { 453 - handle => $account->{handle}, 454 - did => $account->{did}, 455 - didDoc => $self->did_doc($account), 456 - email => $account->{email}, 457 - emailConfirmed => $account->{email_confirmed} ? true : false, 458 - emailAuthFactor => false, 459 - active => $account->{active} ? true : false, 460 - ($account->{status} ? (status => $account->{status}) : ()), 461 - }; 187 + sub get_account_by_email ($self, $email) { 188 + return $self->_row_to_account($self->dbh->selectrow_hashref( 189 + q{SELECT * FROM accounts WHERE email = ?}, 190 + undef, 191 + $email, 192 + )); 462 193 } 463 194 464 - sub check_account_status ($self, $access_token) { 465 - my $auth = $self->auth_from_bearer($access_token, 'access'); 466 - my $repo = $self->current_repo($auth->{account}{did}); 467 - return { 468 - activated => $auth->{account}{active} ? true : false, 469 - validDid => true, 470 - repo => $repo ? true : false, 471 - repoRev => $repo ? $repo->{rev} : undef, 472 - repoCommit => $repo ? $repo->{commit_cid} : undef, 473 - }; 195 + sub get_account_by_identifier ($self, $identifier) { 196 + return $self->get_account_by_did($identifier) if defined $identifier && $identifier =~ /^did:/; 197 + return $self->get_account_by_handle($identifier); 474 198 } 475 199 476 - sub did_doc ($self, $account_or_did) { 477 - my $account = ref($account_or_did) eq 'HASH' 478 - ? $account_or_did 479 - : $self->account_by_did($account_or_did); 480 - return undef unless $account; 481 - 482 - my $did = $account->{did}; 483 - my $multikey = 'z' . encode_base58btc("\xed\x01" . $account->{signing_public}); 484 - 485 - return { 486 - '@context' => [ 487 - 'https://www.w3.org/ns/did/v1', 488 - 'https://w3id.org/security/multikey/v1', 489 - ], 490 - id => $did, 491 - alsoKnownAs => [ 'at://' . $account->{handle} ], 492 - verificationMethod => [{ 493 - id => "$did#atproto", 494 - type => 'Multikey', 495 - controller => $did, 496 - publicKeyMultibase => $multikey, 497 - }], 498 - service => [{ 499 - id => '#atproto_pds', 500 - type => 'AtprotoPersonalDataServer', 501 - serviceEndpoint => $self->{config}{base_url}, 502 - }], 503 - }; 200 + sub list_accounts ($self) { 201 + return [ 202 + map { $self->_row_to_account($_) } 203 + @{ $self->dbh->selectall_arrayref( 204 + q{SELECT * FROM accounts ORDER BY created_at, did}, 205 + { Slice => {} }, 206 + ) } 207 + ]; 504 208 } 505 209 506 - sub resolve_handle ($self, $handle) { 507 - my $account = $self->account_by_handle($handle) 508 - or die { 509 - status => 404, 510 - error => 'HandleNotFound', 511 - message => "No DID found for handle $handle", 512 - }; 513 - return { did => $account->{did} }; 514 - } 210 + sub create_session ($self, %args) { 211 + my $did = $args{did} // die 'did is required'; 212 + my $id = $args{id} // _random_id(); 213 + my $now = $args{created_at} // time; 515 214 516 - sub resolve_did ($self, $did) { 517 - if ($did eq service_did($self->{config})) { 518 - return { 519 - didDoc => { 520 - '@context' => ['https://www.w3.org/ns/did/v1'], 521 - id => $did, 522 - service => [{ 523 - id => "$did#atproto_pds", 524 - type => 'AtprotoPersonalDataServer', 525 - serviceEndpoint => $self->{config}{base_url}, 526 - }], 527 - }, 528 - }; 529 - } 215 + $self->dbh->do( 216 + q{ 217 + INSERT INTO sessions ( 218 + id, did, token, kind, scope, created_at, expires_at, 219 + revoked_at, ip, user_agent 220 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 221 + }, 222 + undef, 223 + $id, 224 + $did, 225 + $args{token}, 226 + $args{kind} // 'refresh', 227 + $args{scope} // 'atproto', 228 + $now, 229 + $args{expires_at}, 230 + $args{revoked_at}, 231 + $args{ip}, 232 + $args{user_agent}, 233 + ); 530 234 531 - my $account = $self->account_by_did($did) 532 - or die { 533 - status => 404, 534 - error => 'DidNotFound', 535 - message => "No DID document found for $did", 536 - }; 537 - return { didDoc => $self->did_doc($account) }; 235 + return $self->get_session($id); 538 236 } 539 237 540 - sub resolve_identity ($self, $identifier) { 541 - my $account = $self->account_by_identifier($identifier) 542 - or die { 543 - status => 404, 544 - error => ($identifier =~ /^did:/ ? 'DidNotFound' : 'HandleNotFound'), 545 - message => "No identity found for $identifier", 546 - }; 547 - 548 - return { 549 - did => $account->{did}, 550 - handle => $account->{handle}, 551 - didDoc => $self->did_doc($account), 552 - }; 238 + sub get_session ($self, $id) { 239 + return $self->dbh->selectrow_hashref( 240 + q{SELECT * FROM sessions WHERE id = ?}, 241 + undef, 242 + $id, 243 + ); 553 244 } 554 245 555 - sub get_recommended_did_credentials ($self, $access_token) { 556 - my $auth = $self->auth_from_bearer($access_token, 'access'); 557 - my $doc = $self->did_doc($auth->{account}); 558 - return { 559 - rotationKeys => $auth->{account}{recovery_key} ? [ $auth->{account}{recovery_key} ] : undef, 560 - alsoKnownAs => $doc->{alsoKnownAs}, 561 - verificationMethods => $doc->{verificationMethod}, 562 - services => $doc->{service}, 563 - }; 246 + sub revoke_session ($self, $id, %args) { 247 + $self->dbh->do( 248 + q{UPDATE sessions SET revoked_at = ? WHERE id = ?}, 249 + undef, 250 + $args{revoked_at} // time, 251 + $id, 252 + ); 253 + return $self->get_session($id); 564 254 } 565 255 566 - sub update_handle ($self, $access_token, $handle) { 567 - my $auth = $self->auth_from_bearer($access_token, 'access'); 568 - my $did = $auth->{account}{did}; 569 - my $target = normalize_handle($handle, $self->{config}{service_handle_domain} // 'localhost') 570 - or die { 571 - status => 400, 572 - error => 'InvalidHandle', 573 - message => 'Handle is not valid for this service', 574 - }; 575 - 576 - my $existing = $self->account_by_handle($target); 577 - if ($existing && $existing->{did} ne $did) { 578 - die { 579 - status => 400, 580 - error => 'HandleNotAvailable', 581 - message => "Handle already exists: $target", 582 - }; 583 - } 584 - 256 + sub revoke_sessions_by_did ($self, $did, %args) { 585 257 $self->dbh->do( 586 - q{UPDATE accounts SET handle = ?, updated_at = ?, plc_operation_json = ? WHERE did = ?}, 258 + q{UPDATE sessions SET revoked_at = ? WHERE did = ? AND revoked_at IS NULL}, 587 259 undef, 588 - $target, 589 - time, 590 - encode_json($self->_plc_operation_for($did, $target, $auth->{account}{signing_public})), 260 + $args{revoked_at} // time, 591 261 $did, 592 262 ); 263 + return $self->list_sessions_by_did($did); 264 + } 593 265 594 - return {}; 266 + sub list_sessions_by_did ($self, $did) { 267 + return $self->dbh->selectall_arrayref( 268 + q{SELECT * FROM sessions WHERE did = ? ORDER BY created_at DESC, id DESC}, 269 + { Slice => {} }, 270 + $did, 271 + ); 595 272 } 596 273 597 - sub create_app_password ($self, $access_token, $name, $privileged = 0) { 598 - my $auth = $self->auth_from_bearer($access_token, 'access'); 599 - my $password = join('-', map { substr(_token_string(5), 0, 4) } 1 .. 4); 600 - my $salt = _token_string(16); 601 - my $id = _token_string(24); 602 - my $now = time; 274 + sub create_app_password ($self, %args) { 275 + my $did = $args{did} // die 'did is required'; 276 + my $id = $args{id} // _random_id(); 277 + my $now = $args{created_at} // time; 603 278 604 279 $self->dbh->do( 605 - q{INSERT INTO app_passwords (id, did, name, privileged, password_salt, password_hash, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)}, 280 + q{ 281 + INSERT INTO app_passwords ( 282 + id, did, name, password_hash, created_at, revoked_at 283 + ) VALUES (?, ?, ?, ?, ?, ?) 284 + }, 606 285 undef, 607 - $id, $auth->{account}{did}, $name, $privileged ? 1 : 0, $salt, $self->_hash_password($password, $salt), $now, 286 + $id, 287 + $did, 288 + $args{name} // 'app-password', 289 + $args{password_hash}, 290 + $now, 291 + $args{revoked_at}, 608 292 ); 609 293 610 - return { 611 - name => $name, 612 - password => $password, 613 - createdAt => _iso8601($now), 614 - privileged => $privileged ? true : false, 615 - }; 294 + return $self->get_app_password($id); 616 295 } 617 296 618 - sub list_app_passwords ($self, $access_token) { 619 - my $auth = $self->auth_from_bearer($access_token, 'access'); 620 - my $rows = $self->dbh->selectall_arrayref( 621 - q{SELECT name, privileged, created_at FROM app_passwords WHERE did = ? AND revoked_at IS NULL ORDER BY created_at DESC}, 622 - { Slice => {} }, 623 - $auth->{account}{did}, 297 + sub get_app_password ($self, $id) { 298 + return $self->dbh->selectrow_hashref( 299 + q{SELECT * FROM app_passwords WHERE id = ?}, 300 + undef, 301 + $id, 624 302 ); 625 - 626 - return { 627 - passwords => [ 628 - map +{ 629 - name => $_->{name}, 630 - createdAt => _iso8601($_->{created_at}), 631 - privileged => $_->{privileged} ? true : false, 632 - }, @$rows 633 - ], 634 - }; 635 303 } 636 304 637 - sub revoke_app_password ($self, $access_token, $name) { 638 - my $auth = $self->auth_from_bearer($access_token, 'access'); 639 - $self->dbh->do( 640 - q{UPDATE app_passwords SET revoked_at = ? WHERE did = ? AND name = ? AND revoked_at IS NULL}, 305 + sub get_app_password_by_name ($self, $did, $name) { 306 + return $self->dbh->selectrow_hashref( 307 + q{SELECT * FROM app_passwords WHERE did = ? AND name = ? ORDER BY created_at DESC LIMIT 1}, 641 308 undef, 642 - time, 643 - $auth->{account}{did}, 309 + $did, 644 310 $name, 645 311 ); 646 - return {}; 647 312 } 648 313 649 - sub create_invite_codes ($self, $count = 1, $created_by = undef) { 650 - my @codes; 651 - my $now = time; 652 - for (1 .. $count) { 653 - my $code = join('-', map { substr(_token_string(6), 0, 5) } 1 .. 3); 654 - push @codes, $code; 655 - $self->dbh->do( 656 - q{INSERT OR IGNORE INTO invite_codes (code, created_by, created_at) VALUES (?, ?, ?)}, 657 - undef, 658 - $code, $created_by, $now, 659 - ); 660 - } 661 - return \@codes; 314 + sub revoke_app_password ($self, $id, %args) { 315 + $self->dbh->do( 316 + q{UPDATE app_passwords SET revoked_at = ? WHERE id = ?}, 317 + undef, 318 + $args{revoked_at} // time, 319 + $id, 320 + ); 321 + return $self->get_app_password($id); 662 322 } 663 323 664 - sub list_invite_codes ($self) { 324 + sub list_app_passwords_by_did ($self, $did) { 665 325 return $self->dbh->selectall_arrayref( 666 - q{SELECT code, created_by, disabled, created_at FROM invite_codes ORDER BY created_at DESC}, 326 + q{SELECT * FROM app_passwords WHERE did = ? ORDER BY created_at DESC, id DESC}, 667 327 { Slice => {} }, 328 + $did, 668 329 ); 669 330 } 670 331 671 - sub create_record ($self, $access_token, $payload) { 672 - my $auth = $self->auth_from_bearer($access_token, 'access'); 673 - $self->_assert_repo_owner($auth->{account}, $payload->{repo}); 674 - 675 - my $rkey = $payload->{rkey} || next_tid(); 676 - my $now = time; 677 - my $uri = "at://$auth->{account}{did}/$payload->{collection}/$rkey"; 678 - my $cid = $self->_cid_for_record($payload->{record}); 679 - 680 - $self->_assert_swap_commit($auth->{account}{did}, $payload->{swapCommit}); 681 - 332 + sub put_blob ($self, %args) { 333 + my $cid = $args{cid} // die 'cid is required'; 334 + my $now = $args{created_at} // time; 682 335 $self->dbh->do( 683 336 q{ 684 - INSERT INTO records (did, collection, rkey, uri, cid, record_json, created_at, updated_at) 685 - VALUES (?, ?, ?, ?, ?, ?, ?, ?) 337 + INSERT INTO blobs ( 338 + cid, did, mime_type, byte_size, storage_path, temporary, 339 + created_at, referenced_at, quarantined_at 340 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 341 + ON CONFLICT(cid) DO UPDATE SET 342 + did = excluded.did, 343 + mime_type = excluded.mime_type, 344 + byte_size = excluded.byte_size, 345 + storage_path = excluded.storage_path, 346 + temporary = excluded.temporary, 347 + referenced_at = COALESCE(excluded.referenced_at, blobs.referenced_at), 348 + quarantined_at = excluded.quarantined_at 686 349 }, 687 350 undef, 688 - $auth->{account}{did}, 689 - $payload->{collection}, 690 - $rkey, 691 - $uri, 692 351 $cid, 693 - encode_json($payload->{record}), 694 - $now, 352 + $args{did}, 353 + $args{mime_type}, 354 + $args{byte_size}, 355 + $args{storage_path}, 356 + $args{temporary} ? 1 : 0, 695 357 $now, 358 + $args{referenced_at}, 359 + $args{quarantined_at}, 696 360 ); 361 + return $self->get_blob($cid); 362 + } 697 363 698 - my $repo = $self->_rebuild_repo($auth->{account}{did}); 699 - return { 700 - uri => $uri, 701 - cid => $cid, 702 - commit => { 703 - cid => $repo->{commit_cid}, 704 - rev => $repo->{rev}, 705 - }, 706 - validationStatus => 'unknown', 707 - }; 364 + sub get_blob ($self, $cid) { 365 + return $self->dbh->selectrow_hashref( 366 + q{SELECT * FROM blobs WHERE cid = ?}, 367 + undef, 368 + $cid, 369 + ); 708 370 } 709 371 710 - sub put_record ($self, $access_token, $payload) { 711 - my $auth = $self->auth_from_bearer($access_token, 'access'); 712 - $self->_assert_repo_owner($auth->{account}, $payload->{repo}); 713 - $self->_assert_swap_commit($auth->{account}{did}, $payload->{swapCommit}); 714 - 715 - my $existing = $self->get_record($auth->{account}{did}, $payload->{collection}, $payload->{rkey}, 1); 716 - if (defined $payload->{swapRecord} && $existing && ($existing->{cid} // '') ne $payload->{swapRecord}) { 717 - die { 718 - status => 400, 719 - error => 'InvalidSwap', 720 - message => 'swapRecord did not match current record CID', 721 - }; 372 + sub list_blobs_by_did ($self, $did, %args) { 373 + my $limit = $args{limit} // 500; 374 + my $cursor = $args{cursor}; 375 + my @bind = ($did); 376 + my $sql = q{SELECT * FROM blobs WHERE did = ?}; 377 + if (defined $cursor && length $cursor) { 378 + $sql .= q{ AND cid > ?}; 379 + push @bind, $cursor; 722 380 } 381 + $sql .= q{ ORDER BY cid LIMIT ?}; 382 + push @bind, $limit + 1; 383 + my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, @bind); 384 + return _paginate($rows, $limit, 'cid'); 385 + } 723 386 724 - my $cid = $self->_cid_for_record($payload->{record}); 725 - my $uri = "at://$auth->{account}{did}/$payload->{collection}/$payload->{rkey}"; 726 - my $now = time; 387 + sub put_record ($self, %args) { 388 + my $did = $args{did} // die 'did is required'; 389 + my $collection = $args{collection} // die 'collection is required'; 390 + my $rkey = $args{rkey} // die 'rkey is required'; 391 + my $cid = $args{cid} // die 'cid is required'; 392 + my $now = $args{updated_at} // time; 727 393 728 394 $self->dbh->do( 729 395 q{ 730 - INSERT INTO records (did, collection, rkey, uri, cid, record_json, created_at, updated_at) 731 - VALUES (?, ?, ?, ?, ?, ?, ?, ?) 732 - ON CONFLICT(did, collection, rkey) 733 - DO UPDATE SET cid = excluded.cid, record_json = excluded.record_json, updated_at = excluded.updated_at 396 + INSERT INTO records ( 397 + did, collection, rkey, cid, value_json, record_bytes, created_at, updated_at 398 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 399 + ON CONFLICT(did, collection, rkey) DO UPDATE SET 400 + cid = excluded.cid, 401 + value_json = excluded.value_json, 402 + record_bytes = excluded.record_bytes, 403 + updated_at = excluded.updated_at 734 404 }, 735 405 undef, 736 - $auth->{account}{did}, 737 - $payload->{collection}, 738 - $payload->{rkey}, 739 - $uri, 406 + $did, 407 + $collection, 408 + $rkey, 740 409 $cid, 741 - encode_json($payload->{record}), 742 - $now, 410 + encode_json($args{value}), 411 + $args{record_bytes}, 412 + $args{created_at} // $now, 743 413 $now, 744 414 ); 745 415 746 - my $repo = $self->_rebuild_repo($auth->{account}{did}); 747 - return { 748 - uri => $uri, 749 - cid => $cid, 750 - commit => { 751 - cid => $repo->{commit_cid}, 752 - rev => $repo->{rev}, 753 - }, 754 - validationStatus => 'unknown', 755 - }; 416 + return $self->get_record($did, $collection, $rkey); 756 417 } 757 418 758 - sub delete_record ($self, $access_token, $payload) { 759 - my $auth = $self->auth_from_bearer($access_token, 'access'); 760 - $self->_assert_repo_owner($auth->{account}, $payload->{repo}); 761 - $self->_assert_swap_commit($auth->{account}{did}, $payload->{swapCommit}); 762 - 763 - if (defined $payload->{swapRecord}) { 764 - my $existing = $self->get_record($auth->{account}{did}, $payload->{collection}, $payload->{rkey}, 1); 765 - if ($existing && ($existing->{cid} // '') ne $payload->{swapRecord}) { 766 - die { 767 - status => 400, 768 - error => 'InvalidSwap', 769 - message => 'swapRecord did not match current record CID', 770 - }; 771 - } 419 + sub replace_records_for_did ($self, $did, $records) { 420 + my $dbh = $self->dbh; 421 + $dbh->do(q{DELETE FROM records WHERE did = ?}, undef, $did); 422 + for my $record (@$records) { 423 + $dbh->do( 424 + q{ 425 + INSERT INTO records ( 426 + did, collection, rkey, cid, value_json, record_bytes, created_at, updated_at 427 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 428 + }, 429 + undef, 430 + $did, 431 + $record->{collection}, 432 + $record->{rkey}, 433 + $record->{cid}, 434 + encode_json($record->{value}), 435 + $record->{record_bytes}, 436 + $record->{created_at} // time, 437 + $record->{updated_at} // time, 438 + ); 772 439 } 440 + return 1; 441 + } 773 442 443 + sub delete_record ($self, $did, $collection, $rkey) { 774 444 $self->dbh->do( 775 445 q{DELETE FROM records WHERE did = ? AND collection = ? AND rkey = ?}, 776 446 undef, 777 - $auth->{account}{did}, 778 - $payload->{collection}, 779 - $payload->{rkey}, 447 + $did, 448 + $collection, 449 + $rkey, 780 450 ); 781 - 782 - my $repo = $self->_rebuild_repo($auth->{account}{did}); 783 - return { 784 - commit => { 785 - cid => $repo->{commit_cid}, 786 - rev => $repo->{rev}, 787 - }, 788 - }; 451 + return 1; 789 452 } 790 453 791 - sub apply_writes ($self, $access_token, $payload) { 792 - my $auth = $self->auth_from_bearer($access_token, 'access'); 793 - $self->_assert_repo_owner($auth->{account}, $payload->{repo}); 794 - $self->_assert_swap_commit($auth->{account}{did}, $payload->{swapCommit}); 795 - 796 - my @results; 797 - my $now = time; 798 - 799 - for my $write (@{ $payload->{writes} || [] }) { 800 - if (exists $write->{value} && !exists $write->{rkey}) { 801 - $write->{rkey} = next_tid(); 802 - } 803 - 804 - if (exists $write->{value}) { 805 - my $cid = $self->_cid_for_record($write->{value}); 806 - my $uri = "at://$auth->{account}{did}/$write->{collection}/$write->{rkey}"; 807 - $self->dbh->do( 808 - q{ 809 - INSERT INTO records (did, collection, rkey, uri, cid, record_json, created_at, updated_at) 810 - VALUES (?, ?, ?, ?, ?, ?, ?, ?) 811 - ON CONFLICT(did, collection, rkey) 812 - DO UPDATE SET cid = excluded.cid, record_json = excluded.record_json, updated_at = excluded.updated_at 813 - }, 814 - undef, 815 - $auth->{account}{did}, $write->{collection}, $write->{rkey}, $uri, $cid, encode_json($write->{value}), $now, $now, 816 - ); 817 - push @results, { 818 - uri => $uri, 819 - cid => $cid, 820 - validationStatus => 'unknown', 821 - }; 822 - } else { 823 - $self->dbh->do( 824 - q{DELETE FROM records WHERE did = ? AND collection = ? AND rkey = ?}, 825 - undef, 826 - $auth->{account}{did}, $write->{collection}, $write->{rkey}, 827 - ); 828 - push @results, {}; 829 - } 830 - } 831 - 832 - my $repo = $self->_rebuild_repo($auth->{account}{did}); 833 - return { 834 - commit => { 835 - cid => $repo->{commit_cid}, 836 - rev => $repo->{rev}, 454 + sub get_record ($self, $did, $collection, $rkey) { 455 + my $row = $self->dbh->selectrow_hashref( 456 + q{ 457 + SELECT * FROM records 458 + WHERE did = ? AND collection = ? AND rkey = ? 837 459 }, 838 - results => \@results, 839 - }; 840 - } 841 - 842 - sub get_record ($self, $repo, $collection, $rkey, $allow_internal = 0) { 843 - my $did = $allow_internal ? $repo : $self->_resolve_repo($repo)->{did}; 844 - my $row = $self->dbh->selectrow_hashref( 845 - q{SELECT * FROM records WHERE did = ? AND collection = ? AND rkey = ?}, 846 460 undef, 847 - $did, $collection, $rkey, 461 + $did, 462 + $collection, 463 + $rkey, 848 464 ); 849 - return undef if $allow_internal && !$row; 850 - die { 851 - status => 404, 852 - error => 'RecordNotFound', 853 - message => "Record not found for $did/$collection/$rkey", 854 - } unless $row; 855 - 856 - return { 857 - uri => $row->{uri}, 858 - cid => $row->{cid}, 859 - value => decode_json($row->{record_json}), 860 - }; 465 + return _row_to_record($row); 861 466 } 862 467 863 - sub list_records ($self, $repo, $collection, $limit = 50, $cursor = undef, $reverse = 0) { 864 - my $did = $self->_resolve_repo($repo)->{did}; 468 + sub list_records ($self, $did, $collection, %args) { 469 + my $limit = $args{limit} // 50; 470 + $limit = 100 if $limit > 100; 471 + my $cursor = $args{cursor}; 472 + my $reverse = $args{reverse} ? 1 : 0; 865 473 my @bind = ($did, $collection); 866 - my $sql = q{SELECT * FROM records WHERE did = ? AND collection = ?}; 474 + my $sql = q{ 475 + SELECT * FROM records 476 + WHERE did = ? AND collection = ? 477 + }; 867 478 if (defined $cursor && length $cursor) { 868 479 $sql .= $reverse ? q{ AND rkey < ?} : q{ AND rkey > ?}; 869 480 push @bind, $cursor; 870 481 } 871 482 $sql .= $reverse ? q{ ORDER BY rkey DESC} : q{ ORDER BY rkey ASC}; 872 483 $sql .= q{ LIMIT ?}; 873 - push @bind, ($limit > 100 ? 100 : $limit); 874 - 484 + push @bind, $limit + 1; 875 485 my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, @bind); 876 - my $next_cursor = @$rows == ($limit > 100 ? 100 : $limit) ? $rows->[-1]{rkey} : undef; 877 - return { 878 - ($next_cursor ? (cursor => $next_cursor) : ()), 879 - records => [ 880 - map +{ 881 - uri => $_->{uri}, 882 - cid => $_->{cid}, 883 - value => decode_json($_->{record_json}), 884 - }, @$rows 885 - ], 886 - }; 486 + my $page = _paginate($rows, $limit, 'rkey'); 487 + $page->{items} = [ map { _row_to_record($_) } @{ $page->{items} } ]; 488 + return $page; 887 489 } 888 490 889 - sub describe_repo ($self, $repo) { 890 - my $account = $self->_resolve_repo($repo); 891 - my $collections = $self->dbh->selectcol_arrayref( 892 - q{SELECT DISTINCT collection FROM records WHERE did = ? ORDER BY collection}, 893 - undef, 894 - $account->{did}, 491 + sub all_records_for_did ($self, $did) { 492 + my $rows = $self->dbh->selectall_arrayref( 493 + q{SELECT * FROM records WHERE did = ? ORDER BY collection, rkey}, 494 + { Slice => {} }, 495 + $did, 895 496 ); 896 - return { 897 - handle => $account->{handle}, 898 - did => $account->{did}, 899 - didDoc => $self->did_doc($account), 900 - collections => $collections, 901 - handleIsCorrect => true, 902 - }; 497 + return [ map { _row_to_record($_) } @$rows ]; 903 498 } 904 499 905 - sub upload_blob ($self, $access_token, $bytes, $mime_type = 'application/octet-stream') { 906 - my $auth = $self->auth_from_bearer($access_token, 'access'); 907 - my $cid = ATProto::PDS::Repo::CID->for_raw($bytes)->to_string; 908 - my $hash = sha256_hex($bytes); 909 - my $path = File::Spec->catfile($self->_blob_root, $cid); 910 - 911 - open(my $fh, '>:raw', $path) or die "open($path): $!"; 912 - print {$fh} $bytes; 913 - close($fh); 500 + sub list_collections_for_did ($self, $did) { 501 + my $rows = $self->dbh->selectall_arrayref( 502 + q{SELECT DISTINCT collection FROM records WHERE did = ? ORDER BY collection}, 503 + { Slice => {} }, 504 + $did, 505 + ); 506 + return [ map { $_->{collection} } @$rows ]; 507 + } 914 508 509 + sub put_block ($self, %args) { 510 + my $cid = $args{cid} // die 'cid is required'; 511 + my $now = $args{created_at} // time; 915 512 $self->dbh->do( 916 513 q{ 917 - INSERT INTO blobs (cid, did, mime_type, size, sha256, path, created_at) 918 - VALUES (?, ?, ?, ?, ?, ?, ?) 919 - ON CONFLICT(cid) DO NOTHING 514 + INSERT INTO blocks (cid, codec, bytes, created_at) 515 + VALUES (?, ?, ?, ?) 516 + ON CONFLICT(cid) DO UPDATE SET 517 + codec = excluded.codec, 518 + bytes = excluded.bytes 920 519 }, 921 520 undef, 922 - $cid, $auth->{account}{did}, $mime_type, length($bytes), $hash, $path, time, 521 + $cid, 522 + $args{codec}, 523 + $args{bytes}, 524 + $now, 923 525 ); 924 - 925 - return { 926 - blob => { 927 - '$type' => 'blob', 928 - ref => { '$link' => $cid }, 929 - mimeType => $mime_type, 930 - size => length($bytes), 931 - }, 932 - }; 526 + return $self->get_block($cid); 933 527 } 934 528 935 - sub get_blob ($self, $did, $cid) { 936 - $self->account_by_did($did) 937 - or die { status => 404, error => 'RepoNotFound', message => "Repo not found: $did" }; 938 - my $blob = $self->dbh->selectrow_hashref( 939 - q{SELECT * FROM blobs WHERE did = ? AND cid = ?}, 529 + sub get_block ($self, $cid) { 530 + return $self->dbh->selectrow_hashref( 531 + q{SELECT * FROM blocks WHERE cid = ?}, 940 532 undef, 941 - $did, $cid, 942 - ) or die { 943 - status => 404, 944 - error => 'BlobNotFound', 945 - message => "Blob not found: $cid", 946 - }; 947 - return $blob; 533 + $cid, 534 + ); 948 535 } 949 536 950 - sub list_blobs ($self, $did, $limit = 500, $cursor = undef) { 951 - $self->account_by_did($did) 952 - or die { status => 404, error => 'RepoNotFound', message => "Repo not found: $did" }; 953 - 954 - my @bind = ($did); 955 - my $sql = q{SELECT cid FROM blobs WHERE did = ?}; 956 - if (defined $cursor && length $cursor) { 957 - $sql .= q{ AND cid > ?}; 958 - push @bind, $cursor; 959 - } 960 - $sql .= q{ ORDER BY cid ASC LIMIT ?}; 961 - push @bind, ($limit > 1000 ? 1000 : $limit); 962 - 963 - my $rows = $self->dbh->selectcol_arrayref($sql, undef, @bind); 964 - my $next_cursor = @$rows == ($limit > 1000 ? 1000 : $limit) ? $rows->[-1] : undef; 965 - return { 966 - ($next_cursor ? (cursor => $next_cursor) : ()), 967 - cids => $rows, 968 - }; 537 + sub get_blocks ($self, $cids) { 538 + return [] unless @$cids; 539 + my $placeholders = join(', ', ('?') x @$cids); 540 + my $rows = $self->dbh->selectall_arrayref( 541 + "SELECT * FROM blocks WHERE cid IN ($placeholders)", 542 + { Slice => {} }, 543 + @$cids, 544 + ); 545 + return $rows; 969 546 } 970 547 971 - sub list_missing_blobs ($self, $repo) { 972 - my $account = $self->_resolve_repo($repo); 973 - return { 974 - blobs => [], 975 - cursor => undef, 976 - did => $account->{did}, 977 - }; 548 + sub put_commit ($self, %args) { 549 + my $did = $args{did} // die 'did is required'; 550 + my $now = $args{created_at} // time; 551 + $self->dbh->do( 552 + q{ 553 + INSERT INTO commits ( 554 + did, rev, cid, root_cid, prev_cid, commit_bytes, car_bytes, created_at 555 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 556 + }, 557 + undef, 558 + $did, 559 + $args{rev}, 560 + $args{cid}, 561 + $args{root_cid}, 562 + $args{prev_cid}, 563 + $args{commit_bytes}, 564 + $args{car_bytes}, 565 + $now, 566 + ); 567 + $self->set_repo_head( 568 + did => $did, 569 + commit_cid => $args{cid}, 570 + rev => $args{rev}, 571 + root_cid => $args{root_cid}, 572 + indexed_at => $now, 573 + car_bytes => $args{car_bytes}, 574 + ); 575 + return $self->get_commit_by_rev($did, $args{rev}); 978 576 } 979 577 980 - sub current_repo ($self, $did) { 981 - return $self->dbh->selectrow_hashref(q{SELECT * FROM repo_roots WHERE did = ?}, undef, $did); 578 + sub get_commit_by_rev ($self, $did, $rev) { 579 + return $self->dbh->selectrow_hashref( 580 + q{SELECT * FROM commits WHERE did = ? AND rev = ?}, 581 + undef, 582 + $did, 583 + $rev, 584 + ); 982 585 } 983 586 984 - sub latest_commit ($self, $did) { 985 - my $repo = $self->current_repo($did) 986 - or die { status => 404, error => 'RepoNotFound', message => "Repo not found: $did" }; 987 - return { 988 - cid => $repo->{commit_cid}, 989 - rev => $repo->{rev}, 990 - }; 587 + sub get_latest_commit ($self, $did) { 588 + return $self->dbh->selectrow_hashref( 589 + q{SELECT * FROM commits WHERE did = ? ORDER BY created_at DESC, rev DESC LIMIT 1}, 590 + undef, 591 + $did, 592 + ); 991 593 } 992 594 993 - sub repo_status ($self, $did) { 994 - my $account = $self->account_by_did($did) 995 - or die { status => 404, error => 'RepoNotFound', message => "Repo not found: $did" }; 996 - my $repo = $self->current_repo($did); 997 - 998 - return { 999 - did => $did, 1000 - active => $account->{active} ? true : false, 1001 - ($account->{status} ? (status => $account->{status}) : ()), 1002 - ($repo ? (rev => $repo->{rev}) : ()), 1003 - }; 595 + sub repo_car ($self, $did) { 596 + my $row = $self->get_latest_commit($did); 597 + return $row ? $row->{car_bytes} : undef; 1004 598 } 1005 599 1006 - sub list_repos ($self, $limit = 500, $cursor = undef) { 600 + sub list_repos ($self, %args) { 601 + my $limit = $args{limit} // 500; 602 + $limit = 1000 if $limit > 1000; 603 + my $cursor = $args{cursor}; 1007 604 my @bind; 1008 605 my $sql = q{ 1009 - SELECT rr.did, rr.commit_cid AS head, rr.rev, a.active, a.status 1010 - FROM repo_roots rr 1011 - JOIN accounts a ON a.did = rr.did 606 + SELECT did, repo_commit_cid AS head, repo_rev AS rev, deleted_at, deactivated_at 607 + FROM accounts 608 + WHERE repo_commit_cid IS NOT NULL 1012 609 }; 1013 610 if (defined $cursor && length $cursor) { 1014 - $sql .= q{ WHERE rr.did > ?}; 611 + $sql .= q{ AND did > ?}; 1015 612 push @bind, $cursor; 1016 613 } 1017 - $sql .= q{ ORDER BY rr.did ASC LIMIT ?}; 1018 - push @bind, ($limit > 1000 ? 1000 : $limit); 1019 - 614 + $sql .= q{ ORDER BY did LIMIT ?}; 615 + push @bind, $limit + 1; 1020 616 my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, @bind); 1021 - my $next_cursor = @$rows == ($limit > 1000 ? 1000 : $limit) ? $rows->[-1]{did} : undef; 1022 - return { 1023 - ($next_cursor ? (cursor => $next_cursor) : ()), 1024 - repos => [ 1025 - map +{ 617 + my $page = _paginate($rows, $limit, 'did'); 618 + $page->{items} = [ 619 + map { 620 + +{ 1026 621 did => $_->{did}, 1027 622 head => $_->{head}, 1028 623 rev => $_->{rev}, 1029 - active => $_->{active} ? true : false, 1030 - ($_->{status} ? (status => $_->{status}) : ()), 1031 - }, @$rows 1032 - ], 1033 - }; 1034 - } 1035 - 1036 - sub list_repos_by_collection ($self, $collection, $limit = 500, $cursor = undef) { 1037 - my @bind = ($collection); 1038 - my $sql = q{SELECT DISTINCT did FROM records WHERE collection = ?}; 1039 - if (defined $cursor && length $cursor) { 1040 - $sql .= q{ AND did > ?}; 1041 - push @bind, $cursor; 1042 - } 1043 - $sql .= q{ ORDER BY did ASC LIMIT ?}; 1044 - push @bind, ($limit > 2000 ? 2000 : $limit); 1045 - 1046 - my $rows = $self->dbh->selectcol_arrayref($sql, undef, @bind); 1047 - my $next_cursor = @$rows == ($limit > 2000 ? 2000 : $limit) ? $rows->[-1] : undef; 1048 - return { 1049 - ($next_cursor ? (cursor => $next_cursor) : ()), 1050 - repos => [ map +{ did => $_ }, @$rows ], 1051 - }; 1052 - } 1053 - 1054 - sub get_repo_car ($self, $did) { 1055 - my $repo = $self->current_repo($did) 1056 - or die { status => 404, error => 'RepoNotFound', message => "Repo not found: $did" }; 1057 - my $blocks = $self->_blocks_for_commit($repo->{commit_cid}); 1058 - return write_car(ATProto::PDS::Repo::CID->from_string($repo->{commit_cid}), $blocks); 1059 - } 1060 - 1061 - sub get_record_car ($self, $did, $collection, $rkey) { 1062 - $self->get_record($did, $collection, $rkey, 1) 1063 - or die { status => 404, error => 'RecordNotFound', message => 'Record not found' }; 1064 - return $self->get_repo_car($did); 1065 - } 1066 - 1067 - sub get_blocks_car ($self, $did, $cid_strings) { 1068 - $self->account_by_did($did) 1069 - or die { status => 404, error => 'RepoNotFound', message => "Repo not found: $did" }; 1070 - my @blocks; 1071 - for my $cid (@$cid_strings) { 1072 - my $row = $self->dbh->selectrow_hashref(q{SELECT * FROM blocks WHERE cid = ?}, undef, $cid); 1073 - next unless $row; 1074 - push @blocks, { 1075 - cid => ATProto::PDS::Repo::CID->from_string($row->{cid}), 1076 - bytes => $row->{bytes}, 1077 - }; 1078 - } 1079 - my $repo = $self->current_repo($did); 1080 - return write_car($repo ? ATProto::PDS::Repo::CID->from_string($repo->{commit_cid}) : undef, \@blocks); 1081 - } 1082 - 1083 - sub create_report ($self, $access_token, $payload) { 1084 - my $auth = $self->auth_from_bearer($access_token, 'access'); 1085 - $self->dbh->do( 1086 - q{INSERT INTO moderation_reports (id, did, reason_type, reason, subject_json, created_at) VALUES (?, ?, ?, ?, ?, ?)}, 1087 - undef, 1088 - _token_string(24), 1089 - $auth->{account}{did}, 1090 - $payload->{reasonType}, 1091 - $payload->{reason}, 1092 - encode_json($payload->{subject} // {}), 1093 - time, 1094 - ); 1095 - return {}; 1096 - } 1097 - 1098 - sub admin_accounts ($self) { 1099 - return $self->dbh->selectall_arrayref(q{SELECT * FROM accounts ORDER BY created_at DESC}, { Slice => {} }); 1100 - } 1101 - 1102 - sub admin_send_email ($self, $did, $subject, $content) { 1103 - return { 1104 - sent => true, 1105 - did => $did, 1106 - subject => $subject, 1107 - content => $content, 1108 - }; 624 + active => ($_->{deleted_at} || $_->{deactivated_at}) ? JSON::PP::false : JSON::PP::true, 625 + } 626 + } @{ $page->{items} } 627 + ]; 628 + return $page; 1109 629 } 1110 630 1111 - sub request_code ($self, $did, $purpose, $target = undef) { 1112 - my $id = _token_string(24); 1113 - my $code = substr(_token_string(12), 0, 8); 631 + sub append_event ($self, %args) { 632 + my $now = $args{created_at} // time; 1114 633 $self->dbh->do( 1115 - q{INSERT INTO pending_codes (id, did, purpose, target, code, created_at) VALUES (?, ?, ?, ?, ?, ?)}, 634 + q{ 635 + INSERT INTO events ( 636 + did, type, rev, commit_cid, payload_json, car_bytes, created_at 637 + ) VALUES (?, ?, ?, ?, ?, ?, ?) 638 + }, 1116 639 undef, 1117 - $id, $did, $purpose, $target, $code, time, 640 + $args{did}, 641 + $args{type}, 642 + $args{rev}, 643 + $args{commit_cid}, 644 + _maybe_json($args{payload}), 645 + $args{car_bytes}, 646 + $now, 1118 647 ); 1119 - return { id => $id, code => $code }; 648 + return $self->dbh->sqlite_last_insert_rowid; 1120 649 } 1121 650 1122 - sub verify_code ($self, $did, $purpose, $code) { 1123 - my $row = $self->dbh->selectrow_hashref( 1124 - q{SELECT * FROM pending_codes WHERE did = ? AND purpose = ? AND code = ? AND used_at IS NULL ORDER BY created_at DESC LIMIT 1}, 1125 - undef, 1126 - $did, $purpose, $code, 1127 - ) or return undef; 1128 - $self->dbh->do(q{UPDATE pending_codes SET used_at = ? WHERE id = ?}, undef, time, $row->{id}); 1129 - return $row; 651 + sub list_events_after ($self, $cursor, %args) { 652 + my $limit = $args{limit} // 100; 653 + my $sql = q{SELECT * FROM events WHERE seq > ? ORDER BY seq LIMIT ?}; 654 + return $self->dbh->selectall_arrayref($sql, { Slice => {} }, $cursor // 0, $limit); 1130 655 } 1131 656 1132 - sub set_account_email ($self, $did, $email, $confirmed = 0) { 657 + sub set_repo_head ($self, %args) { 658 + my $did = $args{did} // die 'did is required'; 659 + my $now = $args{indexed_at} // time; 1133 660 $self->dbh->do( 1134 - q{UPDATE accounts SET email = ?, email_confirmed = ?, updated_at = ? WHERE did = ?}, 661 + q{ 662 + INSERT INTO repo_heads (did, commit_cid, rev, root_cid, indexed_at) 663 + VALUES (?, ?, ?, ?, ?) 664 + ON CONFLICT(did) DO UPDATE SET 665 + commit_cid = excluded.commit_cid, 666 + rev = excluded.rev, 667 + root_cid = excluded.root_cid, 668 + indexed_at = excluded.indexed_at 669 + }, 1135 670 undef, 1136 - $email, $confirmed ? 1 : 0, time, $did, 671 + $did, 672 + $args{commit_cid}, 673 + $args{rev}, 674 + $args{root_cid}, 675 + $now, 1137 676 ); 1138 - } 1139 - 1140 - sub set_account_password ($self, $did, $password) { 1141 - my $salt = _token_string(16); 1142 677 $self->dbh->do( 1143 - q{UPDATE accounts SET password_salt = ?, password_hash = ?, updated_at = ? WHERE did = ?}, 678 + q{ 679 + UPDATE accounts 680 + SET repo_commit_cid = ?, repo_root_cid = ?, repo_rev = ?, updated_at = ? 681 + WHERE did = ? 682 + }, 1144 683 undef, 1145 - $salt, $self->_hash_password($password, $salt), time, $did, 684 + $args{commit_cid}, 685 + $args{root_cid}, 686 + $args{rev}, 687 + $now, 688 + $did, 1146 689 ); 690 + return $self->get_repo_head($did); 1147 691 } 1148 692 1149 - sub set_account_active ($self, $did, $active, $status = undef) { 1150 - $self->dbh->do( 1151 - q{UPDATE accounts SET active = ?, status = ?, updated_at = ? WHERE did = ?}, 693 + sub get_repo_head ($self, $did) { 694 + return $self->dbh->selectrow_hashref( 695 + q{SELECT * FROM repo_heads WHERE did = ?}, 1152 696 undef, 1153 - $active ? 1 : 0, $status, time, $did, 1154 - ); 1155 - } 1156 - 1157 - sub delete_account ($self, $did) { 1158 - $self->dbh->do(q{DELETE FROM records WHERE did = ?}, undef, $did); 1159 - $self->dbh->do(q{DELETE FROM repo_roots WHERE did = ?}, undef, $did); 1160 - $self->dbh->do(q{DELETE FROM accounts WHERE did = ?}, undef, $did); 1161 - } 1162 - 1163 - sub _rebuild_repo ($self, $did) { 1164 - my $account = $self->account_by_did($did) or die "unknown account: $did"; 1165 - my $rows = $self->dbh->selectall_arrayref( 1166 - q{SELECT collection, rkey, cid, record_json FROM records WHERE did = ? ORDER BY collection, rkey}, 1167 - { Slice => {} }, 1168 697 $did, 1169 698 ); 1170 - 1171 - my %entries; 1172 - my @blocks; 1173 - my %seen; 1174 - for my $row (@$rows) { 1175 - my $record = decode_json($row->{record_json}); 1176 - my $dag = $self->_lex_to_dag($record); 1177 - my $bytes = encode_dag_cbor($dag); 1178 - my $cid = ATProto::PDS::Repo::CID->for_dag_cbor($bytes); 1179 - $entries{"$row->{collection}/$row->{rkey}"} = $cid; 1180 - push @blocks, { cid => $cid, bytes => $bytes } unless $seen{$cid->to_string}++; 1181 - if ($row->{cid} ne $cid->to_string) { 1182 - $self->dbh->do( 1183 - q{UPDATE records SET cid = ?, updated_at = ? WHERE did = ? AND collection = ? AND rkey = ?}, 1184 - undef, 1185 - $cid->to_string, time, $did, $row->{collection}, $row->{rkey}, 1186 - ); 1187 - } 1188 - } 1189 - 1190 - my $mst = build_mst(\%entries); 1191 - for my $block (@{ $mst->{blocks} }) { 1192 - push @blocks, $block unless $seen{$block->{cid}->to_string}++; 1193 - } 1194 - 1195 - my $prev = $self->current_repo($did); 1196 - my $rev = next_tid($prev ? $prev->{rev} : undef); 1197 - my $commit_unsigned = { 1198 - did => $did, 1199 - version => 3, 1200 - rev => $rev, 1201 - prev => undef, 1202 - data => $mst->{root}, 1203 - }; 1204 - my $unsigned_bytes = encode_dag_cbor($commit_unsigned); 1205 - my $pk = Crypt::PK::Ed25519->new; 1206 - $pk->import_key_raw($account->{signing_private}, 'private'); 1207 - my $sig = $pk->sign_message($unsigned_bytes); 1208 - my $commit = { 1209 - %$commit_unsigned, 1210 - sig => ATProto::PDS::Repo::Bytes->new($sig), 1211 - }; 1212 - my $commit_bytes = encode_dag_cbor($commit); 1213 - my $commit_cid = ATProto::PDS::Repo::CID->for_dag_cbor($commit_bytes); 1214 - unshift @blocks, { cid => $commit_cid, bytes => $commit_bytes }; 1215 - 1216 - my $now = time; 1217 - $self->_txn(sub { 1218 - for my $block (@blocks) { 1219 - $self->dbh->do( 1220 - q{INSERT OR IGNORE INTO blocks (cid, codec, bytes, created_at) VALUES (?, ?, ?, ?)}, 1221 - undef, 1222 - $block->{cid}->to_string, 1223 - $block->{cid}->codec, 1224 - $block->{bytes}, 1225 - $now, 1226 - ); 1227 - } 1228 - 1229 - $self->dbh->do(q{DELETE FROM commit_blocks WHERE commit_cid = ?}, undef, $commit_cid->to_string); 1230 - for my $index (0 .. $#blocks) { 1231 - $self->dbh->do( 1232 - q{INSERT INTO commit_blocks (commit_cid, cid, ord) VALUES (?, ?, ?)}, 1233 - undef, 1234 - $commit_cid->to_string, 1235 - $blocks[$index]{cid}->to_string, 1236 - $index, 1237 - ); 1238 - } 1239 - 1240 - $self->dbh->do( 1241 - q{INSERT OR REPLACE INTO repo_commits (cid, did, rev, prev_commit_cid, data_cid, created_at) VALUES (?, ?, ?, ?, ?, ?)}, 1242 - undef, 1243 - $commit_cid->to_string, 1244 - $did, 1245 - $rev, 1246 - ($prev ? $prev->{commit_cid} : undef), 1247 - $mst->{root}->to_string, 1248 - $now, 1249 - ); 1250 - 1251 - $self->dbh->do( 1252 - q{INSERT OR REPLACE INTO repo_roots (did, commit_cid, data_cid, rev, prev_commit_cid, updated_at) VALUES (?, ?, ?, ?, ?, ?)}, 1253 - undef, 1254 - $did, 1255 - $commit_cid->to_string, 1256 - $mst->{root}->to_string, 1257 - $rev, 1258 - ($prev ? $prev->{commit_cid} : undef), 1259 - $now, 1260 - ); 1261 - }); 1262 - 1263 - return { 1264 - did => $did, 1265 - commit_cid => $commit_cid->to_string, 1266 - data_cid => $mst->{root}->to_string, 1267 - rev => $rev, 1268 - prev_commit_cid => $prev ? $prev->{commit_cid} : undef, 1269 - }; 1270 699 } 1271 700 1272 - sub _blocks_for_commit ($self, $commit_cid) { 1273 - my $rows = $self->dbh->selectall_arrayref( 1274 - q{ 1275 - SELECT b.cid, b.bytes 1276 - FROM commit_blocks cb 1277 - JOIN blocks b ON b.cid = cb.cid 1278 - WHERE cb.commit_cid = ? 1279 - ORDER BY cb.ord ASC 701 + sub default_migrations { 702 + return ( 703 + { 704 + version => 1, 705 + statements => [ 706 + q{ 707 + CREATE TABLE IF NOT EXISTS accounts ( 708 + id TEXT PRIMARY KEY, 709 + did TEXT NOT NULL UNIQUE, 710 + handle TEXT NOT NULL UNIQUE, 711 + email TEXT UNIQUE, 712 + password_hash TEXT, 713 + created_at INTEGER NOT NULL, 714 + updated_at INTEGER NOT NULL, 715 + deactivated_at INTEGER, 716 + deleted_at INTEGER, 717 + did_doc_json TEXT, 718 + signing_key TEXT, 719 + recovery_key TEXT 720 + ) 721 + }, 722 + q{ 723 + CREATE TABLE IF NOT EXISTS sessions ( 724 + id TEXT PRIMARY KEY, 725 + did TEXT NOT NULL, 726 + token TEXT, 727 + kind TEXT NOT NULL, 728 + scope TEXT, 729 + created_at INTEGER NOT NULL, 730 + expires_at INTEGER, 731 + revoked_at INTEGER, 732 + ip TEXT, 733 + user_agent TEXT, 734 + FOREIGN KEY (did) REFERENCES accounts(did) 735 + ) 736 + }, 737 + q{CREATE INDEX IF NOT EXISTS sessions_by_did ON sessions (did, created_at DESC)}, 738 + q{ 739 + CREATE TABLE IF NOT EXISTS app_passwords ( 740 + id TEXT PRIMARY KEY, 741 + did TEXT NOT NULL, 742 + name TEXT NOT NULL, 743 + password_hash TEXT, 744 + created_at INTEGER NOT NULL, 745 + revoked_at INTEGER, 746 + FOREIGN KEY (did) REFERENCES accounts(did) 747 + ) 748 + }, 749 + q{CREATE INDEX IF NOT EXISTS app_passwords_by_did ON app_passwords (did, created_at DESC)}, 750 + q{ 751 + CREATE TABLE IF NOT EXISTS blobs ( 752 + cid TEXT PRIMARY KEY, 753 + did TEXT, 754 + mime_type TEXT, 755 + byte_size INTEGER, 756 + storage_path TEXT, 757 + temporary INTEGER NOT NULL DEFAULT 0, 758 + created_at INTEGER NOT NULL, 759 + referenced_at INTEGER, 760 + quarantined_at INTEGER, 761 + FOREIGN KEY (did) REFERENCES accounts(did) 762 + ) 763 + }, 764 + q{CREATE INDEX IF NOT EXISTS blobs_by_did ON blobs (did, created_at DESC)}, 765 + q{ 766 + CREATE TABLE IF NOT EXISTS repo_heads ( 767 + did TEXT PRIMARY KEY, 768 + commit_cid TEXT, 769 + rev TEXT, 770 + root_cid TEXT, 771 + indexed_at INTEGER NOT NULL, 772 + FOREIGN KEY (did) REFERENCES accounts(did) 773 + ) 774 + }, 775 + ], 1280 776 }, 1281 - { Slice => {} }, 1282 - $commit_cid, 777 + { 778 + version => 2, 779 + statements => [ 780 + q{ALTER TABLE accounts ADD COLUMN account_id TEXT}, 781 + q{ALTER TABLE accounts ADD COLUMN password_salt BLOB}, 782 + q{ALTER TABLE accounts ADD COLUMN private_key BLOB}, 783 + q{ALTER TABLE accounts ADD COLUMN public_key BLOB}, 784 + q{ALTER TABLE accounts ADD COLUMN public_key_multibase TEXT}, 785 + q{ALTER TABLE accounts ADD COLUMN repo_commit_cid TEXT}, 786 + q{ALTER TABLE accounts ADD COLUMN repo_root_cid TEXT}, 787 + q{ALTER TABLE accounts ADD COLUMN repo_rev TEXT}, 788 + q{CREATE UNIQUE INDEX IF NOT EXISTS accounts_account_id_idx ON accounts(account_id)}, 789 + q{ 790 + CREATE TABLE IF NOT EXISTS records ( 791 + did TEXT NOT NULL, 792 + collection TEXT NOT NULL, 793 + rkey TEXT NOT NULL, 794 + cid TEXT NOT NULL, 795 + value_json TEXT NOT NULL, 796 + record_bytes BLOB NOT NULL, 797 + created_at INTEGER NOT NULL, 798 + updated_at INTEGER NOT NULL, 799 + PRIMARY KEY (did, collection, rkey) 800 + ) 801 + }, 802 + q{CREATE INDEX IF NOT EXISTS records_by_collection ON records(did, collection, rkey)}, 803 + q{ 804 + CREATE TABLE IF NOT EXISTS blocks ( 805 + cid TEXT PRIMARY KEY, 806 + codec INTEGER NOT NULL, 807 + bytes BLOB NOT NULL, 808 + created_at INTEGER NOT NULL 809 + ) 810 + }, 811 + q{ 812 + CREATE TABLE IF NOT EXISTS commits ( 813 + did TEXT NOT NULL, 814 + rev TEXT NOT NULL, 815 + cid TEXT NOT NULL UNIQUE, 816 + root_cid TEXT NOT NULL, 817 + prev_cid TEXT, 818 + commit_bytes BLOB NOT NULL, 819 + car_bytes BLOB NOT NULL, 820 + created_at INTEGER NOT NULL, 821 + PRIMARY KEY (did, rev) 822 + ) 823 + }, 824 + q{CREATE INDEX IF NOT EXISTS commits_latest_idx ON commits(did, created_at DESC, rev DESC)}, 825 + q{ 826 + CREATE TABLE IF NOT EXISTS events ( 827 + seq INTEGER PRIMARY KEY AUTOINCREMENT, 828 + did TEXT NOT NULL, 829 + type TEXT NOT NULL, 830 + rev TEXT, 831 + commit_cid TEXT, 832 + payload_json TEXT, 833 + car_bytes BLOB, 834 + created_at INTEGER NOT NULL 835 + ) 836 + }, 837 + q{CREATE INDEX IF NOT EXISTS events_seq_idx ON events(seq)}, 838 + ], 839 + }, 1283 840 ); 1284 - 1285 - return [ 1286 - map +{ 1287 - cid => ATProto::PDS::Repo::CID->from_string($_->{cid}), 1288 - bytes => $_->{bytes}, 1289 - }, @$rows 1290 - ]; 1291 841 } 1292 842 1293 - sub _find_app_password ($self, $did, $password) { 1294 - my $rows = $self->dbh->selectall_arrayref( 1295 - q{SELECT * FROM app_passwords WHERE did = ? AND revoked_at IS NULL}, 1296 - { Slice => {} }, 1297 - $did, 843 + sub _connect ($self) { 844 + make_path(dirname($self->path)); 845 + my $dbh = DBI->connect( 846 + 'dbi:SQLite:dbname=' . $self->path, 847 + q(), 848 + q(), 849 + { 850 + AutoCommit => 1, 851 + RaiseError => 1, 852 + PrintError => 0, 853 + sqlite_unicode => 1, 854 + }, 1298 855 ); 1299 - for my $row (@$rows) { 1300 - return $row if $self->_verify_password($password, $row->{password_salt}, $row->{password_hash}); 1301 - } 1302 - return undef; 856 + $dbh->do('PRAGMA foreign_keys = ON'); 857 + $dbh->do('PRAGMA busy_timeout = 5000'); 858 + $dbh->do('PRAGMA journal_mode = WAL'); 859 + return $dbh; 1303 860 } 1304 861 1305 - sub _assert_repo_owner ($self, $account, $repo_identifier) { 1306 - my $resolved = $self->_resolve_repo($repo_identifier); 1307 - die { 1308 - status => 403, 1309 - error => 'AuthenticationRequired', 1310 - message => 'Authenticated user does not own this repo', 1311 - } unless $resolved->{did} eq $account->{did}; 1312 - } 1313 - 1314 - sub _resolve_repo ($self, $repo) { 1315 - return $repo =~ /^did:/ 1316 - ? ($self->account_by_did($repo) || die { status => 404, error => 'RepoNotFound', message => "Repo not found: $repo" }) 1317 - : ($self->account_by_handle($repo) || die { status => 404, error => 'RepoNotFound', message => "Repo not found: $repo" }); 1318 - } 1319 - 1320 - sub _assert_swap_commit ($self, $did, $swap_commit) { 1321 - return unless defined $swap_commit && length $swap_commit; 1322 - my $repo = $self->current_repo($did); 1323 - if (!$repo || $repo->{commit_cid} ne $swap_commit) { 1324 - die { 1325 - status => 400, 1326 - error => 'InvalidSwap', 1327 - message => 'swapCommit did not match current repo commit', 1328 - }; 862 + sub _row_to_account ($self, $row) { 863 + return undef unless $row; 864 + if (defined $row->{did_doc_json} && length $row->{did_doc_json}) { 865 + $row->{did_doc} = decode_json($row->{did_doc_json}); 1329 866 } 867 + delete $row->{did_doc_json}; 868 + $row->{account_id} //= $row->{id}; 869 + return $row; 1330 870 } 1331 871 1332 - sub _cid_for_record ($self, $record) { 1333 - return ATProto::PDS::Repo::CID->for_dag_cbor(encode_dag_cbor($self->_lex_to_dag($record)))->to_string; 872 + sub _row_to_record ($row) { 873 + return undef unless $row; 874 + $row->{value} = decode_json($row->{value_json}) if defined $row->{value_json}; 875 + delete $row->{value_json}; 876 + return $row; 1334 877 } 1335 878 1336 - sub _lex_to_dag ($self, $value) { 1337 - return undef unless defined $value; 1338 - 1339 - if (ref($value) eq 'ARRAY') { 1340 - return [ map { $self->_lex_to_dag($_) } @$value ]; 879 + sub _paginate ($rows, $limit, $cursor_key) { 880 + my @items = @$rows; 881 + my $cursor; 882 + if (@items > $limit) { 883 + my $last = pop @items; 884 + $cursor = $last->{$cursor_key}; 1341 885 } 1342 - 1343 - if (ref($value) eq 'HASH') { 1344 - if ((keys %$value) == 1 && exists $value->{'$link'}) { 1345 - return ATProto::PDS::Repo::CID->from_string($value->{'$link'}); 1346 - } 1347 - my %copy; 1348 - for my $key (keys %$value) { 1349 - $copy{$key} = $self->_lex_to_dag($value->{$key}); 1350 - } 1351 - return \%copy; 1352 - } 1353 - 1354 - return $value; 1355 - } 1356 - 1357 - sub _plc_operation_for ($self, $did, $handle, $public_key) { 1358 - my $multikey = 'z' . encode_base58btc("\xed\x01" . $public_key); 1359 886 return { 1360 - did => $did, 1361 - alsoKnownAs => [ 'at://' . $handle ], 1362 - verificationMethods => { 1363 - atproto => $multikey, 1364 - }, 1365 - services => { 1366 - atproto_pds => { 1367 - type => 'AtprotoPersonalDataServer', 1368 - endpoint => $self->{config}{base_url}, 1369 - }, 1370 - }, 1371 - }; 1372 - } 1373 - 1374 - sub _new_plc_did ($self) { 1375 - my $alphabet = '234567abcdefghijklmnopqrstuvwxyz'; 1376 - my $bytes = random_bytes(24); 1377 - my $suffix = join '', map { substr($alphabet, ord($_) % length($alphabet), 1) } split //, $bytes; 1378 - return 'did:plc:' . substr($suffix, 0, 24); 1379 - } 1380 - 1381 - sub _blob_root ($self) { 1382 - return File::Spec->catdir($self->{config}{data_dir}, 'blobs'); 1383 - } 1384 - 1385 - sub _hash_password ($self, $password, $salt) { 1386 - return sha256_hex(join(':', $salt, $password)); 1387 - } 1388 - 1389 - sub _verify_password ($self, $password, $salt, $hash) { 1390 - return sha256_hex(join(':', $salt, $password)) eq $hash; 1391 - } 1392 - 1393 - sub _txn ($self, $code) { 1394 - my $dbh = $self->dbh; 1395 - my $ok = eval { 1396 - $dbh->begin_work; 1397 - $code->(); 1398 - $dbh->commit; 1399 - 1; 887 + items => \@items, 888 + cursor => $cursor, 1400 889 }; 1401 - if (!$ok) { 1402 - my $error = $@; 1403 - eval { $dbh->rollback }; 1404 - die $error; 1405 - } 1406 890 } 1407 891 1408 - sub _iso8601 ($epoch) { 1409 - my @parts = gmtime($epoch); 1410 - return sprintf( 1411 - '%04d-%02d-%02dT%02d:%02d:%02dZ', 1412 - $parts[5] + 1900, 1413 - $parts[4] + 1, 1414 - $parts[3], 1415 - $parts[2], 1416 - $parts[1], 1417 - $parts[0], 1418 - ); 892 + sub _maybe_json ($value) { 893 + return undef unless defined $value; 894 + return ref($value) ? encode_json($value) : $value; 1419 895 } 1420 896 1421 - sub _token_string ($length = 24) { 1422 - my $alphabet = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'; 1423 - my $bytes = random_bytes($length); 1424 - return join '', map { substr($alphabet, ord($_) % length($alphabet), 1) } split //, $bytes; 897 + sub _random_id { 898 + open(my $fh, '<:raw', '/dev/urandom') or die "open(/dev/urandom): $!"; 899 + my $bytes = q(); 900 + my $read = read($fh, $bytes, 12); 901 + CORE::close($fh); 902 + die 'failed to read random bytes' unless defined $read && $read == 12; 903 + return unpack('H*', $bytes); 1425 904 } 1426 905 1427 906 1;
+86 -38
lib/ATProto/PDS/XRPC/Dispatcher.pm
··· 4 4 use warnings; 5 5 6 6 use Mojo::Base -base, -signatures; 7 - use Mojo::JSON (); 8 7 9 8 has app => undef; 10 9 has routes => undef; 11 10 has catalog => sub { [] }; 12 11 13 12 sub register_routes ($self) { 14 - for my $endpoint (@{ $self->catalog }) { 15 - if ($endpoint->{type} eq 'subscription') { 16 - $self->routes->websocket($endpoint->{path})->to(cb => sub ($c) { 17 - my $handler = $c->app->api_registry->handler_for($endpoint->{id}); 18 - return $handler->($c, $endpoint) if $handler; 13 + my %by_id = map { $_->{id} => $_ } @{ $self->catalog }; 19 14 20 - $c->send({ json => { 21 - error => 'NotImplemented', 22 - message => "No subscription handler registered for $endpoint->{id}", 23 - nsid => $endpoint->{id}, 24 - }}); 25 - $c->finish(1000); 26 - }); 27 - next; 15 + $self->routes->websocket('/xrpc/*nsid')->to(cb => sub ($c) { 16 + my $endpoint = $by_id{ $c->stash('nsid') // q() }; 17 + return $c->finish(1008) unless $endpoint; 18 + 19 + if ($endpoint->{type} ne 'subscription') { 20 + $c->send({ json => { 21 + error => 'MethodNotAllowed', 22 + message => "$endpoint->{id} is not a subscription endpoint", 23 + }}); 24 + return $c->finish(1008); 28 25 } 29 26 30 - my $route = $endpoint->{type} eq 'query' 31 - ? $self->routes->get($endpoint->{path}) 32 - : $self->routes->post($endpoint->{path}); 27 + my $handler = $c->app->api_registry->handler_for($endpoint->{id}); 28 + if ($handler) { 29 + return $handler->($c, $endpoint); 30 + } 33 31 34 - $route->to(cb => sub ($c) { 35 - my $handler = $c->app->api_registry->handler_for($endpoint->{id}); 36 - if ($handler) { 37 - my $result = eval { $handler->($c, $endpoint) }; 38 - if (my $err = $@) { 39 - if (ref($err) eq 'HASH' && $err->{error}) { 40 - return $c->render( 41 - status => $err->{status} // 400, 42 - json => { 43 - error => $err->{error}, 44 - message => $err->{message} // $err->{error}, 45 - }, 46 - ); 47 - } 48 - die $err; 49 - } 50 - return $c->render(json => $result); 51 - } 32 + $c->send({ json => { 33 + error => 'NotImplemented', 34 + message => "No subscription handler registered for $endpoint->{id}", 35 + nsid => $endpoint->{id}, 36 + }}); 37 + $c->finish(1000); 38 + }); 52 39 53 - $c->render( 40 + $self->routes->any('/xrpc/*nsid')->to(cb => sub ($c) { 41 + my $endpoint = $by_id{ $c->stash('nsid') // q() }; 42 + unless ($endpoint) { 43 + return $c->render( 44 + status => 404, 45 + json => { 46 + error => 'UnknownMethod', 47 + message => 'Unknown XRPC method', 48 + }, 49 + ); 50 + } 51 + 52 + if ($endpoint->{type} eq 'subscription') { 53 + return $c->render( 54 + status => 426, 55 + json => { 56 + error => 'UpgradeRequired', 57 + message => "$endpoint->{id} requires a websocket upgrade", 58 + }, 59 + ); 60 + } 61 + 62 + if ($endpoint->{type} eq 'query' && $c->req->method ne 'GET') { 63 + return $c->render( 64 + status => 405, 65 + json => { 66 + error => 'MethodNotAllowed', 67 + message => "$endpoint->{id} expects GET", 68 + }, 69 + ); 70 + } 71 + 72 + if ($endpoint->{type} eq 'procedure' && $c->req->method ne 'POST') { 73 + return $c->render( 74 + status => 405, 75 + json => { 76 + error => 'MethodNotAllowed', 77 + message => "$endpoint->{id} expects POST", 78 + }, 79 + ); 80 + } 81 + 82 + my $handler = $c->app->api_registry->handler_for($endpoint->{id}); 83 + unless ($handler) { 84 + return $c->render( 54 85 status => 501, 55 86 json => { 56 87 error => 'NotImplemented', ··· 59 90 type => $endpoint->{type}, 60 91 }, 61 92 ); 62 - }); 63 - } 93 + } 94 + 95 + my $result = eval { $handler->($c, $endpoint) }; 96 + if (my $err = $@) { 97 + if (ref($err) eq 'HASH' && $err->{error}) { 98 + return $c->render( 99 + status => $err->{status} // 400, 100 + json => { 101 + error => $err->{error}, 102 + message => $err->{message} // $err->{error}, 103 + }, 104 + ); 105 + } 106 + die $err; 107 + } 108 + 109 + return unless defined $result; 110 + return $c->render(json => $result); 111 + }); 64 112 } 65 113 66 114 1;
+2 -3
t/app-routes.t
··· 40 40 ->json_like('/did' => qr/\Adid:web:/); 41 41 42 42 $t->post_ok('/xrpc/com.atproto.repo.createRecord' => json => {}) 43 - ->status_is(501) 44 - ->json_is('/error' => 'NotImplemented') 45 - ->json_is('/nsid' => 'com.atproto.repo.createRecord'); 43 + ->status_is(404) 44 + ->json_is('/error' => 'RepoNotFound'); 46 45 47 46 $t->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos') 48 47 ->finish_ok;
+91
t/pds_smoke.t
··· 1 + use v5.34; 2 + use warnings; 3 + 4 + use Config (); 5 + use FindBin qw($Bin); 6 + use File::Spec; 7 + use File::Temp qw(tempdir); 8 + use Test2::V0; 9 + 10 + BEGIN { 11 + require lib; 12 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 13 + lib->import( 14 + File::Spec->catdir($root, 'lib'), 15 + File::Spec->catdir($root, 'local', 'lib', 'perl5'), 16 + File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 17 + ); 18 + } 19 + 20 + use Test::Mojo; 21 + use ATProto::PDS; 22 + 23 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 24 + my $tmp = tempdir(CLEANUP => 1); 25 + 26 + my $app = ATProto::PDS->new( 27 + project_root => $root, 28 + settings => { 29 + base_url => 'http://127.0.0.1:7755', 30 + service_handle_domain => 'example.test', 31 + service_did_method => 'did:web', 32 + jwt_secret => 'smoke-secret', 33 + admin_password => 'admin-secret', 34 + db_path => File::Spec->catfile($tmp, 'perlds.sqlite'), 35 + }, 36 + ); 37 + 38 + my $t = Test::Mojo->new($app); 39 + 40 + $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 41 + handle => 'alice.example.test', 42 + email => 'alice@example.test', 43 + password => 'hunter22', 44 + })->status_is(200); 45 + 46 + my $created = $t->tx->res->json; 47 + ok($created->{accessJwt}, 'account creation returns access token'); 48 + ok($created->{refreshJwt}, 'account creation returns refresh token'); 49 + is($created->{handle}, 'alice.example.test', 'account creation returns normalized handle'); 50 + 51 + my $access = $created->{accessJwt}; 52 + my $did = $created->{did}; 53 + 54 + $t->get_ok('/xrpc/com.atproto.server.getSession' => { 55 + Authorization => "Bearer $access", 56 + })->status_is(200) 57 + ->json_is('/did', $did); 58 + 59 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 60 + Authorization => "Bearer $access", 61 + } => json => { 62 + repo => $did, 63 + collection => 'app.bsky.feed.post', 64 + record => { 65 + '$type' => 'app.bsky.feed.post', 66 + text => 'hello from perl', 67 + }, 68 + })->status_is(200) 69 + ->json_has('/uri') 70 + ->json_has('/cid'); 71 + 72 + $t->get_ok('/xrpc/com.atproto.repo.listRecords' => form => { 73 + repo => $did, 74 + collection => 'app.bsky.feed.post', 75 + })->status_is(200) 76 + ->json_is('/records/0/value/text', 'hello from perl'); 77 + 78 + $t->get_ok('/xrpc/com.atproto.sync.getLatestCommit' => form => { 79 + did => $did, 80 + })->status_is(200) 81 + ->json_has('/cid') 82 + ->json_has('/rev'); 83 + 84 + $t->get_ok('/xrpc/com.atproto.sync.getRepo' => form => { 85 + did => $did, 86 + })->status_is(200); 87 + 88 + like($t->tx->res->headers->content_type // '', qr{application/vnd\.ipld\.car}, 'repo export is served as CAR'); 89 + ok(length($t->tx->res->body) > 0, 'repo export is non-empty'); 90 + 91 + done_testing;
+98
t/repo-api.t
··· 1 + use v5.34; 2 + use warnings; 3 + 4 + use Config (); 5 + use File::Path qw(remove_tree); 6 + use File::Spec; 7 + use FindBin qw($Bin); 8 + use Test2::V0; 9 + 10 + BEGIN { 11 + require lib; 12 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 13 + lib->import( 14 + File::Spec->catdir($root, 'lib'), 15 + File::Spec->catdir($root, 'local', 'lib', 'perl5'), 16 + File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 17 + ); 18 + } 19 + 20 + use Test::Mojo; 21 + use ATProto::PDS; 22 + 23 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 24 + my $tmp = File::Spec->catdir($root, 'data', 'tmp-tests', 'repo-api'); 25 + remove_tree($tmp) if -d $tmp; 26 + 27 + my $t = Test::Mojo->new(ATProto::PDS->new( 28 + project_root => $root, 29 + settings => { 30 + base_url => 'http://127.0.0.1:7755', 31 + service_did_method => 'did:web', 32 + service_handle_domain => 'localhost', 33 + jwt_secret => 'repo-secret', 34 + data_dir => $tmp, 35 + db_path => File::Spec->catfile($tmp, 'perlds.sqlite'), 36 + }, 37 + )); 38 + 39 + $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 40 + handle => 'repo-owner', 41 + email => 'repo@example.com', 42 + password => 'password123', 43 + })->status_is(200); 44 + 45 + my $session = $t->tx->res->json; 46 + my $did = $session->{did}; 47 + my $access = $session->{accessJwt}; 48 + 49 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { Authorization => "Bearer $access" } => json => { 50 + repo => $did, 51 + collection => 'app.bsky.feed.post', 52 + rkey => 'first-post', 53 + record => { 54 + '$type' => 'app.bsky.feed.post', 55 + text => 'hello from perl', 56 + createdAt => '2026-03-10T00:00:00Z', 57 + }, 58 + })->status_is(200) 59 + ->json_like('/cid' => qr/\Ab/); 60 + 61 + $t->get_ok("/xrpc/com.atproto.repo.getRecord?repo=$did&collection=app.bsky.feed.post&rkey=first-post") 62 + ->status_is(200) 63 + ->json_is('/value/text' => 'hello from perl'); 64 + 65 + $t->get_ok("/xrpc/com.atproto.repo.listRecords?repo=$did&collection=app.bsky.feed.post") 66 + ->status_is(200) 67 + ->json_is('/records/0/value/text' => 'hello from perl'); 68 + 69 + $t->get_ok("/xrpc/com.atproto.sync.getLatestCommit?did=$did") 70 + ->status_is(200) 71 + ->json_like('/cid' => qr/\Ab/) 72 + ->json_has('/rev'); 73 + 74 + $t->get_ok("/xrpc/com.atproto.sync.getRepoStatus?did=$did") 75 + ->status_is(200) 76 + ->json_is('/did' => $did) 77 + ->json_has('/active'); 78 + 79 + $t->get_ok('/xrpc/com.atproto.sync.listRepos') 80 + ->status_is(200) 81 + ->json_is('/repos/0/did' => $did); 82 + 83 + $t->get_ok("/xrpc/com.atproto.sync.getRepo?did=$did") 84 + ->status_is(200) 85 + ->content_type_like(qr{application/vnd\.ipld\.car}) 86 + ->content_like(qr/.+/s); 87 + 88 + $t->post_ok('/xrpc/com.atproto.repo.deleteRecord' => { Authorization => "Bearer $access" } => json => { 89 + repo => $did, 90 + collection => 'app.bsky.feed.post', 91 + rkey => 'first-post', 92 + })->status_is(200); 93 + 94 + $t->get_ok("/xrpc/com.atproto.repo.getRecord?repo=$did&collection=app.bsky.feed.post&rkey=first-post") 95 + ->status_is(404) 96 + ->json_is('/error' => 'RecordNotFound'); 97 + 98 + done_testing;