A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

locks locks locks locks

+265 -323
+12 -35
cmd/appview/serve.go
··· 14 14 "syscall" 15 15 "time" 16 16 17 - "github.com/bluesky-social/indigo/atproto/syntax" 18 17 "github.com/distribution/distribution/v3/registry" 19 18 "github.com/distribution/distribution/v3/registry/handlers" 20 19 "github.com/spf13/cobra" ··· 186 185 } else { 187 186 // Register UI routes with dependencies 188 187 routes.RegisterUIRoutes(mainRouter, routes.UIDependencies{ 189 - Database: uiDatabase, 190 - ReadOnlyDB: uiReadOnlyDB, 191 - SessionStore: uiSessionStore, 188 + Database: uiDatabase, 189 + ReadOnlyDB: uiReadOnlyDB, 190 + SessionStore: uiSessionStore, 192 191 OAuthClientApp: oauthClientApp, 193 - OAuthStore: oauthStore, 194 - Refresher: refresher, 195 - BaseURL: baseURL, 196 - DeviceStore: deviceStore, 197 - HealthChecker: healthChecker, 198 - ReadmeCache: readmeCache, 199 - Templates: uiTemplates, 192 + OAuthStore: oauthStore, 193 + Refresher: refresher, 194 + BaseURL: baseURL, 195 + DeviceStore: deviceStore, 196 + HealthChecker: healthChecker, 197 + ReadmeCache: readmeCache, 198 + Templates: uiTemplates, 200 199 }) 201 200 } 202 201 } ··· 215 214 oauthServer.SetPostAuthCallback(func(ctx context.Context, did, handle, pdsEndpoint, sessionID string) error { 216 215 slog.Debug("OAuth post-auth callback", "component", "appview/callback", "did", did) 217 216 218 - // Parse DID for session resume 219 - didParsed, err := syntax.ParseDID(did) 220 - if err != nil { 221 - slog.Warn("Failed to parse DID", "component", "appview/callback", "did", did, "error", err) 222 - return nil // Non-fatal 223 - } 224 - 225 - // Resume OAuth session to get authenticated client 226 - session, err := oauthClientApp.ResumeSession(ctx, didParsed, sessionID) 227 - if err != nil { 228 - slog.Warn("Failed to resume session", "component", "appview/callback", "did", did, "error", err) 229 - // Fallback: update user without avatar 230 - _ = db.UpsertUser(uiDatabase, &db.User{ 231 - DID: did, 232 - Handle: handle, 233 - PDSEndpoint: pdsEndpoint, 234 - Avatar: "", 235 - LastSeen: time.Now(), 236 - }) 237 - return nil // Non-fatal 238 - } 239 - 240 - // Create authenticated atproto client using the indigo session's API client 241 - client := atproto.NewClientWithIndigoClient(pdsEndpoint, did, session.APIClient()) 217 + // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety) 218 + client := atproto.NewClientWithSessionProvider(pdsEndpoint, did, refresher) 242 219 243 220 // Ensure sailor profile exists (creates with default hold if configured) 244 221 slog.Debug("Ensuring profile exists", "component", "appview/callback", "did", did, "default_hold_did", defaultHoldDID)
+9 -37
pkg/appview/handlers/api.go
··· 43 43 return 44 44 } 45 45 46 - // Get OAuth session for the authenticated user 47 - slog.Debug("Getting OAuth session for star", "user_did", user.DID) 48 - session, err := h.Refresher.GetSession(r.Context(), user.DID) 49 - if err != nil { 50 - slog.Warn("Failed to get OAuth session for star", "user_did", user.DID, "error", err) 51 - http.Error(w, fmt.Sprintf("Failed to get OAuth session: %v", err), http.StatusUnauthorized) 52 - return 53 - } 54 - 55 - // Get user's PDS client (use indigo's API client which handles DPoP automatically) 56 - apiClient := session.APIClient() 57 - pdsClient := atproto.NewClientWithIndigoClient(user.PDSEndpoint, user.DID, apiClient) 46 + // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety) 47 + slog.Debug("Creating PDS client for star", "user_did", user.DID) 48 + pdsClient := atproto.NewClientWithSessionProvider(user.PDSEndpoint, user.DID, h.Refresher) 58 49 59 50 // Create star record 60 51 starRecord := atproto.NewStarRecord(ownerDID, repository) ··· 106 97 return 107 98 } 108 99 109 - // Get OAuth session for the authenticated user 110 - slog.Debug("Getting OAuth session for unstar", "user_did", user.DID) 111 - session, err := h.Refresher.GetSession(r.Context(), user.DID) 112 - if err != nil { 113 - slog.Warn("Failed to get OAuth session for unstar", "user_did", user.DID, "error", err) 114 - http.Error(w, fmt.Sprintf("Failed to get OAuth session: %v", err), http.StatusUnauthorized) 115 - return 116 - } 117 - 118 - // Get user's PDS client (use indigo's API client which handles DPoP automatically) 119 - apiClient := session.APIClient() 120 - pdsClient := atproto.NewClientWithIndigoClient(user.PDSEndpoint, user.DID, apiClient) 100 + // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety) 101 + slog.Debug("Creating PDS client for unstar", "user_did", user.DID) 102 + pdsClient := atproto.NewClientWithSessionProvider(user.PDSEndpoint, user.DID, h.Refresher) 121 103 122 104 // Delete star record from user's PDS 123 105 rkey := atproto.StarRecordKey(ownerDID, repository) ··· 172 154 return 173 155 } 174 156 175 - // Get OAuth session for the authenticated user 176 - session, err := h.Refresher.GetSession(r.Context(), user.DID) 177 - if err != nil { 178 - slog.Debug("Failed to get OAuth session for check star", "user_did", user.DID, "error", err) 179 - // No OAuth session - return not starred 180 - w.Header().Set("Content-Type", "application/json") 181 - json.NewEncoder(w).Encode(map[string]bool{"starred": false}) 182 - return 183 - } 184 - 185 - // Get user's PDS client (use indigo's API client which handles DPoP automatically) 186 - apiClient := session.APIClient() 187 - pdsClient := atproto.NewClientWithIndigoClient(user.PDSEndpoint, user.DID, apiClient) 157 + // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety) 158 + // Note: Error handling moves to the PDS call - if session doesn't exist, GetRecord will fail 159 + pdsClient := atproto.NewClientWithSessionProvider(user.PDSEndpoint, user.DID, h.Refresher) 188 160 189 161 // Check if star record exists 190 162 rkey := atproto.StarRecordKey(ownerDID, repository)
+4 -20
pkg/appview/handlers/images.go
··· 30 30 repo := chi.URLParam(r, "repository") 31 31 tag := chi.URLParam(r, "tag") 32 32 33 - // Get OAuth session for the authenticated user 34 - session, err := h.Refresher.GetSession(r.Context(), user.DID) 35 - if err != nil { 36 - http.Error(w, fmt.Sprintf("Failed to get OAuth session: %v", err), http.StatusUnauthorized) 37 - return 38 - } 39 - 40 - // Create ATProto client with OAuth credentials 41 - apiClient := session.APIClient() 42 - pdsClient := atproto.NewClientWithIndigoClient(user.PDSEndpoint, user.DID, apiClient) 33 + // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety) 34 + pdsClient := atproto.NewClientWithSessionProvider(user.PDSEndpoint, user.DID, h.Refresher) 43 35 44 36 // Compute rkey for tag record (repository_tag with slashes replaced) 45 37 rkey := fmt.Sprintf("%s_%s", repo, tag) ··· 108 100 return 109 101 } 110 102 111 - // Get OAuth session for the authenticated user 112 - session, err := h.Refresher.GetSession(r.Context(), user.DID) 113 - if err != nil { 114 - http.Error(w, fmt.Sprintf("Failed to get OAuth session: %v", err), http.StatusUnauthorized) 115 - return 116 - } 117 - 118 - // Create ATProto client with OAuth credentials 119 - apiClient := session.APIClient() 120 - pdsClient := atproto.NewClientWithIndigoClient(user.PDSEndpoint, user.DID, apiClient) 103 + // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety) 104 + pdsClient := atproto.NewClientWithSessionProvider(user.PDSEndpoint, user.DID, h.Refresher) 121 105 122 106 // If tagged and confirmed, delete all tags first 123 107 if tagged && confirmed {
+6 -11
pkg/appview/handlers/repository.go
··· 163 163 isStarred := false 164 164 user := middleware.GetUser(r) 165 165 if user != nil && h.Refresher != nil && h.Directory != nil { 166 - // Get OAuth session for the authenticated user 167 - session, err := h.Refresher.GetSession(r.Context(), user.DID) 168 - if err == nil { 169 - // Get user's PDS client 170 - apiClient := session.APIClient() 171 - pdsClient := atproto.NewClientWithIndigoClient(user.PDSEndpoint, user.DID, apiClient) 166 + // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety) 167 + pdsClient := atproto.NewClientWithSessionProvider(user.PDSEndpoint, user.DID, h.Refresher) 172 168 173 - // Check if star record exists 174 - rkey := atproto.StarRecordKey(owner.DID, repository) 175 - _, err = pdsClient.GetRecord(r.Context(), atproto.StarCollection, rkey) 176 - isStarred = (err == nil) 177 - } 169 + // Check if star record exists 170 + rkey := atproto.StarRecordKey(owner.DID, repository) 171 + _, err := pdsClient.GetRecord(r.Context(), atproto.StarCollection, rkey) 172 + isStarred = (err == nil) 178 173 } 179 174 180 175 // Check if current user is the repository owner
+4 -28
pkg/appview/handlers/settings.go
··· 26 26 return 27 27 } 28 28 29 - // Get OAuth session for the user 30 - session, err := h.Refresher.GetSession(r.Context(), user.DID) 31 - if err != nil { 32 - // OAuth session not found or expired - redirect to re-authenticate 33 - slog.Warn("OAuth session not found, redirecting to login", "component", "settings", "did", user.DID, "error", err) 34 - http.Redirect(w, r, "/auth/oauth/login?return_to=/settings", http.StatusFound) 35 - return 36 - } 37 - 38 - // Use indigo's API client directly - it handles all auth automatically 39 - apiClient := session.APIClient() 40 - 41 - // Create ATProto client with indigo's XRPC client 42 - client := atproto.NewClientWithIndigoClient(user.PDSEndpoint, user.DID, apiClient) 29 + // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety) 30 + client := atproto.NewClientWithSessionProvider(user.PDSEndpoint, user.DID, h.Refresher) 43 31 44 32 // Fetch sailor profile 45 33 profile, err := storage.GetProfile(r.Context(), client) ··· 96 84 97 85 holdEndpoint := r.FormValue("hold_endpoint") 98 86 99 - // Get OAuth session for the user 100 - session, err := h.Refresher.GetSession(r.Context(), user.DID) 101 - if err != nil { 102 - // OAuth session not found or expired - redirect to re-authenticate 103 - slog.Warn("OAuth session not found, redirecting to login", "component", "settings", "did", user.DID, "error", err) 104 - http.Redirect(w, r, "/auth/oauth/login?return_to=/settings", http.StatusFound) 105 - return 106 - } 107 - 108 - // Use indigo's API client directly - it handles all auth automatically 109 - apiClient := session.APIClient() 110 - 111 - // Create ATProto client with indigo's XRPC client 112 - client := atproto.NewClientWithIndigoClient(user.PDSEndpoint, user.DID, apiClient) 87 + // Create ATProto client with session provider (uses DoWithSession for DPoP nonce safety) 88 + client := atproto.NewClientWithSessionProvider(user.PDSEndpoint, user.DID, h.Refresher) 113 89 114 90 // Fetch existing profile or create new one 115 91 profile, err := storage.GetProfile(r.Context(), client)
+3 -9
pkg/appview/middleware/registry.go
··· 409 409 var atprotoClient *atproto.Client 410 410 411 411 if nr.refresher != nil { 412 - // Try OAuth flow first 413 - session, err := nr.refresher.GetSession(ctx, did) 414 - if err == nil { 415 - // OAuth session available - use indigo's API client (handles DPoP automatically) 416 - apiClient := session.APIClient() 417 - atprotoClient = atproto.NewClientWithIndigoClient(pdsEndpoint, did, apiClient) 418 - } else { 419 - slog.Debug("OAuth refresh failed, falling back to Basic Auth", "component", "registry/middleware", "did", did, "error", err) 420 - } 412 + // Use session provider for locked OAuth sessions 413 + // This prevents DPoP nonce race conditions during concurrent layer uploads 414 + atprotoClient = atproto.NewClientWithSessionProvider(pdsEndpoint, did, nr.refresher) 421 415 } 422 416 423 417 // Fall back to Basic Auth token cache if OAuth not available
+71 -57
pkg/atproto/client.go
··· 12 12 "strings" 13 13 14 14 "github.com/bluesky-social/indigo/atproto/atclient" 15 + indigo_oauth "github.com/bluesky-social/indigo/atproto/auth/oauth" 15 16 ) 16 17 17 18 // Sentinel errors ··· 19 20 ErrRecordNotFound = errors.New("record not found") 20 21 ) 21 22 23 + // SessionProvider provides locked OAuth sessions for PDS operations. 24 + // This interface allows the ATProto client to use DoWithSession() for each PDS call, 25 + // preventing DPoP nonce race conditions during concurrent operations. 26 + type SessionProvider interface { 27 + // DoWithSession executes fn with a locked OAuth session. 28 + // The lock is held for the entire duration, serializing DPoP nonce updates. 29 + DoWithSession(ctx context.Context, did string, fn func(session *indigo_oauth.ClientSession) error) error 30 + } 31 + 22 32 // Client wraps ATProto operations for the registry 23 33 type Client struct { 24 34 pdsEndpoint string 25 35 did string 26 36 accessToken string // For Basic Auth only 27 37 httpClient *http.Client 28 - useIndigoClient bool // true if using indigo's OAuth client (handles auth automatically) 29 - indigoClient *atclient.APIClient // indigo's API client for OAuth requests 38 + sessionProvider SessionProvider // For locked OAuth sessions (prevents DPoP nonce races) 30 39 } 31 40 32 41 // NewClient creates a new ATProto client for Basic Auth tokens (app passwords) ··· 39 48 } 40 49 } 41 50 42 - // NewClientWithIndigoClient creates an ATProto client using indigo's API client 43 - // This uses indigo's native XRPC methods with automatic DPoP handling 44 - func NewClientWithIndigoClient(pdsEndpoint, did string, indigoClient *atclient.APIClient) *Client { 51 + // NewClientWithSessionProvider creates an ATProto client that uses locked OAuth sessions. 52 + // This is the preferred constructor for concurrent operations (e.g., Docker layer uploads) 53 + // as it prevents DPoP nonce race conditions by serializing PDS calls per-DID. 54 + // 55 + // Each PDS call acquires a per-DID lock, ensuring that: 56 + // - Only one goroutine at a time can negotiate DPoP nonces with the PDS 57 + // - The session's nonce is saved to DB before other goroutines load it 58 + // - Concurrent manifest operations don't cause nonce thrashing 59 + func NewClientWithSessionProvider(pdsEndpoint, did string, sessionProvider SessionProvider) *Client { 45 60 return &Client{ 46 61 pdsEndpoint: pdsEndpoint, 47 62 did: did, 48 - useIndigoClient: true, 49 - indigoClient: indigoClient, 50 - httpClient: indigoClient.Client, // Keep for any fallback cases 63 + sessionProvider: sessionProvider, 64 + httpClient: &http.Client{}, 51 65 } 52 66 } 53 67 ··· 67 81 "record": record, 68 82 } 69 83 70 - // Use indigo API client (OAuth with DPoP) 71 - if c.useIndigoClient && c.indigoClient != nil { 84 + // Use session provider (locked OAuth with DPoP) - prevents nonce races 85 + if c.sessionProvider != nil { 72 86 var result Record 73 - err := c.indigoClient.Post(ctx, "com.atproto.repo.putRecord", payload, &result) 87 + err := c.sessionProvider.DoWithSession(ctx, c.did, func(session *indigo_oauth.ClientSession) error { 88 + apiClient := session.APIClient() 89 + return apiClient.Post(ctx, "com.atproto.repo.putRecord", payload, &result) 90 + }) 74 91 if err != nil { 75 92 return nil, fmt.Errorf("putRecord failed: %w", err) 76 93 } ··· 113 130 114 131 // GetRecord retrieves a record from the ATProto repository 115 132 func (c *Client) GetRecord(ctx context.Context, collection, rkey string) (*Record, error) { 116 - // Use indigo API client (OAuth with DPoP) 117 - if c.useIndigoClient && c.indigoClient != nil { 118 - params := map[string]any{ 119 - "repo": c.did, 120 - "collection": collection, 121 - "rkey": rkey, 122 - } 133 + params := map[string]any{ 134 + "repo": c.did, 135 + "collection": collection, 136 + "rkey": rkey, 137 + } 123 138 139 + // Use session provider (locked OAuth with DPoP) - prevents nonce races 140 + if c.sessionProvider != nil { 124 141 var result Record 125 - err := c.indigoClient.Get(ctx, "com.atproto.repo.getRecord", params, &result) 142 + err := c.sessionProvider.DoWithSession(ctx, c.did, func(session *indigo_oauth.ClientSession) error { 143 + apiClient := session.APIClient() 144 + return apiClient.Get(ctx, "com.atproto.repo.getRecord", params, &result) 145 + }) 126 146 if err != nil { 127 147 // Check for RecordNotFound error from indigo's APIError type 128 148 var apiErr *atclient.APIError ··· 187 207 "rkey": rkey, 188 208 } 189 209 190 - // Use indigo API client (OAuth with DPoP) 191 - if c.useIndigoClient && c.indigoClient != nil { 192 - var result map[string]any // deleteRecord returns empty object on success 193 - err := c.indigoClient.Post(ctx, "com.atproto.repo.deleteRecord", payload, &result) 210 + // Use session provider (locked OAuth with DPoP) - prevents nonce races 211 + if c.sessionProvider != nil { 212 + err := c.sessionProvider.DoWithSession(ctx, c.did, func(session *indigo_oauth.ClientSession) error { 213 + apiClient := session.APIClient() 214 + var result map[string]any // deleteRecord returns empty object on success 215 + return apiClient.Post(ctx, "com.atproto.repo.deleteRecord", payload, &result) 216 + }) 194 217 if err != nil { 195 218 return fmt.Errorf("deleteRecord failed: %w", err) 196 219 } ··· 279 302 280 303 // UploadBlob uploads binary data to the PDS and returns a blob reference 281 304 func (c *Client) UploadBlob(ctx context.Context, data []byte, mimeType string) (*ATProtoBlobRef, error) { 282 - // Use indigo API client (OAuth with DPoP) 283 - if c.useIndigoClient && c.indigoClient != nil { 305 + // Use session provider (locked OAuth with DPoP) - prevents nonce races 306 + if c.sessionProvider != nil { 284 307 var result struct { 285 308 Blob ATProtoBlobRef `json:"blob"` 286 309 } 287 310 288 - err := c.indigoClient.LexDo(ctx, 289 - "POST", 290 - mimeType, 291 - "com.atproto.repo.uploadBlob", 292 - nil, 293 - data, 294 - &result, 295 - ) 311 + err := c.sessionProvider.DoWithSession(ctx, c.did, func(session *indigo_oauth.ClientSession) error { 312 + apiClient := session.APIClient() 313 + return apiClient.LexDo(ctx, 314 + "POST", 315 + mimeType, 316 + "com.atproto.repo.uploadBlob", 317 + nil, 318 + data, 319 + &result, 320 + ) 321 + }) 296 322 if err != nil { 297 323 return nil, fmt.Errorf("uploadBlob failed: %w", err) 298 324 } ··· 510 536 // GetActorProfile fetches an actor's profile from their PDS 511 537 // The actor parameter can be a DID or handle 512 538 func (c *Client) GetActorProfile(ctx context.Context, actor string) (*ActorProfile, error) { 513 - // Use indigo API client (OAuth with DPoP) 514 - if c.useIndigoClient && c.indigoClient != nil { 515 - params := map[string]any{ 516 - "actor": actor, 517 - } 518 - 519 - var profile ActorProfile 520 - err := c.indigoClient.Get(ctx, "app.bsky.actor.getProfile", params, &profile) 521 - if err != nil { 522 - return nil, fmt.Errorf("getProfile failed: %w", err) 523 - } 524 - return &profile, nil 525 - } 526 - 527 - // Basic Auth (app passwords) 539 + // Basic Auth (app passwords) or unauthenticated 528 540 url := fmt.Sprintf("%s/xrpc/app.bsky.actor.getProfile?actor=%s", c.pdsEndpoint, actor) 529 541 530 542 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) ··· 563 575 // GetProfileRecord fetches the app.bsky.actor.profile record from PDS 564 576 // This returns the raw profile record with blob references (not CDN URLs) 565 577 func (c *Client) GetProfileRecord(ctx context.Context, did string) (*ProfileRecord, error) { 566 - // Use indigo API client (OAuth with DPoP) 567 - if c.useIndigoClient && c.indigoClient != nil { 568 - params := map[string]any{ 569 - "repo": did, 570 - "collection": "app.bsky.actor.profile", 571 - "rkey": "self", 572 - } 578 + params := map[string]any{ 579 + "repo": did, 580 + "collection": "app.bsky.actor.profile", 581 + "rkey": "self", 582 + } 573 583 584 + // Use session provider (locked OAuth with DPoP) - prevents nonce races 585 + if c.sessionProvider != nil { 574 586 var result struct { 575 587 Value ProfileRecord `json:"value"` 576 588 } 577 - 578 - err := c.indigoClient.Get(ctx, "com.atproto.repo.getRecord", params, &result) 589 + err := c.sessionProvider.DoWithSession(ctx, c.did, func(session *indigo_oauth.ClientSession) error { 590 + apiClient := session.APIClient() 591 + return apiClient.Get(ctx, "com.atproto.repo.getRecord", params, &result) 592 + }) 579 593 if err != nil { 580 594 return nil, fmt.Errorf("getRecord failed: %w", err) 581 595 }
+2 -17
pkg/atproto/client_test.go
··· 23 23 if client.accessToken != "token123" { 24 24 t.Errorf("accessToken = %v, want token123", client.accessToken) 25 25 } 26 - if client.useIndigoClient { 27 - t.Error("useIndigoClient should be false for Basic Auth client") 26 + if client.sessionProvider != nil { 27 + t.Error("sessionProvider should be nil for Basic Auth client") 28 28 } 29 29 } 30 30 ··· 1001 1001 if client.PDSEndpoint() != expectedEndpoint { 1002 1002 t.Errorf("PDSEndpoint() = %v, want %v", client.PDSEndpoint(), expectedEndpoint) 1003 1003 } 1004 - } 1005 - 1006 - // TestNewClientWithIndigoClient tests client initialization with Indigo client 1007 - func TestNewClientWithIndigoClient(t *testing.T) { 1008 - // Note: We can't easily create a real indigo client in tests without complex setup 1009 - // We pass nil for the indigo client, which is acceptable for testing the constructor 1010 - // The actual client.go code will handle nil indigo client by checking before use 1011 - 1012 - // Skip this test for now as it requires a real indigo client 1013 - // The function is tested indirectly through integration tests 1014 - t.Skip("Skipping TestNewClientWithIndigoClient - requires real indigo client setup") 1015 - 1016 - // When properly set up with a real indigo client, the test would look like: 1017 - // client := NewClientWithIndigoClient("https://pds.example.com", "did:plc:test123", indigoClient) 1018 - // if !client.useIndigoClient { t.Error("useIndigoClient should be true") } 1019 1004 } 1020 1005 1021 1006 // TestListRecordsError tests error handling in ListRecords
+41 -19
pkg/auth/oauth/client.go
··· 169 169 r.uiSessionStore = store 170 170 } 171 171 172 - // GetSession gets a fresh OAuth session for a DID 173 - // Loads session from database on every request (database is source of truth) 174 - // Uses per-DID locking to prevent concurrent requests from racing on DPoP nonce updates 172 + // DoWithSession executes a function with a locked OAuth session. 173 + // The lock is held for the entire duration of the function, preventing DPoP nonce races. 174 + // 175 + // This is the preferred way to make PDS requests that require OAuth/DPoP authentication. 176 + // The lock is held through the entire PDS interaction, ensuring that: 177 + // 1. Only one goroutine at a time can negotiate DPoP nonces with the PDS for a given DID 178 + // 2. The session's PersistSessionCallback saves the updated nonce before other goroutines load 179 + // 3. Concurrent layer uploads don't race on stale nonces 175 180 // 176 181 // Why locking is critical: 177 182 // During docker push, multiple layers upload concurrently. Each layer creates a new 178 183 // ClientSession by loading from database. Without locking, this race condition occurs: 179 - // 1. Layer A loads session with stale DPoP nonce from DB 180 - // 2. Layer B loads session with same stale nonce (A hasn't updated DB yet) 181 - // 3. Layer A makes request → 401 "use_dpop_nonce" → gets fresh nonce → saves to DB 182 - // 4. Layer B makes request → 401 "use_dpop_nonce" (using stale nonce from step 2) 183 - // 5. DPoP nonce thrashing continues, eventually causing 500 errors 184 + // 1. Layer A loads session with stale DPoP nonce from DB 185 + // 2. Layer B loads session with same stale nonce (A hasn't updated DB yet) 186 + // 3. Layer A makes request → 401 "use_dpop_nonce" → gets fresh nonce → saves to DB 187 + // 4. Layer B makes request → 401 "use_dpop_nonce" (using stale nonce from step 2) 188 + // 5. DPoP nonce thrashing continues, eventually causing 500 errors 184 189 // 185 190 // With per-DID locking: 186 - // 1. Layer A acquires lock, loads session, handles nonce negotiation, saves, releases lock 187 - // 2. Layer B acquires lock AFTER A releases, loads fresh nonce from DB, succeeds 188 - func (r *Refresher) GetSession(ctx context.Context, did string) (*oauth.ClientSession, error) { 189 - // Get or create a mutex for this DID to prevent concurrent session loads 190 - // This prevents DPoP nonce race conditions when multiple layers upload simultaneously 191 + // 1. Layer A acquires lock, loads session, handles nonce negotiation, saves, releases lock 192 + // 2. Layer B acquires lock AFTER A releases, loads fresh nonce from DB, succeeds 193 + // 194 + // Example usage: 195 + // 196 + // var result MyResult 197 + // err := refresher.DoWithSession(ctx, did, func(session *oauth.ClientSession) error { 198 + // resp, err := session.DoWithAuth(session.Client, req, "com.atproto.server.getServiceAuth") 199 + // if err != nil { 200 + // return err 201 + // } 202 + // // Parse response into result... 203 + // return nil 204 + // }) 205 + func (r *Refresher) DoWithSession(ctx context.Context, did string, fn func(session *oauth.ClientSession) error) error { 206 + // Get or create a mutex for this DID 191 207 mutexInterface, _ := r.didLocks.LoadOrStore(did, &sync.Mutex{}) 192 208 mutex := mutexInterface.(*sync.Mutex) 193 209 194 - // Serialize session loading per DID 210 + // Hold the lock for the ENTIRE operation (load + PDS request + nonce save) 195 211 mutex.Lock() 196 212 defer mutex.Unlock() 197 213 198 - slog.Debug("Acquired session lock for DID", 214 + slog.Debug("Acquired session lock for DoWithSession", 199 215 "component", "oauth/refresher", 200 216 "did", did) 201 217 218 + // Load session while holding lock 202 219 session, err := r.resumeSession(ctx, did) 203 220 if err != nil { 204 - return nil, err 221 + return err 205 222 } 206 223 207 - slog.Debug("Released session lock for DID", 224 + // Execute the function (PDS request) while still holding lock 225 + // The session's PersistSessionCallback will save nonce updates to DB 226 + err = fn(session) 227 + 228 + slog.Debug("Released session lock for DoWithSession", 208 229 "component", "oauth/refresher", 209 - "did", did) 230 + "did", did, 231 + "success", err == nil) 210 232 211 - return session, nil 233 + return err 212 234 } 213 235 214 236 // resumeSession loads a session from storage
+113 -90
pkg/auth/token/servicetoken.go
··· 15 15 "atcr.io/pkg/auth" 16 16 "atcr.io/pkg/auth/oauth" 17 17 "github.com/bluesky-social/indigo/atproto/atclient" 18 + indigo_oauth "github.com/bluesky-social/indigo/atproto/auth/oauth" 18 19 ) 19 20 20 21 // getErrorHint provides context-specific troubleshooting hints based on API error type ··· 47 48 // GetOrFetchServiceToken gets a service token for hold authentication. 48 49 // Checks cache first, then fetches from PDS with OAuth/DPoP if needed. 49 50 // This is the canonical implementation used by both middleware and crew registration. 51 + // 52 + // IMPORTANT: Uses DoWithSession() to hold a per-DID lock through the entire PDS interaction. 53 + // This prevents DPoP nonce race conditions when multiple Docker layers upload concurrently. 50 54 func GetOrFetchServiceToken( 51 55 ctx context.Context, 52 56 refresher *oauth.Refresher, ··· 74 78 slog.Debug("Service token expiring soon, proactively renewing", "did", did) 75 79 } 76 80 77 - session, err := refresher.GetSession(ctx, did) 78 - if err != nil { 79 - // OAuth session unavailable - fail 80 - InvalidateServiceToken(did, holdDID) 81 + // Use DoWithSession to hold the lock through the entire PDS interaction. 82 + // This prevents DPoP nonce races when multiple goroutines try to fetch service tokens. 83 + var serviceToken string 84 + var fetchErr error 81 85 82 - // Try to extract detailed error information 83 - var apiErr *atclient.APIError 84 - if errors.As(err, &apiErr) { 85 - slog.Error("Failed to get OAuth session for service token", 86 - "component", "token/servicetoken", 86 + err := refresher.DoWithSession(ctx, did, func(session *indigo_oauth.ClientSession) error { 87 + // Double-check cache after acquiring lock - another goroutine may have 88 + // populated it while we were waiting (classic double-checked locking pattern) 89 + cachedToken, expiresAt := GetServiceToken(did, holdDID) 90 + if cachedToken != "" && time.Until(expiresAt) > 10*time.Second { 91 + slog.Debug("Service token cache hit after lock acquisition", 87 92 "did", did, 88 - "holdDID", holdDID, 89 - "pdsEndpoint", pdsEndpoint, 90 - "error", err, 91 - "httpStatus", apiErr.StatusCode, 92 - "errorName", apiErr.Name, 93 - "errorMessage", apiErr.Message, 94 - "hint", getErrorHint(apiErr)) 95 - } else { 96 - slog.Error("Failed to get OAuth session for service token", 93 + "expiresIn", time.Until(expiresAt).Round(time.Second)) 94 + serviceToken = cachedToken 95 + return nil 96 + } 97 + 98 + // Cache still empty/expired - proceed with PDS call 99 + // Request 5-minute expiry (PDS may grant less) 100 + // exp must be absolute Unix timestamp, not relative duration 101 + // Note: OAuth scope includes #atcr_hold fragment, but service auth aud must be bare DID 102 + expiryTime := time.Now().Unix() + 300 // 5 minutes from now 103 + serviceAuthURL := fmt.Sprintf("%s%s?aud=%s&lxm=%s&exp=%d", 104 + pdsEndpoint, 105 + atproto.ServerGetServiceAuth, 106 + url.QueryEscape(holdDID), 107 + url.QueryEscape("com.atproto.repo.getRecord"), 108 + expiryTime, 109 + ) 110 + 111 + req, err := http.NewRequestWithContext(ctx, "GET", serviceAuthURL, nil) 112 + if err != nil { 113 + fetchErr = fmt.Errorf("failed to create service auth request: %w", err) 114 + return fetchErr 115 + } 116 + 117 + // Use OAuth session to authenticate to PDS (with DPoP) 118 + // The lock is held, so DPoP nonce negotiation is serialized per-DID 119 + resp, err := session.DoWithAuth(session.Client, req, "com.atproto.server.getServiceAuth") 120 + if err != nil { 121 + // Auth error - may indicate expired tokens or corrupted session 122 + InvalidateServiceToken(did, holdDID) 123 + 124 + // Inspect the error to extract detailed information from indigo's APIError 125 + var apiErr *atclient.APIError 126 + if errors.As(err, &apiErr) { 127 + // Log detailed API error information 128 + slog.Error("OAuth authentication failed during service token request", 129 + "component", "token/servicetoken", 130 + "did", did, 131 + "holdDID", holdDID, 132 + "pdsEndpoint", pdsEndpoint, 133 + "url", serviceAuthURL, 134 + "error", err, 135 + "httpStatus", apiErr.StatusCode, 136 + "errorName", apiErr.Name, 137 + "errorMessage", apiErr.Message, 138 + "hint", getErrorHint(apiErr)) 139 + } else { 140 + // Fallback for non-API errors (network errors, etc.) 141 + slog.Error("OAuth authentication failed during service token request", 142 + "component", "token/servicetoken", 143 + "did", did, 144 + "holdDID", holdDID, 145 + "pdsEndpoint", pdsEndpoint, 146 + "url", serviceAuthURL, 147 + "error", err, 148 + "errorType", fmt.Sprintf("%T", err), 149 + "hint", "Network error or unexpected failure during OAuth request") 150 + } 151 + 152 + fetchErr = fmt.Errorf("OAuth validation failed: %w", err) 153 + return fetchErr 154 + } 155 + defer resp.Body.Close() 156 + 157 + if resp.StatusCode != http.StatusOK { 158 + // Service auth failed 159 + bodyBytes, _ := io.ReadAll(resp.Body) 160 + InvalidateServiceToken(did, holdDID) 161 + slog.Error("Service token request returned non-200 status", 97 162 "component", "token/servicetoken", 98 163 "did", did, 99 164 "holdDID", holdDID, 100 165 "pdsEndpoint", pdsEndpoint, 101 - "error", err, 102 - "errorType", fmt.Sprintf("%T", err), 103 - "hint", "OAuth session not found in database or token refresh failed") 166 + "statusCode", resp.StatusCode, 167 + "responseBody", string(bodyBytes), 168 + "hint", "PDS rejected the service token request - check PDS logs for details") 169 + fetchErr = fmt.Errorf("service auth failed with status %d: %s", resp.StatusCode, string(bodyBytes)) 170 + return fetchErr 104 171 } 105 172 106 - // Delete the stale OAuth session to force re-authentication 107 - // This also invalidates the UI session automatically 108 - if delErr := refresher.DeleteSession(ctx, did); delErr != nil { 109 - slog.Warn("Failed to delete stale OAuth session", 110 - "component", "token/servicetoken", 111 - "did", did, 112 - "error", delErr) 173 + // Parse response to get service token 174 + var result struct { 175 + Token string `json:"token"` 176 + } 177 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 178 + fetchErr = fmt.Errorf("failed to decode service auth response: %w", err) 179 + return fetchErr 113 180 } 114 181 115 - return "", fmt.Errorf("failed to get OAuth session: %w", err) 116 - } 182 + if result.Token == "" { 183 + fetchErr = fmt.Errorf("empty token in service auth response") 184 + return fetchErr 185 + } 117 186 118 - // Call com.atproto.server.getServiceAuth on the user's PDS 119 - // Request 5-minute expiry (PDS may grant less) 120 - // exp must be absolute Unix timestamp, not relative duration 121 - // Note: OAuth scope includes #atcr_hold fragment, but service auth aud must be bare DID 122 - expiryTime := time.Now().Unix() + 300 // 5 minutes from now 123 - serviceAuthURL := fmt.Sprintf("%s%s?aud=%s&lxm=%s&exp=%d", 124 - pdsEndpoint, 125 - atproto.ServerGetServiceAuth, 126 - url.QueryEscape(holdDID), 127 - url.QueryEscape("com.atproto.repo.getRecord"), 128 - expiryTime, 129 - ) 187 + serviceToken = result.Token 188 + return nil 189 + }) 130 190 131 - req, err := http.NewRequestWithContext(ctx, "GET", serviceAuthURL, nil) 132 191 if err != nil { 133 - return "", fmt.Errorf("failed to create service auth request: %w", err) 134 - } 135 - 136 - // Use OAuth session to authenticate to PDS (with DPoP) 137 - resp, err := session.DoWithAuth(session.Client, req, "com.atproto.server.getServiceAuth") 138 - if err != nil { 139 - // Auth error - may indicate expired tokens or corrupted session 192 + // DoWithSession failed (session load or callback error) 140 193 InvalidateServiceToken(did, holdDID) 141 194 142 - // Inspect the error to extract detailed information from indigo's APIError 195 + // Try to extract detailed error information 143 196 var apiErr *atclient.APIError 144 197 if errors.As(err, &apiErr) { 145 - // Log detailed API error information 146 - slog.Error("OAuth authentication failed during service token request", 198 + slog.Error("Failed to get OAuth session for service token", 147 199 "component", "token/servicetoken", 148 200 "did", did, 149 201 "holdDID", holdDID, 150 202 "pdsEndpoint", pdsEndpoint, 151 - "url", serviceAuthURL, 152 203 "error", err, 153 204 "httpStatus", apiErr.StatusCode, 154 205 "errorName", apiErr.Name, 155 206 "errorMessage", apiErr.Message, 156 207 "hint", getErrorHint(apiErr)) 157 - } else { 158 - // Fallback for non-API errors (network errors, etc.) 159 - slog.Error("OAuth authentication failed during service token request", 208 + } else if fetchErr == nil { 209 + // Session load failed (not a fetch error) 210 + slog.Error("Failed to get OAuth session for service token", 160 211 "component", "token/servicetoken", 161 212 "did", did, 162 213 "holdDID", holdDID, 163 214 "pdsEndpoint", pdsEndpoint, 164 - "url", serviceAuthURL, 165 215 "error", err, 166 216 "errorType", fmt.Sprintf("%T", err), 167 - "hint", "Network error or unexpected failure during OAuth request") 217 + "hint", "OAuth session not found in database or token refresh failed") 168 218 } 169 219 170 220 // Delete the stale OAuth session to force re-authentication ··· 176 226 "error", delErr) 177 227 } 178 228 179 - return "", fmt.Errorf("OAuth validation failed: %w", err) 180 - } 181 - defer resp.Body.Close() 182 - 183 - if resp.StatusCode != http.StatusOK { 184 - // Service auth failed 185 - bodyBytes, _ := io.ReadAll(resp.Body) 186 - InvalidateServiceToken(did, holdDID) 187 - slog.Error("Service token request returned non-200 status", 188 - "component", "token/servicetoken", 189 - "did", did, 190 - "holdDID", holdDID, 191 - "pdsEndpoint", pdsEndpoint, 192 - "statusCode", resp.StatusCode, 193 - "responseBody", string(bodyBytes), 194 - "hint", "PDS rejected the service token request - check PDS logs for details") 195 - return "", fmt.Errorf("service auth failed with status %d: %s", resp.StatusCode, string(bodyBytes)) 229 + if fetchErr != nil { 230 + return "", fetchErr 231 + } 232 + return "", fmt.Errorf("failed to get OAuth session: %w", err) 196 233 } 197 - 198 - // Parse response to get service token 199 - var result struct { 200 - Token string `json:"token"` 201 - } 202 - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 203 - return "", fmt.Errorf("failed to decode service auth response: %w", err) 204 - } 205 - 206 - if result.Token == "" { 207 - return "", fmt.Errorf("empty token in service auth response") 208 - } 209 - 210 - serviceToken := result.Token 211 234 212 235 // Cache the token (parses JWT to extract actual expiry) 213 236 if err := SetServiceToken(did, holdDID, serviceToken); err != nil {