perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Enforce moderation takedowns across repo, record, and blob flows

alice 731c0b5e 0ce778bc

+582 -20
+55 -8
lib/ATProto/PDS/API/Admin.pm
··· 13 13 use ATProto::PDS::Auth::Password qw(hash_password); 14 14 use ATProto::PDS::Crypto::Secp256k1 qw(signing_did_to_public_key_multibase); 15 15 use ATProto::PDS::Identity qw(account_did_doc normalize_handle); 16 + use ATProto::PDS::Moderation qw(current_record_subject current_subject_status parse_at_uri); 16 17 17 18 our @EXPORT_OK = qw(register_admin_handlers); 18 19 ··· 52 53 $registry->register('com.atproto.admin.getSubjectStatus', sub ($c, $endpoint) { 53 54 require_admin($c); 54 55 my $subject = _subject_from_params($c); 55 - my $status = $c->store->get_subject_status(subject_key($subject)); 56 + my $status = current_subject_status($c, $subject); 57 + xrpc_error(404, 'NotFound', 'Subject not found') unless $status; 56 58 return { 57 - subject => $subject, 58 - ($status && $status->{takedown} ? (takedown => $status->{takedown}) : ()), 59 - ($status && $status->{deactivated} ? (deactivated => $status->{deactivated}) : ()), 59 + subject => $status->{subject}, 60 + ($status->{takedown} ? (takedown => $status->{takedown}) : ()), 61 + ($status->{deactivated} ? (deactivated => $status->{deactivated}) : ()), 60 62 }; 61 63 }); 62 64 63 65 $registry->register('com.atproto.admin.updateSubjectStatus', sub ($c, $endpoint) { 64 66 require_admin($c); 65 67 my $body = $c->req->json || {}; 66 - my $subject = $body->{subject} || {}; 68 + my $subject = _validated_subject($c, $body->{subject} || {}); 69 + my $existing = $c->store->get_subject_status(subject_key($subject)); 67 70 my $status = $c->store->put_subject_status( 68 71 subject_key => subject_key($subject), 69 72 subject => $subject, 70 - takedown => $body->{takedown}, 71 - deactivated => $body->{deactivated}, 73 + takedown => exists($body->{takedown}) ? $body->{takedown} : ($existing ? $existing->{takedown} : undef), 74 + deactivated => exists($body->{deactivated}) ? $body->{deactivated} : ($existing ? $existing->{deactivated} : undef), 72 75 ); 73 - if (exists($subject->{did}) && !exists($subject->{uri}) && !exists($subject->{cid}) && $body->{deactivated}) { 76 + if (exists($subject->{did}) && !exists($subject->{uri}) && !exists($subject->{cid}) && exists($body->{deactivated})) { 74 77 $c->store->update_account( 75 78 $subject->{did}, 76 79 deactivated_at => $body->{deactivated}{applied} ? time : undef, 80 + ); 81 + $c->store->append_event( 82 + did => $subject->{did}, 83 + type => 'account', 84 + rev => ($c->store->get_account_by_did($subject->{did})->{repo_rev} // undef), 85 + payload => { 86 + active => $body->{deactivated}{applied} ? JSON::PP::false : JSON::PP::true, 87 + ($body->{deactivated}{applied} ? (status => 'deactivated') : ()), 88 + }, 89 + ); 90 + } 91 + if (exists($subject->{did}) && exists($subject->{cid})) { 92 + $c->store->update_blob( 93 + $subject->{cid}, 94 + quarantined_at => ($status->{takedown} && $status->{takedown}{applied}) ? time : undef, 77 95 ); 78 96 } 79 97 return { ··· 252 270 cid => $c->param('blob'), 253 271 } if defined $c->param('blob'); 254 272 xrpc_error(400, 'InvalidRequest', 'A subject reference is required'); 273 + } 274 + 275 + sub _validated_subject ($c, $subject) { 276 + if (exists($subject->{did}) && !exists($subject->{uri}) && !exists($subject->{cid})) { 277 + my $account = $c->store->get_account_by_did($subject->{did}); 278 + xrpc_error(404, 'NotFound', 'Subject not found') unless $account; 279 + return { 280 + %{$subject}, 281 + '$type' => ($subject->{'$type'} // 'com.atproto.admin.defs#repoRef'), 282 + }; 283 + } 284 + if (exists $subject->{uri}) { 285 + my $current = current_record_subject($c, $subject->{uri}); 286 + xrpc_error(404, 'NotFound', 'Subject not found') unless $current; 287 + return { 288 + %{$current}, 289 + '$type' => ($subject->{'$type'} // 'com.atproto.repo.strongRef'), 290 + }; 291 + } 292 + if (exists($subject->{did}) && exists($subject->{cid})) { 293 + my $blob = $c->store->get_blob($subject->{cid}); 294 + xrpc_error(404, 'NotFound', 'Subject not found') 295 + unless $blob && ($blob->{did} // q()) eq ($subject->{did} // q()); 296 + return { 297 + %{$subject}, 298 + '$type' => ($subject->{'$type'} // 'com.atproto.admin.defs#repoBlobRef'), 299 + }; 300 + } 301 + xrpc_error(400, 'InvalidRequest', 'Invalid subject'); 255 302 } 256 303 257 304 1;
+1 -10
lib/ATProto/PDS/API/Helpers.pm
··· 11 11 12 12 use ATProto::PDS::API::Util qw(iso8601 xrpc_error); 13 13 use ATProto::PDS::Auth::Password qw(verify_password); 14 + use ATProto::PDS::Moderation qw(subject_key); 14 15 15 16 our @EXPORT_OK = qw( 16 17 account_view ··· 100 101 } @$uses 101 102 ], 102 103 }; 103 - } 104 - 105 - sub subject_key ($subject) { 106 - return 'repo:' . ($subject->{did} // q()) 107 - if ref($subject) eq 'HASH' && exists $subject->{did} && !exists $subject->{uri} && !exists $subject->{cid}; 108 - return 'record:' . ($subject->{uri} // q()) . ':' . ($subject->{cid} // q()) 109 - if ref($subject) eq 'HASH' && exists $subject->{uri}; 110 - return 'blob:' . ($subject->{did} // q()) . ':' . ($subject->{cid} // q()) 111 - if ref($subject) eq 'HASH' && exists $subject->{did} && exists $subject->{cid}; 112 - xrpc_error(400, 'InvalidRequest', 'Unsupported subject payload'); 113 104 } 114 105 115 106 1;
+2
lib/ATProto/PDS/API/Misc.pm
··· 13 13 use ATProto::PDS::API::Util qw(iso8601 xrpc_error); 14 14 use ATProto::PDS::Auth::Password qw(hash_password random_hex); 15 15 use ATProto::PDS::Identity qw(account_did_doc normalize_handle service_did service_did_doc); 16 + use ATProto::PDS::Moderation qw(assert_report_allowed); 16 17 use ATProto::PDS::PLC qw(create_signed_plc_operation is_plc_did plc_rotation_did plc_update_handle recommended_did_credentials refresh_plc_did_doc submit_plc_operation); 17 18 use ATProto::PDS::Repo::CID; 18 19 use ATProto::PDS::Repo::DagCbor qw(encode_dag_cbor); ··· 183 184 $registry->register('com.atproto.moderation.createReport', sub ($c, $endpoint) { 184 185 my (undef, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 185 186 my $body = $c->req->json || {}; 187 + assert_report_allowed($c, $account, $body->{reasonType}); 186 188 my $row = $c->store->create_report( 187 189 reason_type => $body->{reasonType}, 188 190 reason => $body->{reason},
+50 -1
lib/ATProto/PDS/API/Repo.pm
··· 12 12 13 13 use ATProto::PDS::API::Server qw(require_auth); 14 14 use ATProto::PDS::API::Util qw(blob_ref); 15 + use ATProto::PDS::Moderation qw(assert_record_readable assert_repo_readable assert_repo_writable is_record_takedown parse_at_uri); 15 16 use ATProto::PDS::Repo::CID; 16 17 17 18 our @EXPORT_OK = qw(register_repo_handlers); ··· 20 21 $registry->register('com.atproto.repo.describeRepo', sub ($c, $endpoint) { 21 22 my $account = _resolve_repo($c, $c->param('repo')); 22 23 _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 24 + assert_repo_readable($c, $account); 23 25 24 26 return { 25 27 handle => $account->{handle}, ··· 104 106 $registry->register('com.atproto.repo.getRecord', sub ($c, $endpoint) { 105 107 my $account = _resolve_repo($c, $c->param('repo')); 106 108 _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 109 + assert_repo_readable($c, $account); 107 110 my $row = $c->store->get_record($account->{did}, $c->param('collection'), $c->param('rkey')); 108 111 _xrpc_error(404, 'RecordNotFound', 'Record was not found') unless $row; 112 + assert_record_readable($c, "at://$account->{did}/$row->{collection}/$row->{rkey}"); 109 113 return { 110 114 uri => "at://$account->{did}/$row->{collection}/$row->{rkey}", 111 115 cid => $row->{cid}, ··· 116 120 $registry->register('com.atproto.repo.listRecords', sub ($c, $endpoint) { 117 121 my $account = _resolve_repo($c, $c->param('repo')); 118 122 _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 119 - my $page = $c->store->list_records( 123 + assert_repo_readable($c, $account); 124 + my $page = _list_visible_records( 125 + $c, 120 126 $account->{did}, 121 127 $c->param('collection'), 122 128 limit => $c->param('limit') // 50, ··· 139 145 140 146 $registry->register('com.atproto.repo.uploadBlob', sub ($c, $endpoint) { 141 147 my (undef, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 148 + assert_repo_writable($c, $account); 142 149 my $bytes = $c->req->body // q(); 143 150 my $cid = ATProto::PDS::Repo::CID->for_raw($bytes)->to_string; 151 + my $existing = $c->store->get_blob($cid); 152 + _xrpc_error(400, 'BlobTakenDown', 'Blob has been taken down') 153 + if $existing && defined $existing->{quarantined_at}; 144 154 my $data_dir = $c->config_value('data_dir', File::Spec->catdir($c->app->project_root, 'data', 'runtime')); 145 155 my $blob_dir = File::Spec->catdir($data_dir, 'blobs'); 146 156 make_path($blob_dir); ··· 166 176 167 177 $registry->register('com.atproto.repo.listMissingBlobs', sub ($c, $endpoint) { 168 178 my (undef, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 179 + assert_repo_writable($c, $account); 169 180 my $page = { 170 181 items => [], 171 182 cursor => undef, ··· 177 188 178 189 $registry->register('com.atproto.repo.importRepo', sub ($c, $endpoint) { 179 190 my (undef, $account) = require_auth($c, audience => 'access', allow_refresh => 1); 191 + assert_repo_writable($c, $account); 180 192 _xrpc_error(400, 'InvalidRequest', 'Service is not accepting repo imports') 181 193 unless $c->config_value('accepting_imports', 1); 182 194 my $car_bytes = $c->req->body // q(); ··· 210 222 _xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 211 223 my ($claims) = require_auth($c, audience => 'access', allow_refresh => 1); 212 224 _xrpc_error(401, 'AuthRequired', 'Token is not authorized for that repo') unless ($claims->{sub} // '') eq $account->{did}; 225 + assert_repo_writable($c, $account); 213 226 return $account; 227 + } 228 + 229 + sub _list_visible_records ($c, $did, $collection, %args) { 230 + my $limit = $args{limit} // 50; 231 + my $cursor = $args{cursor}; 232 + my $reverse = $args{reverse} ? 1 : 0; 233 + my @visible; 234 + my $next_cursor = $cursor; 235 + while (@visible < $limit + 1) { 236 + my $page = $c->store->list_records( 237 + $did, 238 + $collection, 239 + limit => $limit + 1, 240 + cursor => $next_cursor, 241 + reverse => $reverse, 242 + ); 243 + last unless @{ $page->{items} }; 244 + for my $row (@{ $page->{items} }) { 245 + next if is_record_takedown($c, "at://$did/$row->{collection}/$row->{rkey}"); 246 + push @visible, $row; 247 + last if @visible >= $limit + 1; 248 + } 249 + last unless defined $page->{cursor}; 250 + $next_cursor = $page->{cursor}; 251 + } 252 + 253 + my $out_cursor; 254 + if (@visible > $limit) { 255 + my $last = pop @visible; 256 + $out_cursor = $last->{rkey}; 257 + } 258 + 259 + return { 260 + items => \@visible, 261 + cursor => $out_cursor, 262 + }; 214 263 } 215 264 216 265 sub _xrpc_error ($status, $error, $message) {
+3 -1
lib/ATProto/PDS/API/Server.pm
··· 13 13 use ATProto::PDS::Auth::JWT qw(decode_jwt encode_jwt); 14 14 use ATProto::PDS::Auth::Password qw(hash_password random_hex); 15 15 use ATProto::PDS::Identity qw(account_did account_did_doc normalize_handle service_did); 16 + use ATProto::PDS::Moderation qw(assert_login_allowed); 16 17 use ATProto::PDS::PLC qw(account_did_method create_plc_account is_plc_did refresh_plc_did_doc); 17 18 use ATProto::PDS::Repo::CAR qw(read_car); 18 19 ··· 121 122 my $body = $c->req->json || {}; 122 123 my $account = find_account($c, $body->{identifier} // q()); 123 124 xrpc_error(401, 'AuthRequired', 'Invalid identifier or password') unless $account; 124 - xrpc_error(403, 'AccountDeleted', 'Account has been deleted') if defined $account->{deleted_at}; 125 125 xrpc_error(401, 'AuthRequired', 'Invalid identifier or password') 126 126 unless verify_account_password($c, $account, $body->{password} // q()); 127 + assert_login_allowed($c, $account, allow_takedown => $body->{allowTakendown}); 127 128 return _issue_session($c, $account); 128 129 }); 129 130 ··· 137 138 my $session = $c->store->get_session($claims->{jti}); 138 139 xrpc_error(401, 'InvalidToken', 'Refresh session was not found') unless $session; 139 140 xrpc_error(401, 'ExpiredToken', 'Refresh session has already been revoked') if defined $session->{revoked_at}; 141 + assert_login_allowed($c, $account); 140 142 $c->store->revoke_session($session->{id}); 141 143 return _issue_session($c, $account); 142 144 });
+11
lib/ATProto/PDS/API/Sync.pm
··· 12 12 use ATProto::PDS::EventStream qw(encode_error_frame encode_info_frame encode_message_frame); 13 13 use ATProto::PDS::API::Util qw(iso8601 resolve_did_account xrpc_error); 14 14 use ATProto::PDS::Identity qw(service_host); 15 + use ATProto::PDS::Moderation qw(assert_blob_readable assert_record_readable assert_repo_readable); 15 16 use ATProto::PDS::Repo::CAR qw(write_car); 16 17 use ATProto::PDS::Repo::CID; 17 18 use ATProto::PDS::Repo::Bytes; ··· 22 23 $registry->register('com.atproto.sync.getLatestCommit', sub ($c, $endpoint) { 23 24 my $account = resolve_did_account($c, $c->param('did') // q()); 24 25 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 26 + assert_repo_readable($c, $account, message => 'Could not find repo for DID: ' . ($c->param('did') // q())); 25 27 my $head = $c->store->get_repo_head($account->{did}); 26 28 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $head; 27 29 return { ··· 33 35 $registry->register('com.atproto.sync.getHead', sub ($c, $endpoint) { 34 36 my $account = resolve_did_account($c, $c->param('did') // q()); 35 37 xrpc_error(404, 'HeadNotFound', 'Repository head was not found') unless $account; 38 + assert_repo_readable($c, $account, error => 'HeadNotFound', message => 'Repository head was not found'); 36 39 my $head = $c->store->get_repo_head($account->{did}); 37 40 xrpc_error(404, 'HeadNotFound', 'Repository head was not found') unless $head; 38 41 return { ··· 43 46 $registry->register('com.atproto.sync.getRepoStatus', sub ($c, $endpoint) { 44 47 my $account = resolve_did_account($c, $c->param('did') // q()); 45 48 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 49 + assert_repo_readable($c, $account, message => 'Could not find repo for DID: ' . ($c->param('did') // q())); 46 50 return { 47 51 did => $account->{did}, 48 52 active => defined($account->{deactivated_at}) ? JSON::PP::false : JSON::PP::true, ··· 55 59 $registry->register('com.atproto.sync.getRepo', sub ($c, $endpoint) { 56 60 my $account = resolve_did_account($c, $c->param('did') // q()); 57 61 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 62 + assert_repo_readable($c, $account, message => 'Could not find repo for DID: ' . ($c->param('did') // q())); 58 63 my $car = $c->store->repo_car($account->{did}); 59 64 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless defined $car; 60 65 $c->res->headers->content_type('application/vnd.ipld.car'); ··· 65 70 $registry->register('com.atproto.sync.getCheckout', sub ($c, $endpoint) { 66 71 my $account = resolve_did_account($c, $c->param('did') // q()); 67 72 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 73 + assert_repo_readable($c, $account, message => 'Could not find repo for DID: ' . ($c->param('did') // q())); 68 74 my $car = $c->store->repo_car($account->{did}); 69 75 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless defined $car; 70 76 $c->res->headers->content_type('application/vnd.ipld.car'); ··· 75 81 $registry->register('com.atproto.sync.getRecord', sub ($c, $endpoint) { 76 82 my $account = resolve_did_account($c, $c->param('did') // q()); 77 83 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 84 + assert_repo_readable($c, $account, message => 'Could not find repo for DID: ' . ($c->param('did') // q())); 78 85 my $record = $c->store->get_record($account->{did}, $c->param('collection'), $c->param('rkey')); 79 86 xrpc_error(404, 'RecordNotFound', 'Record was not found') unless $record; 87 + assert_record_readable($c, "at://$account->{did}/$record->{collection}/$record->{rkey}"); 80 88 my $car = $c->store->repo_car($account->{did}); 81 89 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless defined $car; 82 90 $c->res->headers->content_type('application/vnd.ipld.car'); ··· 87 95 $registry->register('com.atproto.sync.getBlocks', sub ($c, $endpoint) { 88 96 my $account = resolve_did_account($c, $c->param('did') // q()); 89 97 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 98 + assert_repo_readable($c, $account, message => 'Could not find repo for DID: ' . ($c->param('did') // q())); 90 99 my @cids = _flatten_params($c->every_param('cids')); 91 100 xrpc_error(400, 'InvalidRequest', 'At least one CID is required') unless @cids; 92 101 my $rows = $c->store->get_blocks(\@cids); ··· 112 121 my $blob = $c->store->get_blob($c->param('cid') // q()); 113 122 xrpc_error(404, 'BlobNotFound', 'Blob was not found') 114 123 unless $blob && ($blob->{did} // q()) eq $account->{did}; 124 + assert_blob_readable($c, $account, $blob); 115 125 xrpc_error(404, 'BlobNotFound', 'Blob content is not available') 116 126 unless $blob->{storage_path} && -f $blob->{storage_path}; 117 127 open(my $fh, '<:raw', $blob->{storage_path}) or xrpc_error(500, 'StorageFailure', 'Unable to read blob'); ··· 149 159 $registry->register('com.atproto.sync.listBlobs', sub ($c, $endpoint) { 150 160 my $account = resolve_did_account($c, $c->param('did') // q()); 151 161 xrpc_error(404, 'RepoNotFound', 'Repository was not found') unless $account; 162 + assert_repo_readable($c, $account, message => 'Could not find repo for DID: ' . ($c->param('did') // q())); 152 163 my $page = $c->store->list_blobs_by_did( 153 164 $account->{did}, 154 165 limit => $c->param('limit') // 500,
+158
lib/ATProto/PDS/Moderation.pm
··· 1 + package ATProto::PDS::Moderation; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use Exporter 'import'; 9 + use JSON::PP (); 10 + 11 + use ATProto::PDS::API::Util qw(xrpc_error); 12 + use ATProto::PDS::Auth::JWT qw(decode_jwt); 13 + 14 + our @EXPORT_OK = qw( 15 + assert_blob_readable 16 + assert_login_allowed 17 + assert_record_readable 18 + assert_repo_readable 19 + assert_repo_writable 20 + assert_report_allowed 21 + can_read_private_blob 22 + current_record_subject 23 + current_subject_status 24 + is_blob_takedown 25 + is_record_takedown 26 + is_repo_takedown 27 + parse_at_uri 28 + subject_key 29 + ); 30 + 31 + sub subject_key ($subject) { 32 + return 'repo:' . ($subject->{did} // q()) 33 + if ref($subject) eq 'HASH' && exists $subject->{did} && !exists $subject->{uri} && !exists $subject->{cid}; 34 + return 'record:' . ($subject->{uri} // q()) 35 + if ref($subject) eq 'HASH' && exists $subject->{uri}; 36 + return 'blob:' . ($subject->{did} // q()) . ':' . ($subject->{cid} // q()) 37 + if ref($subject) eq 'HASH' && exists $subject->{did} && exists $subject->{cid}; 38 + xrpc_error(400, 'InvalidRequest', 'Unsupported subject payload'); 39 + } 40 + 41 + sub parse_at_uri ($uri) { 42 + return unless defined $uri && $uri =~ m{\Aat://([^/]+)/([^/]+)/([^/?#]+)\z}; 43 + return ($1, $2, $3); 44 + } 45 + 46 + sub current_record_subject ($c, $uri) { 47 + my ($did, $collection, $rkey) = parse_at_uri($uri); 48 + return undef unless defined $did; 49 + my $record = $c->store->get_record($did, $collection, $rkey); 50 + return undef unless $record; 51 + return { 52 + uri => $uri, 53 + cid => $record->{cid}, 54 + }; 55 + } 56 + 57 + sub current_subject_status ($c, $subject) { 58 + my $key = subject_key($subject); 59 + my $status = $c->store->get_subject_status($key); 60 + return undef unless $status; 61 + my $current_subject = $status->{subject} || $subject; 62 + if (exists $subject->{uri}) { 63 + my $current = current_record_subject($c, $subject->{uri}); 64 + return undef unless $current; 65 + $current_subject = { 66 + %{$current}, 67 + ($status->{subject}{'$type'} ? ('$type' => $status->{subject}{'$type'}) : ()), 68 + }; 69 + } 70 + return { 71 + %{$status}, 72 + subject => $current_subject, 73 + }; 74 + } 75 + 76 + sub is_repo_takedown ($c, $did) { 77 + my $status = $c->store->get_subject_status('repo:' . ($did // q())); 78 + return ($status && $status->{takedown} && $status->{takedown}{applied}) ? 1 : 0; 79 + } 80 + 81 + sub is_record_takedown ($c, $uri) { 82 + my $status = $c->store->get_subject_status('record:' . ($uri // q())); 83 + return ($status && $status->{takedown} && $status->{takedown}{applied}) ? 1 : 0; 84 + } 85 + 86 + sub is_blob_takedown ($c, $did, $cid) { 87 + my $status = $c->store->get_subject_status('blob:' . ($did // q()) . ':' . ($cid // q())); 88 + return ($status && $status->{takedown} && $status->{takedown}{applied}) ? 1 : 0; 89 + } 90 + 91 + sub assert_login_allowed ($c, $account, %opts) { 92 + xrpc_error(403, 'AccountDeleted', 'Account has been deleted') if defined $account->{deleted_at}; 93 + if (is_repo_takedown($c, $account->{did}) && !$opts{allow_takedown}) { 94 + xrpc_error(403, 'AccountTakedown', 'Account has been taken down'); 95 + } 96 + if (defined $account->{deactivated_at} && !$opts{allow_deactivated}) { 97 + xrpc_error(403, 'AccountDeactivated', 'Account is deactivated'); 98 + } 99 + return 1; 100 + } 101 + 102 + sub assert_repo_readable ($c, $account, %opts) { 103 + return 1 unless $account; 104 + if (is_repo_takedown($c, $account->{did})) { 105 + xrpc_error( 106 + $opts{status} // 404, 107 + $opts{error} // 'RepoNotFound', 108 + $opts{message} // 'Repository was not found', 109 + ); 110 + } 111 + return 1; 112 + } 113 + 114 + sub assert_record_readable ($c, $uri, %opts) { 115 + if (is_record_takedown($c, $uri)) { 116 + xrpc_error( 117 + $opts{status} // 404, 118 + $opts{error} // 'RecordNotFound', 119 + $opts{message} // 'Record was not found', 120 + ); 121 + } 122 + return 1; 123 + } 124 + 125 + sub assert_repo_writable ($c, $account) { 126 + if (is_repo_takedown($c, $account->{did}) || defined $account->{deactivated_at}) { 127 + xrpc_error(401, 'InvalidToken', 'Bad token scope'); 128 + } 129 + return 1; 130 + } 131 + 132 + sub can_read_private_blob ($c, $did) { 133 + my $auth = $c->req->headers->authorization // q(); 134 + return 1 if defined($c->config_value('admin_password')) && length($c->config_value('admin_password')) 135 + && $auth =~ /\ABearer\s+\Q@{[$c->config_value('admin_password')]}\E\z/; 136 + return 0 unless $auth =~ /\ABearer\s+(.+)\z/i; 137 + my $token = $1; 138 + my $decoded = eval { decode_jwt($token, $c->config_value('jwt_secret', 'perlds-dev-secret')) }; 139 + return 0 unless $decoded && ref($decoded) eq 'HASH'; 140 + my $claims = $decoded->{claims} || {}; 141 + return (($claims->{sub} // q()) eq ($did // q())) ? 1 : 0; 142 + } 143 + 144 + sub assert_blob_readable ($c, $account, $blob) { 145 + my $private_ok = can_read_private_blob($c, $account->{did}); 146 + if ((is_repo_takedown($c, $account->{did}) || is_blob_takedown($c, $account->{did}, $blob->{cid}) || defined $blob->{quarantined_at}) && !$private_ok) { 147 + xrpc_error(404, 'BlobNotFound', 'Blob was not found'); 148 + } 149 + return 1; 150 + } 151 + 152 + sub assert_report_allowed ($c, $account, $reason_type) { 153 + return 1 unless is_repo_takedown($c, $account->{did}); 154 + return 1 if ($reason_type // q()) eq 'com.atproto.moderation.defs#reasonAppeal'; 155 + xrpc_error(403, 'InvalidRequest', 'Report not accepted from takendown account'); 156 + } 157 + 158 + 1;
+40
lib/ATProto/PDS/Repo/Manager.pm
··· 9 9 use Scalar::Util qw(blessed); 10 10 11 11 use ATProto::PDS::Crypto::Secp256k1 qw(generate_keypair sign_compact_low_s); 12 + use ATProto::PDS::API::Util qw(xrpc_error); 12 13 use ATProto::PDS::Repo::Bytes; 13 14 use ATProto::PDS::Repo::CAR qw(read_car write_car); 14 15 use ATProto::PDS::Repo::CID; ··· 81 82 my $collection = $write->{collection}; 82 83 my $rkey = $write->{rkey} // next_tid(); 83 84 my $value = $write->{value} // $write->{record}; 85 + my @blob_cids = _blob_cids($value); 86 + for my $blob_cid (@blob_cids) { 87 + my $blob = $store->get_blob($blob_cid); 88 + die { 89 + status => 400, 90 + error => 'InvalidBlob', 91 + message => "Could not find blob: $blob_cid", 92 + } unless $blob && ($blob->{did} // q()) eq $did; 93 + die { 94 + status => 400, 95 + error => 'BlobTakenDown', 96 + message => "Blob has been taken down: $blob_cid", 97 + } if defined $blob->{quarantined_at}; 98 + } 84 99 my $bytes = encode_dag_cbor($value); 85 100 my $cid = ATProto::PDS::Repo::CID->for_dag_cbor($bytes); 86 101 my $path = $collection . '/' . $rkey; ··· 145 160 ); 146 161 } 147 162 $store->replace_records_for_did($did, [ values %$records ]); 163 + for my $record (values %$records) { 164 + $store->mark_blobs_referenced(_blob_cids($record->{value})); 165 + } 148 166 $store->put_commit( 149 167 did => $did, 150 168 rev => $rev, ··· 175 193 car_bytes => $car_bytes, 176 194 results => \@results, 177 195 }; 196 + } 197 + 198 + sub _blob_cids ($value) { 199 + return () unless defined $value; 200 + if (ref($value) eq 'HASH') { 201 + if (($value->{'$type'} // q()) eq 'blob' && ref($value->{ref}) eq 'HASH' && defined($value->{ref}{'$link'})) { 202 + return ($value->{ref}{'$link'}); 203 + } 204 + my @found; 205 + for my $child (values %$value) { 206 + push @found, _blob_cids($child); 207 + } 208 + return @found; 209 + } 210 + if (ref($value) eq 'ARRAY') { 211 + my @found; 212 + for my $child (@$value) { 213 + push @found, _blob_cids($child); 214 + } 215 + return @found; 216 + } 217 + return (); 178 218 } 179 219 180 220 sub import_repo_car ($self, $account, $car_bytes) {
+33
lib/ATProto/PDS/Store/SQLite.pm
··· 384 384 ); 385 385 } 386 386 387 + sub update_blob ($self, $cid, %args) { 388 + my @sets; 389 + my @bind; 390 + for my $column (qw(did mime_type byte_size storage_path temporary referenced_at quarantined_at)) { 391 + next unless exists $args{$column}; 392 + push @sets, "$column = ?"; 393 + push @bind, $column eq 'temporary' ? ($args{$column} ? 1 : 0) : $args{$column}; 394 + } 395 + return $self->get_blob($cid) unless @sets; 396 + push @bind, $cid; 397 + $self->dbh->do( 398 + 'UPDATE blobs SET ' . join(', ', @sets) . ' WHERE cid = ?', 399 + undef, 400 + @bind, 401 + ); 402 + return $self->get_blob($cid); 403 + } 404 + 405 + sub mark_blobs_referenced ($self, @cids) { 406 + return 0 unless @cids; 407 + my $now = time; 408 + my %seen; 409 + for my $cid (grep { defined && length && !$seen{$_}++ } @cids) { 410 + $self->dbh->do( 411 + q{UPDATE blobs SET referenced_at = ?, temporary = 0 WHERE cid = ?}, 412 + undef, 413 + $now, 414 + $cid, 415 + ); 416 + } 417 + return scalar keys %seen; 418 + } 419 + 387 420 sub list_blobs_by_did ($self, $did, %args) { 388 421 my $limit = $args{limit} // 500; 389 422 my $cursor = $args{cursor};
+229
t/moderation.t
··· 1 + use v5.34; 2 + use warnings; 3 + 4 + use Config (); 5 + use File::Spec; 6 + use File::Temp qw(tempdir); 7 + use FindBin qw($Bin); 8 + use JSON::PP (); 9 + use Test::More; 10 + 11 + BEGIN { 12 + require lib; 13 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 14 + lib->import( 15 + File::Spec->catdir($root, 'lib'), 16 + File::Spec->catdir($root, 'local', 'lib', 'perl5'), 17 + File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 18 + ); 19 + } 20 + 21 + use Test::Mojo; 22 + use Mojo::URL; 23 + use ATProto::PDS; 24 + 25 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 26 + my $tmp = tempdir(CLEANUP => 1); 27 + 28 + my $app = ATProto::PDS->new( 29 + project_root => $root, 30 + settings => { 31 + base_url => 'http://127.0.0.1:7755', 32 + service_handle_domain => 'example.test', 33 + service_did_method => 'did:web', 34 + jwt_secret => 'moderation-secret', 35 + admin_password => 'admin-secret', 36 + data_dir => File::Spec->catdir($tmp, 'data'), 37 + db_path => File::Spec->catfile($tmp, 'perlds.sqlite'), 38 + }, 39 + ); 40 + 41 + my $t = Test::Mojo->new($app); 42 + 43 + $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 44 + handle => 'alice.example.test', 45 + email => 'alice@example.test', 46 + password => 'hunter22', 47 + })->status_is(200); 48 + 49 + my $session = $t->tx->res->json; 50 + my $did = $session->{did}; 51 + my $access = $session->{accessJwt}; 52 + 53 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 54 + Authorization => "Bearer $access", 55 + } => json => { 56 + repo => $did, 57 + collection => 'app.bsky.feed.post', 58 + rkey => 'visible-post', 59 + record => { 60 + '$type' => 'app.bsky.feed.post', 61 + text => 'visible', 62 + createdAt => '2026-03-10T00:00:00Z', 63 + }, 64 + })->status_is(200) 65 + ->json_is('/uri', "at://$did/app.bsky.feed.post/visible-post"); 66 + 67 + my $record_cid = $t->tx->res->json->{cid}; 68 + 69 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 70 + Authorization => 'Bearer admin-secret', 71 + } => json => { 72 + subject => { uri => "at://$did/app.bsky.feed.post/visible-post", cid => $record_cid }, 73 + takedown => { applied => JSON::PP::true }, 74 + })->status_is(200) 75 + ->json_is('/subject/uri', "at://$did/app.bsky.feed.post/visible-post"); 76 + 77 + $t->get_ok("/xrpc/com.atproto.repo.getRecord?repo=$did&collection=app.bsky.feed.post&rkey=visible-post") 78 + ->status_is(404) 79 + ->json_is('/error', 'RecordNotFound'); 80 + 81 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.repo.listRecords')->query( 82 + repo => $did, 83 + collection => 'app.bsky.feed.post', 84 + ))->status_is(200) 85 + ->json_is('/records', []); 86 + 87 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.getRecord')->query( 88 + did => $did, 89 + collection => 'app.bsky.feed.post', 90 + rkey => 'visible-post', 91 + ))->status_is(404) 92 + ->json_is('/error', 'RecordNotFound'); 93 + 94 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 95 + Authorization => 'Bearer admin-secret', 96 + } => json => { 97 + subject => { did => $did }, 98 + takedown => { applied => JSON::PP::true }, 99 + })->status_is(200); 100 + 101 + $t->post_ok('/xrpc/com.atproto.server.createSession' => json => { 102 + identifier => 'alice.example.test', 103 + password => 'hunter22', 104 + })->status_is(403) 105 + ->json_is('/error', 'AccountTakedown'); 106 + 107 + $t->post_ok('/xrpc/com.atproto.server.createSession' => json => { 108 + identifier => 'alice.example.test', 109 + password => 'hunter22', 110 + allowTakendown => JSON::PP::true, 111 + })->status_is(200); 112 + 113 + my $takedown_access = $t->tx->res->json->{accessJwt}; 114 + 115 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 116 + Authorization => "Bearer $takedown_access", 117 + } => json => { 118 + repo => $did, 119 + collection => 'app.bsky.feed.post', 120 + rkey => 'blocked-post', 121 + record => { 122 + '$type' => 'app.bsky.feed.post', 123 + text => 'blocked', 124 + createdAt => '2026-03-10T00:00:01Z', 125 + }, 126 + })->status_is(401) 127 + ->json_is('/error', 'InvalidToken') 128 + ->json_is('/message', 'Bad token scope'); 129 + 130 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.repo.listRecords')->query( 131 + repo => $did, 132 + collection => 'app.bsky.feed.post', 133 + ))->status_is(404) 134 + ->json_is('/error', 'RepoNotFound'); 135 + 136 + $t->post_ok('/xrpc/com.atproto.moderation.createReport' => { 137 + Authorization => "Bearer $takedown_access", 138 + } => json => { 139 + reasonType => 'com.atproto.moderation.defs#reasonRude', 140 + reason => 'not allowed while takendown', 141 + subject => { did => 'did:web:elsewhere.test' }, 142 + })->status_is(403) 143 + ->json_is('/message', 'Report not accepted from takendown account'); 144 + 145 + $t->post_ok('/xrpc/com.atproto.moderation.createReport' => { 146 + Authorization => "Bearer $takedown_access", 147 + } => json => { 148 + reasonType => 'com.atproto.moderation.defs#reasonAppeal', 149 + reason => 'please restore', 150 + subject => { did => $did }, 151 + })->status_is(200) 152 + ->json_is('/reasonType', 'com.atproto.moderation.defs#reasonAppeal'); 153 + 154 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 155 + Authorization => 'Bearer admin-secret', 156 + } => json => { 157 + subject => { did => $did }, 158 + takedown => { applied => JSON::PP::false }, 159 + })->status_is(200); 160 + 161 + my $blob_tx = $t->ua->build_tx( 162 + POST => '/xrpc/com.atproto.repo.uploadBlob' => { 163 + Authorization => "Bearer $access", 164 + 'Content-Type' => 'image/png', 165 + } => 'blob-bytes', 166 + ); 167 + $t->request_ok($blob_tx)->status_is(200); 168 + 169 + my $blob = $t->tx->res->json->{blob}; 170 + my $blob_cid = $blob->{ref}{'$link'}; 171 + 172 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 173 + Authorization => 'Bearer admin-secret', 174 + } => json => { 175 + subject => { did => $did, cid => $blob_cid }, 176 + takedown => { applied => JSON::PP::true }, 177 + })->status_is(200) 178 + ->json_is('/subject/cid', $blob_cid); 179 + 180 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.getBlob')->query( 181 + did => $did, 182 + cid => $blob_cid, 183 + ))->status_is(404) 184 + ->json_is('/error', 'BlobNotFound'); 185 + 186 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.sync.getBlob')->query( 187 + did => $did, 188 + cid => $blob_cid, 189 + ) => { 190 + Authorization => "Bearer $access", 191 + })->status_is(200); 192 + is($t->tx->res->body, 'blob-bytes', 'repo owner can still read quarantined blob'); 193 + 194 + my $blocked_blob_upload = $t->ua->build_tx( 195 + POST => '/xrpc/com.atproto.repo.uploadBlob' => { 196 + Authorization => "Bearer $access", 197 + 'Content-Type' => 'image/png', 198 + } => 'blob-bytes', 199 + ); 200 + $t->request_ok($blocked_blob_upload)->status_is(400) 201 + ->json_is('/error', 'BlobTakenDown'); 202 + 203 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 204 + Authorization => "Bearer $access", 205 + } => json => { 206 + repo => $did, 207 + collection => 'app.bsky.feed.post', 208 + rkey => 'blob-ref', 209 + record => { 210 + '$type' => 'app.bsky.feed.post', 211 + text => 'blob ref', 212 + createdAt => '2026-03-10T00:00:02Z', 213 + embed => { 214 + image => $blob, 215 + }, 216 + }, 217 + })->status_is(400) 218 + ->json_is('/error', 'BlobTakenDown'); 219 + 220 + $t->get_ok('/xrpc/com.atproto.admin.getSubjectStatus' => { 221 + Authorization => 'Bearer admin-secret', 222 + } => form => { 223 + did => $did, 224 + blob => $blob_cid, 225 + })->status_is(200) 226 + ->json_is('/subject/cid', $blob_cid) 227 + ->json_is('/takedown/applied', JSON::PP::true); 228 + 229 + done_testing;