A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
80
fork

Configure Feed

Select the types of activity you want to include in your feed.

oauth working

+436 -126
+1 -2
cmd/credential-helper/main.go
··· 85 85 // will validate the session token and issue a registry JWT 86 86 creds := Credentials{ 87 87 ServerURL: serverURL, 88 - Username: "oauth2", // Signals token-based auth to Docker 88 + Username: "oauth2", // Signals token-based auth to Docker 89 89 Secret: session.SessionToken, // Return session token directly 90 90 } 91 91 ··· 280 280 } 281 281 return result.AccessToken, nil 282 282 } 283 -
+73 -23
cmd/hold/main.go
··· 142 142 143 143 // For now, construct direct URL to blob 144 144 // In production, this would use driver-specific presigned URLs 145 - url, err := s.getDownloadURL(ctx, req.Digest) 145 + url, err := s.getDownloadURL(ctx, req.Digest, req.DID) 146 146 if err != nil { 147 147 http.Error(w, fmt.Sprintf("failed to generate URL: %v", err), http.StatusInternalServerError) 148 148 return ··· 258 258 w.Write(content) 259 259 } 260 260 261 + // HandleMove moves a blob from one path to another 262 + // POST /move?from={path}&to={digest}&did={did} 263 + func (s *HoldService) HandleMove(w http.ResponseWriter, r *http.Request) { 264 + if r.Method != http.MethodPost { 265 + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 266 + return 267 + } 268 + 269 + fromPath := r.URL.Query().Get("from") 270 + toDigest := r.URL.Query().Get("to") 271 + did := r.URL.Query().Get("did") 272 + 273 + if fromPath == "" || toDigest == "" { 274 + http.Error(w, "missing from or to parameter", http.StatusBadRequest) 275 + return 276 + } 277 + 278 + // Authorize WRITE access 279 + if !s.isAuthorizedWrite(did) { 280 + if did == "" { 281 + http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 282 + } else { 283 + http.Error(w, "forbidden: write access denied", http.StatusForbidden) 284 + } 285 + return 286 + } 287 + 288 + ctx := r.Context() 289 + sourcePath := blobPath(fromPath) 290 + destPath := blobPath(toDigest) 291 + 292 + // Try to move using driver's Move operation 293 + if err := s.driver.Move(ctx, sourcePath, destPath); err != nil { 294 + log.Printf("HandleMove: failed to move blob: %v", err) 295 + http.Error(w, fmt.Sprintf("failed to move blob: %v", err), http.StatusInternalServerError) 296 + return 297 + } 298 + 299 + log.Printf("HandleMove: successfully moved blob from=%s to=%s", fromPath, toDigest) 300 + w.WriteHeader(http.StatusOK) 301 + } 302 + 261 303 // HandleProxyPut proxies a blob upload through the service 262 304 func (s *HoldService) HandleProxyPut(w http.ResponseWriter, r *http.Request) { 263 - log.Printf("HandleProxyPut: method=%s, path=%s, query=%s", r.Method, r.URL.Path, r.URL.RawQuery) 264 - 265 305 if r.Method != http.MethodPut { 266 306 http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 267 307 return ··· 277 317 if did == "" { 278 318 did = r.Header.Get("X-ATCR-DID") 279 319 } 280 - 281 - log.Printf("HandleProxyPut: digest=%s, did=%s", digest, did) 282 320 283 321 // Authorize WRITE access 284 - authorized := s.isAuthorizedWrite(did) 285 - log.Printf("HandleProxyPut: authorization check: did=%s, authorized=%v", did, authorized) 286 - if !authorized { 322 + if !s.isAuthorizedWrite(did) { 287 323 if did == "" { 288 - log.Printf("HandleProxyPut: rejecting - no DID provided") 289 324 http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 290 325 } else { 291 - log.Printf("HandleProxyPut: rejecting - DID not authorized for write") 292 326 http.Error(w, "forbidden: write access denied", http.StatusForbidden) 293 327 } 294 328 return 295 329 } 296 330 297 - // Write blob to storage 331 + // Stream blob to storage (no buffering) 298 332 ctx := r.Context() 299 333 path := blobPath(digest) 300 334 301 - content, err := io.ReadAll(r.Body) 335 + // Create writer for streaming 336 + writer, err := s.driver.Writer(ctx, path, false) 302 337 if err != nil { 303 - log.Printf("HandleProxyPut: failed to read body: %v", err) 304 - http.Error(w, "failed to read body", http.StatusBadRequest) 338 + log.Printf("HandleProxyPut: failed to create writer: %v", err) 339 + http.Error(w, "failed to create writer", http.StatusInternalServerError) 305 340 return 306 341 } 307 342 308 - log.Printf("HandleProxyPut: writing blob to path=%s, size=%d bytes", path, len(content)) 309 - if err := s.driver.PutContent(ctx, path, content); err != nil { 310 - log.Printf("HandleProxyPut: failed to store blob: %v", err) 311 - http.Error(w, "failed to store blob", http.StatusInternalServerError) 343 + // Stream directly from request body to storage 344 + written, err := io.Copy(writer, r.Body) 345 + if err != nil { 346 + writer.Cancel(ctx) 347 + log.Printf("HandleProxyPut: failed to write blob: %v", err) 348 + http.Error(w, "failed to write blob", http.StatusInternalServerError) 312 349 return 313 350 } 314 351 315 - log.Printf("HandleProxyPut: successfully stored blob digest=%s, size=%d", digest, len(content)) 352 + // Commit the write 353 + if err := writer.Commit(ctx); err != nil { 354 + log.Printf("HandleProxyPut: failed to commit blob: %v", err) 355 + http.Error(w, "failed to commit blob", http.StatusInternalServerError) 356 + return 357 + } 358 + 359 + log.Printf("HandleProxyPut: successfully stored blob path=%s, size=%d", digest, written) 316 360 w.WriteHeader(http.StatusCreated) 317 361 } 318 362 ··· 426 470 } 427 471 428 472 // getDownloadURL generates a download URL for a blob 429 - func (s *HoldService) getDownloadURL(ctx context.Context, digest string) (string, error) { 473 + func (s *HoldService) getDownloadURL(ctx context.Context, digest string, did string) (string, error) { 430 474 // Check if blob exists 431 475 path := blobPath(digest) 432 476 _, err := s.driver.Stat(ctx, path) ··· 435 479 } 436 480 437 481 // For drivers that support presigned URLs (S3), use those 438 - // For now, return a proxy URL through this service 439 - return fmt.Sprintf("%s/blobs/%s", s.config.Server.PublicURL, digest), nil 482 + // For now, return a proxy URL through this service with DID for authorization 483 + return fmt.Sprintf("%s/blobs/%s?did=%s", s.config.Server.PublicURL, digest, did), nil 440 484 } 441 485 442 486 // getUploadURL generates an upload URL for a blob ··· 563 607 mux.HandleFunc("/register", service.HandleRegister) 564 608 mux.HandleFunc("/get-presigned-url", service.HandleGetPresignedURL) 565 609 mux.HandleFunc("/put-presigned-url", service.HandlePutPresignedURL) 610 + mux.HandleFunc("/move", service.HandleMove) 566 611 567 612 // OAuth client metadata endpoint for ATProto OAuth 568 613 clientID := cfg.Server.PublicURL + "/client-metadata.json" ··· 697 742 return defaultValue 698 743 } 699 744 700 - // blobPath converts a digest (e.g., "sha256:abc123...") to a storage path 745 + // blobPath converts a digest (e.g., "sha256:abc123...") or temp path to a storage path 701 746 // Distribution stores blobs as: /docker/registry/v2/blobs/{algorithm}/{xx}/{hash}/data 702 747 // where xx is the first 2 characters of the hash for directory sharding 703 748 // NOTE: Path must start with / for filesystem driver 704 749 func blobPath(digest string) string { 750 + // Handle temp paths (start with uploads/temp-) 751 + if strings.HasPrefix(digest, "uploads/temp-") { 752 + return fmt.Sprintf("/docker/registry/v2/%s/data", digest) 753 + } 754 + 705 755 // Split digest into algorithm and hash 706 756 parts := strings.SplitN(digest, ":", 2) 707 757 if len(parts) != 2 {
+3 -1
cmd/registry/serve.go
··· 112 112 clientIDConfig := oauth.ClientIDConfig{ 113 113 BaseURL: baseURL, 114 114 CallbackPath: "/auth/oauth/callback", 115 - Scopes: []string{"atproto"}, 115 + Scopes: oauth.GetDefaultScopes(), 116 116 } 117 117 clientID, redirectURI := clientIDConfig.MakeClientID() 118 118 ··· 133 133 134 134 // 8. Create OAuth server 135 135 oauthServer := oauth.NewServer(refreshStorage, sessionManager, baseURL) 136 + // Connect server to refresher for cache invalidation 137 + oauthServer.SetRefresher(refresher) 136 138 137 139 // 9. Initialize auth keys and create token issuer 138 140 var issuer *token.Issuer
+1
config/config.yml
··· 38 38 realm: http://127.0.0.1:5000/auth/token 39 39 service: atcr.io 40 40 issuer: atcr.io 41 + expiration: 1800 # 30 minutes (in seconds) 41 42 42 43 # Certificate bundle for validating JWTs 43 44 rootcertbundle: /var/lib/atcr/auth/private-key.crt
+2 -2
docker-compose.yml
··· 1 1 services: 2 - registry: 2 + atcr-registry: 3 3 build: 4 4 context: . 5 5 dockerfile: Dockerfile ··· 24 24 # - OAuth tokens -> Persistent volume (atcr-tokens) 25 25 # Future: Add read_only: true for production deployments 26 26 27 - hold: 27 + atcr-hold: 28 28 environment: 29 29 HOLD_PUBLIC_URL: http://172.28.0.3:8080 30 30 HOLD_OWNER: did:plc:pddp4xt5lgnv2qsegbzzs4xg
+9 -1
pkg/atproto/client.go
··· 231 231 return nil, err 232 232 } 233 233 234 - req.Header.Set("Authorization", c.authHeader()) 234 + // Only set Authorization header if we have an access token 235 + if c.accessToken != "" { 236 + authHeader := c.authHeader() 237 + fmt.Printf("DEBUG [atproto/client]: UploadBlob Authorization header: %q (useDPoP=%v, token_length=%d)\n", authHeader, c.useDPoP, len(c.accessToken)) 238 + req.Header.Set("Authorization", authHeader) 239 + } else { 240 + fmt.Printf("DEBUG [atproto/client]: UploadBlob: No access token available, sending unauthenticated request\n") 241 + return nil, fmt.Errorf("no access token available for authenticated PDS operation - please complete OAuth flow at: http://127.0.0.1:5000/auth/oauth/authorize?handle=<your-handle>") 242 + } 235 243 req.Header.Set("Content-Type", mimeType) 236 244 237 245 resp, err := c.httpClient.Do(req)
+4 -4
pkg/auth/oauth/client.go
··· 10 10 "fmt" 11 11 "net/http" 12 12 13 - atprotoclient "atcr.io/pkg/atproto" 13 + "atcr.io/pkg/atproto" 14 14 "authelia.com/client/oauth2" 15 15 ) 16 16 ··· 19 19 config *oauth2.Config 20 20 dpopKey *ecdsa.PrivateKey 21 21 dpopTransport *DPoPTransport 22 - resolver *atprotoclient.Resolver 22 + resolver *atproto.Resolver 23 23 clientID string 24 24 redirectURI string 25 25 metadata *AuthServerMetadata ··· 36 36 return &Client{ 37 37 dpopKey: dpopKey, 38 38 dpopTransport: NewDPoPTransport(http.DefaultTransport, dpopKey), 39 - resolver: atprotoclient.NewResolver(), 39 + resolver: atproto.NewResolver(), 40 40 clientID: clientID, 41 41 redirectURI: redirectURI, 42 42 }, nil ··· 70 70 PushedAuthURL: metadata.PushedAuthorizationRequestEndpoint, 71 71 }, 72 72 RedirectURL: c.redirectURI, 73 - Scopes: []string{"atproto"}, 73 + Scopes: GetDefaultScopes(), 74 74 } 75 75 76 76 return nil
+2 -2
pkg/auth/oauth/client_id.go
··· 27 27 28 28 // ClientIDConfig helps construct appropriate client IDs for different environments 29 29 type ClientIDConfig struct { 30 - BaseURL string // Base URL (e.g., "http://127.0.0.1:8888" or "https://example.com") 30 + BaseURL string // Base URL (e.g., "http://127.0.0.1:8888" or "https://example.com") 31 31 CallbackPath string // Callback path (e.g., "/oauth/callback") 32 - Scopes []string // OAuth scopes 32 + Scopes []string // OAuth scopes 33 33 } 34 34 35 35 // MakeClientID creates the appropriate client ID based on the environment
+3 -2
pkg/auth/oauth/flow.go
··· 24 24 25 25 // RunInteractiveFlow executes an interactive OAuth authorization code flow 26 26 // The setupCallback function is called TWICE: 27 - // 1. First with authURL="" to start the server (before PAR) 28 - // 2. Then with the actual authURL to display it to the user (after PAR) 27 + // 1. First with authURL="" to start the server (before PAR) 28 + // 2. Then with the actual authURL to display it to the user (after PAR) 29 + // 29 30 // This two-phase approach ensures the server is running before PAR tries to fetch client metadata 30 31 func RunInteractiveFlow(ctx context.Context, cfg InteractiveFlowConfig, 31 32 setupCallback func(authURL string, handler *CallbackHandler, metadata *ClientMetadata) error) (*FlowResult, error) {
+59 -17
pkg/auth/oauth/refresher.go
··· 15 15 type AccessTokenEntry struct { 16 16 Token string 17 17 DPoPKey *ecdsa.PrivateKey 18 + Transport *DPoPTransport // Cache the transport to preserve nonce across requests 18 19 ExpiresAt time.Time 19 20 } 20 21 21 22 // Refresher manages OAuth token refresh for AppView 22 23 type Refresher struct { 23 - storage *RefreshTokenStorage 24 - accessTokens map[string]*AccessTokenEntry 25 - mu sync.RWMutex 26 - clientID string 27 - redirectURI string 24 + storage *RefreshTokenStorage 25 + accessTokens map[string]*AccessTokenEntry 26 + mu sync.RWMutex 27 + refreshLocks map[string]*sync.Mutex // Per-DID locks for refresh operations 28 + refreshLockMu sync.Mutex // Protects refreshLocks map 29 + clientID string 30 + redirectURI string 28 31 } 29 32 30 33 // NewRefresher creates a new token refresher ··· 32 35 return &Refresher{ 33 36 storage: storage, 34 37 accessTokens: make(map[string]*AccessTokenEntry), 38 + refreshLocks: make(map[string]*sync.Mutex), 35 39 clientID: clientID, 36 40 redirectURI: redirectURI, 37 41 } ··· 39 43 40 44 // GetAccessToken gets a fresh access token for a DID 41 45 // Returns cached token if still valid, otherwise refreshes 42 - func (r *Refresher) GetAccessToken(ctx context.Context, did string) (string, *ecdsa.PrivateKey, error) { 43 - // Check cache first 46 + // Returns: accessToken, dpopKey, dpopTransport, error 47 + func (r *Refresher) GetAccessToken(ctx context.Context, did string) (string, *ecdsa.PrivateKey, *DPoPTransport, error) { 48 + // Check cache first (fast path) 44 49 r.mu.RLock() 45 50 entry, ok := r.accessTokens[did] 46 51 r.mu.RUnlock() 47 52 48 53 if ok && time.Now().Before(entry.ExpiresAt) { 49 54 // Token still valid 50 - return entry.Token, entry.DPoPKey, nil 55 + return entry.Token, entry.DPoPKey, entry.Transport, nil 56 + } 57 + 58 + // Token expired or not cached, need to refresh 59 + // Get or create per-DID lock to prevent concurrent refreshes 60 + r.refreshLockMu.Lock() 61 + didLock, ok := r.refreshLocks[did] 62 + if !ok { 63 + didLock = &sync.Mutex{} 64 + r.refreshLocks[did] = didLock 51 65 } 66 + r.refreshLockMu.Unlock() 52 67 53 - // Token expired or not cached, refresh it 68 + // Acquire DID-specific lock 69 + didLock.Lock() 70 + defer didLock.Unlock() 71 + 72 + // Double-check cache after acquiring lock (another goroutine might have refreshed) 73 + r.mu.RLock() 74 + entry, ok = r.accessTokens[did] 75 + r.mu.RUnlock() 76 + 77 + if ok && time.Now().Before(entry.ExpiresAt) { 78 + // Token was refreshed while we waited for the lock 79 + return entry.Token, entry.DPoPKey, entry.Transport, nil 80 + } 81 + 82 + // Actually refresh the token 54 83 return r.RefreshToken(ctx, did) 55 84 } 56 85 57 86 // RefreshToken forces a token refresh for a DID 58 - func (r *Refresher) RefreshToken(ctx context.Context, did string) (string, *ecdsa.PrivateKey, error) { 87 + // Returns: accessToken, dpopKey, dpopTransport, error 88 + func (r *Refresher) RefreshToken(ctx context.Context, did string) (string, *ecdsa.PrivateKey, *DPoPTransport, error) { 59 89 // Get stored refresh token 60 90 entry, err := r.storage.Get(did) 61 91 if err != nil { 62 - return "", nil, fmt.Errorf("failed to get stored refresh token: %w", err) 92 + return "", nil, nil, fmt.Errorf("failed to get stored refresh token: %w", err) 63 93 } 64 94 65 95 // Parse DPoP key 66 96 dpopKey, err := r.storage.GetDPoPKey(did) 67 97 if err != nil { 68 - return "", nil, fmt.Errorf("failed to get DPoP key: %w", err) 98 + return "", nil, nil, fmt.Errorf("failed to get DPoP key: %w", err) 69 99 } 70 100 71 101 // Create OAuth client with DPoP transport ··· 75 105 // Discover PDS OAuth metadata 76 106 metadata, err := DiscoverAuthServer(ctx, entry.PDS) 77 107 if err != nil { 78 - return "", nil, fmt.Errorf("failed to discover auth server: %w", err) 108 + return "", nil, nil, fmt.Errorf("failed to discover auth server: %w", err) 79 109 } 80 110 81 111 // Configure OAuth2 client ··· 87 117 PushedAuthURL: metadata.PushedAuthorizationRequestEndpoint, 88 118 }, 89 119 RedirectURL: r.redirectURI, 90 - Scopes: []string{"atproto"}, 120 + Scopes: GetDefaultScopes(), 91 121 } 92 122 93 123 // Create context with custom HTTP client ··· 98 128 RefreshToken: entry.RefreshToken, 99 129 }).Token() 100 130 if err != nil { 101 - return "", nil, fmt.Errorf("failed to refresh token: %w", err) 131 + return "", nil, nil, fmt.Errorf("failed to refresh token: %w", err) 102 132 } 103 133 104 134 // Update last refresh timestamp ··· 116 146 } 117 147 } 118 148 119 - // Cache the access token 149 + // Set access token on transport for "ath" claim in future DPoP proofs 150 + dpopTransport.SetAccessToken(token.AccessToken) 151 + 152 + // Cache the access token and transport 120 153 // Expire 1 minute early to avoid edge cases 121 154 expiresAt := token.Expiry.Add(-1 * time.Minute) 122 155 ··· 124 157 r.accessTokens[did] = &AccessTokenEntry{ 125 158 Token: token.AccessToken, 126 159 DPoPKey: dpopKey, 160 + Transport: dpopTransport, // Cache transport to preserve nonce 127 161 ExpiresAt: expiresAt, 128 162 } 129 163 r.mu.Unlock() 130 164 131 - return token.AccessToken, dpopKey, nil 165 + return token.AccessToken, dpopKey, dpopTransport, nil 166 + } 167 + 168 + // InvalidateAccessToken removes a cached access token for a DID 169 + // This is useful when a new refresh token is obtained (e.g., after re-authorization) 170 + func (r *Refresher) InvalidateAccessToken(did string) { 171 + r.mu.Lock() 172 + delete(r.accessTokens, did) 173 + r.mu.Unlock() 132 174 } 133 175 134 176 // RevokeToken removes stored refresh token and cached access token
+22
pkg/auth/oauth/scopes.go
··· 1 + package oauth 2 + 3 + import ( 4 + "fmt" 5 + 6 + "atcr.io/pkg/atproto" 7 + ) 8 + 9 + // GetDefaultScopes returns the default OAuth scopes for ATCR registry operations 10 + func GetDefaultScopes() []string { 11 + return []string{ 12 + "atproto", 13 + "transition:generic.full", 14 + "blob:application/vnd.docker.distribution.manifest.v2+json", 15 + "blob:application/vnd.docker.image.rootfs.diff.tar.gzip", 16 + "blob:application/vnd.docker.container.image.v1+json", 17 + fmt.Sprintf("repo:%s?action=create", atproto.ManifestCollection), 18 + fmt.Sprintf("repo:%s?action=update", atproto.ManifestCollection), 19 + fmt.Sprintf("repo:%s?action=create", atproto.TagCollection), 20 + fmt.Sprintf("repo:%s?action=update", atproto.TagCollection), 21 + } 22 + }
+30 -17
pkg/auth/oauth/server.go
··· 17 17 18 18 // Server handles OAuth authorization for the AppView 19 19 type Server struct { 20 - storage *RefreshTokenStorage 21 - sessionManager *session.Manager 22 - resolver *atproto.Resolver 23 - clientID string 24 - redirectURI string 25 - baseURL string 26 - states map[string]*OAuthState 27 - statesMu sync.RWMutex 20 + storage *RefreshTokenStorage 21 + sessionManager *session.Manager 22 + resolver *atproto.Resolver 23 + refresher *Refresher 24 + clientID string 25 + redirectURI string 26 + baseURL string 27 + states map[string]*OAuthState 28 + statesMu sync.RWMutex 28 29 } 29 30 30 31 // OAuthState tracks an in-progress OAuth flow 31 32 type OAuthState struct { 32 - State string 33 - Handle string 34 - DID string 35 - PDSEndpoint string 36 - CodeVerifier string 37 - DPoPKey *ecdsa.PrivateKey 38 - CreatedAt time.Time 33 + State string 34 + Handle string 35 + DID string 36 + PDSEndpoint string 37 + CodeVerifier string 38 + DPoPKey *ecdsa.PrivateKey 39 + CreatedAt time.Time 39 40 } 40 41 41 42 // NewServer creates a new OAuth server ··· 44 45 cfg := ClientIDConfig{ 45 46 BaseURL: baseURL, 46 47 CallbackPath: "/auth/oauth/callback", 47 - Scopes: []string{"atproto"}, 48 + Scopes: GetDefaultScopes(), 48 49 } 49 50 clientID, redirectURI := cfg.MakeClientID() 50 51 ··· 52 53 storage: storage, 53 54 sessionManager: sessionManager, 54 55 resolver: atproto.NewResolver(), 56 + refresher: nil, // Will be set via SetRefresher() 55 57 clientID: clientID, 56 58 redirectURI: redirectURI, 57 59 baseURL: baseURL, 58 60 states: make(map[string]*OAuthState), 59 61 } 62 + } 63 + 64 + // SetRefresher sets the refresher for invalidating access token cache 65 + func (s *Server) SetRefresher(refresher *Refresher) { 66 + s.refresher = refresher 60 67 } 61 68 62 69 // ServeAuthorize handles GET /auth/oauth/authorize ··· 190 197 PushedAuthURL: metadata.PushedAuthorizationRequestEndpoint, 191 198 }, 192 199 RedirectURL: s.redirectURI, 193 - Scopes: []string{"atproto"}, 200 + Scopes: GetDefaultScopes(), 194 201 } 195 202 196 203 // Create context with custom HTTP client ··· 220 227 221 228 if err := s.storage.Store(state.DID, refreshEntry); err != nil { 222 229 return "", fmt.Errorf("failed to store refresh token: %w", err) 230 + } 231 + 232 + // Invalidate cached access token (if any) since we have a new refresh token with new scopes 233 + if s.refresher != nil { 234 + s.refresher.InvalidateAccessToken(state.DID) 235 + fmt.Printf("DEBUG [oauth/server]: Invalidated cached access token for DID=%s after storing new refresh token\n", state.DID) 223 236 } 224 237 225 238 // Create session token for credential helper
+5
pkg/auth/oauth/transport.go
··· 123 123 124 124 // Add DPoP header 125 125 req.Header.Set("DPoP", proofString) 126 + proofPreview := proofString 127 + if len(proofPreview) > 50 { 128 + proofPreview = proofPreview[:50] 129 + } 130 + fmt.Printf("DEBUG [oauth/transport]: Added DPoP proof for %s %s (proof_length=%d, first_50=%q)\n", req.Method, req.URL.String(), len(proofString), proofPreview) 126 131 127 132 return nil 128 133 }
+18 -5
pkg/middleware/registry.go
··· 5 5 "encoding/json" 6 6 "fmt" 7 7 "strings" 8 + "sync" 8 9 9 10 "github.com/distribution/distribution/v3" 10 11 registrymw "github.com/distribution/distribution/v3/registry/middleware/registry" ··· 35 36 distribution.Namespace 36 37 resolver *atproto.Resolver 37 38 defaultStorageEndpoint string 39 + repositories sync.Map // Cache of RoutingRepository instances by key (did:reponame) 38 40 } 39 41 40 42 // initATProtoResolver initializes the name resolution middleware ··· 115 117 116 118 if globalRefresher != nil { 117 119 // Try OAuth flow first 118 - accessToken, dpopKey, err := globalRefresher.GetAccessToken(ctx, did) 120 + accessToken, dpopKey, dpopTransport, err := globalRefresher.GetAccessToken(ctx, did) 119 121 if err == nil { 120 - // OAuth token available - create client with DPoP support 121 - fmt.Printf("DEBUG [registry/middleware]: Using OAuth access token for DID=%s\n", did) 122 - dpopTransport := oauth.NewDPoPTransport(nil, dpopKey) 122 + // OAuth token available - use cached DPoP transport (preserves nonce) 123 + fmt.Printf("DEBUG [registry/middleware]: Using OAuth access token for DID=%s (length=%d, first_20=%q)\n", did, len(accessToken), accessToken[:min(20, len(accessToken))]) 123 124 atprotoClient = atproto.NewClientWithDPoP(pdsEndpoint, did, accessToken, dpopKey, dpopTransport) 124 125 } else { 125 126 fmt.Printf("DEBUG [registry/middleware]: OAuth refresh failed for DID=%s: %v, falling back to Basic Auth\n", did, err) ··· 143 144 // Example: "evan.jarrett.net/debian" -> store as "debian" 144 145 repositoryName := imageName 145 146 146 - fmt.Printf("DEBUG [registry/middleware]: Creating RoutingRepository for image=%s (ATProto repo name)\n", repositoryName) 147 + // Cache key is DID + repository name 148 + cacheKey := did + ":" + repositoryName 149 + 150 + // Check cache first 151 + if cached, ok := nr.repositories.Load(cacheKey); ok { 152 + fmt.Printf("DEBUG [registry/middleware]: Using cached RoutingRepository for %s\n", cacheKey) 153 + return cached.(*storage.RoutingRepository), nil 154 + } 155 + 156 + fmt.Printf("DEBUG [registry/middleware]: Creating new RoutingRepository for image=%s (ATProto repo name)\n", repositoryName) 147 157 148 158 // Create routing repository - routes manifests to ATProto, blobs to hold service 149 159 // The registry is stateless - no local storage is used 150 160 // Pass storage endpoint and DID as parameters (can't use context as it gets lost) 151 161 routingRepo := storage.NewRoutingRepository(repo, atprotoClient, repositoryName, storageEndpoint, did) 162 + 163 + // Cache the repository 164 + nr.repositories.Store(cacheKey, routingRepo) 152 165 153 166 return routingRepo, nil 154 167 }
+193 -48
pkg/storage/proxy_blob_store.go
··· 14 14 "github.com/opencontainers/go-digest" 15 15 ) 16 16 17 + const ( 18 + // maxChunkSize is the maximum buffer size before flushing to hold service 19 + // Matches S3's minimum multipart upload size 20 + maxChunkSize = 5 * 1024 * 1024 // 5MB 21 + ) 22 + 17 23 // Global upload tracking (shared across all ProxyBlobStore instances) 18 24 // This is necessary because distribution creates new repository/blob store instances per request 19 25 var ( ··· 35 41 storageEndpoint: storageEndpoint, 36 42 httpClient: &http.Client{ 37 43 Timeout: 5 * time.Minute, // Timeout for presigned URL requests and uploads 44 + Transport: &http.Transport{ 45 + DisableKeepAlives: false, // Re-enable keep-alive 46 + MaxIdleConns: 100, 47 + MaxIdleConnsPerHost: 100, 48 + MaxConnsPerHost: 0, // unlimited 49 + IdleConnTimeout: 90 * time.Second, 50 + }, 38 51 }, 39 52 did: did, 40 53 } ··· 42 55 43 56 // Stat returns the descriptor for a blob 44 57 func (p *ProxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { 45 - // For simplicity, we'll just check if we can get a download URL 46 - // In production, you'd want a dedicated stat endpoint 47 - url, err := p.getDownloadURL(ctx, dgst) 58 + // Quick HEAD request to hold service to check if blob exists 59 + url := fmt.Sprintf("%s/blobs/%s?did=%s", p.storageEndpoint, dgst.String(), p.did) 60 + req, err := http.NewRequestWithContext(ctx, "HEAD", url, nil) 61 + if err != nil { 62 + return distribution.Descriptor{}, distribution.ErrBlobUnknown 63 + } 64 + 65 + resp, err := p.httpClient.Do(req) 48 66 if err != nil { 49 67 return distribution.Descriptor{}, distribution.ErrBlobUnknown 50 68 } 69 + defer resp.Body.Close() 51 70 52 - // We don't have size info from the storage service 53 - // Return a minimal descriptor 71 + if resp.StatusCode != http.StatusOK { 72 + return distribution.Descriptor{}, distribution.ErrBlobUnknown 73 + } 74 + 75 + // Return a minimal descriptor with size from Content-Length if available 76 + size := int64(0) 77 + if contentLength := resp.Header.Get("Content-Length"); contentLength != "" { 78 + fmt.Sscanf(contentLength, "%d", &size) 79 + } 80 + 54 81 return distribution.Descriptor{ 55 82 Digest: dgst, 83 + Size: size, 56 84 MediaType: "application/octet-stream", 57 - URLs: []string{url}, 58 85 }, nil 59 86 } 60 87 ··· 167 194 } 168 195 } 169 196 170 - // Create proxy blob writer 197 + // Create pipe for streaming upload 198 + pipeReader, pipeWriter := io.Pipe() 199 + uploadErr := make(chan error, 1) 200 + digestChan := make(chan string, 1) 201 + 202 + // Create writer 171 203 writer := &ProxyBlobWriter{ 172 - store: p, 173 - ctx: ctx, 174 - options: opts, 175 - id: fmt.Sprintf("upload-%d", time.Now().UnixNano()), 176 - startedAt: time.Now(), 204 + store: p, 205 + options: opts, 206 + pipeWriter: pipeWriter, 207 + pipeReader: pipeReader, 208 + digestChan: digestChan, 209 + uploadErr: uploadErr, 210 + id: fmt.Sprintf("upload-%d", time.Now().UnixNano()), 211 + startedAt: time.Now(), 177 212 } 178 213 179 214 // Store in global uploads map for resume support ··· 181 216 globalUploads[writer.id] = writer 182 217 globalUploadsMu.Unlock() 183 218 219 + // Start background goroutine that streams to temp location immediately 220 + go func() { 221 + defer pipeReader.Close() 222 + 223 + // Stream to temp location immediately to avoid pipe deadlock 224 + tempPath := fmt.Sprintf("uploads/temp-%s", writer.id) // No leading slash 225 + url := fmt.Sprintf("%s/blobs/%s?did=%s", p.storageEndpoint, tempPath, p.did) 226 + 227 + fmt.Printf("DEBUG [goroutine]: Starting upload to temp: url=%s\n", url) 228 + 229 + // Use context with timeout to prevent hanging forever 230 + uploadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) 231 + defer cancel() 232 + 233 + req, err := http.NewRequestWithContext(uploadCtx, "PUT", url, pipeReader) 234 + if err != nil { 235 + fmt.Printf("DEBUG [goroutine]: Failed to create request: %v\n", err) 236 + // Consume digest channel even on error 237 + <-digestChan 238 + uploadErr <- fmt.Errorf("failed to create request: %w", err) 239 + return 240 + } 241 + req.Header.Set("Content-Type", "application/octet-stream") 242 + 243 + fmt.Printf("DEBUG [goroutine]: Sending PUT request...\n") 244 + // Stream to temp location (this will block until all data is written) 245 + resp, err := p.httpClient.Do(req) 246 + if err != nil { 247 + fmt.Printf("DEBUG [goroutine]: PUT failed: %v\n", err) 248 + <-digestChan 249 + uploadErr <- fmt.Errorf("failed to upload to temp: %w", err) 250 + return 251 + } 252 + defer resp.Body.Close() 253 + 254 + fmt.Printf("DEBUG [goroutine]: Got response status=%d\n", resp.StatusCode) 255 + 256 + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 257 + bodyBytes, _ := io.ReadAll(resp.Body) 258 + fmt.Printf("DEBUG [goroutine]: Upload failed with status %d, body=%s\n", resp.StatusCode, string(bodyBytes)) 259 + <-digestChan 260 + uploadErr <- fmt.Errorf("upload to temp failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 261 + return 262 + } 263 + 264 + fmt.Printf("DEBUG [goroutine]: Upload to temp succeeded, waiting for digest...\n") 265 + // Upload to temp succeeded, now wait for digest from Commit() 266 + digest, ok := <-digestChan 267 + if !ok { 268 + uploadErr <- fmt.Errorf("upload cancelled after streaming to temp") 269 + return 270 + } 271 + 272 + fmt.Printf("DEBUG [goroutine]: Got digest=%s, signaling completion\n", digest) 273 + // Store digest for Commit() to use in move operation 274 + writer.finalDigest = digest 275 + uploadErr <- nil 276 + }() 277 + 184 278 return writer, nil 185 279 } 186 280 ··· 195 289 return nil, distribution.ErrBlobUploadUnknown 196 290 } 197 291 292 + // With streaming, no flush needed - just return the writer 198 293 return writer, nil 199 294 } 200 295 ··· 283 378 284 379 // ProxyBlobWriter implements distribution.BlobWriter for proxy uploads 285 380 type ProxyBlobWriter struct { 286 - store *ProxyBlobStore 287 - ctx context.Context 288 - options distribution.CreateOptions 289 - buffer bytes.Buffer 290 - size int64 291 - closed bool 292 - id string 293 - startedAt time.Time 381 + store *ProxyBlobStore 382 + options distribution.CreateOptions 383 + pipeWriter *io.PipeWriter // Streams directly to hold service 384 + pipeReader *io.PipeReader 385 + digestChan chan string // Sends digest to upload goroutine 386 + uploadErr chan error // Receives upload result from goroutine 387 + finalDigest string // Final digest for move operation 388 + size int64 389 + closed bool 390 + id string // Distribution's upload ID 391 + startedAt time.Time 294 392 } 295 393 296 394 // ID returns the upload ID ··· 304 402 } 305 403 306 404 // Write writes data to the upload 405 + // Streams directly to hold service via pipe 307 406 func (w *ProxyBlobWriter) Write(p []byte) (int, error) { 308 407 if w.closed { 309 408 return 0, fmt.Errorf("writer closed") 310 409 } 311 - n, err := w.buffer.Write(p) 410 + 411 + // Write to pipe - streams immediately to hold service 412 + n, err := w.pipeWriter.Write(p) 413 + if err != nil { 414 + // If write fails (client disconnected), close pipe to unblock goroutine 415 + w.pipeWriter.CloseWithError(err) 416 + return n, err 417 + } 312 418 w.size += int64(n) 313 - return n, err 419 + 420 + return n, nil 314 421 } 315 422 316 423 // ReadFrom reads from a reader ··· 318 425 if w.closed { 319 426 return 0, fmt.Errorf("writer closed") 320 427 } 321 - n, err := w.buffer.ReadFrom(r) 322 - w.size += n 323 - return n, err 428 + 429 + // Read in chunks and flush when needed 430 + buf := make([]byte, 32*1024) // 32KB read buffer 431 + var total int64 432 + 433 + for { 434 + nr, err := r.Read(buf) 435 + if nr > 0 { 436 + nw, werr := w.Write(buf[:nr]) 437 + total += int64(nw) 438 + if werr != nil { 439 + return total, werr 440 + } 441 + } 442 + if err == io.EOF { 443 + break 444 + } 445 + if err != nil { 446 + return total, err 447 + } 448 + } 449 + 450 + return total, nil 324 451 } 325 452 326 453 // Size returns the current size ··· 340 467 delete(globalUploads, w.id) 341 468 globalUploadsMu.Unlock() 342 469 343 - // Upload the buffered content 344 - content := w.buffer.Bytes() 345 - dgst := digest.FromBytes(content) 346 - 347 - // Verify digest matches 348 - if desc.Digest != "" && dgst != desc.Digest { 349 - return distribution.Descriptor{}, fmt.Errorf("digest mismatch") 470 + // Close pipe to signal EOF to upload goroutine 471 + if err := w.pipeWriter.Close(); err != nil { 472 + return distribution.Descriptor{}, fmt.Errorf("failed to close pipe: %w", err) 350 473 } 351 474 352 - // Get upload URL 353 - url, err := w.store.getUploadURL(ctx, dgst, int64(len(content))) 354 - if err != nil { 355 - return distribution.Descriptor{}, err 475 + // Send digest to upload goroutine (it's waiting after temp upload completes) 476 + w.digestChan <- desc.Digest.String() 477 + close(w.digestChan) 478 + 479 + // Wait for upload goroutine to complete 480 + if err := <-w.uploadErr; err != nil { 481 + return distribution.Descriptor{}, fmt.Errorf("upload to temp failed: %w", err) 356 482 } 357 483 358 - // Upload 359 - req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(content)) 484 + // Now move temp → final location 485 + tempPath := fmt.Sprintf("uploads/temp-%s", w.id) // No leading slash 486 + finalPath := desc.Digest.String() 487 + 488 + moveURL := fmt.Sprintf("%s/move?from=%s&to=%s&did=%s", 489 + w.store.storageEndpoint, tempPath, finalPath, w.store.did) 490 + 491 + req, err := http.NewRequestWithContext(context.Background(), "POST", moveURL, nil) 360 492 if err != nil { 361 - return distribution.Descriptor{}, err 493 + return distribution.Descriptor{}, fmt.Errorf("failed to create move request: %w", err) 362 494 } 363 - req.Header.Set("Content-Type", "application/octet-stream") 364 495 365 496 resp, err := w.store.httpClient.Do(req) 366 497 if err != nil { 367 - return distribution.Descriptor{}, err 498 + return distribution.Descriptor{}, fmt.Errorf("failed to move blob: %w", err) 368 499 } 369 500 defer resp.Body.Close() 370 501 371 502 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 372 - return distribution.Descriptor{}, fmt.Errorf("upload failed: status %d", resp.StatusCode) 503 + bodyBytes, _ := io.ReadAll(resp.Body) 504 + return distribution.Descriptor{}, fmt.Errorf("move blob failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 373 505 } 374 506 507 + fmt.Printf("DEBUG [proxy_blob_store]: Committed upload: digest=%s, size=%d (moved from temp)\n", desc.Digest, w.size) 508 + 375 509 return distribution.Descriptor{ 376 - Digest: dgst, 377 - Size: int64(len(content)), 510 + Digest: desc.Digest, 511 + Size: w.size, 378 512 MediaType: desc.MediaType, 379 513 }, nil 380 514 } ··· 388 522 delete(globalUploads, w.id) 389 523 globalUploadsMu.Unlock() 390 524 525 + // Close digest channel without sending digest 526 + close(w.digestChan) 527 + 528 + // Close pipe with error to stop streaming 529 + if w.pipeWriter != nil { 530 + w.pipeWriter.CloseWithError(fmt.Errorf("upload cancelled")) 531 + } 532 + 533 + // Wait for goroutine to finish 534 + <-w.uploadErr 535 + 536 + fmt.Printf("DEBUG [proxy_blob_store]: Cancelled upload: id=%s\n", w.id) 391 537 return nil 392 538 } 393 539 394 540 // Close closes the writer 395 - // NOTE: For resumable uploads, we don't mark as closed here 396 - // Distribution calls Close() after each PATCH, but the upload may continue 397 - // Only Commit() and Cancel() actually finalize the upload 541 + // Just returns - streaming continues via pipe 398 542 func (w *ProxyBlobWriter) Close() error { 399 - // Don't set w.closed = true here - allow resuming 543 + // Don't close pipe here - that happens in Commit() or Cancel() 544 + // Don't set w.closed = true - allow resuming for next PATCH 400 545 return nil 401 546 } 402 547
+11 -2
pkg/storage/routing_repository.go
··· 18 18 storageEndpoint string // Hold service endpoint for blobs (from discovery for push) 19 19 did string // User's DID for authorization 20 20 manifestStore *atproto.ManifestStore // Cached manifest store instance 21 + blobStore *ProxyBlobStore // Cached blob store instance 21 22 } 22 23 23 24 // NewRoutingRepository creates a new routing repository ··· 62 63 // Blobs returns a proxy blob store that routes to external hold service 63 64 // The registry (AppView) NEVER stores blobs locally - all blobs go through hold service 64 65 func (r *RoutingRepository) Blobs(ctx context.Context) distribution.BlobStore { 66 + // Return cached blob store if available 67 + if r.blobStore != nil { 68 + fmt.Printf("DEBUG [storage/blobs]: Returning cached blob store for did=%s, repo=%s\n", 69 + r.did, r.repositoryName) 70 + return r.blobStore 71 + } 72 + 65 73 // For pull operations, check if we have a cached hold endpoint from a recent manifest fetch 66 74 // This ensures blobs are fetched from the hold recorded in the manifest, not re-discovered 67 75 holdEndpoint := r.storageEndpoint // Default to discovery-based endpoint ··· 82 90 panic("storage endpoint not set in RoutingRepository - ensure default_storage_endpoint is configured in middleware") 83 91 } 84 92 85 - // Always use proxy blob store - routes to external hold service 86 - return NewProxyBlobStore(holdEndpoint, r.did) 93 + // Create and cache proxy blob store 94 + r.blobStore = NewProxyBlobStore(holdEndpoint, r.did) 95 + return r.blobStore 87 96 } 88 97 89 98 // Tags returns the tag service