A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

try and lock session get/update

+47 -5
+47 -5
pkg/auth/oauth/client.go
··· 9 9 "fmt" 10 10 "log/slog" 11 11 "strings" 12 + "sync" 12 13 "time" 13 14 14 15 "atcr.io/pkg/atproto" ··· 146 147 type Refresher struct { 147 148 clientApp *oauth.ClientApp 148 149 uiSessionStore UISessionStore // For invalidating UI sessions on OAuth failures 150 + didLocks sync.Map // Per-DID mutexes to prevent concurrent DPoP nonce races 149 151 } 150 152 151 153 // NewRefresher creates a new session refresher ··· 162 164 163 165 // GetSession gets a fresh OAuth session for a DID 164 166 // Loads session from database on every request (database is source of truth) 167 + // Uses per-DID locking to prevent concurrent requests from racing on DPoP nonce updates 168 + // 169 + // Why locking is critical: 170 + // During docker push, multiple layers upload concurrently. Each layer creates a new 171 + // ClientSession by loading from database. Without locking, this race condition occurs: 172 + // 1. Layer A loads session with stale DPoP nonce from DB 173 + // 2. Layer B loads session with same stale nonce (A hasn't updated DB yet) 174 + // 3. Layer A makes request → 401 "use_dpop_nonce" → gets fresh nonce → saves to DB 175 + // 4. Layer B makes request → 401 "use_dpop_nonce" (using stale nonce from step 2) 176 + // 5. DPoP nonce thrashing continues, eventually causing 500 errors 177 + // 178 + // With per-DID locking: 179 + // 1. Layer A acquires lock, loads session, handles nonce negotiation, saves, releases lock 180 + // 2. Layer B acquires lock AFTER A releases, loads fresh nonce from DB, succeeds 165 181 func (r *Refresher) GetSession(ctx context.Context, did string) (*oauth.ClientSession, error) { 166 - return r.resumeSession(ctx, did) 182 + // Get or create a mutex for this DID to prevent concurrent session loads 183 + // This prevents DPoP nonce race conditions when multiple layers upload simultaneously 184 + mutexInterface, _ := r.didLocks.LoadOrStore(did, &sync.Mutex{}) 185 + mutex := mutexInterface.(*sync.Mutex) 186 + 187 + // Serialize session loading per DID 188 + mutex.Lock() 189 + defer mutex.Unlock() 190 + 191 + slog.Debug("Acquired session lock for DID", 192 + "component", "oauth/refresher", 193 + "did", did) 194 + 195 + session, err := r.resumeSession(ctx, did) 196 + if err != nil { 197 + return nil, err 198 + } 199 + 200 + slog.Debug("Released session lock for DID", 201 + "component", "oauth/refresher", 202 + "did", did) 203 + 204 + return session, nil 167 205 } 168 206 169 207 // resumeSession loads a session from storage ··· 213 251 } 214 252 215 253 // Set up callback to persist token updates to SQLite 216 - // This ensures that when indigo automatically refreshes tokens, 217 - // the new tokens are saved to the database immediately 254 + // This ensures that when indigo automatically refreshes tokens or updates DPoP nonces, 255 + // the new state is saved to the database immediately 218 256 session.PersistSessionCallback = func(callbackCtx context.Context, updatedData *oauth.ClientSessionData) { 219 257 if err := r.clientApp.Store.SaveSession(callbackCtx, *updatedData); err != nil { 220 258 slog.Error("Failed to persist OAuth session update", ··· 223 261 "sessionID", sessionID, 224 262 "error", err) 225 263 } else { 226 - slog.Debug("Persisted OAuth token refresh to database", 264 + // Log session updates (token refresh, DPoP nonce updates, etc.) 265 + // Note: updatedData contains the full session state including DPoP nonce, 266 + // but we don't log sensitive data like tokens or nonces themselves 267 + slog.Debug("Persisted OAuth session update to database", 227 268 "component", "oauth/refresher", 228 269 "did", did, 229 - "sessionID", sessionID) 270 + "sessionID", sessionID, 271 + "hint", "This includes token refresh and DPoP nonce updates") 230 272 } 231 273 } 232 274 return session, nil