perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Extract differential harness transport helpers

alice f4d4a647 459713ed

+309 -249
+213
lib/ATProto/PDS/Test/Differential/Firehose.pm
··· 1 + package ATProto::PDS::Test::Differential::Firehose; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use Exporter 'import'; 9 + use Mojo::UserAgent; 10 + use Time::HiRes qw(time); 11 + 12 + use ATProto::PDS::EventStream qw(decode_frame); 13 + 14 + our @EXPORT_OK = qw( 15 + first_frame 16 + frames_until_quiet 17 + next_commit_frame 18 + quiet_firehose 19 + ); 20 + 21 + sub quiet_firehose ($url, $quiet_seconds = 0.5) { 22 + my $ua = Mojo::UserAgent->new; 23 + my $got_frame = 0; 24 + my $error; 25 + my $done = 0; 26 + 27 + $ua->websocket($url => sub ($ua, $tx) { 28 + unless ($tx->is_websocket) { 29 + $error = $tx->res->error->{message} // 'websocket handshake failed'; 30 + $done = 1; 31 + Mojo::IOLoop->stop; 32 + return; 33 + } 34 + 35 + my $timer = Mojo::IOLoop->timer($quiet_seconds => sub { 36 + $done = 1; 37 + $tx->finish(1000); 38 + }); 39 + 40 + $tx->on(binary => sub ($tx, $bytes) { 41 + $got_frame = 1; 42 + Mojo::IOLoop->remove($timer); 43 + $tx->finish(1000); 44 + }); 45 + 46 + $tx->on(finish => sub ($tx, $code, $reason = undef) { 47 + Mojo::IOLoop->stop if $done || $got_frame || defined $error; 48 + }); 49 + }); 50 + 51 + Mojo::IOLoop->start unless Mojo::IOLoop->is_running; 52 + die "$error\n" if defined $error; 53 + return !$got_frame; 54 + } 55 + 56 + sub next_commit_frame ($url, $expected_path, $trigger, $timeout = 10) { 57 + my $ua = Mojo::UserAgent->new; 58 + my $frame; 59 + my $error; 60 + my $deadline = time + $timeout; 61 + my $triggered = 0; 62 + my $done = 0; 63 + 64 + $ua->websocket($url => sub ($ua, $tx) { 65 + unless ($tx->is_websocket) { 66 + $error = $tx->res->error->{message} // 'websocket handshake failed'; 67 + $done = 1; 68 + Mojo::IOLoop->stop; 69 + return; 70 + } 71 + 72 + my $timer; 73 + $timer = Mojo::IOLoop->recurring(0.1 => sub { 74 + if (time >= $deadline) { 75 + $error = "timed out waiting for firehose commit at $expected_path"; 76 + Mojo::IOLoop->remove($timer); 77 + $done = 1; 78 + $tx->finish(1000); 79 + } 80 + }); 81 + 82 + $tx->on(binary => sub ($tx, $bytes) { 83 + my $decoded = decode_frame($bytes); 84 + my $header = $decoded->{header} || {}; 85 + my $body = $decoded->{body} || {}; 86 + my @ops = @{ $body->{ops} || [] }; 87 + my $match = grep { ($_->{path} // q()) eq $expected_path } @ops; 88 + if (($header->{t} // q()) eq '#commit' && $match) { 89 + $frame = $decoded; 90 + Mojo::IOLoop->remove($timer); 91 + $done = 1; 92 + $tx->finish(1000); 93 + } 94 + }); 95 + 96 + Mojo::IOLoop->next_tick(sub { 97 + return if $triggered; 98 + $triggered = 1; 99 + eval { $trigger->() }; 100 + if ($@) { 101 + $error = $@; 102 + Mojo::IOLoop->remove($timer); 103 + $done = 1; 104 + $tx->finish(1011); 105 + } 106 + }); 107 + 108 + $tx->on(finish => sub ($tx, $code, $reason = undef) { 109 + Mojo::IOLoop->stop if $done || defined $error; 110 + }); 111 + }); 112 + 113 + Mojo::IOLoop->start unless Mojo::IOLoop->is_running; 114 + die "$error\n" if defined $error; 115 + return $frame; 116 + } 117 + 118 + sub first_frame ($url, $timeout = 5) { 119 + my $ua = Mojo::UserAgent->new; 120 + my $frame; 121 + my $error; 122 + my $done = 0; 123 + my $deadline = time + $timeout; 124 + 125 + $ua->websocket($url => sub ($ua, $tx) { 126 + unless ($tx->is_websocket) { 127 + $error = $tx->res->error->{message} // 'websocket handshake failed'; 128 + $done = 1; 129 + Mojo::IOLoop->stop; 130 + return; 131 + } 132 + 133 + my $timer; 134 + $timer = Mojo::IOLoop->recurring(0.1 => sub { 135 + if (time >= $deadline) { 136 + $error = 'timed out waiting for firehose frame'; 137 + Mojo::IOLoop->remove($timer); 138 + $done = 1; 139 + $tx->finish(1000); 140 + } 141 + }); 142 + 143 + $tx->on(binary => sub ($tx, $bytes) { 144 + $frame = decode_frame($bytes); 145 + Mojo::IOLoop->remove($timer); 146 + $done = 1; 147 + $tx->finish(1000); 148 + }); 149 + 150 + $tx->on(finish => sub ($tx, $code, $reason = undef) { 151 + Mojo::IOLoop->stop if $done || defined $error; 152 + }); 153 + }); 154 + 155 + Mojo::IOLoop->start unless Mojo::IOLoop->is_running; 156 + die "$error\n" if defined $error; 157 + return $frame; 158 + } 159 + 160 + sub frames_until_quiet ($url, $quiet_seconds = 0.25, $timeout = 5) { 161 + my $ua = Mojo::UserAgent->new; 162 + my @frames; 163 + my $error; 164 + my $done = 0; 165 + my $deadline = time + $timeout; 166 + 167 + $ua->websocket($url => sub ($ua, $tx) { 168 + unless ($tx->is_websocket) { 169 + $error = $tx->res->error->{message} // 'websocket handshake failed'; 170 + $done = 1; 171 + Mojo::IOLoop->stop; 172 + return; 173 + } 174 + 175 + my $quiet_timer; 176 + my $reset_quiet = sub { 177 + Mojo::IOLoop->remove($quiet_timer) if defined $quiet_timer; 178 + $quiet_timer = Mojo::IOLoop->timer($quiet_seconds => sub { 179 + $done = 1; 180 + $tx->finish(1000); 181 + }); 182 + }; 183 + $reset_quiet->(); 184 + 185 + my $watchdog; 186 + $watchdog = Mojo::IOLoop->recurring(0.1 => sub { 187 + if (time >= $deadline) { 188 + $error = "timed out waiting for websocket frames at $url"; 189 + Mojo::IOLoop->remove($watchdog); 190 + Mojo::IOLoop->remove($quiet_timer) if defined $quiet_timer; 191 + $done = 1; 192 + $tx->finish(1000); 193 + } 194 + }); 195 + 196 + $tx->on(binary => sub ($tx, $bytes) { 197 + push @frames, decode_frame($bytes); 198 + $reset_quiet->(); 199 + }); 200 + 201 + $tx->on(finish => sub ($tx, $code, $reason = undef) { 202 + Mojo::IOLoop->remove($watchdog) if defined $watchdog; 203 + Mojo::IOLoop->remove($quiet_timer) if defined $quiet_timer; 204 + Mojo::IOLoop->stop if $done || defined $error; 205 + }); 206 + }); 207 + 208 + Mojo::IOLoop->start unless Mojo::IOLoop->is_running; 209 + die "$error\n" if defined $error; 210 + return \@frames; 211 + } 212 + 213 + 1;
+79
lib/ATProto/PDS/Test/Differential/HTTP.pm
··· 1 + package ATProto::PDS::Test::Differential::HTTP; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use Exporter 'import'; 9 + use MIME::Base64 qw(encode_base64); 10 + use Mojo::URL; 11 + use Mojo::UserAgent; 12 + 13 + our @EXPORT_OK = qw( 14 + admin_auth_header 15 + auth_header 16 + get_form 17 + get_json 18 + get_json_url 19 + post_bytes 20 + post_empty 21 + post_json 22 + ); 23 + 24 + sub post_json ($origin, $nsid, $payload, $headers = {}) { 25 + my $ua = Mojo::UserAgent->new(max_redirects => 0); 26 + my $tx = $ua->post( 27 + "$origin/xrpc/$nsid" => { 28 + 'Content-Type' => 'application/json', 29 + %{$headers}, 30 + } => json => $payload, 31 + ); 32 + return $tx->result; 33 + } 34 + 35 + sub post_empty ($origin, $nsid, $headers = {}) { 36 + my $ua = Mojo::UserAgent->new(max_redirects => 0); 37 + my $tx = $ua->post("$origin/xrpc/$nsid" => $headers); 38 + return $tx->result; 39 + } 40 + 41 + sub post_bytes ($origin, $nsid, $bytes, $content_type, $headers = {}) { 42 + my $ua = Mojo::UserAgent->new(max_redirects => 0); 43 + my $tx = $ua->post( 44 + "$origin/xrpc/$nsid" => { 45 + 'Content-Type' => $content_type, 46 + %{$headers}, 47 + } => $bytes, 48 + ); 49 + return $tx->result; 50 + } 51 + 52 + sub get_form ($origin, $nsid, $query, $headers = {}) { 53 + my $ua = Mojo::UserAgent->new(max_redirects => 0); 54 + my $url = Mojo::URL->new("$origin/xrpc/$nsid")->query($query); 55 + my $tx = $ua->get($url => $headers); 56 + return $tx->result; 57 + } 58 + 59 + sub get_json ($origin, $nsid, $query = undef, $headers = {}) { 60 + return defined $query 61 + ? get_form($origin, $nsid, $query, $headers) 62 + : Mojo::UserAgent->new(max_redirects => 0)->get("$origin/xrpc/$nsid" => $headers)->result; 63 + } 64 + 65 + sub get_json_url ($url) { 66 + return Mojo::UserAgent->new(max_redirects => 0)->get($url)->result; 67 + } 68 + 69 + sub auth_header ($token) { 70 + return { Authorization => "Bearer $token" }; 71 + } 72 + 73 + sub admin_auth_header ($password) { 74 + return { 75 + Authorization => 'Basic ' . encode_base64("admin:$password", q()), 76 + }; 77 + } 78 + 79 + 1;
+17 -249
script/differential-validate
··· 11 11 use File::Temp qw(tempdir); 12 12 use IO::Socket::INET; 13 13 use JSON::PP (); 14 - use MIME::Base64 qw(decode_base64 encode_base64); 14 + use MIME::Base64 qw(decode_base64); 15 15 use POSIX qw(WNOHANG); 16 16 use Time::HiRes qw(sleep time); 17 17 ··· 26 26 } 27 27 28 28 use Mojo::JSON qw(decode_json encode_json true false); 29 - use Mojo::URL; 30 29 use Mojo::UserAgent; 31 - use ATProto::PDS::EventStream qw(decode_frame); 32 30 use ATProto::PDS::Repo::CAR qw(read_car); 33 31 use ATProto::PDS::Repo::DagCbor qw(decode_dag_cbor); 32 + use ATProto::PDS::Test::Differential::Firehose qw( 33 + first_frame 34 + frames_until_quiet 35 + next_commit_frame 36 + quiet_firehose 37 + ); 38 + use ATProto::PDS::Test::Differential::HTTP qw( 39 + admin_auth_header 40 + auth_header 41 + get_form 42 + get_json 43 + get_json_url 44 + post_bytes 45 + post_empty 46 + post_json 47 + ); 34 48 35 49 my $root = File::Spec->rel2abs(File::Spec->catdir(dirname(__FILE__), '..')); 36 50 my $tmp = tempdir(CLEANUP => 1); ··· 170 184 return unpack('H*', $buf); 171 185 } 172 186 173 - sub post_json ($origin, $nsid, $payload, $headers = {}) { 174 - my $ua = Mojo::UserAgent->new(max_redirects => 0); 175 - my $tx = $ua->post( 176 - "$origin/xrpc/$nsid" => { 177 - 'Content-Type' => 'application/json', 178 - %{$headers}, 179 - } => json => $payload, 180 - ); 181 - return $tx->result; 182 - } 183 - 184 - sub post_empty ($origin, $nsid, $headers = {}) { 185 - my $ua = Mojo::UserAgent->new(max_redirects => 0); 186 - my $tx = $ua->post("$origin/xrpc/$nsid" => $headers); 187 - return $tx->result; 188 - } 189 - 190 - sub post_bytes ($origin, $nsid, $bytes, $content_type, $headers = {}) { 191 - my $ua = Mojo::UserAgent->new(max_redirects => 0); 192 - my $tx = $ua->post( 193 - "$origin/xrpc/$nsid" => { 194 - 'Content-Type' => $content_type, 195 - %{$headers}, 196 - } => $bytes, 197 - ); 198 - return $tx->result; 199 - } 200 - 201 - sub get_form ($origin, $nsid, $query, $headers = {}) { 202 - my $ua = Mojo::UserAgent->new(max_redirects => 0); 203 - my $url = Mojo::URL->new("$origin/xrpc/$nsid")->query($query); 204 - my $tx = $ua->get($url => $headers); 205 - return $tx->result; 206 - } 207 - 208 - sub get_json ($origin, $nsid, $query = undef, $headers = {}) { 209 - my $res = defined $query ? get_form($origin, $nsid, $query, $headers) : Mojo::UserAgent->new(max_redirects => 0)->get("$origin/xrpc/$nsid" => $headers)->result; 210 - return $res; 211 - } 212 - 213 - sub get_json_url ($url) { 214 - return Mojo::UserAgent->new(max_redirects => 0)->get($url)->result; 215 - } 216 - 217 - sub auth_header ($token) { 218 - return { Authorization => "Bearer $token" }; 219 - } 220 - 221 - sub admin_auth_header ($password) { 222 - return { 223 - Authorization => 'Basic ' . encode_base64("admin:$password", q()), 224 - }; 225 - } 226 - 227 187 sub normalized_domains ($res) { 228 188 my $json = $res->json || {}; 229 189 return [ sort map { /^\./ ? $_ : ".$_" } @{ $json->{availableUserDomains} || [] } ]; 230 - } 231 - 232 - sub quiet_firehose ($url, $quiet_seconds = 0.5) { 233 - my $ua = Mojo::UserAgent->new; 234 - my $got_frame = 0; 235 - my $error; 236 - my $done = 0; 237 - 238 - $ua->websocket($url => sub ($ua, $tx) { 239 - unless ($tx->is_websocket) { 240 - $error = $tx->res->error->{message} // 'websocket handshake failed'; 241 - $done = 1; 242 - Mojo::IOLoop->stop; 243 - return; 244 - } 245 - 246 - my $timer = Mojo::IOLoop->timer($quiet_seconds => sub { 247 - $done = 1; 248 - $tx->finish(1000); 249 - }); 250 - 251 - $tx->on(binary => sub ($tx, $bytes) { 252 - $got_frame = 1; 253 - Mojo::IOLoop->remove($timer); 254 - $tx->finish(1000); 255 - }); 256 - 257 - $tx->on(finish => sub ($tx, $code, $reason = undef) { 258 - Mojo::IOLoop->stop if $done || $got_frame || defined $error; 259 - }); 260 - }); 261 - 262 - Mojo::IOLoop->start unless Mojo::IOLoop->is_running; 263 - die "$error\n" if defined $error; 264 - return !$got_frame; 265 - } 266 - 267 - sub next_commit_frame ($url, $expected_path, $trigger, $timeout = 10) { 268 - my $ua = Mojo::UserAgent->new; 269 - my $frame; 270 - my $error; 271 - my $deadline = time + $timeout; 272 - my $triggered = 0; 273 - my $done = 0; 274 - 275 - $ua->websocket($url => sub ($ua, $tx) { 276 - unless ($tx->is_websocket) { 277 - $error = $tx->res->error->{message} // 'websocket handshake failed'; 278 - $done = 1; 279 - Mojo::IOLoop->stop; 280 - return; 281 - } 282 - 283 - my $timer; 284 - $timer = Mojo::IOLoop->recurring(0.1 => sub { 285 - if (time >= $deadline) { 286 - $error = "timed out waiting for firehose commit at $expected_path"; 287 - Mojo::IOLoop->remove($timer); 288 - $done = 1; 289 - $tx->finish(1000); 290 - } 291 - }); 292 - 293 - $tx->on(binary => sub ($tx, $bytes) { 294 - my $decoded = decode_frame($bytes); 295 - my $header = $decoded->{header} || {}; 296 - my $body = $decoded->{body} || {}; 297 - my @ops = @{ $body->{ops} || [] }; 298 - my $match = grep { ($_->{path} // q()) eq $expected_path } @ops; 299 - if (($header->{t} // q()) eq '#commit' && $match) { 300 - $frame = $decoded; 301 - Mojo::IOLoop->remove($timer); 302 - $done = 1; 303 - $tx->finish(1000); 304 - } 305 - }); 306 - 307 - Mojo::IOLoop->next_tick(sub { 308 - return if $triggered; 309 - $triggered = 1; 310 - eval { $trigger->() }; 311 - if ($@) { 312 - $error = $@; 313 - Mojo::IOLoop->remove($timer); 314 - $done = 1; 315 - $tx->finish(1011); 316 - } 317 - }); 318 - 319 - $tx->on(finish => sub ($tx, $code, $reason = undef) { 320 - Mojo::IOLoop->stop if $done || defined $error; 321 - }); 322 - }); 323 - 324 - Mojo::IOLoop->start unless Mojo::IOLoop->is_running; 325 - die "$error\n" if defined $error; 326 - return $frame; 327 - } 328 - 329 - sub first_frame ($url, $timeout = 5) { 330 - my $ua = Mojo::UserAgent->new; 331 - my $frame; 332 - my $error; 333 - my $done = 0; 334 - my $deadline = time + $timeout; 335 - 336 - $ua->websocket($url => sub ($ua, $tx) { 337 - unless ($tx->is_websocket) { 338 - $error = $tx->res->error->{message} // 'websocket handshake failed'; 339 - $done = 1; 340 - Mojo::IOLoop->stop; 341 - return; 342 - } 343 - 344 - my $timer; 345 - $timer = Mojo::IOLoop->recurring(0.1 => sub { 346 - if (time >= $deadline) { 347 - $error = "timed out waiting for firehose frame"; 348 - Mojo::IOLoop->remove($timer); 349 - $done = 1; 350 - $tx->finish(1000); 351 - } 352 - }); 353 - 354 - $tx->on(binary => sub ($tx, $bytes) { 355 - $frame = decode_frame($bytes); 356 - Mojo::IOLoop->remove($timer); 357 - $done = 1; 358 - $tx->finish(1000); 359 - }); 360 - 361 - $tx->on(finish => sub ($tx, $code, $reason = undef) { 362 - Mojo::IOLoop->stop if $done || defined $error; 363 - }); 364 - }); 365 - 366 - Mojo::IOLoop->start unless Mojo::IOLoop->is_running; 367 - die "$error\n" if defined $error; 368 - return $frame; 369 - } 370 - 371 - sub frames_until_quiet ($url, $quiet_seconds = 0.25, $timeout = 5) { 372 - my $ua = Mojo::UserAgent->new; 373 - my @frames; 374 - my $error; 375 - my $done = 0; 376 - my $deadline = time + $timeout; 377 - 378 - $ua->websocket($url => sub ($ua, $tx) { 379 - unless ($tx->is_websocket) { 380 - $error = $tx->res->error->{message} // 'websocket handshake failed'; 381 - $done = 1; 382 - Mojo::IOLoop->stop; 383 - return; 384 - } 385 - 386 - my $quiet_timer; 387 - my $reset_quiet = sub { 388 - Mojo::IOLoop->remove($quiet_timer) if defined $quiet_timer; 389 - $quiet_timer = Mojo::IOLoop->timer($quiet_seconds => sub { 390 - $done = 1; 391 - $tx->finish(1000); 392 - }); 393 - }; 394 - $reset_quiet->(); 395 - 396 - my $watchdog; 397 - $watchdog = Mojo::IOLoop->recurring(0.1 => sub { 398 - if (time >= $deadline) { 399 - $error = "timed out waiting for websocket frames at $url"; 400 - Mojo::IOLoop->remove($watchdog); 401 - Mojo::IOLoop->remove($quiet_timer) if defined $quiet_timer; 402 - $done = 1; 403 - $tx->finish(1000); 404 - } 405 - }); 406 - 407 - $tx->on(binary => sub ($tx, $bytes) { 408 - push @frames, decode_frame($bytes); 409 - $reset_quiet->(); 410 - }); 411 - 412 - $tx->on(finish => sub ($tx, $code, $reason = undef) { 413 - Mojo::IOLoop->remove($watchdog) if defined $watchdog; 414 - Mojo::IOLoop->remove($quiet_timer) if defined $quiet_timer; 415 - Mojo::IOLoop->stop if $done || defined $error; 416 - }); 417 - }); 418 - 419 - Mojo::IOLoop->start unless Mojo::IOLoop->is_running; 420 - die "$error\n" if defined $error; 421 - return \@frames; 422 190 } 423 191 424 192 sub normalize_commit_frame ($frame, $did) {