An Elixir toolkit for the AT Protocol. hexdocs.pm/atex
elixir bluesky atproto decentralization
25
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(service_auth): return {:error, :invalid_jwt} for malformed JWTs instead of raising

+56 -28
+41 -28
lib/atex/service_auth.ex
··· 103 103 def validate_jwt(jwt, opts \\ []) do 104 104 {expected_aud, expected_lxm} = options(opts) 105 105 106 - %{ 107 - fields: 108 - %{ 109 - "aud" => target_aud, 110 - "iat" => iat, 111 - "exp" => exp, 112 - "iss" => issuing_did, 113 - "jti" => nonce 114 - } = fields 115 - } = JOSE.JWT.peek(jwt) 106 + peek_result = 107 + try do 108 + peeked = JOSE.JWT.peek(jwt) 109 + {:ok, peeked} 110 + rescue 111 + _ -> {:error, :invalid_jwt} 112 + end 113 + 114 + case peek_result do 115 + {:error, _} = err -> 116 + err 116 117 117 - target_lxm = Map.get(fields, "lxm") 118 + {:ok, 119 + %{ 120 + fields: 121 + %{ 122 + "aud" => target_aud, 123 + "iat" => iat, 124 + "exp" => exp, 125 + "iss" => issuing_did, 126 + "jti" => nonce 127 + } = fields 128 + }} -> 129 + target_lxm = Map.get(fields, "lxm") 118 130 119 - with :ok <- validate_aud(expected_aud, target_aud), 120 - :ok <- validate_lxm(expected_lxm, target_lxm), 121 - :ok <- validate_token_times(iat, exp), 122 - # Resolve JWT's issuer to: a) make sure it's a real identity, b) get 123 - # the signing key from their DID document to verify the token 124 - {:ok, identity} <- Atex.IdentityResolver.resolve(issuing_did), 125 - user_jwk when not is_nil(user_jwk) <- 126 - Atex.DID.Document.get_atproto_signing_key(identity.document), 127 - {true, %JOSE.JWT{} = jwt_struct, _jws} <- JOSE.JWT.verify(user_jwk, jwt), 128 - # Record the nonce atomically after successful verification. insert_new 129 - # is used under the hood so this returns :seen if the jti was already 130 - # consumed, preventing replay attacks. 131 - :ok <- Atex.ServiceAuth.JTICache.put(nonce, exp) do 132 - {:ok, jwt_struct} 133 - else 134 - :seen -> {:error, :replayed_token} 135 - err -> err 131 + with :ok <- validate_aud(expected_aud, target_aud), 132 + :ok <- validate_lxm(expected_lxm, target_lxm), 133 + :ok <- validate_token_times(iat, exp), 134 + # Resolve JWT's issuer to: a) make sure it's a real identity, b) get 135 + # the signing key from their DID document to verify the token 136 + {:ok, identity} <- Atex.IdentityResolver.resolve(issuing_did), 137 + user_jwk when not is_nil(user_jwk) <- 138 + Atex.DID.Document.get_atproto_signing_key(identity.document), 139 + {true, %JOSE.JWT{} = jwt_struct, _jws} <- JOSE.JWT.verify(user_jwk, jwt), 140 + # Record the nonce atomically after successful verification. insert_new 141 + # is used under the hood so this returns :seen if the jti was already 142 + # consumed, preventing replay attacks. 143 + :ok <- Atex.ServiceAuth.JTICache.put(nonce, exp) do 144 + {:ok, jwt_struct} 145 + else 146 + :seen -> {:error, :replayed_token} 147 + err -> err 148 + end 136 149 end 137 150 end 138 151
+15
test/atex/service_auth_test.exs
··· 1 + defmodule Atex.ServiceAuthTest do 2 + use ExUnit.Case, async: true 3 + 4 + describe "validate_jwt/2" do 5 + test "returns {:error, :invalid_jwt} for a malformed token string" do 6 + assert {:error, :invalid_jwt} = 7 + Atex.ServiceAuth.validate_jwt("not.a.valid.jwt", aud: "did:web:example.com") 8 + end 9 + 10 + test "returns {:error, :invalid_jwt} for an empty string" do 11 + assert {:error, :invalid_jwt} = 12 + Atex.ServiceAuth.validate_jwt("", aud: "did:web:example.com") 13 + end 14 + end 15 + end