perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add persisted labels with query and subscribe support

alice bc272b47 731c0b5e

+411 -56
+54 -1
lib/ATProto/PDS/API/Admin.pm
··· 12 12 use ATProto::PDS::API::Util qw(xrpc_error); 13 13 use ATProto::PDS::Auth::Password qw(hash_password); 14 14 use ATProto::PDS::Crypto::Secp256k1 qw(signing_did_to_public_key_multibase); 15 - use ATProto::PDS::Identity qw(account_did_doc normalize_handle); 15 + use ATProto::PDS::Identity qw(account_did_doc normalize_handle service_did); 16 16 use ATProto::PDS::Moderation qw(current_record_subject current_subject_status parse_at_uri); 17 17 18 18 our @EXPORT_OK = qw(register_admin_handlers); ··· 73 73 takedown => exists($body->{takedown}) ? $body->{takedown} : ($existing ? $existing->{takedown} : undef), 74 74 deactivated => exists($body->{deactivated}) ? $body->{deactivated} : ($existing ? $existing->{deactivated} : undef), 75 75 ); 76 + _sync_hide_label($c, $subject, $existing, $status); 76 77 if (exists($subject->{did}) && !exists($subject->{uri}) && !exists($subject->{cid}) && exists($body->{deactivated})) { 77 78 $c->store->update_account( 78 79 $subject->{did}, ··· 299 300 }; 300 301 } 301 302 xrpc_error(400, 'InvalidRequest', 'Invalid subject'); 303 + } 304 + 305 + sub _sync_hide_label ($c, $subject, $before, $after) { 306 + my $was = ($before && $before->{takedown} && $before->{takedown}{applied}) ? 1 : 0; 307 + my $now = ($after && $after->{takedown} && $after->{takedown}{applied}) ? 1 : 0; 308 + return if $was == $now; 309 + 310 + my $src = service_did($c->app->settings); 311 + my ($uri, $cid) = _label_uri_and_cid($subject); 312 + my $label = { 313 + ver => 1, 314 + src => $src, 315 + uri => $uri, 316 + (defined $cid ? (cid => $cid) : ()), 317 + val => '!hide', 318 + cts => ATProto::PDS::API::Util::iso8601(time), 319 + ($now ? () : (neg => JSON::PP::true)), 320 + }; 321 + 322 + if ($now) { 323 + $c->store->put_label( 324 + subject_key => subject_key($subject), 325 + src => $src, 326 + uri => $uri, 327 + cid => $cid, 328 + val => '!hide', 329 + ); 330 + } else { 331 + $c->store->delete_label( 332 + subject_key => subject_key($subject), 333 + src => $src, 334 + val => '!hide', 335 + ); 336 + } 337 + 338 + $c->store->append_event( 339 + did => $src, 340 + type => 'label', 341 + payload => { 342 + labels => [ $label ], 343 + }, 344 + ); 345 + } 346 + 347 + sub _label_uri_and_cid ($subject) { 348 + if (exists $subject->{uri}) { 349 + return ($subject->{uri}, $subject->{cid}); 350 + } 351 + if (exists($subject->{did}) && exists($subject->{cid})) { 352 + return ('at://' . $subject->{did}, $subject->{cid}); 353 + } 354 + return ('at://' . ($subject->{did} // q()), undef); 302 355 } 303 356 304 357 1;
+69 -55
lib/ATProto/PDS/API/Misc.pm
··· 7 7 8 8 use Exporter 'import'; 9 9 use JSON::PP (); 10 + use Mojo::IOLoop; 10 11 11 12 use ATProto::PDS::API::Helpers qw(find_account require_admin subject_key); 12 13 use ATProto::PDS::API::Server qw(require_auth); 13 14 use ATProto::PDS::API::Util qw(iso8601 xrpc_error); 14 15 use ATProto::PDS::Auth::Password qw(hash_password random_hex); 16 + use ATProto::PDS::EventStream qw(encode_error_frame encode_info_frame encode_message_frame); 15 17 use ATProto::PDS::Identity qw(account_did_doc normalize_handle service_did service_did_doc); 16 18 use ATProto::PDS::Moderation qw(assert_report_allowed); 17 19 use ATProto::PDS::PLC qw(create_signed_plc_operation is_plc_did plc_rotation_did plc_update_handle recommended_did_credentials refresh_plc_did_doc submit_plc_operation); ··· 204 206 205 207 $registry->register('com.atproto.label.queryLabels', sub ($c, $endpoint) { 206 208 my $patterns = [ _flatten_params($c->every_param('uriPatterns')) ]; 209 + my @sources = _flatten_params($c->every_param('sources')); 207 210 xrpc_error(400, 'InvalidRequest', 'uriPatterns is required') unless @$patterns; 208 - my @labels = grep { _matches_patterns($_->{uri}, $patterns) } @{ _current_labels($c) }; 209 - my $limit = $c->param('limit') // 50; 210 - $limit = 250 if $limit > 250; 211 - my @slice = @labels[0 .. (@labels < $limit ? $#labels : $limit - 1)]; 211 + my $page = $c->store->list_labels( 212 + uri_patterns => $patterns, 213 + (@sources ? (sources => \@sources) : ()), 214 + limit => $c->param('limit') // 50, 215 + cursor => $c->param('cursor'), 216 + ); 212 217 return { 213 - labels => \@slice, 218 + (defined $page->{cursor} ? (cursor => $page->{cursor}) : ()), 219 + labels => [ map { _label_view($_) } @{ $page->{items} } ], 214 220 }; 215 221 }); 216 222 217 223 $registry->register('com.atproto.temp.fetchLabels', sub ($c, $endpoint) { 218 - my @labels = @{ _current_labels($c) }; 219 - my $limit = $c->param('limit') // 50; 220 - $limit = 250 if $limit > 250; 221 - my @slice = @labels[0 .. (@labels < $limit ? $#labels : $limit - 1)]; 224 + my $page = $c->store->list_labels( 225 + limit => $c->param('limit') // 50, 226 + cursor => $c->param('cursor'), 227 + ); 222 228 return { 223 - labels => \@slice, 229 + (defined $page->{cursor} ? (cursor => $page->{cursor}) : ()), 230 + labels => [ map { _label_view($_) } @{ $page->{items} } ], 224 231 }; 225 232 }); 226 233 227 234 $registry->register('com.atproto.label.subscribeLabels', sub ($c, $endpoint) { 228 - my $cursor = int($c->param('cursor') // 0); 229 - my @labels = @{ _current_labels($c) }; 230 - my $seq = $cursor + 1; 231 - if (@labels) { 232 - $c->send({ json => { 233 - seq => $seq, 234 - labels => \@labels, 235 - }}); 235 + my $cursor_param = $c->param('cursor'); 236 + my $latest = $c->store->latest_event_seq; 237 + my $oldest = $c->store->oldest_event_seq; 238 + 239 + my $next_seq; 240 + if (!defined $cursor_param || $cursor_param eq q()) { 241 + $next_seq = $latest + 1; 242 + } else { 243 + my $cursor = int($cursor_param); 244 + if ($cursor > $latest + 1) { 245 + $c->send({ binary => encode_error_frame('FutureCursor', 'Cursor is ahead of the local label stream') }); 246 + $c->finish(1008); 247 + return; 248 + } 249 + if ($oldest && $cursor && $cursor < $oldest) { 250 + $c->send({ binary => encode_info_frame('OutdatedCursor', 'Cursor predates the oldest locally retained event') }); 251 + $next_seq = $oldest; 252 + } else { 253 + $next_seq = $cursor || ($oldest || ($latest + 1)); 254 + } 236 255 } 237 - $c->finish(1000); 256 + 257 + my $drain; 258 + $drain = sub { 259 + my $events = $c->store->list_events_from($next_seq, limit => 100); 260 + for my $event (@$events) { 261 + next unless ($event->{type} // q()) eq 'label'; 262 + my $labels = $event->{payload}{labels} || []; 263 + next unless @$labels; 264 + $next_seq = $event->{seq} + 1; 265 + $c->send({ binary => encode_message_frame('#labels', { 266 + seq => 0 + $event->{seq}, 267 + labels => $labels, 268 + })}); 269 + } 270 + }; 271 + 272 + $drain->(); 273 + my $timer_id = Mojo::IOLoop->recurring(0.25 => sub { $drain->() }); 274 + $c->on(finish => sub ($c, $code, $reason = undef) { 275 + Mojo::IOLoop->remove($timer_id) if defined $timer_id; 276 + }); 238 277 return; 239 278 }); 240 279 ··· 294 333 return @flat; 295 334 } 296 335 297 - sub _current_labels ($c) { 298 - my $src = service_did($c->app->settings); 299 - my @labels; 300 - for my $status (@{ $c->store->list_subject_statuses }) { 301 - next unless $status->{takedown} && $status->{takedown}{applied}; 302 - my ($uri, $cid) = _subject_uri_and_cid($status->{subject}); 303 - push @labels, { 304 - ver => 1, 305 - src => $src, 306 - uri => $uri, 307 - (defined $cid ? (cid => $cid) : ()), 308 - val => '!hide', 309 - cts => iso8601($status->{updated_at}), 310 - }; 311 - } 312 - return \@labels; 313 - } 314 - 315 - sub _subject_uri_and_cid ($subject) { 316 - if (exists $subject->{uri}) { 317 - return ($subject->{uri}, $subject->{cid}); 318 - } 319 - if (exists $subject->{did} && exists $subject->{cid}) { 320 - return ($subject->{recordUri} || ('at://' . $subject->{did}), $subject->{cid}); 321 - } 322 - return ('at://' . ($subject->{did} // q()), undef); 323 - } 324 - 325 - sub _matches_patterns ($uri, $patterns) { 326 - for my $pattern (@$patterns) { 327 - return 1 if $pattern eq $uri; 328 - if ($pattern =~ /\A(.+)\*\z/ && index($uri, $1) == 0) { 329 - return 1; 330 - } 331 - } 332 - return 0; 336 + sub _label_view ($row) { 337 + return { 338 + ver => 1, 339 + src => $row->{src}, 340 + uri => $row->{uri}, 341 + (defined($row->{cid}) ? (cid => $row->{cid}) : ()), 342 + val => $row->{val}, 343 + cts => iso8601($row->{created_at}), 344 + (defined($row->{exp}) ? (exp => iso8601($row->{exp})) : ()), 345 + (defined($row->{sig}) ? (sig => $row->{sig}) : ()), 346 + }; 333 347 } 334 348 335 349 1;
+129
lib/ATProto/PDS/Store/SQLite.pm
··· 1039 1039 return [ map { _row_from_json_columns($_, qw(subject_json takedown_json deactivated_json)) } @$rows ]; 1040 1040 } 1041 1041 1042 + sub put_label ($self, %args) { 1043 + my $subject_key = $args{subject_key} // die 'subject_key is required'; 1044 + my $src = $args{src} // die 'src is required'; 1045 + my $uri = $args{uri} // die 'uri is required'; 1046 + my $val = $args{val} // die 'val is required'; 1047 + my $now = $args{created_at} // time; 1048 + $self->dbh->do( 1049 + q{ 1050 + INSERT INTO labels ( 1051 + subject_key, src, uri, cid, val, exp, sig, created_at, updated_at 1052 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 1053 + ON CONFLICT(subject_key, src, val) DO UPDATE SET 1054 + uri = excluded.uri, 1055 + cid = excluded.cid, 1056 + exp = excluded.exp, 1057 + sig = excluded.sig, 1058 + updated_at = excluded.updated_at 1059 + }, 1060 + undef, 1061 + $subject_key, 1062 + $src, 1063 + $uri, 1064 + $args{cid}, 1065 + $val, 1066 + $args{exp}, 1067 + $args{sig}, 1068 + $now, 1069 + $args{updated_at} // $now, 1070 + ); 1071 + return $self->get_label( 1072 + subject_key => $subject_key, 1073 + src => $src, 1074 + val => $val, 1075 + ); 1076 + } 1077 + 1078 + sub get_label ($self, %args) { 1079 + return $self->dbh->selectrow_hashref( 1080 + q{ 1081 + SELECT * FROM labels 1082 + WHERE subject_key = ? AND src = ? AND val = ? 1083 + }, 1084 + undef, 1085 + $args{subject_key}, 1086 + $args{src}, 1087 + $args{val}, 1088 + ); 1089 + } 1090 + 1091 + sub delete_label ($self, %args) { 1092 + $self->dbh->do( 1093 + q{ 1094 + DELETE FROM labels 1095 + WHERE subject_key = ? AND src = ? AND val = ? 1096 + }, 1097 + undef, 1098 + $args{subject_key}, 1099 + $args{src}, 1100 + $args{val}, 1101 + ); 1102 + return 1; 1103 + } 1104 + 1105 + sub list_labels ($self, %args) { 1106 + my $limit = $args{limit} // 50; 1107 + $limit = 250 if $limit > 250; 1108 + my $cursor = $args{cursor}; 1109 + my @where; 1110 + my @bind; 1111 + if (my $sources = $args{sources}) { 1112 + if (@$sources) { 1113 + my $placeholders = join(', ', ('?') x @$sources); 1114 + push @where, "src IN ($placeholders)"; 1115 + push @bind, @$sources; 1116 + } 1117 + } 1118 + if (defined $cursor && length $cursor) { 1119 + push @where, q{id > ?}; 1120 + push @bind, int($cursor); 1121 + } 1122 + my $sql = q{SELECT * FROM labels}; 1123 + $sql .= q{ WHERE } . join(q{ AND }, @where) if @where; 1124 + $sql .= q{ ORDER BY id ASC}; 1125 + my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, @bind); 1126 + my @filtered = grep { _matches_uri_patterns($_->{uri}, $args{uri_patterns}) } @$rows; 1127 + my @items = @filtered; 1128 + my $next_cursor; 1129 + if (@items > $limit) { 1130 + @items = @items[0 .. $limit - 1]; 1131 + $next_cursor = $items[-1]{id}; 1132 + } 1133 + return { 1134 + items => \@items, 1135 + cursor => $next_cursor, 1136 + }; 1137 + } 1138 + 1042 1139 sub reserve_signing_key ($self, %args) { 1043 1140 my $did = $args{did} // die 'did is required'; 1044 1141 my $now = $args{created_at} // time; ··· 1481 1578 q{ALTER TABLE reserved_signing_keys ADD COLUMN signing_key_did TEXT}, 1482 1579 ], 1483 1580 }, 1581 + { 1582 + version => 5, 1583 + statements => [ 1584 + q{ 1585 + CREATE TABLE IF NOT EXISTS labels ( 1586 + id INTEGER PRIMARY KEY AUTOINCREMENT, 1587 + subject_key TEXT NOT NULL, 1588 + src TEXT NOT NULL, 1589 + uri TEXT NOT NULL, 1590 + cid TEXT, 1591 + val TEXT NOT NULL, 1592 + exp INTEGER, 1593 + sig BLOB, 1594 + created_at INTEGER NOT NULL, 1595 + updated_at INTEGER NOT NULL, 1596 + UNIQUE(subject_key, src, val) 1597 + ) 1598 + }, 1599 + q{CREATE INDEX IF NOT EXISTS labels_lookup_idx ON labels (src, uri, id)}, 1600 + ], 1601 + }, 1484 1602 ); 1485 1603 } 1486 1604 ··· 1547 1665 sub _maybe_json ($value) { 1548 1666 return undef unless defined $value; 1549 1667 return ref($value) ? encode_json($value) : $value; 1668 + } 1669 + 1670 + sub _matches_uri_patterns ($uri, $patterns = undef) { 1671 + return 1 unless $patterns && @$patterns; 1672 + for my $pattern (@$patterns) { 1673 + return 1 if $pattern eq $uri; 1674 + if ($pattern =~ /\A(.+)\*\z/ && index($uri, $1) == 0) { 1675 + return 1; 1676 + } 1677 + } 1678 + return 0; 1550 1679 } 1551 1680 1552 1681 sub _random_id {
+159
t/labels.t
··· 1 + use v5.34; 2 + use warnings; 3 + 4 + use Config (); 5 + use File::Spec; 6 + use File::Temp qw(tempdir); 7 + use FindBin qw($Bin); 8 + use JSON::PP (); 9 + use Test::More; 10 + 11 + BEGIN { 12 + require lib; 13 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 14 + lib->import( 15 + File::Spec->catdir($root, 'lib'), 16 + File::Spec->catdir($root, 'local', 'lib', 'perl5'), 17 + File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 18 + ); 19 + } 20 + 21 + use Test::Mojo; 22 + use Mojo::URL; 23 + use ATProto::PDS; 24 + use ATProto::PDS::EventStream qw(decode_frame); 25 + use ATProto::PDS::Identity qw(service_did); 26 + 27 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 28 + my $tmp = tempdir(CLEANUP => 1); 29 + 30 + my $app = ATProto::PDS->new( 31 + project_root => $root, 32 + settings => { 33 + base_url => 'http://127.0.0.1:7755', 34 + service_handle_domain => 'example.test', 35 + service_did_method => 'did:web', 36 + jwt_secret => 'labels-secret', 37 + admin_password => 'admin-secret', 38 + data_dir => File::Spec->catdir($tmp, 'data'), 39 + db_path => File::Spec->catfile($tmp, 'perlds.sqlite'), 40 + }, 41 + ); 42 + 43 + my $service_did = service_did($app->settings); 44 + my $t = Test::Mojo->new($app); 45 + my $ws = Test::Mojo->new($app); 46 + 47 + $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 48 + handle => 'alice.example.test', 49 + email => 'alice@example.test', 50 + password => 'hunter22', 51 + })->status_is(200); 52 + 53 + my $did = $t->tx->res->json->{did}; 54 + 55 + $ws->websocket_ok('/xrpc/com.atproto.label.subscribeLabels'); 56 + is($ws->message, undef, 'label stream is quiet without a backlog'); 57 + 58 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 59 + Authorization => 'Bearer admin-secret', 60 + } => json => { 61 + subject => { did => $did }, 62 + takedown => { applied => JSON::PP::true }, 63 + })->status_is(200); 64 + 65 + $ws->message_ok('received a label frame') 66 + ->message_like({binary => qr/.+/}, 'label frame is binary'); 67 + 68 + my $frame = decode_frame($ws->message->[1]); 69 + is($frame->{header}{t}, '#labels', 'frame type is labels'); 70 + is($frame->{body}{labels}[0]{src}, $service_did, 'label source is the local service DID'); 71 + is($frame->{body}{labels}[0]{uri}, "at://$did", 'repo labels target the repo URI'); 72 + is($frame->{body}{labels}[0]{val}, '!hide', 'repo takedown emits !hide'); 73 + ok(!$frame->{body}{labels}[0]{neg}, 'takedown frame is a positive label'); 74 + 75 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( 76 + uriPatterns => "at://$did*", 77 + sources => $service_did, 78 + ))->status_is(200) 79 + ->json_is('/labels/0/src', $service_did) 80 + ->json_is('/labels/0/uri', "at://$did") 81 + ->json_is('/labels/0/val', '!hide'); 82 + 83 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( 84 + uriPatterns => "at://$did*", 85 + sources => 'did:web:other.example', 86 + ))->status_is(200) 87 + ->json_is('/labels', []); 88 + 89 + $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 90 + handle => 'bob.example.test', 91 + email => 'bob@example.test', 92 + password => 'hunter22', 93 + })->status_is(200); 94 + 95 + my $bob_did = $t->tx->res->json->{did}; 96 + 97 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 98 + Authorization => 'Bearer admin-secret', 99 + } => json => { 100 + subject => { did => $bob_did }, 101 + takedown => { applied => JSON::PP::true }, 102 + })->status_is(200); 103 + 104 + $ws->message_ok('received bob label frame') 105 + ->message_like({binary => qr/.+/}, 'bob label frame is binary'); 106 + 107 + my $bob_frame = decode_frame($ws->message->[1]); 108 + is($bob_frame->{body}{labels}[0]{uri}, "at://$bob_did", 'second repo takedown streams immediately'); 109 + 110 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( 111 + uriPatterns => 'at://*', 112 + limit => 1, 113 + ))->status_is(200) 114 + ->json_has('/cursor') 115 + ->json_is('/labels/0/src', $service_did); 116 + 117 + my $cursor = $t->tx->res->json->{cursor}; 118 + 119 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( 120 + uriPatterns => 'at://*', 121 + cursor => $cursor, 122 + limit => 1, 123 + ))->status_is(200) 124 + ->json_has('/labels/0'); 125 + 126 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 127 + Authorization => 'Bearer admin-secret', 128 + } => json => { 129 + subject => { did => $did }, 130 + takedown => { applied => JSON::PP::false }, 131 + })->status_is(200); 132 + 133 + $ws->message_ok('received a label negation frame') 134 + ->message_like({binary => qr/.+/}, 'negation frame is binary'); 135 + 136 + my $neg = decode_frame($ws->message->[1]); 137 + is($neg->{header}{t}, '#labels', 'negation frame type is labels'); 138 + is($neg->{body}{labels}[0]{uri}, "at://$did", 'negation targets the same repo URI'); 139 + is($neg->{body}{labels}[0]{val}, '!hide', 'negation is for !hide'); 140 + ok($neg->{body}{labels}[0]{neg}, 'restore emits a negation label'); 141 + 142 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( 143 + uriPatterns => "at://$did*", 144 + sources => $service_did, 145 + ))->status_is(200) 146 + ->json_is('/labels', []); 147 + 148 + $ws->finish_ok; 149 + 150 + my $future = Test::Mojo->new($app); 151 + $future->websocket_ok('/xrpc/com.atproto.label.subscribeLabels?cursor=999999999') 152 + ->message_ok('future label cursor returns an error frame'); 153 + 154 + my $error = decode_frame($future->message->[1]); 155 + is($error->{header}{op}, -1, 'future cursor frame is an error'); 156 + is($error->{body}{error}, 'FutureCursor', 'error type is FutureCursor'); 157 + $future->finish_ok; 158 + 159 + done_testing;