An Elixir toolkit for the AT Protocol. hexdocs.pm/atex
elixir bluesky atproto decentralization
25
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: telemetry integration

+1056 -178
+3
CHANGELOG.md
··· 31 31 `Atex.OAuth.session_active_session_name/0` expose Plug session key atoms 32 32 - `Atex.IdentityResolver` now has full module and function documentation 33 33 - `Atex.XRPC.LoginClient` now has a `@moduledoc` 34 + - Optional `:telemetry` instrumentation via `Atex.Telemetry`. Add `{:telemetry, "~> 1.0"}` to 35 + your deps to receive events from XRPC requests, identity resolution, OAuth flows, and service 36 + auth validation. See `Atex.Telemetry` for the full event catalogue. 34 37 35 38 ### Fixed 36 39
+25 -7
lib/atex/identity_resolver.ex
··· 47 47 def resolve(identifier, opts \\ []) do 48 48 opts = Keyword.validate!(opts, skip_cache: false) 49 49 skip_cache = Keyword.get(opts, :skip_cache) 50 + identifier_type = if String.starts_with?(identifier, "did:"), do: :did, else: :handle 50 51 51 - cache_result = if skip_cache, do: {:error, :not_found}, else: Cache.get(identifier) 52 + Atex.Telemetry.span( 53 + [:atex, :identity_resolver, :resolve], 54 + %{identifier: identifier, identifier_type: identifier_type}, 55 + fn -> 56 + cache_result = if skip_cache, do: {:error, :not_found}, else: Cache.get(identifier) 52 57 53 - # If cache fetch succeeds, then the ok tuple will be retuned by the default `with` behaviour 54 - with {:error, :not_found} <- cache_result, 55 - {:ok, identity} <- do_resolve(identifier), 56 - identity <- Cache.insert(identity) do 57 - {:ok, identity} 58 - end 58 + cache_event = if match?({:ok, _}, cache_result), do: :hit, else: :miss 59 + 60 + Atex.Telemetry.execute( 61 + [:atex, :identity_resolver, :cache, cache_event], 62 + %{system_time: System.system_time()}, 63 + %{identifier: identifier} 64 + ) 65 + 66 + # If cache fetch succeeds, then the ok tuple will be retuned by the default `with` behaviour 67 + result = 68 + with {:error, :not_found} <- cache_result, 69 + {:ok, identity} <- do_resolve(identifier), 70 + identity <- Cache.insert(identity) do 71 + {:ok, identity} 72 + end 73 + 74 + {result, %{}} 75 + end 76 + ) 59 77 end 60 78 61 79 @spec do_resolve(identity :: String.t()) ::
+153 -120
lib/atex/oauth/flow.ex
··· 149 149 list(create_authorization_url_option()) 150 150 ) :: {:ok, String.t()} | {:error, any()} 151 151 def create_authorization_url(authz_metadata, state, code_verifier, login_hint, opts \\ []) do 152 - opts = Keyword.validate!(opts, [:key, :client_id, :redirect_uri, :scopes]) 152 + Atex.Telemetry.span( 153 + [:atex, :oauth, :authorization_url], 154 + %{issuer: Map.get(authz_metadata, :issuer)}, 155 + fn -> 156 + opts = Keyword.validate!(opts, [:key, :client_id, :redirect_uri, :scopes]) 157 + key = Keyword.get_lazy(opts, :key, &Config.get_key/0) 158 + client_id = Keyword.get_lazy(opts, :client_id, &Config.client_id/0) 159 + redirect_uri = Keyword.get_lazy(opts, :redirect_uri, &Config.redirect_uri/0) 160 + scopes = Keyword.get_lazy(opts, :scopes, &Config.scopes/0) 153 161 154 - key = Keyword.get_lazy(opts, :key, &Config.get_key/0) 155 - client_id = Keyword.get_lazy(opts, :client_id, &Config.client_id/0) 156 - redirect_uri = Keyword.get_lazy(opts, :redirect_uri, &Config.redirect_uri/0) 157 - scopes = Keyword.get_lazy(opts, :scopes, &Config.scopes/0) 162 + code_challenge = :crypto.hash(:sha256, code_verifier) |> Base.url_encode64(padding: false) 163 + client_assertion = create_client_assertion(key, client_id, authz_metadata.issuer) 158 164 159 - code_challenge = :crypto.hash(:sha256, code_verifier) |> Base.url_encode64(padding: false) 160 - client_assertion = create_client_assertion(key, client_id, authz_metadata.issuer) 165 + body = %{ 166 + response_type: "code", 167 + client_id: client_id, 168 + redirect_uri: redirect_uri, 169 + state: state, 170 + code_challenge_method: "S256", 171 + code_challenge: code_challenge, 172 + scope: scopes, 173 + client_assertion_type: "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 174 + client_assertion: client_assertion, 175 + login_hint: login_hint 176 + } 161 177 162 - body = %{ 163 - response_type: "code", 164 - client_id: client_id, 165 - redirect_uri: redirect_uri, 166 - state: state, 167 - code_challenge_method: "S256", 168 - code_challenge: code_challenge, 169 - scope: scopes, 170 - client_assertion_type: "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 171 - client_assertion: client_assertion, 172 - login_hint: login_hint 173 - } 178 + result = 179 + case Req.post(authz_metadata.par_endpoint, form: body) do 180 + {:ok, %{body: %{"request_uri" => request_uri}}} -> 181 + query = %{client_id: client_id, request_uri: request_uri} |> URI.encode_query() 182 + {:ok, "#{authz_metadata.authorization_endpoint}?#{query}"} 174 183 175 - case Req.post(authz_metadata.par_endpoint, form: body) do 176 - {:ok, %{body: %{"request_uri" => request_uri}}} -> 177 - query = %{client_id: client_id, request_uri: request_uri} |> URI.encode_query() 178 - {:ok, "#{authz_metadata.authorization_endpoint}?#{query}"} 184 + {:ok, _} -> 185 + {:error, :invalid_par_response} 179 186 180 - {:ok, _} -> 181 - {:error, :invalid_par_response} 187 + err -> 188 + err 189 + end 182 190 183 - err -> 184 - err 185 - end 191 + {result, %{}} 192 + end 193 + ) 186 194 end 187 195 188 196 @doc """ ··· 213 221 list(validate_authorization_code_option()) 214 222 ) :: {:ok, tokens(), String.t() | nil} | {:error, any()} 215 223 def validate_authorization_code(authz_metadata, dpop_key, code, code_verifier, opts \\ []) do 216 - opts = Keyword.validate!(opts, [:key, :client_id, :redirect_uri, :scopes]) 224 + Atex.Telemetry.span( 225 + [:atex, :oauth, :code_exchange], 226 + %{issuer: Map.get(authz_metadata, :issuer)}, 227 + fn -> 228 + opts = Keyword.validate!(opts, [:key, :client_id, :redirect_uri, :scopes]) 229 + key = Keyword.get_lazy(opts, :key, &Config.get_key/0) 230 + client_id = Keyword.get_lazy(opts, :client_id, &Config.client_id/0) 231 + redirect_uri = Keyword.get_lazy(opts, :redirect_uri, &Config.redirect_uri/0) 217 232 218 - key = Keyword.get_lazy(opts, :key, &Config.get_key/0) 219 - client_id = Keyword.get_lazy(opts, :client_id, &Config.client_id/0) 220 - redirect_uri = Keyword.get_lazy(opts, :redirect_uri, &Config.redirect_uri/0) 233 + client_assertion = create_client_assertion(key, client_id, authz_metadata.issuer) 221 234 222 - client_assertion = create_client_assertion(key, client_id, authz_metadata.issuer) 235 + body = %{ 236 + grant_type: "authorization_code", 237 + client_id: client_id, 238 + redirect_uri: redirect_uri, 239 + code: code, 240 + code_verifier: code_verifier 241 + } 223 242 224 - body = %{ 225 - grant_type: "authorization_code", 226 - client_id: client_id, 227 - redirect_uri: redirect_uri, 228 - code: code, 229 - code_verifier: code_verifier 230 - } 243 + body = 244 + if Config.localhost?(), 245 + do: body, 246 + else: 247 + Map.merge(body, %{ 248 + client_assertion_type: "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 249 + client_assertion: client_assertion 250 + }) 231 251 232 - body = 233 - if Config.localhost?(), 234 - do: body, 235 - else: 236 - Map.merge(body, %{ 237 - client_assertion_type: "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 238 - client_assertion: client_assertion 239 - }) 252 + result = 253 + Req.new(method: :post, url: authz_metadata.token_endpoint, form: body) 254 + |> DPoP.send_oauth_dpop_request(dpop_key) 255 + |> case do 256 + {:ok, 257 + %{ 258 + "access_token" => access_token, 259 + "refresh_token" => refresh_token, 260 + "expires_in" => expires_in, 261 + "sub" => did 262 + }, nonce} -> 263 + expires_at = NaiveDateTime.utc_now() |> NaiveDateTime.add(expires_in, :second) 240 264 241 - Req.new(method: :post, url: authz_metadata.token_endpoint, form: body) 242 - |> DPoP.send_oauth_dpop_request(dpop_key) 243 - |> case do 244 - {:ok, 245 - %{ 246 - "access_token" => access_token, 247 - "refresh_token" => refresh_token, 248 - "expires_in" => expires_in, 249 - "sub" => did 250 - }, nonce} -> 251 - expires_at = NaiveDateTime.utc_now() |> NaiveDateTime.add(expires_in, :second) 265 + {:ok, 266 + %{ 267 + access_token: access_token, 268 + refresh_token: refresh_token, 269 + did: did, 270 + expires_at: expires_at 271 + }, nonce} 252 272 253 - {:ok, 254 - %{ 255 - access_token: access_token, 256 - refresh_token: refresh_token, 257 - did: did, 258 - expires_at: expires_at 259 - }, nonce} 273 + {:error, reason, _nonce} -> 274 + {:error, reason} 275 + end 260 276 261 - {:error, reason, _nonce} -> 262 - {:error, reason} 263 - end 277 + {result, %{}} 278 + end 279 + ) 264 280 end 265 281 266 282 @doc """ ··· 285 301 list(refresh_token_option()) 286 302 ) :: {:ok, tokens(), String.t() | nil} | {:error, any()} 287 303 def refresh_token(refresh_token, dpop_key, issuer, token_endpoint, opts \\ []) do 288 - opts = Keyword.validate!(opts, [:key, :client_id]) 304 + Atex.Telemetry.span( 305 + [:atex, :oauth, :token_refresh], 306 + %{issuer: issuer}, 307 + fn -> 308 + opts = Keyword.validate!(opts, [:key, :client_id]) 309 + key = Keyword.get_lazy(opts, :key, &Config.get_key/0) 310 + client_id = Keyword.get_lazy(opts, :client_id, &Config.client_id/0) 289 311 290 - key = Keyword.get_lazy(opts, :key, &Config.get_key/0) 291 - client_id = Keyword.get_lazy(opts, :client_id, &Config.client_id/0) 312 + client_assertion = create_client_assertion(key, client_id, issuer) 292 313 293 - client_assertion = create_client_assertion(key, client_id, issuer) 314 + body = %{ 315 + grant_type: "refresh_token", 316 + refresh_token: refresh_token, 317 + client_id: client_id, 318 + client_assertion_type: "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 319 + client_assertion: client_assertion 320 + } 294 321 295 - body = %{ 296 - grant_type: "refresh_token", 297 - refresh_token: refresh_token, 298 - client_id: client_id, 299 - client_assertion_type: "urn:ietf:params:oauth:client-assertion-type:jwt-bearer", 300 - client_assertion: client_assertion 301 - } 322 + result = 323 + Req.new(method: :post, url: token_endpoint, form: body) 324 + |> DPoP.send_oauth_dpop_request(dpop_key) 325 + |> case do 326 + {:ok, 327 + %{ 328 + "access_token" => access_token, 329 + "refresh_token" => refresh_token, 330 + "expires_in" => expires_in, 331 + "sub" => did 332 + }, nonce} -> 333 + expires_at = NaiveDateTime.utc_now() |> NaiveDateTime.add(expires_in, :second) 302 334 303 - Req.new(method: :post, url: token_endpoint, form: body) 304 - |> DPoP.send_oauth_dpop_request(dpop_key) 305 - |> case do 306 - {:ok, 307 - %{ 308 - "access_token" => access_token, 309 - "refresh_token" => refresh_token, 310 - "expires_in" => expires_in, 311 - "sub" => did 312 - }, nonce} -> 313 - expires_at = NaiveDateTime.utc_now() |> NaiveDateTime.add(expires_in, :second) 335 + {:ok, 336 + %{ 337 + access_token: access_token, 338 + refresh_token: refresh_token, 339 + did: did, 340 + expires_at: expires_at 341 + }, nonce} 314 342 315 - {:ok, 316 - %{ 317 - access_token: access_token, 318 - refresh_token: refresh_token, 319 - did: did, 320 - expires_at: expires_at 321 - }, nonce} 343 + {:error, reason, _nonce} -> 344 + {:error, reason} 345 + end 322 346 323 - {:error, reason, _nonce} -> 324 - {:error, reason} 325 - end 347 + {result, %{}} 348 + end 349 + ) 326 350 end 327 351 328 352 @doc """ ··· 343 367 """ 344 368 @spec revoke_tokens(Session.t(), authorization_metadata()) :: :ok 345 369 def revoke_tokens(%Session{} = session, authz_metadata) do 346 - client_id = Config.client_id() 370 + Atex.Telemetry.span( 371 + [:atex, :oauth, :token_revocation], 372 + %{issuer: Map.get(authz_metadata, :issuer)}, 373 + fn -> 374 + client_id = Config.client_id() 347 375 348 - body = %{ 349 - client_id: client_id, 350 - token: session.refresh_token, 351 - token_type_hint: "refresh_token" 352 - } 376 + body = %{ 377 + client_id: client_id, 378 + token: session.refresh_token, 379 + token_type_hint: "refresh_token" 380 + } 353 381 354 - case Req.post(authz_metadata.revocation_endpoint, form: body) do 355 - {:ok, %{status: status}} when status in [200, 204] -> 356 - :ok 382 + result = 383 + case Req.post(authz_metadata.revocation_endpoint, form: body) do 384 + {:ok, %{status: status}} when status in [200, 204] -> 385 + :ok 357 386 358 - {:ok, %{body: %{"error" => error}}} -> 359 - Logger.warning("Token revocation failed: #{error}") 360 - :ok 387 + {:ok, %{body: %{"error" => error}}} -> 388 + Logger.warning("Token revocation failed: #{error}") 389 + :ok 361 390 362 - {:error, reason} -> 363 - Logger.warning("Token revocation request failed: #{inspect(reason)}") 364 - :ok 391 + {:error, reason} -> 392 + Logger.warning("Token revocation request failed: #{inspect(reason)}") 393 + :ok 394 + 395 + unexpected -> 396 + Logger.warning("Unexpected token revocation response: #{inspect(unexpected)}") 397 + :ok 398 + end 365 399 366 - unexpected -> 367 - Logger.warning("Unexpected token revocation response: #{inspect(unexpected)}") 368 - :ok 369 - end 400 + {result, %{}} 401 + end 402 + ) 370 403 end 371 404 372 405 @spec random_b64(integer()) :: String.t()
+61 -35
lib/atex/service_auth.ex
··· 105 105 106 106 peek_result = 107 107 try do 108 - peeked = JOSE.JWT.peek(jwt) 109 - {:ok, peeked} 108 + {:ok, JOSE.JWT.peek(jwt)} 110 109 rescue 111 110 _ -> {:error, :invalid_jwt} 112 111 end 113 112 114 - case peek_result do 115 - {:error, _} = err -> 116 - err 113 + {span_iss, span_lxm} = 114 + case peek_result do 115 + {:ok, %{fields: fields}} -> {Map.get(fields, "iss"), Map.get(fields, "lxm")} 116 + _ -> {nil, nil} 117 + end 117 118 118 - {:ok, 119 - %{ 120 - fields: 121 - %{ 122 - "aud" => target_aud, 123 - "iat" => iat, 124 - "exp" => exp, 125 - "iss" => issuing_did, 126 - "jti" => nonce 127 - } = fields 128 - }} -> 129 - target_lxm = Map.get(fields, "lxm") 119 + Atex.Telemetry.span( 120 + [:atex, :service_auth, :validate], 121 + %{iss: span_iss, lxm: span_lxm}, 122 + fn -> 123 + result = do_validate_jwt(jwt, peek_result, expected_aud, expected_lxm) 124 + {result, %{}} 125 + end 126 + ) 127 + end 130 128 131 - with :ok <- validate_aud(expected_aud, target_aud), 132 - :ok <- validate_lxm(expected_lxm, target_lxm), 133 - :ok <- validate_token_times(iat, exp), 134 - # Resolve JWT's issuer to: a) make sure it's a real identity, b) get 135 - # the signing key from their DID document to verify the token 136 - {:ok, identity} <- Atex.IdentityResolver.resolve(issuing_did), 137 - user_jwk when not is_nil(user_jwk) <- 138 - Atex.DID.Document.get_atproto_signing_key(identity.document), 139 - {true, %JOSE.JWT{} = jwt_struct, _jws} <- JOSE.JWT.verify(user_jwk, jwt), 140 - # Record the nonce atomically after successful verification. insert_new 141 - # is used under the hood so this returns :seen if the jti was already 142 - # consumed, preventing replay attacks. 143 - :ok <- Atex.ServiceAuth.JTICache.put(nonce, exp) do 144 - {:ok, jwt_struct} 145 - else 146 - :seen -> {:error, :replayed_token} 147 - err -> err 148 - end 129 + @spec do_validate_jwt( 130 + String.t(), 131 + {:ok, JOSE.JWT.t()} | {:error, atom()}, 132 + String.t(), 133 + String.t() | nil 134 + ) :: {:ok, JOSE.JWT.t()} | {:error, atom()} 135 + defp do_validate_jwt(_jwt, {:error, _} = err, _expected_aud, _expected_lxm), do: err 136 + 137 + defp do_validate_jwt( 138 + jwt, 139 + {:ok, 140 + %{ 141 + fields: 142 + %{ 143 + "aud" => target_aud, 144 + "iat" => iat, 145 + "exp" => exp, 146 + "iss" => issuing_did, 147 + "jti" => nonce 148 + } = fields 149 + }}, 150 + expected_aud, 151 + expected_lxm 152 + ) do 153 + target_lxm = Map.get(fields, "lxm") 154 + 155 + with :ok <- validate_aud(expected_aud, target_aud), 156 + :ok <- validate_lxm(expected_lxm, target_lxm), 157 + :ok <- validate_token_times(iat, exp), 158 + # Resolve JWT's issuer to: a) make sure it's a real identity, b) get 159 + # the signing key from their DID document to verify the token 160 + {:ok, identity} <- Atex.IdentityResolver.resolve(issuing_did), 161 + user_jwk when not is_nil(user_jwk) <- 162 + Atex.DID.Document.get_atproto_signing_key(identity.document), 163 + {true, %JOSE.JWT{} = jwt_struct, _jws} <- JOSE.JWT.verify(user_jwk, jwt), 164 + # Record the nonce atomically after successful verification. insert_new 165 + # is used under the hood so this returns :seen if the jti was already 166 + # consumed, preventing replay attacks. 167 + :ok <- Atex.ServiceAuth.JTICache.put(nonce, exp) do 168 + {:ok, jwt_struct} 169 + else 170 + :seen -> {:error, :replayed_token} 171 + err -> err 149 172 end 150 173 end 174 + 175 + defp do_validate_jwt(_jwt, {:ok, _unmatched}, _expected_aud, _expected_lxm), 176 + do: {:error, :invalid_jwt} 151 177 152 178 @spec validate_token_times(integer(), integer()) :: :ok | {:error, reason :: atom()} 153 179 defp validate_token_times(iat, exp) do
+282
lib/atex/telemetry.ex
··· 1 + defmodule Atex.Telemetry do 2 + @moduledoc """ 3 + Telemetry instrumentation for Atex. 4 + 5 + Atex emits `:telemetry` events throughout its subsystems. To receive events, 6 + attach handlers using `:telemetry.attach/4` or `:telemetry.attach_many/4`. 7 + 8 + `:telemetry` is an **optional dependency**. If it is not present in your 9 + application's deps, all instrumentation calls compile to no-ops with zero 10 + runtime overhead. Add it to your `mix.exs` to enable instrumentation: 11 + 12 + {:telemetry, "~> 1.0"} 13 + 14 + ## Event catalogue 15 + 16 + ### XRPC 17 + 18 + #### `[:atex, :xrpc, :request, :start | :stop | :exception]` 19 + 20 + Emitted for every outgoing XRPC HTTP request (all client types). 21 + 22 + - **start measurements:** `%{system_time: integer()}` 23 + - **stop measurements:** `%{duration: integer()}` 24 + - **exception measurements:** `%{duration: integer()}` 25 + - **metadata (all events):** `%{method: :get | :post, resource: String.t(), endpoint: String.t(), client_type: :login | :oauth | :service_auth | :unauthed}` 26 + - **stop additional metadata:** `%{status: integer()}` 27 + - **exception additional metadata:** `%{kind: :error, reason: term(), stacktrace: list()}` 28 + 29 + #### `[:atex, :xrpc, :token_refresh, :start | :stop | :exception]` 30 + 31 + Emitted when a client performs a token refresh (`LoginClient` or `OAuthClient`). 32 + 33 + - **start measurements:** `%{system_time: integer()}` 34 + - **stop measurements:** `%{duration: integer()}` 35 + - **metadata:** `%{client_type: :login | :oauth}` 36 + 37 + ### Identity Resolver 38 + 39 + #### `[:atex, :identity_resolver, :resolve, :start | :stop | :exception]` 40 + 41 + Emitted for every call to `Atex.IdentityResolver.resolve/2`. 42 + 43 + - **start measurements:** `%{system_time: integer()}` 44 + - **stop measurements:** `%{duration: integer()}` 45 + - **metadata:** `%{identifier: String.t(), identifier_type: :did | :handle}` 46 + 47 + #### `[:atex, :identity_resolver, :cache, :hit | :miss]` 48 + 49 + Emitted at the cache check branch inside `resolve/2`. `:hit` means the result 50 + was served from cache; `:miss` means a fresh resolution was performed (including 51 + when `skip_cache: true` is passed). 52 + 53 + - **measurements:** `%{system_time: integer()}` 54 + - **metadata:** `%{identifier: String.t()}` 55 + 56 + ### OAuth 57 + 58 + All OAuth spans share: 59 + 60 + - **start measurements:** `%{system_time: integer()}` 61 + - **stop measurements:** `%{duration: integer()}` 62 + - **metadata:** `%{issuer: String.t() | nil}` 63 + 64 + #### `[:atex, :oauth, :authorization_url, :start | :stop | :exception]` 65 + 66 + Wraps `Atex.OAuth.Flow.create_authorization_url/5` (PAR request + URL construction). 67 + 68 + #### `[:atex, :oauth, :code_exchange, :start | :stop | :exception]` 69 + 70 + Wraps `Atex.OAuth.Flow.validate_authorization_code/5`. 71 + 72 + #### `[:atex, :oauth, :token_refresh, :start | :stop | :exception]` 73 + 74 + Wraps `Atex.OAuth.Flow.refresh_token/5`. 75 + 76 + #### `[:atex, :oauth, :token_revocation, :start | :stop | :exception]` 77 + 78 + Wraps `Atex.OAuth.Flow.revoke_tokens/2`. 79 + 80 + ### Service Auth 81 + 82 + #### `[:atex, :service_auth, :validate, :start | :stop | :exception]` 83 + 84 + Wraps `Atex.ServiceAuth.validate_jwt/2`. 85 + 86 + - **start measurements:** `%{system_time: integer()}` 87 + - **stop measurements:** `%{duration: integer()}` 88 + - **metadata:** `%{iss: String.t() | nil, lxm: String.t() | nil}` 89 + 90 + ## Example handler 91 + 92 + :telemetry.attach_many( 93 + "my-app-atex-handler", 94 + [ 95 + [:atex, :xrpc, :request, :stop], 96 + [:atex, :identity_resolver, :resolve, :stop] 97 + ], 98 + fn event, measurements, metadata, _config -> 99 + Logger.debug( 100 + "Atex event: \#{inspect(event)} " <> 101 + "duration=\#{measurements.duration} " <> 102 + "metadata=\#{inspect(metadata)}" 103 + ) 104 + end, 105 + nil 106 + ) 107 + """ 108 + 109 + @telemetry_available Code.ensure_loaded?(:telemetry) 110 + 111 + if @telemetry_available do 112 + @doc """ 113 + Execute a telemetry event. 114 + 115 + Delegates to `:telemetry.execute/3`. No-op when `:telemetry` is not loaded. 116 + """ 117 + @spec execute(list(atom()), map(), map()) :: :ok 118 + def execute(event, measurements, metadata), 119 + do: :telemetry.execute(event, measurements, metadata) 120 + 121 + @doc """ 122 + Span a block with telemetry start/stop/exception events. 123 + 124 + Emits `event_prefix ++ [:start]` before calling `fun` and `event_prefix ++ [:stop]` 125 + after it returns. The `:start` event carries `%{system_time: System.system_time()}` as 126 + measurements and `start_metadata` as metadata. The `:stop` event carries 127 + `%{duration: duration}` (in native time units) and the result of merging 128 + `start_metadata` with the extra metadata returned by `fun`. 129 + 130 + `fun` must return `{result, extra_stop_metadata}`. This function returns `result`. 131 + 132 + No-op (calls `fun` and returns result) when `:telemetry` is not loaded. 133 + """ 134 + @spec span(list(atom()), map(), (-> {result, map()})) :: result when result: any() 135 + def span(event_prefix, start_metadata, fun) do 136 + start_time = System.monotonic_time() 137 + 138 + :telemetry.execute( 139 + event_prefix ++ [:start], 140 + %{system_time: System.system_time()}, 141 + start_metadata 142 + ) 143 + 144 + try do 145 + {result, extra_stop_metadata} = fun.() 146 + duration = System.monotonic_time() - start_time 147 + 148 + :telemetry.execute( 149 + event_prefix ++ [:stop], 150 + %{duration: duration}, 151 + Map.merge(start_metadata, extra_stop_metadata) 152 + ) 153 + 154 + result 155 + rescue 156 + exception -> 157 + duration = System.monotonic_time() - start_time 158 + 159 + :telemetry.execute( 160 + event_prefix ++ [:exception], 161 + %{duration: duration}, 162 + Map.merge(start_metadata, %{ 163 + kind: :error, 164 + reason: exception, 165 + stacktrace: __STACKTRACE__ 166 + }) 167 + ) 168 + 169 + reraise exception, __STACKTRACE__ 170 + catch 171 + kind, reason -> 172 + duration = System.monotonic_time() - start_time 173 + 174 + :telemetry.execute( 175 + event_prefix ++ [:exception], 176 + %{duration: duration}, 177 + Map.merge(start_metadata, %{ 178 + kind: kind, 179 + reason: reason, 180 + stacktrace: __STACKTRACE__ 181 + }) 182 + ) 183 + 184 + :erlang.raise(kind, reason, __STACKTRACE__) 185 + end 186 + end 187 + 188 + @doc """ 189 + Attach telemetry instrumentation to a `Req.Request`. 190 + 191 + Adds request and response steps that emit `[:atex, :xrpc, :request, ...]` 192 + events. Pass `client_type:` to identify the XRPC client variant. 193 + 194 + No-op when `:telemetry` is not loaded. 195 + 196 + ## Options 197 + 198 + - `:client_type` — one of `:login`, `:oauth`, `:service_auth`, `:unauthed` 199 + (default: `:unknown`) 200 + 201 + ## Example 202 + 203 + Req.new(method: :get, url: url) 204 + |> Atex.Telemetry.attach_req_plugin(client_type: :login) 205 + |> Req.request() 206 + """ 207 + @spec attach_req_plugin(Req.Request.t(), keyword()) :: Req.Request.t() 208 + def attach_req_plugin(req, opts \\ []) do 209 + client_type = Keyword.get(opts, :client_type, :unknown) 210 + 211 + req 212 + |> Req.Request.append_request_steps(atex_telemetry_start: &run_start_step(&1, client_type)) 213 + |> Req.Request.prepend_response_steps(atex_telemetry_stop: &run_stop_step/1) 214 + |> Req.Request.prepend_error_steps(atex_telemetry_stop: &run_stop_step/1) 215 + end 216 + 217 + defp run_start_step(req, client_type) do 218 + start_time = System.monotonic_time() 219 + path = req.url.path || "" 220 + resource = String.replace_prefix(path, "/xrpc/", "") 221 + 222 + endpoint = 223 + URI.to_string(%URI{scheme: req.url.scheme, host: req.url.host, port: req.url.port}) 224 + 225 + :telemetry.execute( 226 + [:atex, :xrpc, :request, :start], 227 + %{system_time: System.system_time()}, 228 + %{method: req.method, resource: resource, endpoint: endpoint, client_type: client_type} 229 + ) 230 + 231 + req 232 + |> Req.Request.put_private(:atex_start_time, start_time) 233 + |> Req.Request.put_private(:atex_metadata, %{ 234 + method: req.method, 235 + resource: resource, 236 + endpoint: endpoint, 237 + client_type: client_type 238 + }) 239 + end 240 + 241 + defp run_stop_step({req, response}) do 242 + start_time = Req.Request.get_private(req, :atex_start_time) 243 + base_metadata = Req.Request.get_private(req, :atex_metadata) || %{} 244 + 245 + if start_time do 246 + duration = System.monotonic_time() - start_time 247 + 248 + if match?(%Req.Response{}, response) do 249 + :telemetry.execute( 250 + [:atex, :xrpc, :request, :stop], 251 + %{duration: duration}, 252 + Map.put(base_metadata, :status, response.status) 253 + ) 254 + else 255 + :telemetry.execute( 256 + [:atex, :xrpc, :request, :exception], 257 + %{duration: duration}, 258 + # stacktrace unavailable in Req error steps — transport errors don't have one 259 + Map.merge(base_metadata, %{kind: :error, reason: response, stacktrace: []}) 260 + ) 261 + end 262 + end 263 + 264 + {req, response} 265 + end 266 + else 267 + @doc false 268 + @spec execute(list(atom()), map(), map()) :: :ok 269 + def execute(_event, _measurements, _metadata), do: :ok 270 + 271 + @doc false 272 + @spec span(list(atom()), map(), (-> {result, map()})) :: result when result: any() 273 + def span(_event_prefix, _start_metadata, fun) do 274 + {result, _meta} = fun.() 275 + result 276 + end 277 + 278 + @doc false 279 + @spec attach_req_plugin(Req.Request.t(), keyword()) :: Req.Request.t() 280 + def attach_req_plugin(req, _opts \\ []), do: req 281 + end 282 + end
+8 -2
lib/atex/xrpc.ex
··· 162 162 @spec unauthed_get(String.t(), String.t(), keyword()) :: 163 163 {:ok, Req.Response.t()} | {:error, any()} 164 164 def unauthed_get(endpoint, name, opts \\ []) do 165 - Req.get(url(endpoint, name), opts) 165 + (opts ++ [method: :get, url: url(endpoint, name)]) 166 + |> Req.new() 167 + |> Atex.Telemetry.attach_req_plugin(client_type: :unauthed) 168 + |> Req.request() 166 169 end 167 170 168 171 @doc """ ··· 171 174 @spec unauthed_post(String.t(), String.t(), keyword()) :: 172 175 {:ok, Req.Response.t()} | {:error, any()} 173 176 def unauthed_post(endpoint, name, opts \\ []) do 174 - Req.post(url(endpoint, name), opts) 177 + (opts ++ [method: :post, url: url(endpoint, name)]) 178 + |> Req.new() 179 + |> Atex.Telemetry.attach_req_plugin(client_type: :unauthed) 180 + |> Req.request() 175 181 end 176 182 177 183 @doc """
+25 -12
lib/atex/xrpc/login_client.ex
··· 80 80 """ 81 81 @spec refresh(t()) :: {:ok, t()} | {:error, any()} 82 82 def refresh(%__MODULE__{endpoint: endpoint, refresh_token: refresh_token} = client) do 83 - request = 84 - Req.new(method: :post, url: XRPC.url(endpoint, "com.atproto.server.refreshSession")) 85 - |> put_auth(refresh_token) 83 + Atex.Telemetry.span( 84 + [:atex, :xrpc, :token_refresh], 85 + %{client_type: :login}, 86 + fn -> 87 + request = 88 + Req.new(method: :post, url: XRPC.url(endpoint, "com.atproto.server.refreshSession")) 89 + |> put_auth(refresh_token) 90 + 91 + result = 92 + case Req.request(request) do 93 + {:ok, %{body: %{"accessJwt" => access_token, "refreshJwt" => refresh_token}}} -> 94 + {:ok, %{client | access_token: access_token, refresh_token: refresh_token}} 86 95 87 - case Req.request(request) do 88 - {:ok, %{body: %{"accessJwt" => access_token, "refreshJwt" => refresh_token}}} -> 89 - {:ok, %{client | access_token: access_token, refresh_token: refresh_token}} 96 + {:ok, response} -> 97 + {:error, response} 90 98 91 - {:ok, response} -> 92 - {:error, response} 99 + err -> 100 + err 101 + end 93 102 94 - err -> 95 - err 96 - end 103 + {result, %{}} 104 + end 105 + ) 97 106 end 98 107 99 108 @impl true ··· 109 118 @spec request(t(), keyword()) :: {:ok, Req.Response.t(), t()} | {:error, any()} 110 119 defp request(client, opts) do 111 120 with {:ok, client} <- validate_client(client) do 112 - request = opts |> Req.new() |> put_auth(client.access_token) 121 + request = 122 + opts 123 + |> Req.new() 124 + |> put_auth(client.access_token) 125 + |> Atex.Telemetry.attach_req_plugin(client_type: :login) 113 126 114 127 case Req.request(request) do 115 128 {:ok, %{status: 200} = response} ->
+13 -1
lib/atex/xrpc/oauth_client.ex
··· 142 142 end 143 143 144 144 @spec do_refresh(t()) :: {:ok, OAuth.Session.t()} | {:error, any()} 145 - defp do_refresh(%__MODULE__{session_key: session_key}) do 145 + defp do_refresh(%__MODULE__{} = client) do 146 + Atex.Telemetry.span( 147 + [:atex, :xrpc, :token_refresh], 148 + %{client_type: :oauth}, 149 + fn -> 150 + {do_refresh_impl(client), %{}} 151 + end 152 + ) 153 + end 154 + 155 + @spec do_refresh_impl(t()) :: {:ok, OAuth.Session.t()} | {:error, any()} 156 + defp do_refresh_impl(%__MODULE__{session_key: session_key}) do 146 157 with {:ok, session} <- OAuth.SessionStore.get(session_key), 147 158 {:ok, authz_server} <- Discovery.get_authorization_server(session.aud), 148 159 {:ok, %{token_endpoint: token_endpoint}} <- ··· 227 238 |> Keyword.put(:url, url) 228 239 |> Req.new() 229 240 |> Req.Request.put_header("authorization", "DPoP #{session.access_token}") 241 + |> Atex.Telemetry.attach_req_plugin(client_type: :oauth) 230 242 231 243 case DPoP.request_protected_dpop_resource( 232 244 request,
+5 -1
lib/atex/xrpc/service_auth_client.ex
··· 51 51 52 52 @spec request(t(), keyword()) :: {:ok, Req.Response.t(), t()} | {:error, any(), t()} 53 53 defp request(client, opts) do 54 - req = opts |> Req.new() |> put_auth(client.token) 54 + req = 55 + opts 56 + |> Req.new() 57 + |> put_auth(client.token) 58 + |> Atex.Telemetry.attach_req_plugin(client_type: :service_auth) 55 59 56 60 case Req.request(req) do 57 61 {:ok, response} -> {:ok, response, client}
+2
lib/atex/xrpc/unauthed_client.ex
··· 27 27 def get(%__MODULE__{endpoint: endpoint} = client, resource, opts \\ []) do 28 28 (opts ++ [method: :get, url: Atex.XRPC.url(endpoint, resource)]) 29 29 |> Req.new() 30 + |> Atex.Telemetry.attach_req_plugin(client_type: :unauthed) 30 31 |> Req.request() 31 32 |> case do 32 33 {:ok, response} -> {:ok, response, client} ··· 38 39 def post(%__MODULE__{endpoint: endpoint} = client, resource, opts \\ []) do 39 40 (opts ++ [method: :post, url: Atex.XRPC.url(endpoint, resource)]) 40 41 |> Req.new() 42 + |> Atex.Telemetry.attach_req_plugin(client_type: :unauthed) 41 43 |> Req.request() 42 44 |> case do 43 45 {:ok, response} -> {:ok, response, client}
+1
mix.exs
··· 46 46 {:bandit, "~> 1.0", only: [:dev, :test]}, 47 47 {:con_cache, "~> 1.1"}, 48 48 {:mutex, "~> 3.0"}, 49 + {:telemetry, "~> 1.0", optional: true}, 49 50 {:dasl, "~> 0.1"}, 50 51 {:mst, "~> 0.1"}, 51 52 {:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false},
+86
test/atex/identity_resolver_telemetry_test.exs
··· 1 + defmodule Atex.IdentityResolverTelemetryTest do 2 + use ExUnit.Case, async: true 3 + 4 + describe "resolve/2 telemetry" do 5 + test "emits resolve start/stop and cache miss on first call" do 6 + ref = make_ref() 7 + identifier = "did:plc:test-#{inspect(ref)}" 8 + 9 + :telemetry.attach_many( 10 + "test-resolver-#{inspect(ref)}", 11 + [ 12 + [:atex, :identity_resolver, :resolve, :start], 13 + [:atex, :identity_resolver, :resolve, :stop], 14 + [:atex, :identity_resolver, :cache, :miss], 15 + [:atex, :identity_resolver, :cache, :hit] 16 + ], 17 + fn event, measurements, metadata, _ -> 18 + send(self(), {:telemetry, event, measurements, metadata}) 19 + end, 20 + nil 21 + ) 22 + 23 + on_exit(fn -> :telemetry.detach("test-resolver-#{inspect(ref)}") end) 24 + 25 + # resolve will fail because it's not a real DID, but telemetry still fires 26 + Atex.IdentityResolver.resolve(identifier) 27 + 28 + assert_receive {:telemetry, [:atex, :identity_resolver, :resolve, :start], 29 + %{system_time: _}, %{identifier: ^identifier, identifier_type: :did}} 30 + 31 + assert_receive {:telemetry, [:atex, :identity_resolver, :cache, :miss], %{system_time: _}, 32 + %{identifier: ^identifier}} 33 + 34 + assert_receive {:telemetry, [:atex, :identity_resolver, :resolve, :stop], %{duration: _}, _} 35 + end 36 + 37 + test "emits cache hit event on repeated call for same identifier" do 38 + ref = make_ref() 39 + # Use a DID-style identifier that we can pre-populate in the cache 40 + identifier = "did:plc:cached-#{inspect(ref)}" 41 + 42 + :telemetry.attach( 43 + "test-resolver-cache-#{inspect(ref)}", 44 + [:atex, :identity_resolver, :cache, :hit], 45 + fn _event, _measurements, metadata, _ -> 46 + send(self(), {:cache_hit, metadata}) 47 + end, 48 + nil 49 + ) 50 + 51 + on_exit(fn -> :telemetry.detach("test-resolver-cache-#{inspect(ref)}") end) 52 + 53 + # First call — cache miss, resolution fails (not a real DID) 54 + Atex.IdentityResolver.resolve(identifier) 55 + 56 + # Pre-populate the cache directly so the second call gets a hit 57 + identity = %Atex.IdentityResolver.Identity{did: identifier, handle: nil, document: nil} 58 + Atex.IdentityResolver.Cache.insert(identity) 59 + 60 + # Second call — cache hit 61 + Atex.IdentityResolver.resolve(identifier) 62 + 63 + assert_receive {:cache_hit, %{identifier: ^identifier}} 64 + end 65 + 66 + test "identifier_type is :handle for non-did identifiers" do 67 + ref = make_ref() 68 + identifier = "user.bsky.social" 69 + 70 + :telemetry.attach( 71 + "test-resolver-handle-#{inspect(ref)}", 72 + [:atex, :identity_resolver, :resolve, :start], 73 + fn _event, _measurements, metadata, _ -> 74 + send(self(), {:start, metadata}) 75 + end, 76 + nil 77 + ) 78 + 79 + on_exit(fn -> :telemetry.detach("test-resolver-handle-#{inspect(ref)}") end) 80 + 81 + Atex.IdentityResolver.resolve(identifier) 82 + 83 + assert_receive {:start, %{identifier_type: :handle}} 84 + end 85 + end 86 + end
+95
test/atex/oauth/flow_test.exs
··· 81 81 {true, %JOSE.JWT{}, _} = JOSE.JWT.verify(JOSE.JWK.to_public(key), token) 82 82 end 83 83 end 84 + 85 + describe "telemetry" do 86 + setup do 87 + key = JOSE.JWK.generate_key({:ec, "P-256"}) 88 + key = %{key | fields: Map.put(key.fields, "kid", "test-kid")} 89 + %{key: key} 90 + end 91 + 92 + test "create_authorization_url/5 emits authorization_url start/stop events", %{key: key} do 93 + ref = make_ref() 94 + 95 + :telemetry.attach_many( 96 + "test-oauth-authz-url-#{inspect(ref)}", 97 + [ 98 + [:atex, :oauth, :authorization_url, :start], 99 + [:atex, :oauth, :authorization_url, :stop] 100 + ], 101 + fn event, measurements, metadata, _ -> 102 + send(self(), {:telemetry, event, measurements, metadata}) 103 + end, 104 + nil 105 + ) 106 + 107 + on_exit(fn -> :telemetry.detach("test-oauth-authz-url-#{inspect(ref)}") end) 108 + 109 + authz_metadata = %{ 110 + issuer: "https://bsky.social", 111 + par_endpoint: "https://bsky.social/oauth/par", 112 + token_endpoint: "https://bsky.social/oauth/token", 113 + authorization_endpoint: "https://bsky.social/oauth/authorize", 114 + revocation_endpoint: "https://bsky.social/oauth/revoke" 115 + } 116 + 117 + # This will fail (no real PAR server) but telemetry still fires 118 + Flow.create_authorization_url(authz_metadata, "state", "verifier", "user.bsky.social", 119 + key: key, 120 + client_id: "https://example.com/client", 121 + redirect_uri: "https://example.com/callback", 122 + scopes: "atproto" 123 + ) 124 + 125 + assert_receive {:telemetry, [:atex, :oauth, :authorization_url, :start], %{system_time: _}, 126 + %{issuer: "https://bsky.social"}} 127 + 128 + assert_receive {:telemetry, [:atex, :oauth, :authorization_url, :stop], %{duration: _}, 129 + %{issuer: "https://bsky.social"}} 130 + end 131 + 132 + test "revoke_tokens/2 emits token_revocation start/stop events" do 133 + ref = make_ref() 134 + 135 + :telemetry.attach_many( 136 + "test-oauth-revoke-#{inspect(ref)}", 137 + [ 138 + [:atex, :oauth, :token_revocation, :start], 139 + [:atex, :oauth, :token_revocation, :stop] 140 + ], 141 + fn event, measurements, metadata, _ -> 142 + send(self(), {:telemetry, event, measurements, metadata}) 143 + end, 144 + nil 145 + ) 146 + 147 + on_exit(fn -> :telemetry.detach("test-oauth-revoke-#{inspect(ref)}") end) 148 + 149 + session = %Atex.OAuth.Session{ 150 + iss: "https://bsky.social", 151 + aud: "https://bsky.social", 152 + sub: "did:plc:abc", 153 + nonce: "nonce", 154 + access_token: "token", 155 + refresh_token: "refresh", 156 + expires_at: NaiveDateTime.utc_now(), 157 + dpop_key: JOSE.JWK.generate_key({:ec, "P-256"}), 158 + dpop_nonce: nil 159 + } 160 + 161 + authz_metadata = %{ 162 + issuer: "https://bsky.social", 163 + par_endpoint: "https://bsky.social/oauth/par", 164 + token_endpoint: "https://bsky.social/oauth/token", 165 + authorization_endpoint: "https://bsky.social/oauth/authorize", 166 + revocation_endpoint: "https://bsky.social/oauth/revoke" 167 + } 168 + 169 + # Will fail (no real server) but telemetry fires 170 + Flow.revoke_tokens(session, authz_metadata) 171 + 172 + assert_receive {:telemetry, [:atex, :oauth, :token_revocation, :start], %{system_time: _}, 173 + %{issuer: "https://bsky.social"}} 174 + 175 + assert_receive {:telemetry, [:atex, :oauth, :token_revocation, :stop], %{duration: _}, 176 + %{issuer: "https://bsky.social"}} 177 + end 178 + end 84 179 end
+28
test/atex/service_auth_test.exs
··· 12 12 Atex.ServiceAuth.validate_jwt("", aud: "did:web:example.com") 13 13 end 14 14 end 15 + 16 + describe "validate_jwt/2 telemetry" do 17 + test "emits validate start and stop events even for invalid JWT" do 18 + ref = make_ref() 19 + 20 + :telemetry.attach_many( 21 + "test-service-auth-#{inspect(ref)}", 22 + [ 23 + [:atex, :service_auth, :validate, :start], 24 + [:atex, :service_auth, :validate, :stop] 25 + ], 26 + fn event, measurements, metadata, _ -> 27 + send(self(), {:telemetry, event, measurements, metadata}) 28 + end, 29 + nil 30 + ) 31 + 32 + on_exit(fn -> :telemetry.detach("test-service-auth-#{inspect(ref)}") end) 33 + 34 + Atex.ServiceAuth.validate_jwt("not.a.valid.jwt", aud: "did:web:example.com") 35 + 36 + assert_receive {:telemetry, [:atex, :service_auth, :validate, :start], %{system_time: _}, 37 + %{iss: nil, lxm: nil}} 38 + 39 + assert_receive {:telemetry, [:atex, :service_auth, :validate, :stop], %{duration: _}, 40 + %{iss: nil, lxm: nil}} 41 + end 42 + end 15 43 end
+269
test/atex/telemetry_test.exs
··· 1 + defmodule Atex.TelemetryTest do 2 + use ExUnit.Case, async: true 3 + 4 + describe "execute/3" do 5 + test "emits a telemetry event" do 6 + ref = make_ref() 7 + 8 + :telemetry.attach( 9 + "test-execute-#{inspect(ref)}", 10 + [:atex, :test, :event], 11 + fn event, measurements, metadata, _ -> 12 + send(self(), {:telemetry, event, measurements, metadata}) 13 + end, 14 + nil 15 + ) 16 + 17 + on_exit(fn -> :telemetry.detach("test-execute-#{inspect(ref)}") end) 18 + 19 + Atex.Telemetry.execute([:atex, :test, :event], %{count: 1}, %{key: "val"}) 20 + 21 + assert_receive {:telemetry, [:atex, :test, :event], %{count: 1}, %{key: "val"}} 22 + end 23 + end 24 + 25 + describe "span/3" do 26 + test "emits start and stop events and returns the result" do 27 + ref = make_ref() 28 + 29 + :telemetry.attach_many( 30 + "test-span-#{inspect(ref)}", 31 + [[:atex, :test, :span, :start], [:atex, :test, :span, :stop]], 32 + fn event, measurements, metadata, _ -> 33 + send(self(), {:telemetry, event, measurements, metadata}) 34 + end, 35 + nil 36 + ) 37 + 38 + on_exit(fn -> :telemetry.detach("test-span-#{inspect(ref)}") end) 39 + 40 + result = 41 + Atex.Telemetry.span([:atex, :test, :span], %{key: "start_val"}, fn -> 42 + {{:ok, :returned_value}, %{extra: "stop_val"}} 43 + end) 44 + 45 + assert result == {:ok, :returned_value} 46 + 47 + assert_receive {:telemetry, [:atex, :test, :span, :start], %{system_time: _}, 48 + %{key: "start_val"}} 49 + 50 + assert_receive {:telemetry, [:atex, :test, :span, :stop], %{duration: _}, 51 + %{key: "start_val", extra: "stop_val"}} 52 + end 53 + end 54 + 55 + defmodule TransportErrorPlug do 56 + @moduledoc false 57 + def init(opts), do: opts 58 + def call(conn, _opts), do: Req.Test.transport_error(conn, :closed) 59 + end 60 + 61 + defmodule OkPlug do 62 + @moduledoc false 63 + import Plug.Conn 64 + def init(opts), do: opts 65 + 66 + def call(conn, _opts) do 67 + conn |> send_resp(200, Jason.encode!(%{ok: true})) 68 + end 69 + end 70 + 71 + defmodule ErrorPlug do 72 + @moduledoc false 73 + import Plug.Conn 74 + def init(opts), do: opts 75 + 76 + def call(conn, _opts) do 77 + conn |> send_resp(500, Jason.encode!(%{error: "ServerError"})) 78 + end 79 + end 80 + 81 + describe "attach_req_plugin/2" do 82 + test "emits start and stop events on success" do 83 + ref = make_ref() 84 + 85 + :telemetry.attach_many( 86 + "test-plugin-success-#{inspect(ref)}", 87 + [[:atex, :xrpc, :request, :start], [:atex, :xrpc, :request, :stop]], 88 + fn event, measurements, metadata, _ -> 89 + send(self(), {:telemetry, event, measurements, metadata}) 90 + end, 91 + nil 92 + ) 93 + 94 + on_exit(fn -> :telemetry.detach("test-plugin-success-#{inspect(ref)}") end) 95 + 96 + req = 97 + Req.new( 98 + method: :get, 99 + url: "http://bsky.social/xrpc/app.bsky.actor.getProfile", 100 + plug: OkPlug 101 + ) 102 + |> Atex.Telemetry.attach_req_plugin(client_type: :login) 103 + 104 + {:ok, _response} = Req.request(req) 105 + 106 + assert_receive {:telemetry, [:atex, :xrpc, :request, :start], %{system_time: _}, 107 + %{ 108 + method: :get, 109 + resource: "app.bsky.actor.getProfile", 110 + endpoint: "http://bsky.social", 111 + client_type: :login 112 + }} 113 + 114 + assert_receive {:telemetry, [:atex, :xrpc, :request, :stop], %{duration: _}, 115 + %{ 116 + status: 200, 117 + method: :get, 118 + resource: "app.bsky.actor.getProfile", 119 + endpoint: "http://bsky.social", 120 + client_type: :login 121 + }} 122 + end 123 + 124 + test "includes status code from non-200 responses in stop event" do 125 + ref = make_ref() 126 + 127 + :telemetry.attach( 128 + "test-plugin-error-#{inspect(ref)}", 129 + [:atex, :xrpc, :request, :stop], 130 + fn _event, _measurements, metadata, _ -> 131 + send(self(), {:stop, metadata}) 132 + end, 133 + nil 134 + ) 135 + 136 + on_exit(fn -> :telemetry.detach("test-plugin-error-#{inspect(ref)}") end) 137 + 138 + req = 139 + Req.new( 140 + method: :get, 141 + url: "http://bsky.social/xrpc/app.bsky.actor.getProfile", 142 + plug: ErrorPlug, 143 + retry: false 144 + ) 145 + |> Atex.Telemetry.attach_req_plugin(client_type: :login) 146 + 147 + {:ok, _response} = Req.request(req) 148 + 149 + assert_receive {:stop, 150 + %{status: 500, resource: "app.bsky.actor.getProfile", client_type: :login}} 151 + end 152 + 153 + test "emits exception event on transport error" do 154 + ref = make_ref() 155 + 156 + :telemetry.attach( 157 + "test-plugin-exception-#{inspect(ref)}", 158 + [:atex, :xrpc, :request, :exception], 159 + fn _event, _measurements, metadata, _ -> 160 + send(self(), {:exception, metadata}) 161 + end, 162 + nil 163 + ) 164 + 165 + on_exit(fn -> :telemetry.detach("test-plugin-exception-#{inspect(ref)}") end) 166 + 167 + req = 168 + Req.new( 169 + method: :get, 170 + url: "http://bsky.social/xrpc/app.bsky.actor.getProfile", 171 + plug: TransportErrorPlug, 172 + retry: false 173 + ) 174 + |> Atex.Telemetry.attach_req_plugin(client_type: :login) 175 + 176 + {:error, _reason} = Req.request(req) 177 + 178 + assert_receive {:exception, 179 + %{ 180 + kind: :error, 181 + reason: %Req.TransportError{reason: :closed}, 182 + resource: "app.bsky.actor.getProfile", 183 + client_type: :login 184 + }} 185 + end 186 + 187 + test "no-op when telemetry not attached — request still succeeds" do 188 + req = 189 + Req.new( 190 + method: :get, 191 + url: "http://bsky.social/xrpc/app.bsky.actor.getProfile", 192 + plug: OkPlug 193 + ) 194 + |> Atex.Telemetry.attach_req_plugin(client_type: :login) 195 + 196 + assert {:ok, %{status: 200}} = Req.request(req) 197 + end 198 + end 199 + 200 + describe "XRPC client instrumentation" do 201 + defmodule XRPCPlug do 202 + @moduledoc false 203 + import Plug.Conn 204 + def init(opts), do: opts 205 + 206 + def call(conn, _opts) do 207 + conn |> send_resp(200, Jason.encode!(%{did: "did:plc:abc", handle: "user.bsky.social"})) 208 + end 209 + end 210 + 211 + test "LoginClient emits request start/stop events" do 212 + ref = make_ref() 213 + 214 + :telemetry.attach_many( 215 + "test-login-client-#{inspect(ref)}", 216 + [[:atex, :xrpc, :request, :start], [:atex, :xrpc, :request, :stop]], 217 + fn event, measurements, metadata, _ -> 218 + send(self(), {:telemetry, event, measurements, metadata}) 219 + end, 220 + nil 221 + ) 222 + 223 + on_exit(fn -> :telemetry.detach("test-login-client-#{inspect(ref)}") end) 224 + 225 + client = Atex.XRPC.LoginClient.new("http://bsky.social", "fake-token", nil) 226 + 227 + Atex.XRPC.get(client, "com.atproto.identity.resolveHandle", 228 + plug: XRPCPlug, 229 + params: [handle: "user.bsky.social"] 230 + ) 231 + 232 + assert_receive {:telemetry, [:atex, :xrpc, :request, :start], %{system_time: _}, 233 + %{ 234 + method: :get, 235 + resource: "com.atproto.identity.resolveHandle", 236 + client_type: :login 237 + }} 238 + 239 + assert_receive {:telemetry, [:atex, :xrpc, :request, :stop], %{duration: _}, 240 + %{status: 200, client_type: :login}} 241 + end 242 + 243 + test "unauthed_get emits request start/stop events" do 244 + ref = make_ref() 245 + 246 + :telemetry.attach_many( 247 + "test-unauthed-#{inspect(ref)}", 248 + [[:atex, :xrpc, :request, :start], [:atex, :xrpc, :request, :stop]], 249 + fn event, measurements, metadata, _ -> 250 + send(self(), {:telemetry, event, measurements, metadata}) 251 + end, 252 + nil 253 + ) 254 + 255 + on_exit(fn -> :telemetry.detach("test-unauthed-#{inspect(ref)}") end) 256 + 257 + Atex.XRPC.unauthed_get("http://bsky.social", "com.atproto.identity.resolveHandle", 258 + plug: XRPCPlug, 259 + params: [handle: "user.bsky.social"] 260 + ) 261 + 262 + assert_receive {:telemetry, [:atex, :xrpc, :request, :start], %{system_time: _}, 263 + %{resource: "com.atproto.identity.resolveHandle", client_type: :unauthed}} 264 + 265 + assert_receive {:telemetry, [:atex, :xrpc, :request, :stop], %{duration: _}, 266 + %{status: 200, client_type: :unauthed}} 267 + end 268 + end 269 + end