A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

create a shared registrycontext that we can pass around to simplify the parameters functions need

+138 -132
+40 -25
pkg/appview/middleware/registry.go
··· 20 20 "atcr.io/pkg/auth/oauth" 21 21 ) 22 22 23 - // Global refresher instance (set by main.go) 24 - var globalRefresher *oauth.Refresher 23 + // Global variables for initialization only 24 + // These are set by main.go during startup and copied into NamespaceResolver instances. 25 + // After initialization, request handling uses the NamespaceResolver's instance fields. 26 + var ( 27 + globalRefresher *oauth.Refresher 28 + globalDatabase storage.DatabaseMetrics 29 + globalAuthorizer auth.HoldAuthorizer 30 + ) 25 31 26 - // Global database instance (set by main.go for pull tracking) 27 - var globalDatabase interface { 28 - IncrementPullCount(did, repository string) error 29 - IncrementPushCount(did, repository string) error 30 - } 31 - 32 - // Global authorizer instance (set by main.go for hold authorization) 33 - var globalAuthorizer auth.HoldAuthorizer 34 - 35 - // SetGlobalRefresher sets the global OAuth refresher instance 32 + // SetGlobalRefresher sets the OAuth refresher instance during initialization 33 + // Must be called before the registry starts serving requests 36 34 func SetGlobalRefresher(refresher *oauth.Refresher) { 37 35 globalRefresher = refresher 38 36 } 39 37 40 - // SetGlobalDatabase sets the global database instance for metrics tracking 41 - func SetGlobalDatabase(database interface { 42 - IncrementPullCount(did, repository string) error 43 - IncrementPushCount(did, repository string) error 44 - }) { 38 + // SetGlobalDatabase sets the database instance during initialization 39 + // Must be called before the registry starts serving requests 40 + func SetGlobalDatabase(database storage.DatabaseMetrics) { 45 41 globalDatabase = database 46 42 } 47 43 48 - // SetGlobalAuthorizer sets the global authorizer instance for hold access control 44 + // SetGlobalAuthorizer sets the authorizer instance during initialization 45 + // Must be called before the registry starts serving requests 49 46 func SetGlobalAuthorizer(authorizer auth.HoldAuthorizer) { 50 47 globalAuthorizer = authorizer 51 48 } ··· 59 56 type NamespaceResolver struct { 60 57 distribution.Namespace 61 58 directory identity.Directory 62 - defaultHoldDID string // Default hold DID (e.g., "did:web:hold01.atcr.io") 63 - testMode bool // If true, fallback to default hold when user's hold is unreachable 64 - repositories sync.Map // Cache of RoutingRepository instances by key (did:reponame) 59 + defaultHoldDID string // Default hold DID (e.g., "did:web:hold01.atcr.io") 60 + testMode bool // If true, fallback to default hold when user's hold is unreachable 61 + repositories sync.Map // Cache of RoutingRepository instances by key (did:reponame) 62 + refresher *oauth.Refresher // OAuth session manager (copied from global on init) 63 + database storage.DatabaseMetrics // Metrics database (copied from global on init) 64 + authorizer auth.HoldAuthorizer // Hold authorization (copied from global on init) 65 65 } 66 66 67 67 // initATProtoResolver initializes the name resolution middleware ··· 82 82 testMode = tm 83 83 } 84 84 85 + // Copy shared services from globals into the instance 86 + // This avoids accessing globals during request handling 85 87 return &NamespaceResolver{ 86 88 Namespace: ns, 87 89 directory: directory, 88 90 defaultHoldDID: defaultHoldDID, 89 91 testMode: testMode, 92 + refresher: globalRefresher, 93 + database: globalDatabase, 94 + authorizer: globalAuthorizer, 90 95 }, nil 91 96 } 92 97 ··· 155 160 // Fall back to Basic Auth token cache (for users who used app passwords) 156 161 var atprotoClient *atproto.Client 157 162 158 - if globalRefresher != nil { 163 + if nr.refresher != nil { 159 164 // Try OAuth flow first 160 - session, err := globalRefresher.GetSession(ctx, did) 165 + session, err := nr.refresher.GetSession(ctx, did) 161 166 if err == nil { 162 167 // OAuth session available - use indigo's API client (handles DPoP automatically) 163 168 apiClient := session.APIClient() ··· 194 199 195 200 // Create routing repository - routes manifests to ATProto, blobs to hold service 196 201 // The registry is stateless - no local storage is used 197 - // Pass hold DID, user DID, authorizer, and refresher as parameters (can't use context as it gets lost) 198 - routingRepo := storage.NewRoutingRepository(repo, atprotoClient, repositoryName, holdDID, did, globalDatabase, globalAuthorizer, globalRefresher) 202 + // Bundle all context into a single RegistryContext struct 203 + registryCtx := &storage.RegistryContext{ 204 + DID: did, 205 + HoldDID: holdDID, 206 + PDSEndpoint: pdsEndpoint, 207 + Repository: repositoryName, 208 + ATProtoClient: atprotoClient, 209 + Database: nr.database, 210 + Authorizer: nr.authorizer, 211 + Refresher: nr.refresher, 212 + } 213 + routingRepo := storage.NewRoutingRepository(repo, registryCtx) 199 214 200 215 // Cache the repository 201 216 nr.repositories.Store(cacheKey, routingRepo)
+29
pkg/appview/storage/context.go
··· 1 + package storage 2 + 3 + import ( 4 + "atcr.io/pkg/atproto" 5 + "atcr.io/pkg/auth" 6 + "atcr.io/pkg/auth/oauth" 7 + ) 8 + 9 + // DatabaseMetrics interface for tracking pull/push counts 10 + type DatabaseMetrics interface { 11 + IncrementPullCount(did, repository string) error 12 + IncrementPushCount(did, repository string) error 13 + } 14 + 15 + // RegistryContext bundles all the context needed for registry operations 16 + // This includes both per-request data (DID, hold) and shared services 17 + type RegistryContext struct { 18 + // Per-request identity and routing information 19 + DID string // User's DID (e.g., "did:plc:abc123") 20 + HoldDID string // Hold service DID (e.g., "did:web:hold01.atcr.io") 21 + PDSEndpoint string // User's PDS endpoint URL 22 + Repository string // Image repository name (e.g., "debian") 23 + ATProtoClient *atproto.Client // Authenticated ATProto client for this user 24 + 25 + // Shared services (same for all requests) 26 + Database DatabaseMetrics // Metrics tracking database 27 + Authorizer auth.HoldAuthorizer // Hold access authorization 28 + Refresher *oauth.Refresher // OAuth session manager 29 + }
+40 -60
pkg/appview/storage/proxy_blob_store.go
··· 11 11 "sync" 12 12 "time" 13 13 14 - "atcr.io/pkg/auth" 15 - "atcr.io/pkg/auth/oauth" 16 14 "github.com/distribution/distribution/v3" 17 15 "github.com/opencontainers/go-digest" 18 16 ) ··· 32 30 33 31 // ProxyBlobStore proxies blob requests to an external storage service 34 32 type ProxyBlobStore struct { 35 - holdDID string // Hold DID (e.g., "did:web:hold01.atcr.io") 36 - holdURL string // Resolved HTTP URL for XRPC requests 33 + ctx *RegistryContext // All context and services 34 + holdURL string // Resolved HTTP URL for XRPC requests 37 35 httpClient *http.Client 38 - did string 39 - database DatabaseMetrics 40 - repository string 41 - authorizer auth.HoldAuthorizer 42 - refresher *oauth.Refresher // OAuth refresher for authenticating to hold service 43 36 } 44 37 45 38 // NewProxyBlobStore creates a new proxy blob store 46 - func NewProxyBlobStore(holdDID, did string, database DatabaseMetrics, repository string, authorizer auth.HoldAuthorizer, refresher *oauth.Refresher) *ProxyBlobStore { 39 + func NewProxyBlobStore(ctx *RegistryContext) *ProxyBlobStore { 47 40 // Resolve DID to URL once at construction time 48 - holdURL := resolveHoldURL(holdDID) 41 + holdURL := resolveHoldURL(ctx.HoldDID) 49 42 50 43 fmt.Printf("DEBUG [proxy_blob_store]: NewProxyBlobStore created with holdDID=%s, holdURL=%s, userDID=%s, repo=%s\n", 51 - holdDID, holdURL, did, repository) 44 + ctx.HoldDID, holdURL, ctx.DID, ctx.Repository) 52 45 53 46 return &ProxyBlobStore{ 54 - holdDID: holdDID, 47 + ctx: ctx, 55 48 holdURL: holdURL, 56 49 httpClient: &http.Client{ 57 50 Timeout: 5 * time.Minute, // Timeout for presigned URL requests and uploads ··· 63 56 IdleConnTimeout: 90 * time.Second, 64 57 }, 65 58 }, 66 - did: did, 67 - database: database, 68 - repository: repository, 69 - authorizer: authorizer, 70 - refresher: refresher, 71 59 } 72 60 } 73 61 ··· 76 64 // Otherwise, uses the default httpClient without authentication 77 65 func (p *ProxyBlobStore) doAuthenticatedRequest(ctx context.Context, req *http.Request) (*http.Response, error) { 78 66 // Try to get OAuth session for DPoP authentication 79 - if p.refresher != nil { 80 - session, err := p.refresher.GetSession(ctx, p.did) 67 + if p.ctx.Refresher != nil { 68 + session, err := p.ctx.Refresher.GetSession(ctx, p.ctx.DID) 81 69 if err != nil { 82 - fmt.Printf("DEBUG [proxy_blob_store]: Failed to get OAuth session for DID=%s: %v, will attempt without auth\n", p.did, err) 70 + fmt.Printf("DEBUG [proxy_blob_store]: Failed to get OAuth session for DID=%s: %v, will attempt without auth\n", p.ctx.DID, err) 83 71 } else { 84 72 // Use session's DoWithAuth method (adds Authorization + DPoP headers) 85 - fmt.Printf("DEBUG [proxy_blob_store]: Using OAuth session for hold service request, DID=%s\n", p.did) 73 + fmt.Printf("DEBUG [proxy_blob_store]: Using OAuth session for hold service request, DID=%s\n", p.ctx.DID) 86 74 // The endpoint parameter is not used for DPoP signing, just token refresh validation 87 75 // For hold service XRPC requests, we can pass "com.atproto.repo.uploadBlob" 88 76 return session.DoWithAuth(session.Client, req, "com.atproto.repo.uploadBlob") ··· 93 81 return p.httpClient.Do(req) 94 82 } 95 83 96 - // resolveHoldURL converts a hold DID to an HTTP URL for XRPC requests 97 - // did:web:hold01.atcr.io → https://hold01.atcr.io 98 - // did:web:172.28.0.3:8080 → http://172.28.0.3:8080 99 - func resolveHoldURL(holdDID string) string { 100 - hostname := strings.TrimPrefix(holdDID, "did:web:") 101 - 102 - // Use HTTP for localhost/IP addresses with ports, HTTPS for domains 103 - if strings.Contains(hostname, ":") || 104 - strings.Contains(hostname, "127.0.0.1") || 105 - strings.Contains(hostname, "localhost") || 106 - // Check if it's an IP address (contains only digits and dots) 107 - (len(hostname) > 0 && (hostname[0] >= '0' && hostname[0] <= '9')) { 108 - return "http://" + hostname 109 - } 110 - return "https://" + hostname 111 - } 112 - 113 - // checkReadAccess verifies the user has read access to the hold 84 + // checkReadAccess validates that the user has read access to blobs in this hold 114 85 func (p *ProxyBlobStore) checkReadAccess(ctx context.Context) error { 115 - if p.authorizer == nil { 116 - // No authorizer configured - allow access (backward compatibility) 117 - return nil 86 + if p.ctx.Authorizer == nil { 87 + return nil // No authorization check if authorizer not configured 118 88 } 119 - 120 - hasAccess, err := p.authorizer.CheckReadAccess(ctx, p.holdDID, p.did) 89 + allowed, err := p.ctx.Authorizer.CheckReadAccess(ctx, p.ctx.HoldDID, p.ctx.DID) 121 90 if err != nil { 122 91 return fmt.Errorf("authorization check failed: %w", err) 123 92 } 124 - 125 - if !hasAccess { 93 + if !allowed { 126 94 return distribution.ErrBlobUnknown // Return same error as missing blob for security 127 95 } 128 - 129 96 return nil 130 97 } 131 98 132 - // checkWriteAccess verifies the user has write access to the hold 99 + // checkWriteAccess validates that the user has write access to blobs in this hold 133 100 func (p *ProxyBlobStore) checkWriteAccess(ctx context.Context) error { 134 - if p.authorizer == nil { 135 - // No authorizer configured - allow access (backward compatibility) 136 - return nil 101 + if p.ctx.Authorizer == nil { 102 + return nil // No authorization check if authorizer not configured 137 103 } 138 - 139 - hasAccess, err := p.authorizer.CheckWriteAccess(ctx, p.holdDID, p.did) 104 + allowed, err := p.ctx.Authorizer.CheckWriteAccess(ctx, p.ctx.HoldDID, p.ctx.DID) 140 105 if err != nil { 141 106 return fmt.Errorf("authorization check failed: %w", err) 142 107 } 143 - 144 - if !hasAccess { 145 - return fmt.Errorf("write access denied to hold %s", p.holdDID) 108 + if !allowed { 109 + return fmt.Errorf("write access denied to hold %s", p.ctx.HoldDID) 146 110 } 111 + return nil 112 + } 147 113 148 - return nil 114 + // resolveHoldURL converts a hold DID to an HTTP URL for XRPC requests 115 + // did:web:hold01.atcr.io → https://hold01.atcr.io 116 + // did:web:172.28.0.3:8080 → http://172.28.0.3:8080 117 + func resolveHoldURL(holdDID string) string { 118 + hostname := strings.TrimPrefix(holdDID, "did:web:") 119 + 120 + // Use HTTP for localhost/IP addresses with ports, HTTPS for domains 121 + if strings.Contains(hostname, ":") || 122 + strings.Contains(hostname, "127.0.0.1") || 123 + strings.Contains(hostname, "localhost") || 124 + // Check if it's an IP address (contains only digits and dots) 125 + (len(hostname) > 0 && (hostname[0] >= '0' && hostname[0] <= '9')) { 126 + return "http://" + hostname 127 + } 128 + return "https://" + hostname 149 129 } 150 130 151 131 // Stat returns the descriptor for a blob ··· 390 370 // Use XRPC endpoint: GET /xrpc/com.atproto.sync.getBlob?did={holdDID}&cid={digest} 391 371 // Per migration doc: hold accepts OCI digest directly as cid parameter (checks for sha256: prefix) 392 372 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 393 - p.holdURL, p.holdDID, dgst.String()) 373 + p.holdURL, p.ctx.HoldDID, dgst.String()) 394 374 return url, nil 395 375 } 396 376 ··· 399 379 func (p *ProxyBlobStore) getHeadURL(ctx context.Context, dgst digest.Digest) (string, error) { 400 380 // Same as GET - hold service handles HEAD method on getBlob endpoint 401 381 url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 402 - p.holdURL, p.holdDID, dgst.String()) 382 + p.holdURL, p.ctx.HoldDID, dgst.String()) 403 383 return url, nil 404 384 } 405 385
+29 -47
pkg/appview/storage/routing_repository.go
··· 6 6 "time" 7 7 8 8 "atcr.io/pkg/atproto" 9 - "atcr.io/pkg/auth" 10 - "atcr.io/pkg/auth/oauth" 11 9 "github.com/distribution/distribution/v3" 12 10 ) 13 11 14 - // DatabaseMetrics interface for tracking pull/push counts 15 - type DatabaseMetrics interface { 16 - IncrementPullCount(did, repository string) error 17 - IncrementPushCount(did, repository string) error 18 - } 19 - 20 12 // RoutingRepository routes manifests to ATProto and blobs to external hold service 21 13 // The registry (AppView) is stateless and NEVER stores blobs locally 22 14 type RoutingRepository struct { 23 15 distribution.Repository 24 - atprotoClient *atproto.Client 25 - repositoryName string 26 - holdDID string // Hold service DID for blobs (from discovery for push), e.g., "did:web:hold01.atcr.io" 27 - did string // User's DID for authorization 28 - manifestStore *atproto.ManifestStore // Cached manifest store instance 29 - blobStore *ProxyBlobStore // Cached blob store instance 30 - database DatabaseMetrics // Database for metrics tracking 31 - authorizer auth.HoldAuthorizer // Authorization for hold access 32 - refresher *oauth.Refresher // OAuth refresher for authenticating to hold service 16 + ctx *RegistryContext // All context and services 17 + manifestStore *atproto.ManifestStore // Cached manifest store instance 18 + blobStore *ProxyBlobStore // Cached blob store instance 33 19 } 34 20 35 21 // NewRoutingRepository creates a new routing repository 36 - func NewRoutingRepository( 37 - baseRepo distribution.Repository, 38 - atprotoClient *atproto.Client, 39 - repoName string, 40 - holdDID string, 41 - did string, 42 - database DatabaseMetrics, 43 - authorizer auth.HoldAuthorizer, 44 - refresher *oauth.Refresher, 45 - ) *RoutingRepository { 22 + func NewRoutingRepository(baseRepo distribution.Repository, ctx *RegistryContext) *RoutingRepository { 46 23 return &RoutingRepository{ 47 - Repository: baseRepo, 48 - atprotoClient: atprotoClient, 49 - repositoryName: repoName, 50 - holdDID: holdDID, 51 - did: did, 52 - database: database, 53 - authorizer: authorizer, 54 - refresher: refresher, 24 + Repository: baseRepo, 25 + ctx: ctx, 55 26 } 56 27 } 57 28 ··· 64 35 65 36 // ManifestStore needs both DID and URL for backward compat (legacy holdEndpoint field) 66 37 // For now, pass holdDID twice (will be cleaned up in manifest_store.go later) 67 - r.manifestStore = atproto.NewManifestStore(r.atprotoClient, r.repositoryName, r.holdDID, r.holdDID, r.did, blobStore, r.database) 38 + r.manifestStore = atproto.NewManifestStore( 39 + r.ctx.ATProtoClient, 40 + r.ctx.Repository, 41 + r.ctx.HoldDID, 42 + r.ctx.HoldDID, 43 + r.ctx.DID, 44 + blobStore, 45 + r.ctx.Database, 46 + ) 68 47 } 69 48 70 49 // After any manifest operation, cache the hold DID for blob fetches ··· 73 52 time.Sleep(100 * time.Millisecond) // Brief delay to let manifest fetch complete 74 53 if holdDID := r.manifestStore.GetLastFetchedHoldDID(); holdDID != "" { 75 54 // Cache for 10 minutes - should cover typical pull operations 76 - GetGlobalHoldCache().Set(r.did, r.repositoryName, holdDID, 10*time.Minute) 55 + GetGlobalHoldCache().Set(r.ctx.DID, r.ctx.Repository, holdDID, 10*time.Minute) 77 56 fmt.Printf("DEBUG [storage/routing]: Cached hold DID: did=%s, repo=%s, hold=%s\n", 78 - r.did, r.repositoryName, holdDID) 57 + r.ctx.DID, r.ctx.Repository, holdDID) 79 58 } 80 59 }() 81 60 ··· 88 67 // Return cached blob store if available 89 68 if r.blobStore != nil { 90 69 fmt.Printf("DEBUG [storage/blobs]: Returning cached blob store for did=%s, repo=%s\n", 91 - r.did, r.repositoryName) 70 + r.ctx.DID, r.ctx.Repository) 92 71 return r.blobStore 93 72 } 94 73 95 74 // For pull operations, check if we have a cached hold DID from a recent manifest fetch 96 75 // This ensures blobs are fetched from the hold recorded in the manifest, not re-discovered 97 - holdDID := r.holdDID // Default to discovery-based DID 76 + holdDID := r.ctx.HoldDID // Default to discovery-based DID 98 77 99 - if cachedHoldDID, ok := GetGlobalHoldCache().Get(r.did, r.repositoryName); ok { 78 + if cachedHoldDID, ok := GetGlobalHoldCache().Get(r.ctx.DID, r.ctx.Repository); ok { 100 79 // Use cached hold DID from manifest 101 80 holdDID = cachedHoldDID 102 81 fmt.Printf("DEBUG [storage/blobs]: Using cached hold from manifest: did=%s, repo=%s, hold=%s\n", 103 - r.did, r.repositoryName, cachedHoldDID) 82 + r.ctx.DID, r.ctx.Repository, cachedHoldDID) 104 83 } else { 105 84 // No cached hold, use discovery-based DID (for push or first pull) 106 85 fmt.Printf("DEBUG [storage/blobs]: Using discovery-based hold: did=%s, repo=%s, hold=%s\n", 107 - r.did, r.repositoryName, holdDID) 86 + r.ctx.DID, r.ctx.Repository, holdDID) 108 87 } 109 88 110 89 if holdDID == "" { 111 90 // This should never happen if middleware is configured correctly 112 - panic("hold DID not set in RoutingRepository - ensure default_hold_did is configured in middleware") 91 + panic("hold DID not set in RegistryContext - ensure default_hold_did is configured in middleware") 113 92 } 114 93 115 - // Create and cache proxy blob store with authorization and OAuth refresher 116 - r.blobStore = NewProxyBlobStore(holdDID, r.did, r.database, r.repositoryName, r.authorizer, r.refresher) 94 + // Update context with the correct hold DID (may be cached or discovered) 95 + r.ctx.HoldDID = holdDID 96 + 97 + // Create and cache proxy blob store 98 + r.blobStore = NewProxyBlobStore(r.ctx) 117 99 return r.blobStore 118 100 } 119 101 120 102 // Tags returns the tag service 121 103 // Tags are stored in ATProto as io.atcr.tag records 122 104 func (r *RoutingRepository) Tags(ctx context.Context) distribution.TagService { 123 - return atproto.NewTagStore(r.atprotoClient, r.repositoryName) 105 + return atproto.NewTagStore(r.ctx.ATProtoClient, r.ctx.Repository) 124 106 }