Select the types of activity you want to include in your feed.
Stateless auth proxy that converts AT Protocol native apps from public to confidential OAuth clients. Deploy once, get 180-day refresh tokens instead of 24-hour ones.
···11+package main
22+33+import (
44+ "context"
55+ "crypto/sha256"
66+ "encoding/hex"
77+ "sync"
88+ "time"
99+)
1010+1111+const (
1212+ refreshCacheTTL = 10 * time.Minute
1313+ refreshCacheMaxSize = 10_000
1414+)
1515+1616+// cachedTokenResponse is the portion of a successful /oauth/token proxy
1717+// response we replay to clients whose original response was lost in transit
1818+// (TCP reset, app backgrounded before reading the body, network blip between
1919+// proxy and client, etc.). Because AT Protocol refresh tokens rotate on use,
2020+// the client only has one shot — if the original response never lands, the
2121+// token is consumed on the authorization server but the client is stuck with
2222+// the old value and every retry gets "invalid_grant: Refresh token replayed".
2323+type cachedTokenResponse struct {
2424+ response *upstreamResponse
2525+ usedKeyID string
2626+}
2727+2828+type refreshCacheEntry struct {
2929+ ready chan struct{}
3030+ result *cachedTokenResponse
3131+ expires time.Time
3232+}
3333+3434+// refreshCache combines a short-TTL idempotency cache with a single-flight
3535+// gate. Concurrent requests for the same refresh token coalesce onto one
3636+// upstream call; retries after a lost response pick up the cached result.
3737+// Negative outcomes (4xx/5xx from upstream, or upstream transport failures)
3838+// are not cached — the authorization server is authoritative on whether a
3939+// given token is still alive.
4040+type refreshCache struct {
4141+ mu sync.Mutex
4242+ entries map[string]*refreshCacheEntry
4343+}
4444+4545+func newRefreshCache() *refreshCache {
4646+ return &refreshCache{entries: make(map[string]*refreshCacheEntry)}
4747+}
4848+4949+func refreshCacheKey(refreshToken string) string {
5050+ sum := sha256.Sum256([]byte(refreshToken))
5151+ return hex.EncodeToString(sum[:])
5252+}
5353+5454+// acquire returns the cache entry for the given refresh token and reports
5555+// whether the caller is the leader (must perform the upstream call and then
5656+// call finalize or release) or a follower (should wait on the entry).
5757+func (c *refreshCache) acquire(refreshToken string) (entry *refreshCacheEntry, isLeader bool) {
5858+ key := refreshCacheKey(refreshToken)
5959+ c.mu.Lock()
6060+ defer c.mu.Unlock()
6161+ c.sweepLocked()
6262+6363+ if existing, ok := c.entries[key]; ok {
6464+ return existing, false
6565+ }
6666+6767+ entry = &refreshCacheEntry{
6868+ ready: make(chan struct{}),
6969+ expires: time.Now().Add(refreshCacheTTL),
7070+ }
7171+ c.entries[key] = entry
7272+ return entry, true
7373+}
7474+7575+// finalize records a successful upstream response and wakes any followers.
7676+// The entry remains in the cache for its TTL so later retries whose original
7777+// response was lost can pick up the same rotated token.
7878+func (c *refreshCache) finalize(entry *refreshCacheEntry, result *cachedTokenResponse) {
7979+ c.mu.Lock()
8080+ entry.result = result
8181+ c.mu.Unlock()
8282+ close(entry.ready)
8383+}
8484+8585+// release removes a non-cacheable entry and wakes any followers with no
8686+// cached result so they can fall through to a fresh upstream attempt.
8787+func (c *refreshCache) release(refreshToken string, entry *refreshCacheEntry) {
8888+ key := refreshCacheKey(refreshToken)
8989+ c.mu.Lock()
9090+ if current, ok := c.entries[key]; ok && current == entry {
9191+ delete(c.entries, key)
9292+ }
9393+ c.mu.Unlock()
9494+ close(entry.ready)
9595+}
9696+9797+// wait blocks until the entry is finalized or released, honoring context
9898+// cancellation. Returns nil if the entry was released or the context ended.
9999+func (c *refreshCache) wait(ctx context.Context, entry *refreshCacheEntry) *cachedTokenResponse {
100100+ select {
101101+ case <-entry.ready:
102102+ case <-ctx.Done():
103103+ return nil
104104+ }
105105+ c.mu.Lock()
106106+ defer c.mu.Unlock()
107107+ return entry.result
108108+}
109109+110110+// sweepLocked drops expired entries that have already settled. In-flight
111111+// entries are left alone regardless of TTL — the leader is responsible for
112112+// calling finalize or release, after which the next sweep will reap them.
113113+// Called under c.mu.
114114+func (c *refreshCache) sweepLocked() {
115115+ now := time.Now()
116116+ for key, entry := range c.entries {
117117+ select {
118118+ case <-entry.ready:
119119+ if now.After(entry.expires) {
120120+ delete(c.entries, key)
121121+ }
122122+ default:
123123+ }
124124+ }
125125+ if len(c.entries) < refreshCacheMaxSize {
126126+ return
127127+ }
128128+ // Bound memory under pathological load: drop settled entries until we
129129+ // are back under the cap. In-flight entries are never evicted.
130130+ for key, entry := range c.entries {
131131+ if len(c.entries) < refreshCacheMaxSize {
132132+ break
133133+ }
134134+ select {
135135+ case <-entry.ready:
136136+ delete(c.entries, key)
137137+ default:
138138+ }
139139+ }
140140+}
+38-1
handler_token.go
···1818 RefreshToken string `json:"refresh_token,omitempty"`
1919}
20202121-func HandleToken(signers *SignerSet, clientID string) http.HandlerFunc {
2121+func HandleToken(signers *SignerSet, clientID string, cache *refreshCache) http.HandlerFunc {
2222 return func(w http.ResponseWriter, r *http.Request) {
2323 var req tokenRequest
2424 if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
···4242 if err := ValidateTokenEndpointForIssuer(r.Context(), req.Issuer, req.TokenEndpoint); err != nil {
4343 writeAPIError(w, err)
4444 return
4545+ }
4646+4747+ // Idempotency: refresh tokens rotate single-use. If a client's original
4848+ // response was lost in transit, the token is spent upstream but the
4949+ // client still holds the old value. Coalesce concurrent duplicates and
5050+ // briefly cache successful upstream responses so retries recover the
5151+ // rotated token instead of getting "Refresh token replayed".
5252+ var cacheSlot *refreshCacheEntry
5353+ if req.GrantType == "refresh_token" && req.RefreshToken != "" {
5454+ entry, isLeader := cache.acquire(req.RefreshToken)
5555+ if !isLeader {
5656+ if cached := cache.wait(r.Context(), entry); cached != nil {
5757+ w.Header().Set(authProxyKeyIDHeader, cached.usedKeyID)
5858+ if err := WriteProxiedResponse(w, cached.response); err != nil {
5959+ log.Printf("failed to write proxied response: %v", err)
6060+ }
6161+ return
6262+ }
6363+ // Leader released (non-cacheable outcome) or context ended.
6464+ // Fall through and make our own upstream attempt without
6565+ // holding a cache slot; avoids looping on contention.
6666+ } else {
6767+ cacheSlot = entry
6868+ defer func() {
6969+ if cacheSlot != nil {
7070+ cache.release(req.RefreshToken, cacheSlot)
7171+ }
7272+ }()
7373+ }
4574 }
46754776 candidateKeyIDs, err := signers.CandidateKeyIDs(req.KeyID)
···103132 }
104133105134 break
135135+ }
136136+137137+ // Finalize the cache entry before writing to the client so concurrent
138138+ // followers can unblock during the write. Cache only 2xx responses —
139139+ // negative outcomes are authoritative and shouldn't mask a later retry.
140140+ if cacheSlot != nil && proxied != nil && proxied.statusCode >= 200 && proxied.statusCode < 300 {
141141+ cache.finalize(cacheSlot, &cachedTokenResponse{response: proxied, usedKeyID: usedKeyID})
142142+ cacheSlot = nil
106143 }
107144108145 w.Header().Set(authProxyKeyIDHeader, usedKeyID)