mount public data from the atmosphere to a virtual filesystem (macos only) pdfs.at
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(atproto/oauth): single-flight refresh + DPoP key GC on re-sign-in

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

+95
+19
Packages/ATProto/Sources/ATProto/OAuth/OAuthAuthTokenProvider.swift
··· 8 8 let clientId: String 9 9 let session: URLSession 10 10 11 + /// Coalesces concurrent `reportTokenRejected` calls onto one refresh Task. 12 + /// Without this, two parallel 401s would fire two refresh POSTs; the 13 + /// second uses the already-rotated refresh_token and fails. 14 + private var inFlightRefresh: Task<Void, Never>? 15 + 11 16 public init( 12 17 did: DID, 13 18 tokenStore: any TokenStore, ··· 57 62 } 58 63 59 64 public func reportTokenRejected() async { 65 + // If a refresh is already in flight, join it instead of starting a 66 + // second one. This is critical: atproto refresh tokens rotate per 67 + // use, so concurrent refresh attempts would race and break the chain. 68 + if let existing = inFlightRefresh { 69 + await existing.value 70 + return 71 + } 72 + let task = Task { await performRefresh() } 73 + inFlightRefresh = task 74 + await task.value 75 + inFlightRefresh = nil 76 + } 77 + 78 + private func performRefresh() async { 60 79 // Attempt a refresh; best-effort. 61 80 do { 62 81 guard let tokens = try await tokenStore.load(did: did),
+9
Packages/ATProto/Sources/ATProto/OAuth/OAuthCoordinator.swift
··· 179 179 dpopKey: dpopKey 180 180 ) 181 181 182 + // Capture the previous DPoP key identifier (if any) so we can GC it 183 + // after the new tokens are saved. We read it before the save so a 184 + // crash between save and delete just leaves a harmless orphan key. 185 + let previousKeyIdentifier = try? await tokenStore.load(did: did)?.dpopKeyIdentifier 186 + 182 187 let stored = StoredTokens( 183 188 did: did, 184 189 accessToken: tokens.accessToken, ··· 189 194 issuer: authMeta.issuer 190 195 ) 191 196 try await tokenStore.save(stored) 197 + 198 + if let previousKeyIdentifier, previousKeyIdentifier != keyIdentifier { 199 + try? await tokenStore.deleteDPoPKey(identifier: previousKeyIdentifier) 200 + } 192 201 } 193 202 194 203 private func redirectSchemeFromRedirectURI(_ uri: String) -> String {
+7
Packages/ATProto/Sources/ATProto/OAuth/Storage/TokenStore.swift
··· 9 9 /// always has access to the matching key. 10 10 func saveDPoPKey(_ key: any DPoPKey, identifier: String) async throws 11 11 func loadDPoPKey(identifier: String) async throws -> (any DPoPKey)? 12 + /// Removes a DPoP key by identifier. Called after re-sign-in so old 13 + /// keys don't accumulate in the store. 14 + func deleteDPoPKey(identifier: String) async throws 12 15 } 13 16 14 17 public actor InMemoryTokenStore: TokenStore { ··· 39 42 40 43 public func loadDPoPKey(identifier: String) async throws -> (any DPoPKey)? { 41 44 keys[identifier] 45 + } 46 + 47 + public func deleteDPoPKey(identifier: String) async throws { 48 + keys.removeValue(forKey: identifier) 42 49 } 43 50 }
+50
Packages/ATProto/Tests/ATProtoTests/OAuth/OAuthAuthTokenProviderTests.swift
··· 68 68 #expect(stored == "n1") 69 69 } 70 70 71 + @Test("concurrent reportTokenRejected calls coalesce into one refresh HTTP call") 72 + func singleFlightRefresh() async throws { 73 + let did = DID("did:plc:abc")! 74 + let key = try InMemoryES256DPoPKey.generate() 75 + let store = InMemoryTokenStore() 76 + try await store.saveDPoPKey(key, identifier: "k1") 77 + try await store.save(StoredTokens( 78 + did: did, accessToken: "OLD", refreshToken: "RT", 79 + expiresAt: Date().addingTimeInterval(3600), 80 + scopes: [.atproto], dpopKeyIdentifier: "k1", 81 + issuer: "https://bsky.social" 82 + )) 83 + 84 + actor Counter { var count = 0; func inc() { count += 1 } } 85 + let counter = Counter() 86 + let session = URLProtocolStub.install { _ in 87 + Task { await counter.inc() } 88 + return .json(#""" 89 + {"access_token":"NEW","refresh_token":"RT2","token_type":"DPoP", 90 + "expires_in":3600,"scope":"atproto","sub":"did:plc:abc"} 91 + """#) 92 + } 93 + defer { URLProtocolStub.reset(session: session) } 94 + 95 + let provider = OAuthAuthTokenProvider( 96 + did: did, tokenStore: store, nonceStore: DPoPNonceStore(), 97 + tokenEndpoint: URL(string: "https://bsky.social/oauth/token")!, 98 + clientId: "https://pdfs.at/oauth/client-metadata.json", 99 + session: session 100 + ) 101 + 102 + // Fire 5 concurrent reportTokenRejected calls. 103 + await withTaskGroup(of: Void.self) { group in 104 + for _ in 0..<5 { 105 + group.addTask { await provider.reportTokenRejected() } 106 + } 107 + } 108 + 109 + // Give any stray async Task.inc() calls a tick to land. 110 + try await Task.sleep(for: .milliseconds(50)) 111 + 112 + // Only ONE HTTP call should have fired despite 5 concurrent callers. 113 + let final = await counter.count 114 + #expect(final == 1) 115 + 116 + // Tokens are still updated once. 117 + let loaded = try await store.load(did: did) 118 + #expect(loaded?.accessToken == "NEW") 119 + } 120 + 71 121 @Test("reportTokenRejected refreshes tokens via refresh_token grant") 72 122 func tokenRejected() async throws { 73 123 let did = DID("did:plc:abc")!
+10
Packages/ATProto/Tests/ATProtoTests/OAuth/TokenStoreTests.swift
··· 36 36 #expect(try await store.load(did: did) == nil) 37 37 } 38 38 39 + @Test("deleteDPoPKey removes the key") 40 + func deleteKey() async throws { 41 + let store = InMemoryTokenStore() 42 + let key = try InMemoryES256DPoPKey.generate() 43 + try await store.saveDPoPKey(key, identifier: "k1") 44 + #expect(try await store.loadDPoPKey(identifier: "k1") != nil) 45 + try await store.deleteDPoPKey(identifier: "k1") 46 + #expect(try await store.loadDPoPKey(identifier: "k1") == nil) 47 + } 48 + 39 49 @Test("list returns all DIDs") 40 50 func list() async throws { 41 51 let store = InMemoryTokenStore()