perlsky is a Perl 5 implementation of an AT Protocol Personal Data Server.
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add Prometheus metrics and runtime instrumentation

alice 21d93571 d4a14a08

+818 -180
+7
README.md
··· 20 20 - Run `PERLDS_RUN_REFERENCE_DIFF=1 prove -lv t/reference-differential.t` to exercise the same harness from the test suite. 21 21 - Run `PERLDS_RUN_REFERENCE_DIFF=1 prove -lv t/reference-differential-plc.t` to run the PLC-specific reference comparison from the test suite. 22 22 23 + Metrics and observability: 24 + 25 + - `perlds` now exposes Prometheus-compatible metrics at `/metrics`. 26 + - Set `metrics_token` to require `Authorization: Bearer <token>` for scrapes. 27 + - The main runtime signals cover XRPC request counts/latency, websocket subscriptions and emitted frames, crawler notifications, blob ingress/egress bytes, and key store operation timings. 28 + - Detailed operator documentation lives in `docs/METRICS.md`. 29 + 23 30 Relay / crawler discovery: 24 31 25 32 - Configure `hostname` to the public host name you want relays to crawl, for example `pds.example.com`. This should be the host, not the full URL.
+72
docs/METRICS.md
··· 1 + # Metrics 2 + 3 + `perlds` now exposes Prometheus-style metrics at `/metrics`. 4 + 5 + ## Security 6 + 7 + - If `metrics_token` is configured, the endpoint requires `Authorization: Bearer <token>`. 8 + - If `metrics_token` is omitted, the endpoint is public. 9 + - For internet-facing deployments, prefer setting `metrics_token` and/or restricting `/metrics` at the reverse proxy layer. 10 + 11 + ## Main Metrics 12 + 13 + - `perlds_xrpc_requests_total` 14 + Counts HTTP XRPC requests by method, NSID, endpoint type, and status. 15 + - `perlds_xrpc_request_duration_seconds` 16 + Histogram for HTTP XRPC latency with the same labels. 17 + - `perlds_subscription_connections_total` 18 + Counts websocket subscription opens by NSID. 19 + - `perlds_subscription_active` 20 + Gauge of active websocket subscriptions by NSID. 21 + - `perlds_subscription_closes_total` 22 + Counts websocket closes by NSID and close code. 23 + - `perlds_subscription_frames_total` 24 + Counts emitted websocket frames by NSID, frame type, and encoding. 25 + - `perlds_subscription_bytes_total` 26 + Counts emitted websocket bytes by NSID and encoding. 27 + - `perlds_subscription_duration_seconds` 28 + Histogram of websocket lifetime by NSID. 29 + - `perlds_crawler_requests_total` 30 + Counts outbound `com.atproto.sync.requestCrawl` calls by crawler service and result. 31 + - `perlds_crawler_request_duration_seconds` 32 + Histogram of outbound crawler request latency. 33 + - `perlds_blob_ingress_bytes_total` 34 + Counts uploaded blob bytes by MIME type. 35 + - `perlds_blob_egress_bytes_total` 36 + Counts downloaded blob bytes by MIME type. 37 + - `perlds_store_operations_total` 38 + Counts instrumented SQLite-backed store operations by operation and status. 39 + - `perlds_store_operation_duration_seconds` 40 + Histogram of instrumented store operation duration. 41 + - `perlds_build_info` 42 + Static build/service info gauge. 43 + 44 + ## Current Store Coverage 45 + 46 + The store metrics currently cover the highest-signal operations on the live path: 47 + 48 + - transactions 49 + - event append and event stream reads 50 + - event high-watermark reads 51 + - blob put/get 52 + - label put/list 53 + - record list 54 + - repo CAR export 55 + 56 + This is enough to understand the hot PDS paths under load without trying to wrap every SQLite call in the codebase. 57 + 58 + ## Suggested Alerts 59 + 60 + - high error rate on `perlds_xrpc_requests_total` 61 + - sustained increase in `perlds_xrpc_request_duration_seconds` 62 + - non-zero `perlds_subscription_active` with no corresponding frame growth 63 + - crawler errors from `perlds_crawler_requests_total{result="error"}` 64 + - large ingress with low egress or vice versa on blob byte counters 65 + - persistent growth in store latency histograms 66 + 67 + ## Example Scrape 68 + 69 + ```sh 70 + curl -H 'Authorization: Bearer YOUR_TOKEN' \ 71 + http://127.0.0.1:7755/metrics 72 + ```
+1
etc/perlds.example.json
··· 7 7 "service_handle_domain": "localhost", 8 8 "jwt_secret": "change-me", 9 9 "admin_password": "change-me-too", 10 + "metrics_token": "change-me-metrics", 10 11 "crawlers": [], 11 12 "crawler_notify_interval": 1200, 12 13 "data_dir": "data/runtime",
+53
lib/ATProto/PDS.pm
··· 17 17 use ATProto::PDS::Identity qw(account_did_doc service_did); 18 18 use ATProto::PDS::LexiconCatalog qw(endpoint_catalog); 19 19 use ATProto::PDS::LexiconRegistry; 20 + use ATProto::PDS::Metrics; 20 21 use ATProto::PDS::Repo::Manager; 21 22 use ATProto::PDS::Store::SQLite; 22 23 use ATProto::PDS::XRPC::Dispatcher; ··· 29 30 my $config = $self->settings; 30 31 my $root = $self->project_root; 31 32 my $public_url = Mojo::URL->new($config->{base_url} // 'http://127.0.0.1:7755'); 33 + my $metrics = ATProto::PDS::Metrics->new( 34 + service => $config->{service_name} // 'perlds', 35 + ); 32 36 my $crawler_notifier = ATProto::PDS::Crawlers->new( 33 37 hostname => ($config->{hostname} // lc($public_url->host // 'localhost')), 34 38 crawlers => $config->{crawlers} // [], 35 39 min_interval => $config->{crawler_notify_interval} // (20 * 60), 40 + metrics => $metrics, 36 41 ); 37 42 38 43 $self->secrets([$config->{jwt_secret} // 'perlds-dev-secret']); 44 + $self->helper(metrics => sub { $metrics }); 39 45 $self->helper(api_registry => sub { state $registry = ATProto::PDS::API::Registry->new }); 40 46 $self->helper(endpoint_catalog => sub ($c) { endpoint_catalog($root) }); 41 47 $self->helper(config_value => sub ($c, $key, $default = undef) { $c->app->settings->{$key} // $default }); ··· 43 49 $self->helper(store => sub ($c) { 44 50 state $store = ATProto::PDS::Store::SQLite->new( 45 51 path => $c->app->settings->{db_path} || File::Spec->catfile($root, 'data', 'runtime', 'perlds.sqlite'), 52 + metrics => $metrics, 46 53 )->bootstrap; 47 54 }); 48 55 $self->helper(crawler_notifier => sub ($c) { ··· 56 63 $c->crawler_notifier->notify_of_update(last_seq => $seq); 57 64 return $seq; 58 65 }); 66 + $self->helper(subscription_send => sub ($c, %args) { 67 + my $nsid = $args{nsid} // $c->stash('nsid') // 'unknown'; 68 + my $frame_type = $args{frame_type} // 'message'; 69 + my $encoding = exists $args{binary} ? 'binary' : 'json'; 70 + my $payload_size = exists $args{binary} 71 + ? length($args{binary} // q()) 72 + : length(Mojo::JSON::encode_json($args{json} // {})); 73 + 74 + $c->app->metrics->increment_counter('perlds_subscription_frames_total', 1, { 75 + nsid => $nsid, 76 + frame_type => $frame_type, 77 + encoding => $encoding, 78 + }); 79 + $c->app->metrics->increment_counter('perlds_subscription_bytes_total', $payload_size, { 80 + nsid => $nsid, 81 + encoding => $encoding, 82 + }); 83 + 84 + return exists $args{binary} 85 + ? $c->send({ binary => $args{binary} }) 86 + : $c->send({ json => $args{json} }); 87 + }); 88 + $self->helper(observe_blob_ingress => sub ($c, $mime_type, $bytes) { 89 + $c->app->metrics->increment_counter('perlds_blob_ingress_bytes_total', $bytes, { 90 + mime_type => $mime_type || 'application/octet-stream', 91 + }); 92 + }); 93 + $self->helper(observe_blob_egress => sub ($c, $mime_type, $bytes) { 94 + $c->app->metrics->increment_counter('perlds_blob_egress_bytes_total', $bytes, { 95 + mime_type => $mime_type || 'application/octet-stream', 96 + }); 97 + }); 59 98 $self->helper(repo_manager => sub ($c) { 60 99 state $manager = ATProto::PDS::Repo::Manager->new( 61 100 store => $c->store, ··· 79 118 service => 'perlds', 80 119 endpoints => scalar @{ endpoint_catalog($root) }, 81 120 }); 121 + }); 122 + 123 + $routes->get('/metrics')->to(cb => sub ($c) { 124 + my $token = $c->config_value('metrics_token'); 125 + if (defined $token && length $token) { 126 + my $auth = $c->req->headers->authorization // q(); 127 + return $c->render( 128 + status => 401, 129 + text => 'metrics authorization required', 130 + ) unless $auth eq "Bearer $token"; 131 + } 132 + 133 + $c->res->headers->content_type('text/plain; version=0.0.4; charset=utf-8'); 134 + $c->render(data => $c->app->metrics->render_prometheus); 82 135 }); 83 136 84 137 $routes->get('/.well-known/did.json')->to(cb => sub ($c) {
+10 -4
lib/ATProto/PDS/API/Misc.pm
··· 242 242 } else { 243 243 my $cursor = int($cursor_param); 244 244 if ($cursor > $latest + 1) { 245 - $c->send({ binary => encode_error_frame('FutureCursor', 'Cursor is ahead of the local label stream') }); 245 + $c->subscription_send( 246 + binary => encode_error_frame('FutureCursor', 'Cursor is ahead of the local label stream'), 247 + frame_type => 'error', 248 + ); 246 249 $c->finish(1008); 247 250 return; 248 251 } 249 252 if ($oldest && $cursor && $cursor < $oldest) { 250 - $c->send({ binary => encode_info_frame('OutdatedCursor', 'Cursor predates the oldest locally retained event') }); 253 + $c->subscription_send( 254 + binary => encode_info_frame('OutdatedCursor', 'Cursor predates the oldest locally retained event'), 255 + frame_type => 'info', 256 + ); 251 257 $next_seq = $oldest; 252 258 } else { 253 259 $next_seq = $cursor || ($oldest || ($latest + 1)); ··· 262 268 my $labels = $event->{payload}{labels} || []; 263 269 next unless @$labels; 264 270 $next_seq = $event->{seq} + 1; 265 - $c->send({ binary => encode_message_frame('#labels', { 271 + $c->subscription_send(binary => encode_message_frame('#labels', { 266 272 seq => 0 + $event->{seq}, 267 273 labels => $labels, 268 - })}); 274 + }), frame_type => 'label'); 269 275 } 270 276 }; 271 277
+1
lib/ATProto/PDS/API/Repo.pm
··· 166 166 close($fh); 167 167 168 168 my $mime_type = $c->req->headers->content_type || 'application/octet-stream'; 169 + $c->observe_blob_ingress($mime_type, length($bytes)); 169 170 $c->store->put_blob( 170 171 cid => $cid, 171 172 did => $account->{did},
+13 -3
lib/ATProto/PDS/API/Sync.pm
··· 129 129 my $bytes = <$fh>; 130 130 close($fh); 131 131 $c->res->headers->content_type($blob->{mime_type} || 'application/octet-stream'); 132 + $c->observe_blob_egress($blob->{mime_type}, length($bytes)); 132 133 $c->render(data => $bytes); 133 134 return; 134 135 }); ··· 237 238 } else { 238 239 my $cursor = int($cursor_param); 239 240 if ($cursor > $latest + 1) { 240 - $c->send({ binary => encode_error_frame('FutureCursor', 'Cursor is ahead of the local event stream') }); 241 + $c->subscription_send( 242 + binary => encode_error_frame('FutureCursor', 'Cursor is ahead of the local event stream'), 243 + frame_type => 'error', 244 + ); 241 245 $c->finish(1008); 242 246 return; 243 247 } 244 248 if ($oldest && $cursor && $cursor < $oldest) { 245 - $c->send({ binary => encode_info_frame('OutdatedCursor', 'Cursor predates the oldest locally retained event') }); 249 + $c->subscription_send( 250 + binary => encode_info_frame('OutdatedCursor', 'Cursor predates the oldest locally retained event'), 251 + frame_type => 'info', 252 + ); 246 253 $next_seq = $oldest; 247 254 } else { 248 255 $next_seq = $cursor || ($oldest || ($latest + 1)); ··· 256 263 my $frame = _event_frame($event); 257 264 next unless $frame; 258 265 $next_seq = $event->{seq} + 1; 259 - $c->send({ binary => $frame }); 266 + $c->subscription_send( 267 + binary => $frame, 268 + frame_type => $event->{type} // 'message', 269 + ); 260 270 } 261 271 }; 262 272
+19
lib/ATProto/PDS/Crawlers.pm
··· 8 8 use Mojo::IOLoop; 9 9 use Mojo::URL; 10 10 use Mojo::UserAgent; 11 + use Time::HiRes qw(time); 11 12 12 13 sub new ($class, %args) { 13 14 return bless { 14 15 hostname => $args{hostname} // 'localhost', 15 16 crawlers => $args{crawlers} // [], 16 17 store => $args{store}, 18 + metrics => $args{metrics}, 17 19 min_interval => $args{min_interval} // (20 * 60), 18 20 last_notified => $args{last_notified} // 0, 19 21 in_flight => 0, ··· 72 74 } 73 75 74 76 for my $result (@{ $results || [] }) { 77 + if ($self->{metrics}) { 78 + my $result_label = $result->{ok} ? 'ok' : 'error'; 79 + $self->{metrics}->increment_counter('perlds_crawler_requests_total', 1, { 80 + service => $result->{service}, 81 + result => $result_label, 82 + }); 83 + $self->{metrics}->observe_histogram( 84 + 'perlds_crawler_request_duration_seconds', 85 + $result->{duration_seconds} // 0, 86 + { 87 + service => $result->{service}, 88 + result => $result_label, 89 + }, 90 + ); 91 + } 75 92 $self->_touch_status($result->{service}, 76 93 notified_at => time, 77 94 last_seq => $seq, ··· 105 122 106 123 my @results; 107 124 for my $service (@$services) { 125 + my $started = time; 108 126 my $result = { 109 127 service => $service, 110 128 ok => 0, ··· 126 144 } 127 145 128 146 my $res = $tx->result; 147 + $result->{duration_seconds} = time - $started; 129 148 $result->{code} = $res->code; 130 149 if ($res->is_success) { 131 150 $result->{ok} = 1;
+239
lib/ATProto/PDS/Metrics.pm
··· 1 + package ATProto::PDS::Metrics; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use JSON::PP (); 9 + 10 + sub new ($class, %args) { 11 + my $self = bless { 12 + service => $args{service} // 'perlds', 13 + metrics => {}, 14 + }, $class; 15 + 16 + $self->_register_counter( 17 + 'perlds_xrpc_requests_total', 18 + 'Total XRPC HTTP requests by method, endpoint, type, and status.', 19 + [qw(method nsid endpoint_type status)], 20 + ); 21 + $self->_register_histogram( 22 + 'perlds_xrpc_request_duration_seconds', 23 + 'XRPC HTTP request duration in seconds.', 24 + [qw(method nsid endpoint_type status)], 25 + [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10], 26 + ); 27 + $self->_register_counter( 28 + 'perlds_subscription_connections_total', 29 + 'Total websocket subscription connections opened.', 30 + [qw(nsid)], 31 + ); 32 + $self->_register_counter( 33 + 'perlds_subscription_closes_total', 34 + 'Total websocket subscription closes by endpoint and close code.', 35 + [qw(nsid code)], 36 + ); 37 + $self->_register_gauge( 38 + 'perlds_subscription_active', 39 + 'Active websocket subscription connections.', 40 + [qw(nsid)], 41 + ); 42 + $self->_register_counter( 43 + 'perlds_subscription_frames_total', 44 + 'Total websocket frames emitted by endpoint, frame type, and encoding.', 45 + [qw(nsid frame_type encoding)], 46 + ); 47 + $self->_register_counter( 48 + 'perlds_subscription_bytes_total', 49 + 'Total websocket frame bytes emitted by endpoint and encoding.', 50 + [qw(nsid encoding)], 51 + ); 52 + $self->_register_histogram( 53 + 'perlds_subscription_duration_seconds', 54 + 'Subscription connection lifetime in seconds.', 55 + [qw(nsid)], 56 + [0.1, 0.5, 1, 5, 15, 30, 60, 300, 900, 3600], 57 + ); 58 + $self->_register_counter( 59 + 'perlds_crawler_requests_total', 60 + 'Total outbound requestCrawl notifications by crawler service and result.', 61 + [qw(service result)], 62 + ); 63 + $self->_register_histogram( 64 + 'perlds_crawler_request_duration_seconds', 65 + 'Outbound requestCrawl latency in seconds.', 66 + [qw(service result)], 67 + [0.01, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10], 68 + ); 69 + $self->_register_counter( 70 + 'perlds_blob_ingress_bytes_total', 71 + 'Total bytes accepted through repo blob uploads.', 72 + [qw(mime_type)], 73 + ); 74 + $self->_register_counter( 75 + 'perlds_blob_egress_bytes_total', 76 + 'Total bytes served through sync blob downloads.', 77 + [qw(mime_type)], 78 + ); 79 + $self->_register_counter( 80 + 'perlds_store_operations_total', 81 + 'Total instrumented store operations by operation and status.', 82 + [qw(operation status)], 83 + ); 84 + $self->_register_histogram( 85 + 'perlds_store_operation_duration_seconds', 86 + 'Duration of instrumented store operations.', 87 + [qw(operation status)], 88 + [0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1], 89 + ); 90 + $self->_register_gauge( 91 + 'perlds_build_info', 92 + 'Static build information for the running service.', 93 + [qw(service)], 94 + ); 95 + $self->set_gauge('perlds_build_info', 1, { service => $self->{service} }); 96 + 97 + return $self; 98 + } 99 + 100 + sub increment_counter ($self, $name, $value = 1, $labels = {}) { 101 + my $metric = $self->_metric($name, 'counter'); 102 + my $key = _label_key($metric->{labels}, $labels); 103 + $metric->{samples}{$key} += $value; 104 + return $metric->{samples}{$key}; 105 + } 106 + 107 + sub set_gauge ($self, $name, $value, $labels = {}) { 108 + my $metric = $self->_metric($name, 'gauge'); 109 + my $key = _label_key($metric->{labels}, $labels); 110 + $metric->{samples}{$key} = $value; 111 + return $value; 112 + } 113 + 114 + sub add_gauge ($self, $name, $delta, $labels = {}) { 115 + my $metric = $self->_metric($name, 'gauge'); 116 + my $key = _label_key($metric->{labels}, $labels); 117 + $metric->{samples}{$key} += $delta; 118 + return $metric->{samples}{$key}; 119 + } 120 + 121 + sub observe_histogram ($self, $name, $value, $labels = {}) { 122 + my $metric = $self->_metric($name, 'histogram'); 123 + my $key = _label_key($metric->{labels}, $labels); 124 + my $sample = $metric->{samples}{$key} ||= { 125 + sum => 0, 126 + count => 0, 127 + buckets => { map { $_ => 0 } @{ $metric->{buckets} } }, 128 + labels => { %{$labels} }, 129 + }; 130 + 131 + $sample->{sum} += $value; 132 + $sample->{count} += 1; 133 + for my $bucket (@{ $metric->{buckets} }) { 134 + $sample->{buckets}{$bucket} += 1 if $value <= $bucket; 135 + } 136 + return $sample; 137 + } 138 + 139 + sub render_prometheus ($self) { 140 + my @lines; 141 + for my $name (sort keys %{ $self->{metrics} }) { 142 + my $metric = $self->{metrics}{$name}; 143 + push @lines, "# HELP $name $metric->{help}"; 144 + push @lines, "# TYPE $name $metric->{type}"; 145 + 146 + if ($metric->{type} eq 'histogram') { 147 + for my $key (sort keys %{ $metric->{samples} }) { 148 + my $sample = $metric->{samples}{$key}; 149 + my %base = %{ $sample->{labels} || {} }; 150 + for my $bucket (@{ $metric->{buckets} }) { 151 + push @lines, _format_sample( 152 + "${name}_bucket", 153 + { %base, le => $bucket }, 154 + $sample->{buckets}{$bucket} || 0, 155 + ); 156 + } 157 + push @lines, _format_sample("${name}_bucket", { %base, le => '+Inf' }, $sample->{count}); 158 + push @lines, _format_sample("${name}_sum", \%base, $sample->{sum}); 159 + push @lines, _format_sample("${name}_count", \%base, $sample->{count}); 160 + } 161 + next; 162 + } 163 + 164 + for my $key (sort keys %{ $metric->{samples} }) { 165 + push @lines, _format_sample($name, _decode_label_key($key), $metric->{samples}{$key}); 166 + } 167 + } 168 + 169 + return join("\n", @lines) . "\n"; 170 + } 171 + 172 + sub _metric ($self, $name, $expected_type) { 173 + my $metric = $self->{metrics}{$name} 174 + or die "metric $name is not registered"; 175 + die "metric $name is not a $expected_type" 176 + unless $metric->{type} eq $expected_type; 177 + return $metric; 178 + } 179 + 180 + sub _register_counter ($self, $name, $help, $labels) { 181 + $self->{metrics}{$name} = { 182 + type => 'counter', 183 + help => $help, 184 + labels => $labels, 185 + samples => {}, 186 + }; 187 + } 188 + 189 + sub _register_gauge ($self, $name, $help, $labels) { 190 + $self->{metrics}{$name} = { 191 + type => 'gauge', 192 + help => $help, 193 + labels => $labels, 194 + samples => {}, 195 + }; 196 + } 197 + 198 + sub _register_histogram ($self, $name, $help, $labels, $buckets) { 199 + $self->{metrics}{$name} = { 200 + type => 'histogram', 201 + help => $help, 202 + labels => $labels, 203 + buckets => [ sort { $a <=> $b } @$buckets ], 204 + samples => {}, 205 + }; 206 + } 207 + 208 + sub _format_sample ($name, $labels, $value) { 209 + my $label_text = _format_labels($labels); 210 + return defined($label_text) ? "$name$label_text $value" : "$name $value"; 211 + } 212 + 213 + sub _format_labels ($labels) { 214 + my @keys = sort keys %{$labels || {}}; 215 + return undef unless @keys; 216 + my @pairs = map { 217 + my $value = defined $labels->{$_} ? $labels->{$_} : q(); 218 + $value =~ s/\\/\\\\/g; 219 + $value =~ s/"/\\"/g; 220 + $value =~ s/\n/\\n/g; 221 + qq{$_="$value"} 222 + } @keys; 223 + return '{' . join(',', @pairs) . '}'; 224 + } 225 + 226 + sub _label_key ($ordered_labels, $labels) { 227 + state $json = JSON::PP->new->canonical(1)->allow_nonref(1); 228 + my %normalized = map { 229 + my $value = $labels->{$_}; 230 + $_ => defined $value ? "$value" : q() 231 + } @$ordered_labels; 232 + return $json->encode(\%normalized); 233 + } 234 + 235 + sub _decode_label_key ($key) { 236 + return JSON::PP::decode_json($key); 237 + } 238 + 239 + 1;
+50
lib/ATProto/PDS/Metrics/Store.pm
··· 1 + package ATProto::PDS::Metrics::Store; 2 + 3 + use v5.34; 4 + use warnings; 5 + use feature 'signatures'; 6 + no warnings 'experimental::signatures'; 7 + 8 + use Time::HiRes qw(time); 9 + 10 + use Exporter 'import'; 11 + 12 + our @EXPORT_OK = qw(observe_store_operation); 13 + 14 + sub observe_store_operation ($metrics, $operation, $code) { 15 + my $start = time; 16 + my $wantarray = wantarray; 17 + my (@result, $result); 18 + my $status = 'ok'; 19 + 20 + my $ok = eval { 21 + if (!defined $wantarray) { 22 + $code->(); 23 + } elsif ($wantarray) { 24 + @result = $code->(); 25 + } else { 26 + $result = $code->(); 27 + } 28 + 1; 29 + }; 30 + 31 + if (!$ok) { 32 + $status = 'error'; 33 + } 34 + 35 + if ($metrics) { 36 + my $duration = time - $start; 37 + my $labels = { 38 + operation => $operation, 39 + status => $status, 40 + }; 41 + $metrics->increment_counter('perlds_store_operations_total', 1, $labels); 42 + $metrics->observe_histogram('perlds_store_operation_duration_seconds', $duration, $labels); 43 + } 44 + 45 + die $@ unless $ok; 46 + return if !defined $wantarray; 47 + return $wantarray ? @result : $result; 48 + } 49 + 50 + 1;
+192 -168
lib/ATProto/PDS/Store/SQLite.pm
··· 10 10 use File::Basename qw(dirname); 11 11 use File::Path qw(make_path); 12 12 use JSON::PP qw(decode_json encode_json); 13 + use ATProto::PDS::Metrics::Store qw(observe_store_operation); 13 14 14 15 our @EXPORT_OK = qw(default_migrations); 15 16 ··· 18 19 return bless { 19 20 path => $args{path}, 20 21 dbh => undef, 22 + metrics => $args{metrics}, 21 23 }, $class; 22 24 } 23 25 ··· 71 73 } 72 74 73 75 sub txn ($self, $code) { 74 - my $dbh = $self->dbh; 75 - $dbh->begin_work; 76 - my $wantarray = wantarray; 77 - my @result; 78 - my $ok = eval { 79 - if (!defined $wantarray) { 80 - $code->($dbh); 81 - } elsif ($wantarray) { 82 - @result = $code->($dbh); 83 - } else { 84 - $result[0] = $code->($dbh); 76 + return observe_store_operation($self->{metrics}, 'txn', sub { 77 + my $dbh = $self->dbh; 78 + $dbh->begin_work; 79 + my $wantarray = wantarray; 80 + my @result; 81 + my $ok = eval { 82 + if (!defined $wantarray) { 83 + $code->($dbh); 84 + } elsif ($wantarray) { 85 + @result = $code->($dbh); 86 + } else { 87 + $result[0] = $code->($dbh); 88 + } 89 + 1; 90 + }; 91 + if (!$ok) { 92 + my $err = $@ || 'transaction failed'; 93 + eval { $dbh->rollback }; 94 + die $err; 85 95 } 86 - 1; 87 - }; 88 - if (!$ok) { 89 - my $err = $@ || 'transaction failed'; 90 - eval { $dbh->rollback }; 91 - die $err; 92 - } 93 - $dbh->commit; 94 - return if !defined $wantarray; 95 - return $wantarray ? @result : $result[0]; 96 + $dbh->commit; 97 + return if !defined $wantarray; 98 + return $wantarray ? @result : $result[0]; 99 + }); 96 100 } 97 101 98 102 sub create_account ($self, %args) { ··· 345 349 } 346 350 347 351 sub put_blob ($self, %args) { 348 - my $cid = $args{cid} // die 'cid is required'; 349 - my $now = $args{created_at} // time; 350 - $self->dbh->do( 351 - q{ 352 - INSERT INTO blobs ( 353 - cid, did, mime_type, byte_size, storage_path, temporary, 354 - created_at, referenced_at, quarantined_at 355 - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 356 - ON CONFLICT(cid) DO UPDATE SET 357 - did = excluded.did, 358 - mime_type = excluded.mime_type, 359 - byte_size = excluded.byte_size, 360 - storage_path = excluded.storage_path, 361 - temporary = excluded.temporary, 362 - referenced_at = COALESCE(excluded.referenced_at, blobs.referenced_at), 363 - quarantined_at = excluded.quarantined_at 364 - }, 365 - undef, 366 - $cid, 367 - $args{did}, 368 - $args{mime_type}, 369 - $args{byte_size}, 370 - $args{storage_path}, 371 - $args{temporary} ? 1 : 0, 372 - $now, 373 - $args{referenced_at}, 374 - $args{quarantined_at}, 375 - ); 376 - return $self->get_blob($cid); 352 + return observe_store_operation($self->{metrics}, 'put_blob', sub { 353 + my $cid = $args{cid} // die 'cid is required'; 354 + my $now = $args{created_at} // time; 355 + $self->dbh->do( 356 + q{ 357 + INSERT INTO blobs ( 358 + cid, did, mime_type, byte_size, storage_path, temporary, 359 + created_at, referenced_at, quarantined_at 360 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 361 + ON CONFLICT(cid) DO UPDATE SET 362 + did = excluded.did, 363 + mime_type = excluded.mime_type, 364 + byte_size = excluded.byte_size, 365 + storage_path = excluded.storage_path, 366 + temporary = excluded.temporary, 367 + referenced_at = COALESCE(excluded.referenced_at, blobs.referenced_at), 368 + quarantined_at = excluded.quarantined_at 369 + }, 370 + undef, 371 + $cid, 372 + $args{did}, 373 + $args{mime_type}, 374 + $args{byte_size}, 375 + $args{storage_path}, 376 + $args{temporary} ? 1 : 0, 377 + $now, 378 + $args{referenced_at}, 379 + $args{quarantined_at}, 380 + ); 381 + return $self->get_blob($cid); 382 + }); 377 383 } 378 384 379 385 sub get_blob ($self, $cid) { 380 - return $self->dbh->selectrow_hashref( 381 - q{SELECT * FROM blobs WHERE cid = ?}, 382 - undef, 383 - $cid, 384 - ); 386 + return observe_store_operation($self->{metrics}, 'get_blob', sub { 387 + return $self->dbh->selectrow_hashref( 388 + q{SELECT * FROM blobs WHERE cid = ?}, 389 + undef, 390 + $cid, 391 + ); 392 + }); 385 393 } 386 394 387 395 sub update_blob ($self, $cid, %args) { ··· 522 530 } 523 531 524 532 sub list_records ($self, $did, $collection, %args) { 525 - my $limit = $args{limit} // 50; 526 - $limit = 100 if $limit > 100; 527 - my $cursor = $args{cursor}; 528 - my $reverse = $args{reverse} ? 1 : 0; 529 - my @bind = ($did, $collection); 530 - my $sql = q{ 531 - SELECT * FROM records 532 - WHERE did = ? AND collection = ? 533 - }; 534 - if (defined $cursor && length $cursor) { 535 - $sql .= $reverse ? q{ AND rkey < ?} : q{ AND rkey > ?}; 536 - push @bind, $cursor; 537 - } 538 - $sql .= $reverse ? q{ ORDER BY rkey DESC} : q{ ORDER BY rkey ASC}; 539 - $sql .= q{ LIMIT ?}; 540 - push @bind, $limit + 1; 541 - my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, @bind); 542 - my $page = _paginate($rows, $limit, 'rkey'); 543 - $page->{items} = [ map { _row_to_record($_) } @{ $page->{items} } ]; 544 - return $page; 533 + return observe_store_operation($self->{metrics}, 'list_records', sub { 534 + my $limit = $args{limit} // 50; 535 + $limit = 100 if $limit > 100; 536 + my $cursor = $args{cursor}; 537 + my $reverse = $args{reverse} ? 1 : 0; 538 + my @bind = ($did, $collection); 539 + my $sql = q{ 540 + SELECT * FROM records 541 + WHERE did = ? AND collection = ? 542 + }; 543 + if (defined $cursor && length $cursor) { 544 + $sql .= $reverse ? q{ AND rkey < ?} : q{ AND rkey > ?}; 545 + push @bind, $cursor; 546 + } 547 + $sql .= $reverse ? q{ ORDER BY rkey DESC} : q{ ORDER BY rkey ASC}; 548 + $sql .= q{ LIMIT ?}; 549 + push @bind, $limit + 1; 550 + my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, @bind); 551 + my $page = _paginate($rows, $limit, 'rkey'); 552 + $page->{items} = [ map { _row_to_record($_) } @{ $page->{items} } ]; 553 + return $page; 554 + }); 545 555 } 546 556 547 557 sub all_records_for_did ($self, $did) { ··· 657 667 } 658 668 659 669 sub repo_car ($self, $did) { 660 - my $row = $self->get_latest_commit($did); 661 - return $row ? $row->{car_bytes} : undef; 670 + return observe_store_operation($self->{metrics}, 'repo_car', sub { 671 + my $row = $self->get_latest_commit($did); 672 + return $row ? $row->{car_bytes} : undef; 673 + }); 662 674 } 663 675 664 676 sub list_repos ($self, %args) { ··· 738 750 } 739 751 740 752 sub append_event ($self, %args) { 741 - my $now = $args{created_at} // time; 742 - $self->dbh->do( 743 - q{ 744 - INSERT INTO events ( 745 - did, type, rev, commit_cid, payload_json, car_bytes, created_at 746 - ) VALUES (?, ?, ?, ?, ?, ?, ?) 747 - }, 748 - undef, 749 - $args{did}, 750 - $args{type}, 751 - $args{rev}, 752 - $args{commit_cid}, 753 - _maybe_json($args{payload}), 754 - $args{car_bytes}, 755 - $now, 756 - ); 757 - return $self->dbh->sqlite_last_insert_rowid; 753 + return observe_store_operation($self->{metrics}, 'append_event', sub { 754 + my $now = $args{created_at} // time; 755 + $self->dbh->do( 756 + q{ 757 + INSERT INTO events ( 758 + did, type, rev, commit_cid, payload_json, car_bytes, created_at 759 + ) VALUES (?, ?, ?, ?, ?, ?, ?) 760 + }, 761 + undef, 762 + $args{did}, 763 + $args{type}, 764 + $args{rev}, 765 + $args{commit_cid}, 766 + _maybe_json($args{payload}), 767 + $args{car_bytes}, 768 + $now, 769 + ); 770 + return $self->dbh->sqlite_last_insert_rowid; 771 + }); 758 772 } 759 773 760 774 sub list_events_after ($self, $cursor, %args) { ··· 765 779 } 766 780 767 781 sub list_events_from ($self, $cursor, %args) { 768 - my $limit = $args{limit} // 100; 769 - my $sql = q{SELECT * FROM events WHERE seq >= ? ORDER BY seq LIMIT ?}; 770 - my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, $cursor // 0, $limit); 771 - return [ map { _row_from_json_columns($_, qw(payload_json)) } @$rows ]; 782 + return observe_store_operation($self->{metrics}, 'list_events_from', sub { 783 + my $limit = $args{limit} // 100; 784 + my $sql = q{SELECT * FROM events WHERE seq >= ? ORDER BY seq LIMIT ?}; 785 + my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, $cursor // 0, $limit); 786 + return [ map { _row_from_json_columns($_, qw(payload_json)) } @$rows ]; 787 + }); 772 788 } 773 789 774 790 sub latest_event_seq ($self) { 775 - return $self->dbh->selectrow_array( 776 - q{SELECT COALESCE(MAX(seq), 0) FROM events}, 777 - ) // 0; 791 + return observe_store_operation($self->{metrics}, 'latest_event_seq', sub { 792 + return $self->dbh->selectrow_array( 793 + q{SELECT COALESCE(MAX(seq), 0) FROM events}, 794 + ) // 0; 795 + }); 778 796 } 779 797 780 798 sub oldest_event_seq ($self) { 781 - my $value = $self->dbh->selectrow_array( 782 - q{SELECT MIN(seq) FROM events}, 783 - ); 784 - return defined $value ? $value : 0; 799 + return observe_store_operation($self->{metrics}, 'oldest_event_seq', sub { 800 + my $value = $self->dbh->selectrow_array( 801 + q{SELECT MIN(seq) FROM events}, 802 + ); 803 + return defined $value ? $value : 0; 804 + }); 785 805 } 786 806 787 807 sub create_action_token ($self, %args) { ··· 1040 1060 } 1041 1061 1042 1062 sub put_label ($self, %args) { 1043 - my $subject_key = $args{subject_key} // die 'subject_key is required'; 1044 - my $src = $args{src} // die 'src is required'; 1045 - my $uri = $args{uri} // die 'uri is required'; 1046 - my $val = $args{val} // die 'val is required'; 1047 - my $now = $args{created_at} // time; 1048 - $self->dbh->do( 1049 - q{ 1050 - INSERT INTO labels ( 1051 - subject_key, src, uri, cid, val, exp, sig, created_at, updated_at 1052 - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 1053 - ON CONFLICT(subject_key, src, val) DO UPDATE SET 1054 - uri = excluded.uri, 1055 - cid = excluded.cid, 1056 - exp = excluded.exp, 1057 - sig = excluded.sig, 1058 - updated_at = excluded.updated_at 1059 - }, 1060 - undef, 1061 - $subject_key, 1062 - $src, 1063 - $uri, 1064 - $args{cid}, 1065 - $val, 1066 - $args{exp}, 1067 - $args{sig}, 1068 - $now, 1069 - $args{updated_at} // $now, 1070 - ); 1071 - return $self->get_label( 1072 - subject_key => $subject_key, 1073 - src => $src, 1074 - val => $val, 1075 - ); 1063 + return observe_store_operation($self->{metrics}, 'put_label', sub { 1064 + my $subject_key = $args{subject_key} // die 'subject_key is required'; 1065 + my $src = $args{src} // die 'src is required'; 1066 + my $uri = $args{uri} // die 'uri is required'; 1067 + my $val = $args{val} // die 'val is required'; 1068 + my $now = $args{created_at} // time; 1069 + $self->dbh->do( 1070 + q{ 1071 + INSERT INTO labels ( 1072 + subject_key, src, uri, cid, val, exp, sig, created_at, updated_at 1073 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 1074 + ON CONFLICT(subject_key, src, val) DO UPDATE SET 1075 + uri = excluded.uri, 1076 + cid = excluded.cid, 1077 + exp = excluded.exp, 1078 + sig = excluded.sig, 1079 + updated_at = excluded.updated_at 1080 + }, 1081 + undef, 1082 + $subject_key, 1083 + $src, 1084 + $uri, 1085 + $args{cid}, 1086 + $val, 1087 + $args{exp}, 1088 + $args{sig}, 1089 + $now, 1090 + $args{updated_at} // $now, 1091 + ); 1092 + return $self->get_label( 1093 + subject_key => $subject_key, 1094 + src => $src, 1095 + val => $val, 1096 + ); 1097 + }); 1076 1098 } 1077 1099 1078 1100 sub get_label ($self, %args) { ··· 1103 1125 } 1104 1126 1105 1127 sub list_labels ($self, %args) { 1106 - my $limit = $args{limit} // 50; 1107 - $limit = 250 if $limit > 250; 1108 - my $cursor = $args{cursor}; 1109 - my @where; 1110 - my @bind; 1111 - if (my $sources = $args{sources}) { 1112 - if (@$sources) { 1113 - my $placeholders = join(', ', ('?') x @$sources); 1114 - push @where, "src IN ($placeholders)"; 1115 - push @bind, @$sources; 1128 + return observe_store_operation($self->{metrics}, 'list_labels', sub { 1129 + my $limit = $args{limit} // 50; 1130 + $limit = 250 if $limit > 250; 1131 + my $cursor = $args{cursor}; 1132 + my @where; 1133 + my @bind; 1134 + if (my $sources = $args{sources}) { 1135 + if (@$sources) { 1136 + my $placeholders = join(', ', ('?') x @$sources); 1137 + push @where, "src IN ($placeholders)"; 1138 + push @bind, @$sources; 1139 + } 1140 + } 1141 + if (defined $cursor && length $cursor) { 1142 + push @where, q{id > ?}; 1143 + push @bind, int($cursor); 1144 + } 1145 + my $sql = q{SELECT * FROM labels}; 1146 + $sql .= q{ WHERE } . join(q{ AND }, @where) if @where; 1147 + $sql .= q{ ORDER BY id ASC}; 1148 + my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, @bind); 1149 + my @filtered = grep { _matches_uri_patterns($_->{uri}, $args{uri_patterns}) } @$rows; 1150 + my @items = @filtered; 1151 + my $next_cursor; 1152 + if (@items > $limit) { 1153 + @items = @items[0 .. $limit - 1]; 1154 + $next_cursor = $items[-1]{id}; 1116 1155 } 1117 - } 1118 - if (defined $cursor && length $cursor) { 1119 - push @where, q{id > ?}; 1120 - push @bind, int($cursor); 1121 - } 1122 - my $sql = q{SELECT * FROM labels}; 1123 - $sql .= q{ WHERE } . join(q{ AND }, @where) if @where; 1124 - $sql .= q{ ORDER BY id ASC}; 1125 - my $rows = $self->dbh->selectall_arrayref($sql, { Slice => {} }, @bind); 1126 - my @filtered = grep { _matches_uri_patterns($_->{uri}, $args{uri_patterns}) } @$rows; 1127 - my @items = @filtered; 1128 - my $next_cursor; 1129 - if (@items > $limit) { 1130 - @items = @items[0 .. $limit - 1]; 1131 - $next_cursor = $items[-1]{id}; 1132 - } 1133 - return { 1134 - items => \@items, 1135 - cursor => $next_cursor, 1136 - }; 1156 + return { 1157 + items => \@items, 1158 + cursor => $next_cursor, 1159 + }; 1160 + }); 1137 1161 } 1138 1162 1139 1163 sub reserve_signing_key ($self, %args) {
+56 -5
lib/ATProto/PDS/XRPC/Dispatcher.pm
··· 4 4 use warnings; 5 5 6 6 use Mojo::Base -base, -signatures; 7 + use Time::HiRes qw(time); 7 8 8 9 has app => undef; 9 10 has routes => undef; ··· 13 14 my %by_id = map { $_->{id} => $_ } @{ $self->catalog }; 14 15 15 16 $self->routes->websocket('/xrpc/*nsid')->to(cb => sub ($c) { 17 + my $started = time; 18 + my $nsid = $c->stash('nsid') // q(); 16 19 my $endpoint = $by_id{ $c->stash('nsid') // q() }; 17 20 return $c->finish(1008) unless $endpoint; 18 21 19 22 if ($endpoint->{type} ne 'subscription') { 20 - $c->send({ json => { 23 + $c->app->metrics->increment_counter('perlds_subscription_closes_total', 1, { 24 + nsid => $endpoint->{id}, 25 + code => 1008, 26 + }); 27 + $c->subscription_send(json => { 21 28 error => 'MethodNotAllowed', 22 29 message => "$endpoint->{id} is not a subscription endpoint", 23 - }}); 30 + }, frame_type => 'error', nsid => $endpoint->{id}); 24 31 return $c->finish(1008); 25 32 } 26 33 34 + my $labels = { nsid => $endpoint->{id} }; 35 + $c->app->metrics->increment_counter('perlds_subscription_connections_total', 1, $labels); 36 + $c->app->metrics->add_gauge('perlds_subscription_active', 1, $labels); 37 + $c->on(finish => sub ($c, $code, $reason = undef) { 38 + $c->app->metrics->add_gauge('perlds_subscription_active', -1, $labels); 39 + $c->app->metrics->increment_counter('perlds_subscription_closes_total', 1, { 40 + %{$labels}, 41 + code => defined($code) ? $code : 0, 42 + }); 43 + $c->app->metrics->observe_histogram( 44 + 'perlds_subscription_duration_seconds', 45 + time - $started, 46 + $labels, 47 + ); 48 + }); 49 + 27 50 my $handler = $c->app->api_registry->handler_for($endpoint->{id}); 28 51 if ($handler) { 29 52 return $handler->($c, $endpoint); 30 53 } 31 54 32 - $c->send({ json => { 55 + $c->subscription_send(json => { 33 56 error => 'NotImplemented', 34 57 message => "No subscription handler registered for $endpoint->{id}", 35 58 nsid => $endpoint->{id}, 36 - }}); 59 + }, frame_type => 'error', nsid => $endpoint->{id}); 37 60 $c->finish(1000); 38 61 }); 39 62 40 63 $self->routes->any('/xrpc/*nsid')->to(cb => sub ($c) { 64 + my $started = time; 65 + my $method = $c->req->method; 66 + my $finish_metrics = sub ($status, $endpoint_type = 'unknown', $nsid = $c->stash('nsid') // 'unknown') { 67 + my $labels = { 68 + method => $method, 69 + nsid => $nsid, 70 + endpoint_type => $endpoint_type, 71 + status => $status, 72 + }; 73 + $c->app->metrics->increment_counter('perlds_xrpc_requests_total', 1, $labels); 74 + $c->app->metrics->observe_histogram( 75 + 'perlds_xrpc_request_duration_seconds', 76 + time - $started, 77 + $labels, 78 + ); 79 + }; 80 + 41 81 my $endpoint = $by_id{ $c->stash('nsid') // q() }; 42 82 unless ($endpoint) { 83 + $finish_metrics->(404); 43 84 return $c->render( 44 85 status => 404, 45 86 json => { ··· 50 91 } 51 92 52 93 if ($endpoint->{type} eq 'subscription') { 94 + $finish_metrics->(426, $endpoint->{type}, $endpoint->{id}); 53 95 return $c->render( 54 96 status => 426, 55 97 json => { ··· 60 102 } 61 103 62 104 if ($endpoint->{type} eq 'query' && $c->req->method ne 'GET') { 105 + $finish_metrics->(405, $endpoint->{type}, $endpoint->{id}); 63 106 return $c->render( 64 107 status => 405, 65 108 json => { ··· 70 113 } 71 114 72 115 if ($endpoint->{type} eq 'procedure' && $c->req->method ne 'POST') { 116 + $finish_metrics->(405, $endpoint->{type}, $endpoint->{id}); 73 117 return $c->render( 74 118 status => 405, 75 119 json => { ··· 81 125 82 126 my $handler = $c->app->api_registry->handler_for($endpoint->{id}); 83 127 unless ($handler) { 128 + $finish_metrics->(501, $endpoint->{type}, $endpoint->{id}); 84 129 return $c->render( 85 130 status => 501, 86 131 json => { ··· 95 140 my $result = eval { $handler->($c, $endpoint) }; 96 141 if (my $err = $@) { 97 142 if (ref($err) eq 'HASH' && $err->{error}) { 143 + $finish_metrics->($err->{status} // 400, $endpoint->{type}, $endpoint->{id}); 98 144 return $c->render( 99 145 status => $err->{status} // 400, 100 146 json => { ··· 106 152 die $err; 107 153 } 108 154 109 - return unless defined $result; 155 + if (!defined $result) { 156 + my $status = $c->res->code || 200; 157 + $finish_metrics->($status, $endpoint->{type}, $endpoint->{id}); 158 + return; 159 + } 160 + $finish_metrics->(200, $endpoint->{type}, $endpoint->{id}); 110 161 return $c->render(json => $result); 111 162 }); 112 163 }
+105
t/metrics.t
··· 1 + use v5.34; 2 + use warnings; 3 + 4 + use Config (); 5 + use File::Spec; 6 + use File::Temp qw(tempdir); 7 + use FindBin qw($Bin); 8 + use Test::More; 9 + 10 + BEGIN { 11 + require lib; 12 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 13 + lib->import( 14 + File::Spec->catdir($root, 'lib'), 15 + File::Spec->catdir($root, 'local', 'lib', 'perl5'), 16 + File::Spec->catdir($root, 'local', 'lib', 'perl5', $Config::Config{archname}), 17 + ); 18 + } 19 + 20 + use Test::Mojo; 21 + use ATProto::PDS; 22 + 23 + my $root = File::Spec->rel2abs(File::Spec->catdir($Bin, '..')); 24 + my $tmp = tempdir(CLEANUP => 1); 25 + 26 + my $app = ATProto::PDS->new( 27 + project_root => $root, 28 + settings => { 29 + base_url => 'http://127.0.0.1:7755', 30 + service_did_method => 'did:web', 31 + service_handle_domain => 'test', 32 + jwt_secret => 'metrics-secret', 33 + admin_password => 'admin-secret', 34 + metrics_token => 'metrics-token', 35 + db_path => File::Spec->catfile($tmp, 'metrics.sqlite'), 36 + data_dir => File::Spec->catdir($tmp, 'data'), 37 + }, 38 + ); 39 + 40 + my $t = Test::Mojo->new($app); 41 + 42 + $t->post_ok('/xrpc/com.atproto.server.createAccount' => json => { 43 + handle => 'alice.test', 44 + email => 'alice@test.com', 45 + password => 'hunter22', 46 + })->status_is(200); 47 + 48 + my $access = $t->tx->res->json->{accessJwt}; 49 + 50 + $t->post_ok('/xrpc/com.atproto.repo.uploadBlob' => { 51 + Authorization => "Bearer $access", 52 + 'Content-Type' => 'text/plain', 53 + } => 'hello')->status_is(200); 54 + 55 + $t->websocket_ok('/xrpc/com.atproto.sync.subscribeRepos') 56 + ->finish_ok; 57 + 58 + $t->get_ok('/metrics') 59 + ->status_is(401) 60 + ->content_is('metrics authorization required'); 61 + 62 + $t->get_ok('/metrics' => { 63 + Authorization => 'Bearer metrics-token', 64 + })->status_is(200) 65 + ->header_like('Content-Type' => qr{text/plain; version=0\.0\.4}, 'metrics use Prometheus exposition format'); 66 + 67 + my $metrics = $t->tx->res->body; 68 + 69 + like( 70 + $metrics, 71 + qr/perlds_xrpc_requests_total\{endpoint_type="procedure",method="POST",nsid="com\.atproto\.server\.createAccount",status="200"\} 1\b/, 72 + 'createAccount request counter is exported', 73 + ); 74 + like( 75 + $metrics, 76 + qr/perlds_xrpc_request_duration_seconds_count\{endpoint_type="procedure",method="POST",nsid="com\.atproto\.server\.createAccount",status="200"\} 1\b/, 77 + 'createAccount latency histogram is exported', 78 + ); 79 + like( 80 + $metrics, 81 + qr/perlds_subscription_connections_total\{nsid="com\.atproto\.sync\.subscribeRepos"\} 1\b/, 82 + 'subscription open count is exported', 83 + ); 84 + like( 85 + $metrics, 86 + qr/perlds_subscription_active\{nsid="com\.atproto\.sync\.subscribeRepos"\} 0\b/, 87 + 'subscription active gauge returns to zero after close', 88 + ); 89 + like( 90 + $metrics, 91 + qr/perlds_blob_ingress_bytes_total\{mime_type="text\/plain"\} 5\b/, 92 + 'blob upload bytes are exported', 93 + ); 94 + like( 95 + $metrics, 96 + qr/perlds_store_operations_total\{operation="append_event",status="ok"\} [1-9]\d*\b/, 97 + 'store operation counters are exported', 98 + ); 99 + like( 100 + $metrics, 101 + qr/perlds_build_info\{service="perlds"\} 1\b/, 102 + 'build info metric is exported', 103 + ); 104 + 105 + done_testing;