perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Harden websocket test assumptions

alice 5a70794c e125b423

+164 -5
+22 -2
t/firehose.t
··· 5 5 use File::Spec; 6 6 use File::Temp qw(tempdir); 7 7 use FindBin qw($Bin); 8 + use Mojo::IOLoop; 9 + use Time::HiRes qw(sleep time); 8 10 use Test::More; 9 11 10 12 BEGIN { ··· 42 44 my $t = Test::Mojo->new($app); 43 45 my $ws = Test::Mojo->new($app); 44 46 47 + sub ws_quiet_ok { 48 + my ($ws, $desc, $timeout) = @_; 49 + $timeout //= 0.1; 50 + my $deadline = time + $timeout; 51 + while (time < $deadline) { 52 + Mojo::IOLoop->one_tick; 53 + if (@{ $ws->{messages} || [] }) { 54 + $ws->message(shift @{ $ws->{messages} }); 55 + fail($desc); 56 + diag('unexpected websocket frame arrived while the stream was expected to stay quiet'); 57 + return; 58 + } 59 + sleep 0.01; 60 + } 61 + pass($desc); 62 + return; 63 + } 64 + 45 65 $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 46 66 handle => 'alice.example.test', 47 67 email => 'alice@example.test', ··· 87 107 my $prior_rev = $prior_head->{rev}; 88 108 89 109 $ws->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos'); 90 - is($ws->message, undef, 'no backlog is emitted when no cursor is supplied'); 110 + ws_quiet_ok($ws, 'no backlog is emitted when no cursor is supplied'); 91 111 92 112 $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 93 113 Authorization => "Bearer $access", ··· 261 281 my $firehose_latest = $app->store->latest_event_seq; 262 282 my $exclusive = Test::Mojo->new($app); 263 283 $exclusive->websocket_ok("/xrpc/com.atproto.sync.subscribeRepos?cursor=$firehose_latest"); 264 - is($exclusive->message, undef, 'repo cursor replay is exclusive at the current latest event'); 284 + ws_quiet_ok($exclusive, 'repo cursor replay is exclusive at the current latest event'); 265 285 $exclusive->finish_ok; 266 286 267 287 my $future = Test::Mojo->new($app);
+142 -3
t/labels.t
··· 6 6 use File::Temp qw(tempdir); 7 7 use FindBin qw($Bin); 8 8 use JSON::PP (); 9 + use Mojo::IOLoop; 10 + use Time::HiRes qw(sleep time); 9 11 use Test::More; 10 12 11 13 BEGIN { ··· 45 47 my $ws = Test::Mojo->new($app); 46 48 my $admin_auth = 'Basic YWRtaW46YWRtaW4tc2VjcmV0'; 47 49 50 + sub ws_quiet_ok { 51 + my ($ws, $desc, $timeout) = @_; 52 + $timeout //= 0.1; 53 + my $deadline = time + $timeout; 54 + while (time < $deadline) { 55 + Mojo::IOLoop->one_tick; 56 + if (@{ $ws->{messages} || [] }) { 57 + $ws->message(shift @{ $ws->{messages} }); 58 + fail($desc); 59 + diag('unexpected websocket frame arrived while the stream was expected to stay quiet'); 60 + return; 61 + } 62 + sleep 0.01; 63 + } 64 + pass($desc); 65 + return; 66 + } 67 + 48 68 $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 49 69 handle => 'alice.example.test', 50 70 email => 'alice@example.test', 51 71 password => 'hunter22', 52 72 })->status_is(200); 53 73 54 - my $did = $t->tx->res->json->{did}; 74 + my $created = $t->tx->res->json; 75 + my $did = $created->{did}; 76 + my $access = $created->{accessJwt}; 55 77 56 78 $ws->websocket_ok('/xrpc/com.atproto.label.subscribeLabels'); 57 - is($ws->message, undef, 'label stream is quiet without a backlog'); 79 + ws_quiet_ok($ws, 'label stream stays quiet without a backlog'); 80 + 81 + $t->post_ok('/xrpc/com.atproto.repo.createRecord' => { 82 + Authorization => "Bearer $access", 83 + } => json => { 84 + repo => $did, 85 + collection => 'app.bsky.feed.post', 86 + rkey => 'labeled-post', 87 + record => { 88 + '$type' => 'app.bsky.feed.post', 89 + text => 'record moderation target', 90 + createdAt => '2026-03-10T00:00:00Z', 91 + }, 92 + })->status_is(200) 93 + ->json_is('/uri', "at://$did/app.bsky.feed.post/labeled-post"); 94 + 95 + my $record_uri = $t->tx->res->json->{uri}; 96 + my $record_cid = $t->tx->res->json->{cid}; 97 + 98 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 99 + Authorization => $admin_auth, 100 + } => json => { 101 + subject => { uri => $record_uri, cid => $record_cid }, 102 + takedown => { applied => JSON::PP::true }, 103 + })->status_is(200); 104 + 105 + $ws->message_ok('received a record label frame') 106 + ->message_like({binary => qr/.+/}, 'record label frame is binary'); 107 + 108 + my $record_frame = decode_frame($ws->message->[1]); 109 + is($record_frame->{body}{labels}[0]{uri}, $record_uri, 'record label targets the record URI'); 110 + is($record_frame->{body}{labels}[0]{cid}, $record_cid, 'record label carries the record CID'); 111 + is($record_frame->{body}{labels}[0]{val}, '!hide', 'record takedown emits !hide'); 112 + ok(!$record_frame->{body}{labels}[0]{neg}, 'record takedown frame is a positive label'); 113 + 114 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( 115 + uriPatterns => $record_uri, 116 + sources => $service_did, 117 + ))->status_is(200) 118 + ->json_is('/labels/0/uri', $record_uri) 119 + ->json_is('/labels/0/cid', $record_cid) 120 + ->json_is('/labels/0/val', '!hide'); 121 + 122 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 123 + Authorization => $admin_auth, 124 + } => json => { 125 + subject => { uri => $record_uri, cid => $record_cid }, 126 + takedown => { applied => JSON::PP::false }, 127 + })->status_is(200); 128 + 129 + $ws->message_ok('received a record label negation frame') 130 + ->message_like({binary => qr/.+/}, 'record negation frame is binary'); 131 + 132 + my $record_neg = decode_frame($ws->message->[1]); 133 + is($record_neg->{body}{labels}[0]{uri}, $record_uri, 'record negation targets the same record URI'); 134 + is($record_neg->{body}{labels}[0]{cid}, $record_cid, 'record negation keeps the record CID'); 135 + ok($record_neg->{body}{labels}[0]{neg}, 'record restore emits a negation label'); 136 + 137 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( 138 + uriPatterns => $record_uri, 139 + sources => $service_did, 140 + ))->status_is(200) 141 + ->json_is('/labels', []); 142 + 143 + my $blob_tx = $t->ua->build_tx( 144 + POST => '/xrpc/com.atproto.repo.uploadBlob' => { 145 + Authorization => "Bearer $access", 146 + 'Content-Type' => 'image/png', 147 + } => 'blob-bytes', 148 + ); 149 + $t->request_ok($blob_tx)->status_is(200); 150 + 151 + my $blob_cid = $t->tx->res->json->{blob}{ref}{'$link'}; 152 + 153 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 154 + Authorization => $admin_auth, 155 + } => json => { 156 + subject => { did => $did, cid => $blob_cid }, 157 + takedown => { applied => JSON::PP::true }, 158 + })->status_is(200); 159 + 160 + $ws->message_ok('received a blob label frame') 161 + ->message_like({binary => qr/.+/}, 'blob label frame is binary'); 162 + 163 + my $blob_frame = decode_frame($ws->message->[1]); 164 + is($blob_frame->{body}{labels}[0]{uri}, "at://$did", 'blob label targets the repo URI'); 165 + is($blob_frame->{body}{labels}[0]{cid}, $blob_cid, 'blob label carries the blob CID'); 166 + is($blob_frame->{body}{labels}[0]{val}, '!hide', 'blob takedown emits !hide'); 167 + ok(!$blob_frame->{body}{labels}[0]{neg}, 'blob takedown frame is a positive label'); 168 + 169 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( 170 + uriPatterns => "at://$did", 171 + sources => $service_did, 172 + ))->status_is(200) 173 + ->json_is('/labels/0/uri', "at://$did") 174 + ->json_is('/labels/0/cid', $blob_cid) 175 + ->json_is('/labels/0/val', '!hide'); 176 + 177 + $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 178 + Authorization => $admin_auth, 179 + } => json => { 180 + subject => { did => $did, cid => $blob_cid }, 181 + takedown => { applied => JSON::PP::false }, 182 + })->status_is(200); 183 + 184 + $ws->message_ok('received a blob label negation frame') 185 + ->message_like({binary => qr/.+/}, 'blob negation frame is binary'); 186 + 187 + my $blob_neg = decode_frame($ws->message->[1]); 188 + is($blob_neg->{body}{labels}[0]{uri}, "at://$did", 'blob negation targets the repo URI'); 189 + is($blob_neg->{body}{labels}[0]{cid}, $blob_cid, 'blob negation keeps the blob CID'); 190 + ok($blob_neg->{body}{labels}[0]{neg}, 'blob restore emits a negation label'); 191 + 192 + $t->get_ok(Mojo::URL->new('/xrpc/com.atproto.label.queryLabels')->query( 193 + uriPatterns => "at://$did", 194 + sources => $service_did, 195 + ))->status_is(200) 196 + ->json_is('/labels', []); 58 197 59 198 $t->post_ok('/xrpc/com.atproto.admin.updateSubjectStatus' => { 60 199 Authorization => $admin_auth, ··· 157 296 158 297 my $exclusive = Test::Mojo->new($app); 159 298 $exclusive->websocket_ok("/xrpc/com.atproto.label.subscribeLabels?cursor=$label_latest"); 160 - is($exclusive->message, undef, 'label cursor replay is exclusive'); 299 + ws_quiet_ok($exclusive, 'label cursor replay is exclusive'); 161 300 $exclusive->finish_ok; 162 301 163 302 my $replay_start = $app->store->latest_event_seq;