A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
81
fork

Configure Feed

Select the types of activity you want to include in your feed.

general bug fixes

+1095 -446
+3 -1
.tangled/workflows/tests.yml
··· 8 8 9 9 dependencies: 10 10 nixpkgs: 11 - - git 11 + - gcc 12 12 - go 13 13 14 14 steps: 15 15 - name: Run Tests 16 + environment: 17 + CGO_ENABLED: 1 16 18 command: | 17 19 go test -cover ./...
+5
cmd/appview/serve.go
··· 151 151 // Create oauth token refresher 152 152 refresher := oauth.NewRefresher(oauthApp) 153 153 154 + // Wire up UI session store to refresher so it can invalidate UI sessions on OAuth failures 155 + if uiSessionStore != nil { 156 + refresher.SetUISessionStore(uiSessionStore) 157 + } 158 + 154 159 // Set global refresher for middleware 155 160 middleware.SetGlobalRefresher(refresher) 156 161
+3 -2
cmd/oauth-helper/main.go
··· 11 11 "os" 12 12 "time" 13 13 14 + "atcr.io/pkg/atproto" 14 15 "atcr.io/pkg/auth/oauth" 15 16 indigo_oauth "github.com/bluesky-social/indigo/atproto/auth/oauth" 16 17 ) ··· 108 109 109 110 // Generate DPoP proof for deleteRecord endpoint if all params provided 110 111 if *repo != "" && *rkey != "" { 111 - deleteURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord?repo=%s&collection=%s&rkey=%s", 112 - *holdURL, *repo, *collection, *rkey) 112 + deleteURL := fmt.Sprintf("%s%s?repo=%s&collection=%s&rkey=%s", 113 + *holdURL, atproto.RepoDeleteRecord, *repo, *collection, *rkey) 113 114 114 115 dpopProof, err := generateDPoPProof(result.Session, "POST", deleteURL) 115 116 if err != nil {
+1 -6
pkg/appview/config.go
··· 41 41 config.Middleware = buildMiddlewareConfig(defaultHoldDID) 42 42 43 43 // Auth 44 - baseURL := getBaseURL(httpConfig.Addr) 44 + baseURL := GetBaseURL(httpConfig.Addr) 45 45 authConfig, err := buildAuthConfig(baseURL) 46 46 if err != nil { 47 47 return nil, fmt.Errorf("failed to build auth config: %w", err) ··· 198 198 199 199 // Full address provided 200 200 return fmt.Sprintf("http://%s", httpAddr) 201 - } 202 - 203 - // getBaseURL is the internal version used by buildAuthConfig 204 - func getBaseURL(httpAddr string) string { 205 - return GetBaseURL(httpAddr) 206 201 } 207 202 208 203 // getServiceName extracts service name from base URL or uses env var
+18
pkg/appview/db/session_store.go
··· 128 128 } 129 129 } 130 130 131 + // DeleteByDID removes all sessions for a given DID 132 + // This is useful when OAuth refresh fails and we need to force re-authentication 133 + func (s *SessionStore) DeleteByDID(did string) { 134 + result, err := s.db.Exec(` 135 + DELETE FROM ui_sessions WHERE did = ? 136 + `, did) 137 + 138 + if err != nil { 139 + fmt.Printf("Warning: Failed to delete sessions for DID %s: %v\n", did, err) 140 + return 141 + } 142 + 143 + deleted, _ := result.RowsAffected() 144 + if deleted > 0 { 145 + fmt.Printf("Deleted %d UI session(s) for DID %s due to OAuth failure\n", deleted, did) 146 + } 147 + } 148 + 131 149 // Cleanup removes expired sessions 132 150 func (s *SessionStore) Cleanup() { 133 151 result, err := s.db.Exec(`
+108 -2
pkg/appview/middleware/registry.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "fmt" 7 + "io" 8 + "net/http" 9 + "net/url" 7 10 "strings" 8 11 "sync" 12 + "time" 9 13 10 14 "github.com/bluesky-social/indigo/atproto/identity" 11 15 "github.com/bluesky-social/indigo/atproto/syntax" 12 16 "github.com/distribution/distribution/v3" 17 + "github.com/distribution/distribution/v3/registry/api/errcode" 13 18 registrymw "github.com/distribution/distribution/v3/registry/middleware/registry" 14 19 "github.com/distribution/distribution/v3/registry/storage/driver" 15 20 "github.com/distribution/reference" ··· 18 23 "atcr.io/pkg/atproto" 19 24 "atcr.io/pkg/auth" 20 25 "atcr.io/pkg/auth/oauth" 26 + "atcr.io/pkg/auth/token" 21 27 ) 22 28 23 29 // Global variables for initialization only ··· 140 146 } 141 147 ctx = context.WithValue(ctx, "hold.did", holdDID) 142 148 149 + // Get service token for hold authentication 150 + // Check cache first to avoid unnecessary PDS calls on every request 151 + var serviceToken string 152 + if nr.refresher != nil { 153 + cachedToken, expiresAt := token.GetServiceToken(did, holdDID) 154 + 155 + // Use cached token if it exists and has > 10s remaining 156 + if cachedToken != "" && time.Until(expiresAt) > 10*time.Second { 157 + fmt.Printf("DEBUG [registry/middleware]: Using cached service token for DID=%s (expires in %v)\n", 158 + did, time.Until(expiresAt).Round(time.Second)) 159 + serviceToken = cachedToken 160 + } else { 161 + // Cache miss or expiring soon - validate OAuth and get new service token 162 + if cachedToken == "" { 163 + fmt.Printf("DEBUG [registry/middleware]: Cache miss, fetching service token for DID=%s\n", did) 164 + } else { 165 + fmt.Printf("DEBUG [registry/middleware]: Token expiring soon, proactively renewing for DID=%s\n", did) 166 + } 167 + 168 + session, err := nr.refresher.GetSession(ctx, did) 169 + if err != nil { 170 + // OAuth session unavailable - fail fast with proper auth error 171 + nr.refresher.InvalidateSession(did) 172 + token.InvalidateServiceToken(did, holdDID) 173 + fmt.Printf("ERROR [registry/middleware]: Failed to get OAuth session for DID=%s: %v\n", did, err) 174 + fmt.Printf("ERROR [registry/middleware]: User needs to re-authenticate via credential helper\n") 175 + return nil, errcode.ErrorCodeUnauthorized.WithDetail("OAuth session expired - please re-authenticate") 176 + } 177 + 178 + // Call com.atproto.server.getServiceAuth on the user's PDS 179 + // Request 5-minute expiry (PDS may grant less) 180 + // exp must be absolute Unix timestamp, not relative duration 181 + expiryTime := time.Now().Unix() + 300 // 5 minutes from now 182 + serviceAuthURL := fmt.Sprintf("%s%s?aud=%s&lxm=%s&exp=%d", 183 + pdsEndpoint, 184 + atproto.ServerGetServiceAuth, 185 + url.QueryEscape(holdDID), 186 + url.QueryEscape("com.atproto.repo.getRecord"), 187 + expiryTime, 188 + ) 189 + 190 + req, err := http.NewRequestWithContext(ctx, "GET", serviceAuthURL, nil) 191 + if err != nil { 192 + fmt.Printf("ERROR [registry/middleware]: Failed to create service auth request: %v\n", err) 193 + return nil, errcode.ErrorCodeUnauthorized.WithDetail("OAuth session validation failed") 194 + } 195 + 196 + // Use OAuth session to authenticate to PDS (with DPoP) 197 + resp, err := session.DoWithAuth(session.Client, req, "com.atproto.server.getServiceAuth") 198 + if err != nil { 199 + // Invalidate session on auth errors (may indicate corrupted session or expired tokens) 200 + nr.refresher.InvalidateSession(did) 201 + token.InvalidateServiceToken(did, holdDID) 202 + fmt.Printf("ERROR [registry/middleware]: OAuth validation failed for DID=%s: %v\n", did, err) 203 + fmt.Printf("ERROR [registry/middleware]: User needs to re-authenticate via credential helper\n") 204 + return nil, errcode.ErrorCodeUnauthorized.WithDetail("OAuth session expired - please re-authenticate") 205 + } 206 + defer resp.Body.Close() 207 + 208 + if resp.StatusCode != http.StatusOK { 209 + // Invalidate session on auth failures 210 + bodyBytes, _ := io.ReadAll(resp.Body) 211 + nr.refresher.InvalidateSession(did) 212 + token.InvalidateServiceToken(did, holdDID) 213 + fmt.Printf("ERROR [registry/middleware]: OAuth validation failed for DID=%s: status %d, body: %s\n", 214 + did, resp.StatusCode, string(bodyBytes)) 215 + fmt.Printf("ERROR [registry/middleware]: User needs to re-authenticate via credential helper\n") 216 + return nil, errcode.ErrorCodeUnauthorized.WithDetail("OAuth session expired - please re-authenticate") 217 + } 218 + 219 + // Parse response to get service token 220 + var result struct { 221 + Token string `json:"token"` 222 + } 223 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 224 + fmt.Printf("ERROR [registry/middleware]: Failed to decode service auth response: %v\n", err) 225 + return nil, errcode.ErrorCodeUnauthorized.WithDetail("OAuth session validation failed") 226 + } 227 + 228 + if result.Token == "" { 229 + fmt.Printf("ERROR [registry/middleware]: Empty token in service auth response\n") 230 + return nil, errcode.ErrorCodeUnauthorized.WithDetail("OAuth session validation failed") 231 + } 232 + 233 + serviceToken = result.Token 234 + 235 + // Cache the token (parses JWT to extract actual expiry) 236 + if err := token.SetServiceToken(did, holdDID, serviceToken); err != nil { 237 + fmt.Printf("WARN [registry/middleware]: Failed to cache service token: %v\n", err) 238 + // Non-fatal - we have the token, just won't be cached 239 + } 240 + 241 + fmt.Printf("DEBUG [registry/middleware]: OAuth validation succeeded for DID=%s\n", did) 242 + } 243 + } 244 + 143 245 // Create a new reference with identity/image format 144 246 // Use the identity (or DID) as the namespace to ensure canonical format 145 247 // This transforms: evan.jarrett.net/debian -> evan.jarrett.net/debian (keeps full path) ··· 192 294 // Cache key is DID + repository name 193 295 cacheKey := did + ":" + repositoryName 194 296 195 - // Check cache first 297 + // Check cache first and update service token 196 298 if cached, ok := nr.repositories.Load(cacheKey); ok { 197 - return cached.(*storage.RoutingRepository), nil 299 + cachedRepo := cached.(*storage.RoutingRepository) 300 + // Always update the service token even for cached repos (token may have been renewed) 301 + cachedRepo.Ctx.ServiceToken = serviceToken 302 + return cachedRepo, nil 198 303 } 199 304 200 305 // Create routing repository - routes manifests to ATProto, blobs to hold service ··· 205 310 HoldDID: holdDID, 206 311 PDSEndpoint: pdsEndpoint, 207 312 Repository: repositoryName, 313 + ServiceToken: serviceToken, // Cached service token from middleware validation 208 314 ATProtoClient: atprotoClient, 209 315 Database: nr.database, 210 316 Authorizer: nr.authorizer,
+1
pkg/appview/storage/context.go
··· 20 20 HoldDID string // Hold service DID (e.g., "did:web:hold01.atcr.io") 21 21 PDSEndpoint string // User's PDS endpoint URL 22 22 Repository string // Image repository name (e.g., "debian") 23 + ServiceToken string // Service token for hold authentication (cached by middleware) 23 24 ATProtoClient *atproto.Client // Authenticated ATProto client for this user 24 25 25 26 // Shared services (same for all requests)
+17 -106
pkg/appview/storage/proxy_blob_store.go
··· 7 7 "fmt" 8 8 "io" 9 9 "net/http" 10 - "net/url" 11 10 "sync" 12 11 "time" 13 12 14 13 "atcr.io/pkg/appview" 15 14 "atcr.io/pkg/atproto" 16 15 "github.com/distribution/distribution/v3" 16 + "github.com/distribution/distribution/v3/registry/api/errcode" 17 17 "github.com/opencontainers/go-digest" 18 18 ) 19 19 ··· 30 30 globalUploadsMu sync.RWMutex 31 31 ) 32 32 33 - // Service token cache entry 34 - type serviceTokenEntry struct { 35 - token string 36 - expiresAt time.Time 37 - } 38 - 39 - // Global service token cache (shared across all ProxyBlobStore instances) 40 - // Cache key: "userDID:holdDID" 41 - // Tokens are valid for 60 seconds from PDS, we cache for 50 seconds to be safe 42 - var ( 43 - globalServiceTokens = make(map[string]*serviceTokenEntry) 44 - globalServiceTokensMu sync.RWMutex 45 - ) 46 - 47 33 // ProxyBlobStore proxies blob requests to an external storage service 48 34 type ProxyBlobStore struct { 49 35 ctx *RegistryContext // All context and services ··· 75 61 } 76 62 } 77 63 78 - // getServiceToken gets a service token for the hold service from the user's PDS 79 - // Uses com.atproto.server." endpoint 80 - // Tokens are cached for 50 seconds (they're valid for 60 seconds from PDS) 81 - func (p *ProxyBlobStore) getServiceToken(ctx context.Context) (string, error) { 82 - // Check cache first 83 - cacheKey := p.ctx.DID + ":" + p.ctx.HoldDID 84 - globalServiceTokensMu.RLock() 85 - entry, exists := globalServiceTokens[cacheKey] 86 - globalServiceTokensMu.RUnlock() 87 - 88 - if exists && time.Now().Before(entry.expiresAt) { 89 - fmt.Printf("DEBUG [proxy_blob_store]: Using cached service token for %s\n", cacheKey) 90 - return entry.token, nil 91 - } 92 - 93 - // No valid cached token, request a new one from PDS 94 - if p.ctx.Refresher == nil { 95 - return "", fmt.Errorf("no OAuth refresher available for service token request") 96 - } 97 - 98 - session, err := p.ctx.Refresher.GetSession(ctx, p.ctx.DID) 99 - if err != nil { 100 - return "", fmt.Errorf("failed to get OAuth session: %w", err) 101 - } 102 - 103 - // Call com.atproto.server.getServiceAuth on the user's PDS 104 - // Include lxm (lexicon scope) and exp (expiration) parameters 105 - pdsURL := p.ctx.PDSEndpoint 106 - serviceAuthURL := fmt.Sprintf("%s/xrpc/com.atproto.server.getServiceAuth?aud=%s&lxm=%s", 107 - pdsURL, 108 - url.QueryEscape(p.ctx.HoldDID), 109 - url.QueryEscape("com.atproto.repo.getRecord"), 110 - ) 111 - 112 - req, err := http.NewRequestWithContext(ctx, "GET", serviceAuthURL, nil) 113 - if err != nil { 114 - return "", fmt.Errorf("failed to create service auth request: %w", err) 115 - } 116 - 117 - // Use OAuth session to authenticate to PDS (with DPoP) 118 - resp, err := session.DoWithAuth(session.Client, req, "com.atproto.server.getServiceAuth") 119 - if err != nil { 120 - return "", fmt.Errorf("failed to call getServiceAuth: %w", err) 121 - } 122 - defer resp.Body.Close() 123 - 124 - if resp.StatusCode != http.StatusOK { 125 - bodyBytes, _ := io.ReadAll(resp.Body) 126 - return "", fmt.Errorf("getServiceAuth failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 127 - } 128 - 129 - // Parse response 130 - var result struct { 131 - Token string `json:"token"` 132 - } 133 - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 134 - return "", fmt.Errorf("failed to decode service auth response: %w", err) 135 - } 136 - 137 - if result.Token == "" { 138 - return "", fmt.Errorf("empty token in service auth response") 139 - } 140 - 141 - fmt.Printf("DEBUG [proxy_blob_store]: Got new service token for %s (length=%d)\n", cacheKey, len(result.Token)) 142 - 143 - // Cache the token (expires in 50 seconds) 144 - globalServiceTokensMu.Lock() 145 - globalServiceTokens[cacheKey] = &serviceTokenEntry{ 146 - token: result.Token, 147 - expiresAt: time.Now().Add(50 * time.Second), 148 - } 149 - globalServiceTokensMu.Unlock() 150 - 151 - return result.Token, nil 152 - } 153 - 154 64 // doAuthenticatedRequest performs an HTTP request with service token authentication 155 - // Gets a service token from the user's PDS and uses it to authenticate to the hold service 65 + // Uses the service token from middleware to authenticate requests to the hold service 156 66 func (p *ProxyBlobStore) doAuthenticatedRequest(ctx context.Context, req *http.Request) (*http.Response, error) { 157 - // Get service token for the hold service 158 - serviceToken, err := p.getServiceToken(ctx) 159 - if err != nil { 160 - fmt.Printf("DEBUG [proxy_blob_store]: Failed to get service token for DID=%s: %v, will attempt without auth\n", p.ctx.DID, err) 161 - // Fall back to non-authenticated request 162 - return p.httpClient.Do(req) 67 + // Use service token that middleware already validated and cached 68 + // Middleware fails fast with HTTP 401 if OAuth session is invalid 69 + if p.ctx.ServiceToken == "" { 70 + // Should never happen - middleware validates OAuth before handlers run 71 + fmt.Printf("ERROR [proxy_blob_store]: No service token in context for DID=%s\n", p.ctx.DID) 72 + return nil, fmt.Errorf("no service token available (middleware should have validated)") 163 73 } 164 74 165 75 // Add Bearer token to Authorization header 166 - req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", serviceToken)) 167 - fmt.Printf("DEBUG [proxy_blob_store]: Using service token for hold service request, DID=%s\n", p.ctx.DID) 76 + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p.ctx.ServiceToken)) 168 77 169 78 return p.httpClient.Do(req) 170 79 } ··· 408 317 // Start multipart upload via hold service 409 318 uploadID, err := p.startMultipartUpload(ctx, tempDigest) 410 319 if err != nil { 411 - return nil, fmt.Errorf("failed to start multipart upload: %w", err) 320 + return nil, err 412 321 } 413 - 414 - fmt.Printf(" Started multipart upload: uploadID=%s\n", uploadID) 415 322 416 323 writer := &ProxyBlobWriter{ 417 324 store: p, ··· 452 359 // Use XRPC endpoint: /xrpc/com.atproto.sync.getBlob?did={userDID}&cid={digest} 453 360 // The 'did' parameter is the USER's DID (whose blob we're fetching), not the hold service DID 454 361 // Per migration doc: hold accepts OCI digest directly as cid parameter (checks for sha256: prefix) 455 - xrpcURL := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s&method=%s", 456 - p.holdURL, p.ctx.DID, dgst.String(), operation) 362 + xrpcURL := fmt.Sprintf("%s%s?did=%s&cid=%s&method=%s", 363 + p.holdURL, atproto.SyncGetBlob, p.ctx.DID, dgst.String(), operation) 457 364 458 365 req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 459 366 if err != nil { ··· 462 369 463 370 resp, err := p.doAuthenticatedRequest(ctx, req) 464 371 if err != nil { 465 - return "", fmt.Errorf("failed to call hold service: %w", err) 372 + // Don't wrap errcode errors - return them directly 373 + if _, ok := err.(errcode.Error); ok { 374 + return "", err 375 + } 376 + return "", fmt.Errorf("failed to get presigned URL: %w", err) 466 377 } 467 378 defer resp.Body.Close() 468 379
+124 -178
pkg/appview/storage/proxy_blob_store_test.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/base64" 5 6 "encoding/json" 7 + "fmt" 6 8 "net/http" 7 9 "net/http/httptest" 8 10 "strings" ··· 10 12 "time" 11 13 12 14 "atcr.io/pkg/atproto" 15 + "atcr.io/pkg/auth/token" 13 16 "github.com/opencontainers/go-digest" 14 17 ) 15 18 16 19 // TestGetServiceToken_CachingLogic tests the token caching mechanism 17 20 func TestGetServiceToken_CachingLogic(t *testing.T) { 18 - // Clear cache before test 19 - globalServiceTokensMu.Lock() 20 - globalServiceTokens = make(map[string]*serviceTokenEntry) 21 - globalServiceTokensMu.Unlock() 21 + userDID := "did:plc:test" 22 + holdDID := "did:web:hold.example.com" 22 23 23 - // Test 1: Empty cache 24 - cacheKey := "did:plc:test:did:web:hold.example.com" 25 - globalServiceTokensMu.RLock() 26 - _, exists := globalServiceTokens[cacheKey] 27 - globalServiceTokensMu.RUnlock() 28 - 29 - if exists { 24 + // Test 1: Empty cache - invalidate any existing token 25 + token.InvalidateServiceToken(userDID, holdDID) 26 + cachedToken, _ := token.GetServiceToken(userDID, holdDID) 27 + if cachedToken != "" { 30 28 t.Error("Expected empty cache at start") 31 29 } 32 30 33 31 // Test 2: Insert token into cache 34 - testToken := "test-token-12345" 35 - expiresAt := time.Now().Add(50 * time.Second) 32 + // Create a JWT-like token with exp claim for testing 33 + // Format: header.payload.signature where payload has exp claim 34 + testPayload := fmt.Sprintf(`{"exp":%d}`, time.Now().Add(50*time.Second).Unix()) 35 + testToken := "eyJhbGciOiJIUzI1NiJ9." + base64URLEncode(testPayload) + ".signature" 36 36 37 - globalServiceTokensMu.Lock() 38 - globalServiceTokens[cacheKey] = &serviceTokenEntry{ 39 - token: testToken, 40 - expiresAt: expiresAt, 37 + err := token.SetServiceToken(userDID, holdDID, testToken) 38 + if err != nil { 39 + t.Fatalf("Failed to set service token: %v", err) 41 40 } 42 - globalServiceTokensMu.Unlock() 43 41 44 42 // Test 3: Retrieve from cache 45 - globalServiceTokensMu.RLock() 46 - entry, exists := globalServiceTokens[cacheKey] 47 - globalServiceTokensMu.RUnlock() 48 - 49 - if !exists { 43 + cachedToken, expiresAt := token.GetServiceToken(userDID, holdDID) 44 + if cachedToken == "" { 50 45 t.Fatal("Expected token to be in cache") 51 46 } 52 47 53 - if entry.token != testToken { 54 - t.Errorf("Expected token %s, got %s", testToken, entry.token) 48 + if cachedToken != testToken { 49 + t.Errorf("Expected token %s, got %s", testToken, cachedToken) 55 50 } 56 51 57 - if time.Now().After(entry.expiresAt) { 52 + if time.Now().After(expiresAt) { 58 53 t.Error("Expected token to not be expired") 59 54 } 60 55 61 - // Test 4: Expired token 62 - globalServiceTokensMu.Lock() 63 - globalServiceTokens[cacheKey] = &serviceTokenEntry{ 64 - token: "expired-token", 65 - expiresAt: time.Now().Add(-1 * time.Hour), 66 - } 67 - globalServiceTokensMu.Unlock() 68 - 69 - globalServiceTokensMu.RLock() 70 - expiredEntry := globalServiceTokens[cacheKey] 71 - globalServiceTokensMu.RUnlock() 56 + // Test 4: Expired token - GetServiceToken automatically removes it 57 + expiredPayload := fmt.Sprintf(`{"exp":%d}`, time.Now().Add(-1*time.Hour).Unix()) 58 + expiredToken := "eyJhbGciOiJIUzI1NiJ9." + base64URLEncode(expiredPayload) + ".signature" 59 + token.SetServiceToken(userDID, holdDID, expiredToken) 72 60 73 - if !time.Now().After(expiredEntry.expiresAt) { 74 - t.Error("Expected token to be expired") 61 + // GetServiceToken should return empty string for expired token 62 + cachedToken, _ = token.GetServiceToken(userDID, holdDID) 63 + if cachedToken != "" { 64 + t.Error("Expected expired token to be removed from cache") 75 65 } 76 66 } 77 67 78 - // TestGetServiceToken_NoRefresher tests that getServiceToken returns error when refresher is nil 79 - func TestGetServiceToken_NoRefresher(t *testing.T) { 68 + // base64URLEncode helper for creating test JWT tokens 69 + func base64URLEncode(data string) string { 70 + return strings.TrimRight(base64.URLEncoding.EncodeToString([]byte(data)), "=") 71 + } 72 + 73 + // TestServiceToken_EmptyInContext tests that operations fail when service token is missing 74 + func TestServiceToken_EmptyInContext(t *testing.T) { 80 75 ctx := &RegistryContext{ 81 - DID: "did:plc:test", 82 - HoldDID: "did:web:hold.example.com", 83 - PDSEndpoint: "https://pds.example.com", 84 - Repository: "test-repo", 85 - Refresher: nil, // No refresher 76 + DID: "did:plc:test", 77 + HoldDID: "did:web:hold.example.com", 78 + PDSEndpoint: "https://pds.example.com", 79 + Repository: "test-repo", 80 + ServiceToken: "", // No service token (middleware didn't set it) 81 + Refresher: nil, 86 82 } 87 83 88 84 store := NewProxyBlobStore(ctx) 89 85 90 - // Clear cache to force token fetch attempt 91 - globalServiceTokensMu.Lock() 92 - delete(globalServiceTokens, "did:plc:test:did:web:hold.example.com") 93 - globalServiceTokensMu.Unlock() 86 + // Try a write operation that requires authentication 87 + testDigest := digest.FromString("test-content") 88 + _, err := store.Stat(context.Background(), testDigest) 94 89 95 - _, err := store.getServiceToken(context.Background()) 90 + // Should fail because no service token is available 96 91 if err == nil { 97 - t.Error("Expected error when refresher is nil") 92 + t.Error("Expected error when service token is empty") 98 93 } 99 94 100 - if !strings.Contains(err.Error(), "no OAuth refresher") { 101 - t.Errorf("Expected error about no OAuth refresher, got: %v", err) 95 + // Error should indicate authentication issue 96 + if !strings.Contains(err.Error(), "UNAUTHORIZED") && !strings.Contains(err.Error(), "authentication") { 97 + t.Logf("Got error (acceptable): %v", err) 102 98 } 103 99 } 104 100 105 101 // TestDoAuthenticatedRequest_BearerTokenInjection tests that Bearer tokens are added to requests 106 102 func TestDoAuthenticatedRequest_BearerTokenInjection(t *testing.T) { 107 - // This test verifies the Bearer token injection logic when a token is cached 103 + // This test verifies the Bearer token injection logic 108 104 109 - // Setup: Create a cached token 110 - testToken := "cached-bearer-token-xyz" 111 - cacheKey := "did:plc:bearer-test:did:web:hold.example.com" 112 - 113 - globalServiceTokensMu.Lock() 114 - globalServiceTokens[cacheKey] = &serviceTokenEntry{ 115 - token: testToken, 116 - expiresAt: time.Now().Add(50 * time.Second), 117 - } 118 - globalServiceTokensMu.Unlock() 105 + testToken := "test-bearer-token-xyz" 119 106 120 107 // Create a test server to verify the Authorization header 121 108 var receivedAuthHeader string ··· 125 112 })) 126 113 defer testServer.Close() 127 114 128 - // Create ProxyBlobStore with cached token 115 + // Create ProxyBlobStore with service token in context (set by middleware) 129 116 ctx := &RegistryContext{ 130 - DID: "did:plc:bearer-test", 131 - HoldDID: "did:web:hold.example.com", 132 - PDSEndpoint: "https://pds.example.com", 133 - Repository: "test-repo", 134 - Refresher: nil, // Will use cached token, so refresher not needed 117 + DID: "did:plc:bearer-test", 118 + HoldDID: "did:web:hold.example.com", 119 + PDSEndpoint: "https://pds.example.com", 120 + Repository: "test-repo", 121 + ServiceToken: testToken, // Service token from middleware 122 + Refresher: nil, 135 123 } 136 124 137 125 store := NewProxyBlobStore(ctx) ··· 156 144 } 157 145 } 158 146 159 - // TestDoAuthenticatedRequest_FallbackWhenTokenUnavailable tests fallback to non-auth 160 - func TestDoAuthenticatedRequest_FallbackWhenTokenUnavailable(t *testing.T) { 161 - // Clear cache 162 - cacheKey := "did:plc:fallback:did:web:hold.example.com" 163 - globalServiceTokensMu.Lock() 164 - delete(globalServiceTokens, cacheKey) 165 - globalServiceTokensMu.Unlock() 166 - 167 - // Create test server 147 + // TestDoAuthenticatedRequest_ErrorWhenTokenUnavailable tests that authentication failures return proper errors 148 + func TestDoAuthenticatedRequest_ErrorWhenTokenUnavailable(t *testing.T) { 149 + // Create test server (should not be called since auth fails first) 168 150 called := false 169 151 testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 170 152 called = true ··· 172 154 })) 173 155 defer testServer.Close() 174 156 175 - // Create ProxyBlobStore without refresher (will fail to get token and fall back) 157 + // Create ProxyBlobStore without service token (middleware didn't set it) 176 158 ctx := &RegistryContext{ 177 - DID: "did:plc:fallback", 178 - HoldDID: "did:web:hold.example.com", 179 - PDSEndpoint: "https://pds.example.com", 180 - Repository: "test-repo", 181 - Refresher: nil, // No refresher = can't get token 159 + DID: "did:plc:fallback", 160 + HoldDID: "did:web:hold.example.com", 161 + PDSEndpoint: "https://pds.example.com", 162 + Repository: "test-repo", 163 + ServiceToken: "", // No service token 164 + Refresher: nil, 182 165 } 183 166 184 167 store := NewProxyBlobStore(ctx) ··· 189 172 t.Fatalf("Failed to create request: %v", err) 190 173 } 191 174 192 - // Do authenticated request - should fall back to non-auth 175 + // Do authenticated request - should fail when no service token 193 176 resp, err := store.doAuthenticatedRequest(context.Background(), req) 194 - if err != nil { 195 - t.Fatalf("doAuthenticatedRequest should not fail even without token: %v", err) 177 + if err == nil { 178 + t.Fatal("Expected doAuthenticatedRequest to fail when no service token is available") 196 179 } 197 - defer resp.Body.Close() 180 + if resp != nil { 181 + resp.Body.Close() 182 + } 198 183 199 - if !called { 200 - t.Error("Expected request to be made despite missing token") 184 + // Verify error indicates authentication/authorization issue 185 + errStr := err.Error() 186 + if !strings.Contains(errStr, "service token") && !strings.Contains(errStr, "UNAUTHORIZED") { 187 + t.Errorf("Expected service token or unauthorized error, got: %v", err) 201 188 } 202 189 203 - if resp.StatusCode != http.StatusOK { 204 - t.Errorf("Expected status 200, got %d", resp.StatusCode) 190 + if called { 191 + t.Error("Expected request to NOT be made when authentication fails") 205 192 } 206 193 } 207 194 ··· 241 228 242 229 // TestServiceTokenCacheExpiry tests that expired cached tokens are not used 243 230 func TestServiceTokenCacheExpiry(t *testing.T) { 244 - cacheKey := "did:plc:expiry:did:web:hold.example.com" 231 + userDID := "did:plc:expiry" 232 + holdDID := "did:web:hold.example.com" 245 233 246 234 // Insert expired token 247 - globalServiceTokensMu.Lock() 248 - globalServiceTokens[cacheKey] = &serviceTokenEntry{ 249 - token: "expired-token", 250 - expiresAt: time.Now().Add(-1 * time.Hour), // Expired 1 hour ago 251 - } 252 - globalServiceTokensMu.Unlock() 253 - 254 - // Check that it's expired 255 - globalServiceTokensMu.RLock() 256 - entry := globalServiceTokens[cacheKey] 257 - globalServiceTokensMu.RUnlock() 235 + expiredPayload := fmt.Sprintf(`{"exp":%d}`, time.Now().Add(-1*time.Hour).Unix()) 236 + expiredToken := "eyJhbGciOiJIUzI1NiJ9." + base64URLEncode(expiredPayload) + ".signature" 237 + token.SetServiceToken(userDID, holdDID, expiredToken) 258 238 259 - if entry == nil { 260 - t.Fatal("Expected token entry to exist") 261 - } 239 + // GetServiceToken should automatically remove expired tokens 240 + cachedToken, expiresAt := token.GetServiceToken(userDID, holdDID) 262 241 263 - if !time.Now().After(entry.expiresAt) { 264 - t.Error("Expected token to be expired") 242 + // Should return empty string for expired token 243 + if cachedToken != "" { 244 + t.Error("Expected GetServiceToken to return empty string for expired token") 265 245 } 266 246 267 - // The getServiceToken function would check time.Now().Before(entry.expiresAt) 268 - // and this would return false for an expired token, causing it to fetch a new one 269 - shouldUseCache := time.Now().Before(entry.expiresAt) 270 - if shouldUseCache { 271 - t.Error("Expected expired token to not be used from cache") 247 + // expiresAt should be zero time for expired/missing tokens 248 + if !expiresAt.IsZero() { 249 + t.Error("Expected zero time for expired token") 272 250 } 273 251 } 274 252 ··· 327 305 328 306 // Benchmark for token cache access 329 307 func BenchmarkServiceTokenCacheAccess(b *testing.B) { 330 - cacheKey := "did:plc:bench:did:web:hold.example.com" 308 + userDID := "did:plc:bench" 309 + holdDID := "did:web:hold.example.com" 331 310 332 - globalServiceTokensMu.Lock() 333 - globalServiceTokens[cacheKey] = &serviceTokenEntry{ 334 - token: "benchmark-token", 335 - expiresAt: time.Now().Add(50 * time.Second), 336 - } 337 - globalServiceTokensMu.Unlock() 311 + testPayload := fmt.Sprintf(`{"exp":%d}`, time.Now().Add(50*time.Second).Unix()) 312 + testTokenStr := "eyJhbGciOiJIUzI1NiJ9." + base64URLEncode(testPayload) + ".signature" 313 + token.SetServiceToken(userDID, holdDID, testTokenStr) 338 314 339 315 b.ResetTimer() 340 316 for i := 0; i < b.N; i++ { 341 - globalServiceTokensMu.RLock() 342 - entry, exists := globalServiceTokens[cacheKey] 343 - globalServiceTokensMu.RUnlock() 317 + cachedToken, expiresAt := token.GetServiceToken(userDID, holdDID) 344 318 345 - if !exists || time.Now().After(entry.expiresAt) { 319 + if cachedToken == "" || time.Now().After(expiresAt) { 346 320 b.Error("Cache miss in benchmark") 347 321 } 348 322 } ··· 374 348 375 349 // Create store with mocked hold URL 376 350 ctx := &RegistryContext{ 377 - DID: "did:plc:test", 378 - HoldDID: "did:web:hold.example.com", 379 - PDSEndpoint: "https://pds.example.com", 380 - Repository: "test-repo", 351 + DID: "did:plc:test", 352 + HoldDID: "did:web:hold.example.com", 353 + PDSEndpoint: "https://pds.example.com", 354 + Repository: "test-repo", 355 + ServiceToken: "test-service-token", // Service token from middleware 381 356 } 382 357 store := NewProxyBlobStore(ctx) 383 358 store.holdURL = holdServer.URL 384 - 385 - // Setup token cache to avoid auth errors 386 - globalServiceTokensMu.Lock() 387 - globalServiceTokens["did:plc:test:did:web:hold.example.com"] = &serviceTokenEntry{ 388 - token: "test-token", 389 - expiresAt: time.Now().Add(50 * time.Second), 390 - } 391 - globalServiceTokensMu.Unlock() 392 359 393 360 // Call completeMultipartUpload 394 361 parts := []CompletedPart{ ··· 476 443 })) 477 444 defer holdServer.Close() 478 445 479 - // Create store 446 + // Create store with service token in context 480 447 ctx := &RegistryContext{ 481 - DID: "did:plc:test", 482 - HoldDID: "did:web:hold.example.com", 483 - PDSEndpoint: "https://pds.example.com", 484 - Repository: "test-repo", 448 + DID: "did:plc:test", 449 + HoldDID: "did:web:hold.example.com", 450 + PDSEndpoint: "https://pds.example.com", 451 + Repository: "test-repo", 452 + ServiceToken: "test-service-token", // Service token from middleware 485 453 } 486 454 store := NewProxyBlobStore(ctx) 487 455 store.holdURL = holdServer.URL 488 - 489 - // Setup token cache 490 - globalServiceTokensMu.Lock() 491 - globalServiceTokens["did:plc:test:did:web:hold.example.com"] = &serviceTokenEntry{ 492 - token: "test-token", 493 - expiresAt: time.Now().Add(50 * time.Second), 494 - } 495 - globalServiceTokensMu.Unlock() 496 456 497 457 // Call Get() 498 458 dgst := digest.FromBytes(blobData) ··· 544 504 })) 545 505 defer holdServer.Close() 546 506 547 - // Create store 507 + // Create store with service token in context 548 508 ctx := &RegistryContext{ 549 - DID: "did:plc:test", 550 - HoldDID: "did:web:hold.example.com", 551 - PDSEndpoint: "https://pds.example.com", 552 - Repository: "test-repo", 509 + DID: "did:plc:test", 510 + HoldDID: "did:web:hold.example.com", 511 + PDSEndpoint: "https://pds.example.com", 512 + Repository: "test-repo", 513 + ServiceToken: "test-service-token", // Service token from middleware 553 514 } 554 515 store := NewProxyBlobStore(ctx) 555 516 store.holdURL = holdServer.URL 556 - 557 - // Setup token cache 558 - globalServiceTokensMu.Lock() 559 - globalServiceTokens["did:plc:test:did:web:hold.example.com"] = &serviceTokenEntry{ 560 - token: "test-token", 561 - expiresAt: time.Now().Add(50 * time.Second), 562 - } 563 - globalServiceTokensMu.Unlock() 564 517 565 518 // Call Open() 566 519 dgst := digest.FromBytes(blobData) ··· 636 589 })) 637 590 defer holdServer.Close() 638 591 639 - // Create store 592 + // Create store with service token in context 640 593 ctx := &RegistryContext{ 641 - DID: "did:plc:test", 642 - HoldDID: "did:web:hold.example.com", 643 - PDSEndpoint: "https://pds.example.com", 644 - Repository: "test-repo", 594 + DID: "did:plc:test", 595 + HoldDID: "did:web:hold.example.com", 596 + PDSEndpoint: "https://pds.example.com", 597 + Repository: "test-repo", 598 + ServiceToken: "test-service-token", // Service token from middleware 645 599 } 646 600 store := NewProxyBlobStore(ctx) 647 601 store.holdURL = holdServer.URL 648 - 649 - // Setup token cache 650 - globalServiceTokensMu.Lock() 651 - globalServiceTokens["did:plc:test:did:web:hold.example.com"] = &serviceTokenEntry{ 652 - token: "test-token", 653 - expiresAt: time.Now().Add(50 * time.Second), 654 - } 655 - globalServiceTokensMu.Unlock() 656 602 657 603 // Call the function 658 604 _ = tt.testFunc(store) // Ignore error, we just care about the URL
+18 -18
pkg/appview/storage/routing_repository.go
··· 13 13 // The registry (AppView) is stateless and NEVER stores blobs locally 14 14 type RoutingRepository struct { 15 15 distribution.Repository 16 - ctx *RegistryContext // All context and services 16 + Ctx *RegistryContext // All context and services (exported for token updates) 17 17 manifestStore *atproto.ManifestStore // Cached manifest store instance 18 18 blobStore *ProxyBlobStore // Cached blob store instance 19 19 } ··· 22 22 func NewRoutingRepository(baseRepo distribution.Repository, ctx *RegistryContext) *RoutingRepository { 23 23 return &RoutingRepository{ 24 24 Repository: baseRepo, 25 - ctx: ctx, 25 + Ctx: ctx, 26 26 } 27 27 } 28 28 ··· 36 36 // ManifestStore needs both DID and URL for backward compat (legacy holdEndpoint field) 37 37 // For now, pass holdDID twice (will be cleaned up in manifest_store.go later) 38 38 r.manifestStore = atproto.NewManifestStore( 39 - r.ctx.ATProtoClient, 40 - r.ctx.Repository, 41 - r.ctx.HoldDID, 42 - r.ctx.HoldDID, 43 - r.ctx.DID, 39 + r.Ctx.ATProtoClient, 40 + r.Ctx.Repository, 41 + r.Ctx.HoldDID, 42 + r.Ctx.HoldDID, 43 + r.Ctx.DID, 44 44 blobStore, 45 - r.ctx.Database, 45 + r.Ctx.Database, 46 46 ) 47 47 } 48 48 ··· 52 52 time.Sleep(100 * time.Millisecond) // Brief delay to let manifest fetch complete 53 53 if holdDID := r.manifestStore.GetLastFetchedHoldDID(); holdDID != "" { 54 54 // Cache for 10 minutes - should cover typical pull operations 55 - GetGlobalHoldCache().Set(r.ctx.DID, r.ctx.Repository, holdDID, 10*time.Minute) 55 + GetGlobalHoldCache().Set(r.Ctx.DID, r.Ctx.Repository, holdDID, 10*time.Minute) 56 56 fmt.Printf("DEBUG [storage/routing]: Cached hold DID: did=%s, repo=%s, hold=%s\n", 57 - r.ctx.DID, r.ctx.Repository, holdDID) 57 + r.Ctx.DID, r.Ctx.Repository, holdDID) 58 58 } 59 59 }() 60 60 ··· 67 67 // Return cached blob store if available 68 68 if r.blobStore != nil { 69 69 fmt.Printf("DEBUG [storage/blobs]: Returning cached blob store for did=%s, repo=%s\n", 70 - r.ctx.DID, r.ctx.Repository) 70 + r.Ctx.DID, r.Ctx.Repository) 71 71 return r.blobStore 72 72 } 73 73 74 74 // For pull operations, check if we have a cached hold DID from a recent manifest fetch 75 75 // This ensures blobs are fetched from the hold recorded in the manifest, not re-discovered 76 - holdDID := r.ctx.HoldDID // Default to discovery-based DID 76 + holdDID := r.Ctx.HoldDID // Default to discovery-based DID 77 77 78 - if cachedHoldDID, ok := GetGlobalHoldCache().Get(r.ctx.DID, r.ctx.Repository); ok { 78 + if cachedHoldDID, ok := GetGlobalHoldCache().Get(r.Ctx.DID, r.Ctx.Repository); ok { 79 79 // Use cached hold DID from manifest 80 80 holdDID = cachedHoldDID 81 81 fmt.Printf("DEBUG [storage/blobs]: Using cached hold from manifest: did=%s, repo=%s, hold=%s\n", 82 - r.ctx.DID, r.ctx.Repository, cachedHoldDID) 82 + r.Ctx.DID, r.Ctx.Repository, cachedHoldDID) 83 83 } else { 84 84 // No cached hold, use discovery-based DID (for push or first pull) 85 85 fmt.Printf("DEBUG [storage/blobs]: Using discovery-based hold: did=%s, repo=%s, hold=%s\n", 86 - r.ctx.DID, r.ctx.Repository, holdDID) 86 + r.Ctx.DID, r.Ctx.Repository, holdDID) 87 87 } 88 88 89 89 if holdDID == "" { ··· 92 92 } 93 93 94 94 // Update context with the correct hold DID (may be cached or discovered) 95 - r.ctx.HoldDID = holdDID 95 + r.Ctx.HoldDID = holdDID 96 96 97 97 // Create and cache proxy blob store 98 - r.blobStore = NewProxyBlobStore(r.ctx) 98 + r.blobStore = NewProxyBlobStore(r.Ctx) 99 99 return r.blobStore 100 100 } 101 101 102 102 // Tags returns the tag service 103 103 // Tags are stored in ATProto as io.atcr.tag records 104 104 func (r *RoutingRepository) Tags(ctx context.Context) distribution.TagService { 105 - return atproto.NewTagStore(r.ctx.ATProtoClient, r.ctx.Repository) 105 + return atproto.NewTagStore(r.Ctx.ATProtoClient, r.Ctx.Repository) 106 106 }
+14 -14
pkg/atproto/client.go
··· 83 83 return nil, fmt.Errorf("failed to marshal record: %w", err) 84 84 } 85 85 86 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.putRecord", c.pdsEndpoint) 86 + url := fmt.Sprintf("%s%s", c.pdsEndpoint, RepoPutRecord) 87 87 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 88 88 if err != nil { 89 89 return nil, err ··· 137 137 } 138 138 139 139 // Basic Auth (app passwords) 140 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=%s", 141 - c.pdsEndpoint, c.did, collection, rkey) 140 + url := fmt.Sprintf("%s%s?repo=%s&collection=%s&rkey=%s", 141 + c.pdsEndpoint, RepoGetRecord, c.did, collection, rkey) 142 142 143 143 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 144 144 if err != nil { ··· 203 203 return fmt.Errorf("failed to marshal delete request: %w", err) 204 204 } 205 205 206 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.deleteRecord", c.pdsEndpoint) 206 + url := fmt.Sprintf("%s%s", c.pdsEndpoint, RepoDeleteRecord) 207 207 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 208 208 if err != nil { 209 209 return err ··· 228 228 229 229 // ListRecords lists records in a collection 230 230 func (c *Client) ListRecords(ctx context.Context, collection string, limit int) ([]Record, error) { 231 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=%d", 232 - c.pdsEndpoint, c.did, collection, limit) 231 + url := fmt.Sprintf("%s%s?repo=%s&collection=%s&limit=%d", 232 + c.pdsEndpoint, RepoListRecords, c.did, collection, limit) 233 233 234 234 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 235 235 if err != nil { ··· 301 301 } 302 302 303 303 // Basic Auth (app passwords) 304 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", c.pdsEndpoint) 304 + url := fmt.Sprintf("%s%s", c.pdsEndpoint, RepoUploadBlob) 305 305 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(data)) 306 306 if err != nil { 307 307 return nil, err ··· 333 333 334 334 // GetBlob downloads a blob by its CID from the PDS 335 335 func (c *Client) GetBlob(ctx context.Context, cid string) ([]byte, error) { 336 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 337 - c.pdsEndpoint, c.did, cid) 336 + url := fmt.Sprintf("%s%s?did=%s&cid=%s", 337 + c.pdsEndpoint, SyncGetBlob, c.did, cid) 338 338 339 339 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 340 340 if err != nil { ··· 405 405 // This is a network-wide query, not limited to a single PDS 406 406 func (c *Client) ListReposByCollection(ctx context.Context, collection string, limit int, cursor string) (*ListReposByCollectionResult, error) { 407 407 // Build URL with query parameters 408 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.listReposByCollection?collection=%s", c.pdsEndpoint, collection) 408 + url := fmt.Sprintf("%s%s?collection=%s", c.pdsEndpoint, SyncListReposByCollection, collection) 409 409 410 410 if limit > 0 { 411 411 url += fmt.Sprintf("&limit=%d", limit) ··· 447 447 // ListRecordsForRepo lists records in a collection for a specific repo (DID) 448 448 // This differs from ListRecords which uses the client's DID 449 449 func (c *Client) ListRecordsForRepo(ctx context.Context, repoDID, collection string, limit int, cursor string) ([]Record, string, error) { 450 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s", 451 - c.pdsEndpoint, repoDID, collection) 450 + url := fmt.Sprintf("%s%s?repo=%s&collection=%s", 451 + c.pdsEndpoint, RepoListRecords, repoDID, collection) 452 452 453 453 if limit > 0 { 454 454 url += fmt.Sprintf("&limit=%d", limit) ··· 583 583 } 584 584 585 585 // Basic Auth (app passwords) 586 - url := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=app.bsky.actor.profile&rkey=self", 587 - c.pdsEndpoint, did) 586 + url := fmt.Sprintf("%s%s?repo=%s&collection=app.bsky.actor.profile&rkey=self", 587 + c.pdsEndpoint, RepoGetRecord, did) 588 588 589 589 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 590 590 if err != nil {
+5 -5
pkg/atproto/client_test.go
··· 81 81 } 82 82 83 83 // Verify path 84 - expectedPath := "/xrpc/com.atproto.repo.putRecord" 84 + expectedPath := RepoPutRecord 85 85 if r.URL.Path != expectedPath { 86 86 t.Errorf("Path = %v, want %v", r.URL.Path, expectedPath) 87 87 } ··· 198 198 } 199 199 200 200 // Verify path 201 - expectedPath := "/xrpc/com.atproto.repo.getRecord" 201 + expectedPath := RepoGetRecord 202 202 if r.URL.Path != expectedPath { 203 203 t.Errorf("Path = %v, want %v", r.URL.Path, expectedPath) 204 204 } ··· 284 284 } 285 285 286 286 // Verify path 287 - expectedPath := "/xrpc/com.atproto.repo.deleteRecord" 287 + expectedPath := RepoDeleteRecord 288 288 if r.URL.Path != expectedPath { 289 289 t.Errorf("Path = %v, want %v", r.URL.Path, expectedPath) 290 290 } ··· 378 378 t.Errorf("Method = %v, want POST", r.Method) 379 379 } 380 380 381 - if r.URL.Path != "/xrpc/com.atproto.repo.uploadBlob" { 382 - t.Errorf("Path = %v, want /xrpc/com.atproto.repo.uploadBlob", r.URL.Path) 381 + if r.URL.Path != RepoUploadBlob { 382 + t.Errorf("Path = %v, want %s", r.URL.Path, RepoUploadBlob) 383 383 } 384 384 385 385 if r.Header.Get("Content-Type") != mimeType {
+42
pkg/atproto/endpoints.go
··· 70 70 // Response: CAR file (application/vnd.ipld.car) 71 71 SyncGetRepo = "/xrpc/com.atproto.sync.getRepo" 72 72 73 + // SyncGetRecord retrieves a specific record as part of a repository sync. 74 + // Method: GET 75 + // Query: did={did}&collection={collection}&rkey={key} 76 + // Response: Record data 77 + SyncGetRecord = "/xrpc/com.atproto.sync.getRecord" 78 + 73 79 // SyncListRepos lists all repositories on a PDS. 74 80 // Method: GET 75 81 // Response: {"repos": [{...}]} 76 82 SyncListRepos = "/xrpc/com.atproto.sync.listRepos" 83 + 84 + // SyncListReposByCollection lists all repositories that have records in a specific collection. 85 + // Method: GET 86 + // Query: collection={collection}&limit={limit}&cursor={cursor} 87 + // Response: {"repos": [{"did": "..."}], "cursor": "..."} 88 + SyncListReposByCollection = "/xrpc/com.atproto.sync.listReposByCollection" 77 89 78 90 // SyncSubscribeRepos subscribes to real-time repository events via WebSocket. 79 91 // Method: GET (WebSocket upgrade) ··· 101 113 // Method: GET 102 114 // Response: {"did": "...", "availableUserDomains": [...]} 103 115 ServerDescribeServer = "/xrpc/com.atproto.server.describeServer" 116 + 117 + // ServerCreateSession creates a new session with identifier and password. 118 + // Method: POST 119 + // Request: {"identifier": "...", "password": "..."} 120 + // Response: {"accessJwt": "...", "refreshJwt": "...", "did": "...", "handle": "..."} 121 + ServerCreateSession = "/xrpc/com.atproto.server.createSession" 122 + 123 + // ServerGetSession validates a session and returns the current session info. 124 + // Method: GET 125 + // Headers: Authorization (Bearer or DPoP), DPoP (if using DPoP) 126 + // Response: {"did": "...", "handle": "..."} 127 + ServerGetSession = "/xrpc/com.atproto.server.getSession" 104 128 ) 105 129 106 130 // ATProto repo endpoints (com.atproto.repo.*) ··· 112 136 // Query: repo={did} 113 137 // Response: {"did": "...", "handle": "...", "collections": [...]} 114 138 RepoDescribeRepo = "/xrpc/com.atproto.repo.describeRepo" 139 + 140 + // RepoPutRecord creates or updates a record in a repository. 141 + // Method: POST 142 + // Request: {"repo": "...", "collection": "...", "rkey": "...", "record": {...}} 143 + // Response: {"uri": "...", "cid": "..."} 144 + RepoPutRecord = "/xrpc/com.atproto.repo.putRecord" 145 + 146 + // RepoGetRecord retrieves a record from a repository. 147 + // Method: GET 148 + // Query: repo={did}&collection={collection}&rkey={key} 149 + // Response: {"uri": "...", "cid": "...", "value": {...}} 150 + RepoGetRecord = "/xrpc/com.atproto.repo.getRecord" 151 + 152 + // RepoListRecords lists records in a collection. 153 + // Method: GET 154 + // Query: repo={did}&collection={collection}&limit={limit}&cursor={cursor} 155 + // Response: {"records": [...], "cursor": "..."} 156 + RepoListRecords = "/xrpc/com.atproto.repo.listRecords" 115 157 116 158 // RepoDeleteRecord deletes a record from a repository. 117 159 // Method: POST
+1 -1
pkg/atproto/manifest_store_test.go
··· 442 442 } 443 443 444 444 // Should return empty map (or nil) 445 - if labels != nil && len(labels) != 0 { 445 + if len(labels) != 0 { 446 446 t.Errorf("extractConfigLabels() should return empty/nil for config without labels, got %v", labels) 447 447 } 448 448 }
+3 -2
pkg/auth/atproto/session.go pkg/auth/session.go
··· 1 - package atproto 1 + package auth 2 2 3 3 import ( 4 4 "bytes" ··· 11 11 "net/http" 12 12 "sync" 13 13 "time" 14 + "atcr.io/pkg/atproto" 14 15 15 16 "github.com/bluesky-social/indigo/atproto/identity" 16 17 "github.com/bluesky-social/indigo/atproto/syntax" ··· 143 144 return nil, fmt.Errorf("failed to marshal request: %w", err) 144 145 } 145 146 146 - url := fmt.Sprintf("%s/xrpc/com.atproto.server.createSession", pdsEndpoint) 147 + url := fmt.Sprintf("%s%s", pdsEndpoint, atproto.ServerCreateSession) 147 148 fmt.Printf("DEBUG [atproto/session]: POST %s\n", url) 148 149 149 150 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
+4 -4
pkg/auth/hold_remote.go
··· 201 201 202 202 // Build XRPC request URL 203 203 // GET /xrpc/com.atproto.repo.getRecord?repo={did}&collection=io.atcr.hold.captain&rkey=self 204 - xrpcURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=self", 205 - holdURL, url.QueryEscape(holdDID), url.QueryEscape(atproto.CaptainCollection)) 204 + xrpcURL := fmt.Sprintf("%s%s?repo=%s&collection=%s&rkey=self", 205 + holdURL, atproto.RepoGetRecord, url.QueryEscape(holdDID), url.QueryEscape(atproto.CaptainCollection)) 206 206 207 207 req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 208 208 if err != nil { ··· 302 302 303 303 // Build XRPC request URL 304 304 // GET /xrpc/com.atproto.repo.listRecords?repo={did}&collection=io.atcr.hold.crew 305 - xrpcURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s", 306 - holdURL, url.QueryEscape(holdDID), url.QueryEscape(atproto.CrewCollection)) 305 + xrpcURL := fmt.Sprintf("%s%s?repo=%s&collection=%s", 306 + holdURL, atproto.RepoListRecords, url.QueryEscape(holdDID), url.QueryEscape(atproto.CrewCollection)) 307 307 308 308 req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 309 309 if err != nil {
+26 -6
pkg/auth/oauth/refresher.go
··· 4 4 "context" 5 5 "fmt" 6 6 "sync" 7 + "time" 7 8 8 9 "github.com/bluesky-social/indigo/atproto/auth/oauth" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" ··· 15 16 SessionID string 16 17 } 17 18 19 + // UISessionStore interface for managing UI sessions 20 + // Shared between refresher and server 21 + type UISessionStore interface { 22 + Create(did, handle, pdsEndpoint string, duration time.Duration) (string, error) 23 + DeleteByDID(did string) 24 + } 25 + 18 26 // Refresher manages OAuth sessions and token refresh for AppView 19 27 type Refresher struct { 20 - app *App 21 - sessions map[string]*SessionCache // Key: DID string 22 - mu sync.RWMutex 23 - refreshLocks map[string]*sync.Mutex // Per-DID locks for refresh operations 24 - refreshLockMu sync.Mutex // Protects refreshLocks map 28 + app *App 29 + sessions map[string]*SessionCache // Key: DID string 30 + mu sync.RWMutex 31 + refreshLocks map[string]*sync.Mutex // Per-DID locks for refresh operations 32 + refreshLockMu sync.Mutex // Protects refreshLocks map 33 + uiSessionStore UISessionStore // For invalidating UI sessions on OAuth failures 25 34 } 26 35 27 36 // NewRefresher creates a new session refresher ··· 31 40 sessions: make(map[string]*SessionCache), 32 41 refreshLocks: make(map[string]*sync.Mutex), 33 42 } 43 + } 44 + 45 + // SetUISessionStore sets the UI session store for invalidating sessions on OAuth failures 46 + func (r *Refresher) SetUISessionStore(store UISessionStore) { 47 + r.uiSessionStore = store 34 48 } 35 49 36 50 // GetSession gets a fresh OAuth session for a DID ··· 115 129 } 116 130 117 131 // InvalidateSession removes a cached session for a DID 118 - // This is useful when a new OAuth flow creates a fresh session 132 + // This is useful when a new OAuth flow creates a fresh session or when OAuth refresh fails 133 + // Also invalidates any UI sessions for this DID to force re-authentication 119 134 func (r *Refresher) InvalidateSession(did string) { 120 135 r.mu.Lock() 121 136 delete(r.sessions, did) 122 137 r.mu.Unlock() 138 + 139 + // Also delete UI sessions to force user to re-authenticate 140 + if r.uiSessionStore != nil { 141 + r.uiSessionStore.DeleteByDID(did) 142 + } 123 143 } 124 144 125 145 // GetSessionID returns the sessionID for a cached session
+1 -3
pkg/auth/oauth/server.go
··· 16 16 ) 17 17 18 18 // UISessionStore is the interface for UI session management 19 - type UISessionStore interface { 20 - Create(did, handle, pdsEndpoint string, duration time.Duration) (string, error) 21 - } 19 + // UISessionStore is defined in refresher.go to avoid duplication 22 20 23 21 // UserStore is the interface for user management 24 22 type UserStore interface {
+169
pkg/auth/token/cache.go
··· 1 + package token 2 + 3 + import ( 4 + "encoding/base64" 5 + "encoding/json" 6 + "fmt" 7 + "strings" 8 + "sync" 9 + "time" 10 + ) 11 + 12 + // serviceTokenEntry represents a cached service token 13 + type serviceTokenEntry struct { 14 + token string 15 + expiresAt time.Time 16 + } 17 + 18 + // Global cache for service tokens (DID:HoldDID -> token) 19 + // Service tokens are JWTs issued by a user's PDS to authorize AppView to act on their behalf 20 + // when communicating with hold services. These tokens are scoped to specific holds and have 21 + // limited lifetime (typically 60s, can request up to 5min). 22 + var ( 23 + globalServiceTokens = make(map[string]*serviceTokenEntry) 24 + globalServiceTokensMu sync.RWMutex 25 + ) 26 + 27 + // GetServiceToken retrieves a cached service token for the given DID and hold DID 28 + // Returns empty string if no valid cached token exists 29 + func GetServiceToken(did, holdDID string) (token string, expiresAt time.Time) { 30 + cacheKey := did + ":" + holdDID 31 + 32 + globalServiceTokensMu.RLock() 33 + entry, exists := globalServiceTokens[cacheKey] 34 + globalServiceTokensMu.RUnlock() 35 + 36 + if !exists { 37 + return "", time.Time{} 38 + } 39 + 40 + // Check if token is still valid 41 + if time.Now().After(entry.expiresAt) { 42 + // Token expired, remove from cache 43 + globalServiceTokensMu.Lock() 44 + delete(globalServiceTokens, cacheKey) 45 + globalServiceTokensMu.Unlock() 46 + return "", time.Time{} 47 + } 48 + 49 + return entry.token, entry.expiresAt 50 + } 51 + 52 + // SetServiceToken stores a service token in the cache 53 + // Automatically parses the JWT to extract the expiry time 54 + // Applies a 10-second safety margin (cache expires 10s before actual JWT expiry) 55 + func SetServiceToken(did, holdDID, token string) error { 56 + cacheKey := did + ":" + holdDID 57 + 58 + // Parse JWT to extract expiry (don't verify signature - we trust the PDS) 59 + expiry, err := parseJWTExpiry(token) 60 + if err != nil { 61 + // If parsing fails, use default 50s TTL (conservative fallback) 62 + fmt.Printf("WARN [token/cache]: Failed to parse JWT expiry, using default 50s: %v\n", err) 63 + expiry = time.Now().Add(50 * time.Second) 64 + } else { 65 + // Apply 10s safety margin to avoid using nearly-expired tokens 66 + expiry = expiry.Add(-10 * time.Second) 67 + } 68 + 69 + globalServiceTokensMu.Lock() 70 + globalServiceTokens[cacheKey] = &serviceTokenEntry{ 71 + token: token, 72 + expiresAt: expiry, 73 + } 74 + globalServiceTokensMu.Unlock() 75 + 76 + fmt.Printf("DEBUG [token/cache]: Cached service token for %s (expires in %v)\n", 77 + cacheKey, time.Until(expiry).Round(time.Second)) 78 + 79 + return nil 80 + } 81 + 82 + // parseJWTExpiry extracts the expiry time from a JWT without verifying the signature 83 + // We trust tokens from the user's PDS, so signature verification isn't needed here 84 + // Manually decodes the JWT payload to avoid algorithm compatibility issues 85 + func parseJWTExpiry(tokenString string) (time.Time, error) { 86 + // JWT format: header.payload.signature 87 + parts := strings.Split(tokenString, ".") 88 + if len(parts) != 3 { 89 + return time.Time{}, fmt.Errorf("invalid JWT format: expected 3 parts, got %d", len(parts)) 90 + } 91 + 92 + // Decode the payload (second part) 93 + payload, err := base64.RawURLEncoding.DecodeString(parts[1]) 94 + if err != nil { 95 + return time.Time{}, fmt.Errorf("failed to decode JWT payload: %w", err) 96 + } 97 + 98 + // Parse the JSON payload 99 + var claims struct { 100 + Exp int64 `json:"exp"` 101 + } 102 + if err := json.Unmarshal(payload, &claims); err != nil { 103 + return time.Time{}, fmt.Errorf("failed to parse JWT claims: %w", err) 104 + } 105 + 106 + if claims.Exp == 0 { 107 + return time.Time{}, fmt.Errorf("JWT missing exp claim") 108 + } 109 + 110 + return time.Unix(claims.Exp, 0), nil 111 + } 112 + 113 + // InvalidateServiceToken removes a service token from the cache 114 + // Used when we detect that a token is invalid or the user's session has expired 115 + func InvalidateServiceToken(did, holdDID string) { 116 + cacheKey := did + ":" + holdDID 117 + 118 + globalServiceTokensMu.Lock() 119 + delete(globalServiceTokens, cacheKey) 120 + globalServiceTokensMu.Unlock() 121 + 122 + fmt.Printf("DEBUG [token/cache]: Invalidated service token for %s\n", cacheKey) 123 + } 124 + 125 + // GetCacheStats returns statistics about the service token cache for debugging 126 + func GetCacheStats() map[string]interface{} { 127 + globalServiceTokensMu.RLock() 128 + defer globalServiceTokensMu.RUnlock() 129 + 130 + validCount := 0 131 + expiredCount := 0 132 + now := time.Now() 133 + 134 + for _, entry := range globalServiceTokens { 135 + if now.Before(entry.expiresAt) { 136 + validCount++ 137 + } else { 138 + expiredCount++ 139 + } 140 + } 141 + 142 + return map[string]interface{}{ 143 + "total_entries": len(globalServiceTokens), 144 + "valid_tokens": validCount, 145 + "expired_tokens": expiredCount, 146 + } 147 + } 148 + 149 + // CleanExpiredTokens removes expired tokens from the cache 150 + // Can be called periodically to prevent unbounded growth (though expired tokens 151 + // are also removed lazily on access) 152 + func CleanExpiredTokens() { 153 + globalServiceTokensMu.Lock() 154 + defer globalServiceTokensMu.Unlock() 155 + 156 + now := time.Now() 157 + removed := 0 158 + 159 + for key, entry := range globalServiceTokens { 160 + if now.After(entry.expiresAt) { 161 + delete(globalServiceTokens, key) 162 + removed++ 163 + } 164 + } 165 + 166 + if removed > 0 { 167 + fmt.Printf("DEBUG [token/cache]: Cleaned %d expired service tokens\n", removed) 168 + } 169 + }
+2 -3
pkg/auth/token/handler.go
··· 13 13 "atcr.io/pkg/appview/db" 14 14 mainAtproto "atcr.io/pkg/atproto" 15 15 "atcr.io/pkg/auth" 16 - "atcr.io/pkg/auth/atproto" 17 16 ) 18 17 19 18 // Handler handles /auth/token requests 20 19 type Handler struct { 21 20 issuer *Issuer 22 - validator *atproto.SessionValidator 21 + validator *auth.SessionValidator 23 22 deviceStore *db.DeviceStore // For validating device secrets 24 23 defaultHoldDID string 25 24 } ··· 30 29 func NewHandler(issuer *Issuer, deviceStore *db.DeviceStore, defaultHoldDID string) *Handler { 31 30 return &Handler{ 32 31 issuer: issuer, 33 - validator: atproto.NewSessionValidator(), 32 + validator: auth.NewSessionValidator(), 34 33 deviceStore: deviceStore, 35 34 defaultHoldDID: defaultHoldDID, 36 35 }
+19 -18
pkg/hold/oci/xrpc_test.go
··· 12 12 "testing" 13 13 14 14 "atcr.io/pkg/hold/pds" 15 + "atcr.io/pkg/atproto" 15 16 "atcr.io/pkg/s3" 16 17 "github.com/distribution/distribution/v3/registry/storage/driver/factory" 17 18 _ "github.com/distribution/distribution/v3/registry/storage/driver/filesystem" ··· 109 110 func TestHandleInitiateUpload_Success(t *testing.T) { 110 111 handler, _ := setupTestOCIHandler(t) 111 112 112 - req := makeJSONRequest("POST", "/xrpc/io.atcr.hold.initiateUpload", map[string]string{ 113 + req := makeJSONRequest("POST", atproto.HoldInitiateUpload, map[string]string{ 113 114 "digest": "sha256:abc123", 114 115 }) 115 116 addMockAuth(req) ··· 133 134 func TestHandleInitiateUpload_MissingDigest(t *testing.T) { 134 135 handler, _ := setupTestOCIHandler(t) 135 136 136 - req := makeJSONRequest("POST", "/xrpc/io.atcr.hold.initiateUpload", map[string]string{}) 137 + req := makeJSONRequest("POST", atproto.HoldInitiateUpload, map[string]string{}) 137 138 addMockAuth(req) 138 139 139 140 w := httptest.NewRecorder() ··· 154 155 handler, _ := setupTestOCIHandler(t) 155 156 156 157 // First, initiate an upload 157 - initReq := makeJSONRequest("POST", "/xrpc/io.atcr.hold.initiateUpload", map[string]string{ 158 + initReq := makeJSONRequest("POST", atproto.HoldInitiateUpload, map[string]string{ 158 159 "digest": "sha256:abc123", 159 160 }) 160 161 addMockAuth(initReq) ··· 166 167 uploadID := initResp["uploadId"].(string) 167 168 168 169 // Now get part upload URL 169 - req := makeJSONRequest("POST", "/xrpc/io.atcr.hold.getPartUploadUrl", map[string]any{ 170 + req := makeJSONRequest("POST", atproto.HoldGetPartUploadUrl, map[string]any{ 170 171 "uploadId": uploadID, 171 172 "partNumber": 1, 172 173 }) ··· 194 195 func TestHandleGetPartUploadUrl_InvalidSession(t *testing.T) { 195 196 handler, _ := setupTestOCIHandler(t) 196 197 197 - req := makeJSONRequest("POST", "/xrpc/io.atcr.hold.getPartUploadUrl", map[string]any{ 198 + req := makeJSONRequest("POST", atproto.HoldGetPartUploadUrl, map[string]any{ 198 199 "uploadId": "invalid-upload-id", 199 200 "partNumber": 1, 200 201 }) ··· 222 223 223 224 for _, tt := range tests { 224 225 t.Run(tt.name, func(t *testing.T) { 225 - req := makeJSONRequest("POST", "/xrpc/io.atcr.hold.getPartUploadUrl", tt.body) 226 + req := makeJSONRequest("POST", atproto.HoldGetPartUploadUrl, tt.body) 226 227 addMockAuth(req) 227 228 228 229 w := httptest.NewRecorder() ··· 241 242 handler, _ := setupTestOCIHandler(t) 242 243 243 244 // Initiate upload 244 - initReq := makeJSONRequest("POST", "/xrpc/io.atcr.hold.initiateUpload", map[string]string{ 245 + initReq := makeJSONRequest("POST", atproto.HoldInitiateUpload, map[string]string{ 245 246 "digest": "sha256:abc123", 246 247 }) 247 248 addMockAuth(initReq) ··· 254 255 255 256 // Upload a part 256 257 partData := []byte("test part data") 257 - req := httptest.NewRequest("PUT", "/xrpc/io.atcr.hold.uploadPart", bytes.NewReader(partData)) 258 + req := httptest.NewRequest("PUT", atproto.HoldUploadPart, bytes.NewReader(partData)) 258 259 req.Header.Set("X-Upload-Id", uploadID) 259 260 req.Header.Set("X-Part-Number", "1") 260 261 addMockAuth(req) ··· 292 293 293 294 for _, tt := range tests { 294 295 t.Run(tt.name, func(t *testing.T) { 295 - req := httptest.NewRequest("PUT", "/xrpc/io.atcr.hold.uploadPart", bytes.NewReader([]byte("data"))) 296 + req := httptest.NewRequest("PUT", atproto.HoldUploadPart, bytes.NewReader([]byte("data"))) 296 297 if tt.uploadID != "" { 297 298 req.Header.Set("X-Upload-Id", tt.uploadID) 298 299 } ··· 314 315 func TestHandleUploadPart_InvalidPartNumber(t *testing.T) { 315 316 handler, _ := setupTestOCIHandler(t) 316 317 317 - req := httptest.NewRequest("PUT", "/xrpc/io.atcr.hold.uploadPart", bytes.NewReader([]byte("data"))) 318 + req := httptest.NewRequest("PUT", atproto.HoldUploadPart, bytes.NewReader([]byte("data"))) 318 319 req.Header.Set("X-Upload-Id", "test-id") 319 320 req.Header.Set("X-Part-Number", "not-a-number") 320 321 addMockAuth(req) ··· 333 334 handler, _ := setupTestOCIHandler(t) 334 335 335 336 // Initiate upload 336 - initReq := makeJSONRequest("POST", "/xrpc/io.atcr.hold.initiateUpload", map[string]string{ 337 + initReq := makeJSONRequest("POST", atproto.HoldInitiateUpload, map[string]string{ 337 338 "digest": "sha256:abc123", 338 339 }) 339 340 addMockAuth(initReq) ··· 355 356 356 357 var partInfos []map[string]any 357 358 for _, p := range parts { 358 - req := httptest.NewRequest("PUT", "/xrpc/io.atcr.hold.uploadPart", bytes.NewReader([]byte(p.data))) 359 + req := httptest.NewRequest("PUT", atproto.HoldUploadPart, bytes.NewReader([]byte(p.data))) 359 360 req.Header.Set("X-Upload-Id", uploadID) 360 361 req.Header.Set("X-Part-Number", strconv.Itoa(p.number)) 361 362 addMockAuth(req) ··· 373 374 } 374 375 375 376 // Complete upload 376 - completeReq := makeJSONRequest("POST", "/xrpc/io.atcr.hold.completeUpload", map[string]any{ 377 + completeReq := makeJSONRequest("POST", atproto.HoldCompleteUpload, map[string]any{ 377 378 "uploadId": uploadID, 378 379 "digest": "sha256:finaldigest123", 379 380 "parts": partInfos, ··· 401 402 func TestHandleCompleteUpload_MissingParts(t *testing.T) { 402 403 handler, _ := setupTestOCIHandler(t) 403 404 404 - req := makeJSONRequest("POST", "/xrpc/io.atcr.hold.completeUpload", map[string]any{ 405 + req := makeJSONRequest("POST", atproto.HoldCompleteUpload, map[string]any{ 405 406 "uploadId": "test-id", 406 407 "digest": "sha256:test", 407 408 "parts": []any{}, ··· 419 420 func TestHandleCompleteUpload_InvalidSession(t *testing.T) { 420 421 handler, _ := setupTestOCIHandler(t) 421 422 422 - req := makeJSONRequest("POST", "/xrpc/io.atcr.hold.completeUpload", map[string]any{ 423 + req := makeJSONRequest("POST", atproto.HoldCompleteUpload, map[string]any{ 423 424 "uploadId": "invalid-upload-id", 424 425 "digest": "sha256:test", 425 426 "parts": []any{ ··· 442 443 handler, _ := setupTestOCIHandler(t) 443 444 444 445 // Initiate upload 445 - initReq := makeJSONRequest("POST", "/xrpc/io.atcr.hold.initiateUpload", map[string]string{ 446 + initReq := makeJSONRequest("POST", atproto.HoldInitiateUpload, map[string]string{ 446 447 "digest": "sha256:abc123", 447 448 }) 448 449 addMockAuth(initReq) ··· 454 455 uploadID := initResp["uploadId"].(string) 455 456 456 457 // Abort upload 457 - req := makeJSONRequest("POST", "/xrpc/io.atcr.hold.abortUpload", map[string]string{ 458 + req := makeJSONRequest("POST", atproto.HoldAbortUpload, map[string]string{ 458 459 "uploadId": uploadID, 459 460 }) 460 461 addMockAuth(req) ··· 477 478 func TestHandleAbortUpload_InvalidSession(t *testing.T) { 478 479 handler, _ := setupTestOCIHandler(t) 479 480 480 - req := makeJSONRequest("POST", "/xrpc/io.atcr.hold.abortUpload", map[string]string{ 481 + req := makeJSONRequest("POST", atproto.HoldAbortUpload, map[string]string{ 481 482 "uploadId": "invalid-upload-id", 482 483 }) 483 484 addMockAuth(req)
+2 -1
pkg/hold/pds/auth.go
··· 11 11 "strings" 12 12 "time" 13 13 14 + "atcr.io/pkg/atproto" 14 15 "github.com/bluesky-social/indigo/atproto/atcrypto" 15 16 "github.com/bluesky-social/indigo/atproto/identity" 16 17 "github.com/bluesky-social/indigo/atproto/syntax" ··· 154 155 // The httpClient parameter is optional and defaults to http.DefaultClient if nil. 155 156 func validateTokenWithPDS(ctx context.Context, pdsURL, accessToken, dpopProof string, httpClient HTTPClient) (*SessionResponse, error) { 156 157 // Call com.atproto.server.getSession with DPoP headers 157 - url := fmt.Sprintf("%s/xrpc/com.atproto.server.getSession", strings.TrimSuffix(pdsURL, "/")) 158 + url := fmt.Sprintf("%s%s", strings.TrimSuffix(pdsURL, "/"), atproto.ServerGetSession) 158 159 159 160 req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 160 161 if err != nil {
+2 -1
pkg/hold/pds/auth_test.go
··· 12 12 "testing" 13 13 "time" 14 14 15 + "atcr.io/pkg/atproto" 15 16 "github.com/bluesky-social/indigo/atproto/atcrypto" 16 17 "github.com/bluesky-social/indigo/atproto/auth/oauth" 17 18 ) ··· 24 25 25 26 func (m *mockPDSClient) Do(req *http.Request) (*http.Response, error) { 26 27 // Verify request is for getSession endpoint 27 - if !strings.Contains(req.URL.Path, "/xrpc/com.atproto.server.getSession") { 28 + if !strings.Contains(req.URL.Path, atproto.ServerGetSession) { 28 29 return &http.Response{ 29 30 StatusCode: http.StatusNotFound, 30 31 Body: http.NoBody,
+15 -15
pkg/hold/pds/xrpc.go
··· 138 138 139 139 // Health and server info 140 140 r.Get("/xrpc/_health", h.HandleHealth) 141 - r.Get("/xrpc/com.atproto.server.describeServer", h.HandleDescribeServer) 141 + r.Get(atproto.ServerDescribeServer, h.HandleDescribeServer) 142 142 143 143 // Repository metadata 144 - r.Get("/xrpc/com.atproto.repo.describeRepo", h.HandleDescribeRepo) 145 - r.Get("/xrpc/com.atproto.repo.getRecord", h.HandleGetRecord) 146 - r.Get("/xrpc/com.atproto.repo.listRecords", h.HandleListRecords) 144 + r.Get(atproto.RepoDescribeRepo, h.HandleDescribeRepo) 145 + r.Get(atproto.RepoGetRecord, h.HandleGetRecord) 146 + r.Get(atproto.RepoListRecords, h.HandleListRecords) 147 147 148 148 // Sync endpoints 149 - r.Get("/xrpc/com.atproto.sync.listRepos", h.HandleListRepos) 150 - r.Get("/xrpc/com.atproto.sync.getRecord", h.HandleSyncGetRecord) 151 - r.Get("/xrpc/com.atproto.sync.getRepo", h.HandleGetRepo) 152 - r.Get("/xrpc/com.atproto.sync.subscribeRepos", h.HandleSubscribeRepos) 149 + r.Get(atproto.SyncListRepos, h.HandleListRepos) 150 + r.Get(atproto.SyncGetRecord, h.HandleSyncGetRecord) 151 + r.Get(atproto.SyncGetRepo, h.HandleGetRepo) 152 + r.Get(atproto.SyncSubscribeRepos, h.HandleSubscribeRepos) 153 153 154 154 // DID document and handle resolution 155 155 r.Get("/.well-known/did.json", h.HandleDIDDocument) ··· 161 161 r.Group(func(r chi.Router) { 162 162 r.Use(h.corsMiddleware) 163 163 164 - r.Get("/xrpc/com.atproto.sync.getBlob", h.HandleGetBlob) 165 - r.Head("/xrpc/com.atproto.sync.getBlob", h.HandleGetBlob) 164 + r.Get(atproto.SyncGetBlob, h.HandleGetBlob) 165 + r.Head(atproto.SyncGetBlob, h.HandleGetBlob) 166 166 }) 167 167 168 168 // Write endpoints (CORS + owner/crew admin auth) ··· 170 170 r.Use(h.corsMiddleware) 171 171 r.Use(h.requireOwnerOrCrewAdmin) 172 172 173 - r.Post("/xrpc/com.atproto.repo.deleteRecord", h.HandleDeleteRecord) 174 - r.Post("/xrpc/com.atproto.repo.uploadBlob", h.HandleUploadBlob) 173 + r.Post(atproto.RepoDeleteRecord, h.HandleDeleteRecord) 174 + r.Post(atproto.RepoUploadBlob, h.HandleUploadBlob) 175 175 }) 176 176 177 177 // Auth-only endpoints (CORS + DPoP auth) ··· 179 179 r.Use(h.corsMiddleware) 180 180 r.Use(h.requireAuth) 181 181 182 - r.Post("/xrpc/io.atcr.hold.requestCrew", h.HandleRequestCrew) 182 + r.Post(atproto.HoldRequestCrew, h.HandleRequestCrew) 183 183 }) 184 184 } 185 185 ··· 1136 1136 if operation == http.MethodGet || operation == http.MethodHead { 1137 1137 // Generate hold DID from public URL using shared function 1138 1138 holdDID := atproto.ResolveHoldDIDFromURL(publicURL) 1139 - return fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 1140 - publicURL, holdDID, digest) 1139 + return fmt.Sprintf("%s%s?did=%s&cid=%s", 1140 + publicURL, atproto.SyncGetBlob, holdDID, digest) 1141 1141 } 1142 1142 1143 1143 // For PUT operations, proxy fallback is not supported with XRPC
+60 -60
pkg/hold/pds/xrpc_test.go
··· 172 172 func TestHandleDescribeServer(t *testing.T) { 173 173 handler, _ := setupTestXRPCHandler(t) 174 174 175 - req := makeXRPCGetRequest("/xrpc/com.atproto.server.describeServer", nil) 175 + req := makeXRPCGetRequest(atproto.ServerDescribeServer, nil) 176 176 w := httptest.NewRecorder() 177 177 178 178 handler.HandleDescribeServer(w, req) ··· 210 210 handler, _ := setupTestXRPCHandler(t) 211 211 holdDID := "did:web:hold.example.com" 212 212 213 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.describeRepo", map[string]string{ 213 + req := makeXRPCGetRequest(atproto.RepoDescribeRepo, map[string]string{ 214 214 "repo": holdDID, 215 215 }) 216 216 w := httptest.NewRecorder() ··· 249 249 func TestHandleDescribeRepo_MissingRepo(t *testing.T) { 250 250 handler, _ := setupTestXRPCHandler(t) 251 251 252 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.describeRepo", nil) 252 + req := makeXRPCGetRequest(atproto.RepoDescribeRepo, nil) 253 253 w := httptest.NewRecorder() 254 254 255 255 handler.HandleDescribeRepo(w, req) ··· 263 263 func TestHandleDescribeRepo_InvalidRepo(t *testing.T) { 264 264 handler, _ := setupTestXRPCHandler(t) 265 265 266 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.describeRepo", map[string]string{ 266 + req := makeXRPCGetRequest(atproto.RepoDescribeRepo, map[string]string{ 267 267 "repo": "did:plc:wrongdid", 268 268 }) 269 269 w := httptest.NewRecorder() ··· 284 284 holdDID := "did:web:hold.example.com" 285 285 286 286 // Get the captain record that was created during bootstrap 287 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.getRecord", map[string]string{ 287 + req := makeXRPCGetRequest(atproto.RepoGetRecord, map[string]string{ 288 288 "repo": holdDID, 289 289 "collection": atproto.CaptainCollection, 290 290 "rkey": CaptainRkey, ··· 330 330 331 331 crewRkey := crew[0].Rkey 332 332 333 - req = makeXRPCGetRequest("/xrpc/com.atproto.repo.getRecord", map[string]string{ 333 + req = makeXRPCGetRequest(atproto.RepoGetRecord, map[string]string{ 334 334 "repo": holdDID, 335 335 "collection": atproto.CrewCollection, 336 336 "rkey": crewRkey, ··· 379 379 380 380 for _, tt := range tests { 381 381 t.Run(tt.name, func(t *testing.T) { 382 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.getRecord", tt.params) 382 + req := makeXRPCGetRequest(atproto.RepoGetRecord, tt.params) 383 383 w := httptest.NewRecorder() 384 384 385 385 handler.HandleGetRecord(w, req) ··· 396 396 handler, _ := setupTestXRPCHandler(t) 397 397 holdDID := "did:web:hold.example.com" 398 398 399 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.getRecord", map[string]string{ 399 + req := makeXRPCGetRequest(atproto.RepoGetRecord, map[string]string{ 400 400 "repo": holdDID, 401 401 "collection": atproto.CrewCollection, 402 402 "rkey": "nonexistent", ··· 414 414 func TestHandleGetRecord_InvalidRepo(t *testing.T) { 415 415 handler, _ := setupTestXRPCHandler(t) 416 416 417 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.getRecord", map[string]string{ 417 + req := makeXRPCGetRequest(atproto.RepoGetRecord, map[string]string{ 418 418 "repo": "did:plc:wrongdid", 419 419 "collection": atproto.CaptainCollection, 420 420 "rkey": CaptainRkey, ··· 452 452 } 453 453 454 454 // Test listing crew records 455 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.listRecords", map[string]string{ 455 + req := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{ 456 456 "repo": holdDID, 457 457 "collection": atproto.CrewCollection, 458 458 }) ··· 512 512 } 513 513 514 514 // Test with limit=2 515 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.listRecords", map[string]string{ 515 + req := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{ 516 516 "repo": holdDID, 517 517 "collection": atproto.CrewCollection, 518 518 "limit": "2", ··· 538 538 t.Error("Expected cursor in response when there are more records") 539 539 } else { 540 540 // Test pagination with cursor 541 - req2 := makeXRPCGetRequest("/xrpc/com.atproto.repo.listRecords", map[string]string{ 541 + req2 := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{ 542 542 "repo": holdDID, 543 543 "collection": atproto.CrewCollection, 544 544 "limit": "2", ··· 576 576 } 577 577 578 578 // Get normal order 579 - req1 := makeXRPCGetRequest("/xrpc/com.atproto.repo.listRecords", map[string]string{ 579 + req1 := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{ 580 580 "repo": holdDID, 581 581 "collection": atproto.CrewCollection, 582 582 }) ··· 586 586 records1 := result1["records"].([]any) 587 587 588 588 // Get reverse order 589 - req2 := makeXRPCGetRequest("/xrpc/com.atproto.repo.listRecords", map[string]string{ 589 + req2 := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{ 590 590 "repo": holdDID, 591 591 "collection": atproto.CrewCollection, 592 592 "reverse": "true", ··· 627 627 628 628 for _, tt := range tests { 629 629 t.Run(tt.name, func(t *testing.T) { 630 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.listRecords", map[string]string{ 630 + req := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{ 631 631 "repo": "did:web:hold.example.com", 632 632 "collection": atproto.CrewCollection, 633 633 "limit": tt.limit, ··· 657 657 } 658 658 659 659 // Query a collection that has no records yet 660 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.listRecords", map[string]string{ 660 + req := makeXRPCGetRequest(atproto.RepoListRecords, map[string]string{ 661 661 "repo": "did:web:hold.example.com", 662 662 "collection": atproto.CrewCollection, 663 663 }) ··· 698 698 699 699 for _, tt := range tests { 700 700 t.Run(tt.name, func(t *testing.T) { 701 - req := makeXRPCGetRequest("/xrpc/com.atproto.repo.listRecords", tt.params) 701 + req := makeXRPCGetRequest(atproto.RepoListRecords, tt.params) 702 702 w := httptest.NewRecorder() 703 703 704 704 handler.HandleListRecords(w, req) ··· 739 739 "rkey": rkey, 740 740 } 741 741 742 - req := makeXRPCPostRequest("/xrpc/com.atproto.repo.deleteRecord", body) 742 + req := makeXRPCPostRequest(atproto.RepoDeleteRecord, body) 743 743 744 744 // Add DPoP authentication - owner has admin permission to delete crew 745 745 ownerDID := "did:plc:testowner123" ··· 780 780 func TestHandleDeleteRecord_InvalidJSON(t *testing.T) { 781 781 handler, _ := setupTestXRPCHandler(t) 782 782 783 - req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.deleteRecord", bytes.NewReader([]byte("invalid json"))) 783 + req := httptest.NewRequest(http.MethodPost, atproto.RepoDeleteRecord, bytes.NewReader([]byte("invalid json"))) 784 784 req.Header.Set("Content-Type", "application/json") 785 785 w := httptest.NewRecorder() 786 786 ··· 821 821 822 822 for _, tt := range tests { 823 823 t.Run(tt.name, func(t *testing.T) { 824 - req := makeXRPCPostRequest("/xrpc/com.atproto.repo.deleteRecord", tt.body) 824 + req := makeXRPCPostRequest(atproto.RepoDeleteRecord, tt.body) 825 825 w := httptest.NewRecorder() 826 826 827 827 handler.HandleDeleteRecord(w, req) ··· 848 848 handler, _ := setupTestXRPCHandler(t) 849 849 holdDID := "did:web:hold.example.com" 850 850 851 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.listRepos", nil) 851 + req := makeXRPCGetRequest(atproto.SyncListRepos, nil) 852 852 w := httptest.NewRecorder() 853 853 854 854 handler.HandleListRepos(w, req) ··· 902 902 903 903 // setupTestPDS creates the PDS/database but doesn't initialize the repo 904 904 // Check if implementation returns repos before initialization 905 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.listRepos", nil) 905 + req := makeXRPCGetRequest(atproto.SyncListRepos, nil) 906 906 w := httptest.NewRecorder() 907 907 908 908 handler.HandleListRepos(w, req) ··· 922 922 t.Fatalf("Failed to initialize repo: %v", err) 923 923 } 924 924 925 - req = makeXRPCGetRequest("/xrpc/com.atproto.sync.listRepos", nil) 925 + req = makeXRPCGetRequest(atproto.SyncListRepos, nil) 926 926 w = httptest.NewRecorder() 927 927 928 928 handler.HandleListRepos(w, req) ··· 953 953 holdDID := "did:web:hold.example.com" 954 954 955 955 // Get the captain record as CAR file 956 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getRecord", map[string]string{ 956 + req := makeXRPCGetRequest(atproto.SyncGetRecord, map[string]string{ 957 957 "did": holdDID, 958 958 "collection": atproto.CaptainCollection, 959 959 "rkey": CaptainRkey, ··· 1001 1001 1002 1002 for _, tt := range tests { 1003 1003 t.Run(tt.name, func(t *testing.T) { 1004 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getRecord", tt.params) 1004 + req := makeXRPCGetRequest(atproto.SyncGetRecord, tt.params) 1005 1005 w := httptest.NewRecorder() 1006 1006 1007 1007 handler.HandleSyncGetRecord(w, req) ··· 1018 1018 func TestHandleSyncGetRecord_RecordNotFound(t *testing.T) { 1019 1019 handler, _ := setupTestXRPCHandler(t) 1020 1020 1021 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getRecord", map[string]string{ 1021 + req := makeXRPCGetRequest(atproto.SyncGetRecord, map[string]string{ 1022 1022 "did": "did:web:hold.example.com", 1023 1023 "collection": atproto.CrewCollection, 1024 1024 "rkey": "nonexistent", ··· 1037 1037 func TestHandleSyncGetRecord_InvalidDID(t *testing.T) { 1038 1038 handler, _ := setupTestXRPCHandler(t) 1039 1039 1040 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getRecord", map[string]string{ 1040 + req := makeXRPCGetRequest(atproto.SyncGetRecord, map[string]string{ 1041 1041 "did": "did:plc:wrongdid", 1042 1042 "collection": atproto.CaptainCollection, 1043 1043 "rkey": CaptainRkey, ··· 1060 1060 holdDID := "did:web:hold.example.com" 1061 1061 1062 1062 // Get full repo as CAR file 1063 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getRepo", map[string]string{ 1063 + req := makeXRPCGetRequest(atproto.SyncGetRepo, map[string]string{ 1064 1064 "did": holdDID, 1065 1065 }) 1066 1066 w := httptest.NewRecorder() ··· 1081 1081 func TestHandleGetRepo_MissingDID(t *testing.T) { 1082 1082 handler, _ := setupTestXRPCHandler(t) 1083 1083 1084 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getRepo", nil) 1084 + req := makeXRPCGetRequest(atproto.SyncGetRepo, nil) 1085 1085 w := httptest.NewRecorder() 1086 1086 1087 1087 handler.HandleGetRepo(w, req) ··· 1096 1096 func TestHandleGetRepo_InvalidDID(t *testing.T) { 1097 1097 handler, _ := setupTestXRPCHandler(t) 1098 1098 1099 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getRepo", map[string]string{ 1099 + req := makeXRPCGetRequest(atproto.SyncGetRepo, map[string]string{ 1100 1100 "did": "did:plc:wrongdid", 1101 1101 }) 1102 1102 w := httptest.NewRecorder() ··· 1115 1115 holdDID := "did:web:hold.example.com" 1116 1116 1117 1117 // Get current rev to use as 'since' 1118 - req1 := makeXRPCGetRequest("/xrpc/com.atproto.sync.listRepos", nil) 1118 + req1 := makeXRPCGetRequest(atproto.SyncListRepos, nil) 1119 1119 w1 := httptest.NewRecorder() 1120 1120 handler.HandleListRepos(w1, req1) 1121 1121 result := assertJSONResponse(t, w1, http.StatusOK) ··· 1132 1132 } 1133 1133 1134 1134 // Get repo diff since that rev 1135 - req2 := makeXRPCGetRequest("/xrpc/com.atproto.sync.getRepo", map[string]string{ 1135 + req2 := makeXRPCGetRequest(atproto.SyncGetRepo, map[string]string{ 1136 1136 "did": holdDID, 1137 1137 "since": rev, 1138 1138 }) ··· 1173 1173 "permissions": []string{"blob:read"}, 1174 1174 } 1175 1175 1176 - req := makeXRPCPostRequest("/xrpc/io.atcr.hold.requestCrew", body) 1176 + req := makeXRPCPostRequest(atproto.HoldRequestCrew, body) 1177 1177 w := httptest.NewRecorder() 1178 1178 1179 1179 // Note: This will fail auth because we're not providing DPoP tokens ··· 1216 1216 "permissions": []string{"blob:read"}, 1217 1217 } 1218 1218 1219 - req := makeXRPCPostRequest("/xrpc/io.atcr.hold.requestCrew", body) 1219 + req := makeXRPCPostRequest(atproto.HoldRequestCrew, body) 1220 1220 w := httptest.NewRecorder() 1221 1221 1222 1222 handler.HandleRequestCrew(w, req) ··· 1232 1232 func TestHandleRequestCrew_InvalidJSON(t *testing.T) { 1233 1233 handler, _ := setupTestXRPCHandler(t) 1234 1234 1235 - req := httptest.NewRequest(http.MethodPost, "/xrpc/io.atcr.hold.requestCrew", bytes.NewReader([]byte("invalid json"))) 1235 + req := httptest.NewRequest(http.MethodPost, atproto.HoldRequestCrew, bytes.NewReader([]byte("invalid json"))) 1236 1236 req.Header.Set("Content-Type", "application/json") 1237 1237 w := httptest.NewRecorder() 1238 1238 ··· 1391 1391 blobData := []byte("Hello, ATProto!") 1392 1392 1393 1393 // Test standard single blob upload (POST with raw bytes) 1394 - req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader(blobData)) 1394 + req := httptest.NewRequest(http.MethodPost, atproto.RepoUploadBlob, bytes.NewReader(blobData)) 1395 1395 req.Header.Set("Content-Type", "application/octet-stream") 1396 1396 1397 1397 // Add DPoP authentication - owner has admin permission for blob upload ··· 1447 1447 handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 1448 1448 1449 1449 // Empty blob should succeed (edge case) 1450 - req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte{})) 1450 + req := httptest.NewRequest(http.MethodPost, atproto.RepoUploadBlob, bytes.NewReader([]byte{})) 1451 1451 req.Header.Set("Content-Type", "application/octet-stream") 1452 1452 1453 1453 // Add DPoP authentication ··· 1481 1481 handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 1482 1482 1483 1483 // GET is not allowed for upload (only POST) 1484 - req := httptest.NewRequest(http.MethodGet, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte("test"))) 1484 + req := httptest.NewRequest(http.MethodGet, atproto.RepoUploadBlob, bytes.NewReader([]byte("test"))) 1485 1485 w := httptest.NewRecorder() 1486 1486 1487 1487 handler.HandleUploadBlob(w, req) ··· 1509 1509 holdDID := "did:web:hold.example.com" 1510 1510 cid := "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke" 1511 1511 1512 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getBlob", map[string]string{ 1512 + req := makeXRPCGetRequest(atproto.SyncGetBlob, map[string]string{ 1513 1513 "did": holdDID, 1514 1514 "cid": cid, 1515 1515 }) ··· 1548 1548 holdDID := "did:web:hold.example.com" 1549 1549 digest := "sha256:abc123def456" // OCI digest format 1550 1550 1551 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getBlob", map[string]string{ 1551 + req := makeXRPCGetRequest(atproto.SyncGetBlob, map[string]string{ 1552 1552 "did": holdDID, 1553 1553 "cid": digest, 1554 1554 }) ··· 1584 1584 cid := "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke" 1585 1585 1586 1586 // Use HEAD instead of GET 1587 - req := httptest.NewRequest(http.MethodHead, "/xrpc/com.atproto.sync.getBlob?did="+holdDID+"&cid="+cid, nil) 1587 + req := httptest.NewRequest(http.MethodHead, atproto.SyncGetBlob+"?did="+holdDID+"&cid="+cid, nil) 1588 1588 w := httptest.NewRecorder() 1589 1589 1590 1590 handler.HandleGetBlob(w, req) ··· 1641 1641 1642 1642 for _, tt := range tests { 1643 1643 t.Run(tt.name, func(t *testing.T) { 1644 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getBlob", tt.params) 1644 + req := makeXRPCGetRequest(atproto.SyncGetBlob, tt.params) 1645 1645 w := httptest.NewRecorder() 1646 1646 1647 1647 handler.HandleGetBlob(w, req) ··· 1658 1658 func TestHandleGetBlob_InvalidDID(t *testing.T) { 1659 1659 handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 1660 1660 1661 - req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getBlob", map[string]string{ 1661 + req := makeXRPCGetRequest(atproto.SyncGetBlob, map[string]string{ 1662 1662 "did": "did:plc:wrongdid", 1663 1663 "cid": "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke", 1664 1664 }) ··· 1690 1690 1691 1691 holdDID := "did:web:hold.example.com" 1692 1692 cid := "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke" 1693 - url := fmt.Sprintf("/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", holdDID, cid) 1693 + url := fmt.Sprintf("%s?did=%s&cid=%s", atproto.SyncGetBlob, holdDID, cid) 1694 1694 1695 1695 // Test GET request 1696 1696 req := httptest.NewRequest(http.MethodGet, url, nil) ··· 1738 1738 method string 1739 1739 }{ 1740 1740 {"health endpoint", "/xrpc/_health", "GET"}, 1741 - {"describe server", "/xrpc/com.atproto.server.describeServer", "GET"}, 1742 - {"get blob", "/xrpc/com.atproto.sync.getBlob?did=test&cid=test", "GET"}, 1741 + {"describe server", atproto.ServerDescribeServer, "GET"}, 1742 + {"get blob", atproto.SyncGetBlob + "?did=test&cid=test", "GET"}, 1743 1743 } 1744 1744 1745 1745 for _, tt := range tests { ··· 1783 1783 } 1784 1784 1785 1785 body, _ := json.Marshal(input) 1786 - req := httptest.NewRequest("POST", "/xrpc/com.atproto.repo.deleteRecord", bytes.NewReader(body)) 1786 + req := httptest.NewRequest("POST", atproto.RepoDeleteRecord, bytes.NewReader(body)) 1787 1787 req.Header.Set("Content-Type", "application/json") 1788 1788 1789 1789 if err := dpopHelper.AddDPoPToRequest(req); err != nil { ··· 1819 1819 } 1820 1820 1821 1821 body, _ := json.Marshal(input) 1822 - req := httptest.NewRequest("POST", "/xrpc/com.atproto.repo.deleteRecord", bytes.NewReader(body)) 1822 + req := httptest.NewRequest("POST", atproto.RepoDeleteRecord, bytes.NewReader(body)) 1823 1823 req.Header.Set("Content-Type", "application/json") 1824 1824 1825 1825 w := httptest.NewRecorder() ··· 1844 1844 t.Fatalf("Failed to create DPoP helper: %v", err) 1845 1845 } 1846 1846 1847 - req := httptest.NewRequest("POST", "/xrpc/io.atcr.hold.requestCrew", bytes.NewReader([]byte("{}"))) 1847 + req := httptest.NewRequest("POST", atproto.HoldRequestCrew, bytes.NewReader([]byte("{}"))) 1848 1848 req.Header.Set("Content-Type", "application/json") 1849 1849 1850 1850 if err := dpopHelper.AddDPoPToRequest(req); err != nil { ··· 1868 1868 handler.RegisterHandlers(r) 1869 1869 1870 1870 // requestCrew requires auth, but we send no auth 1871 - req := httptest.NewRequest("POST", "/xrpc/io.atcr.hold.requestCrew", bytes.NewReader([]byte("{}"))) 1871 + req := httptest.NewRequest("POST", atproto.HoldRequestCrew, bytes.NewReader([]byte("{}"))) 1872 1872 req.Header.Set("Content-Type", "application/json") 1873 1873 1874 1874 w := httptest.NewRecorder() ··· 1893 1893 method string 1894 1894 }{ 1895 1895 {"health check", "/xrpc/_health", "GET"}, 1896 - {"describe server", "/xrpc/com.atproto.server.describeServer", "GET"}, 1897 - {"describe repo", "/xrpc/com.atproto.repo.describeRepo?repo=" + handler.pds.DID(), "GET"}, 1898 - {"list repos", "/xrpc/com.atproto.sync.listRepos", "GET"}, 1896 + {"describe server", atproto.ServerDescribeServer, "GET"}, 1897 + {"describe repo", atproto.RepoDescribeRepo + "?repo=" + handler.pds.DID(), "GET"}, 1898 + {"list repos", atproto.SyncListRepos, "GET"}, 1899 1899 {"did document", "/.well-known/did.json", "GET"}, 1900 1900 {"atproto did", "/.well-known/atproto-did", "GET"}, 1901 1901 } ··· 1923 1923 handler.RegisterHandlers(r) 1924 1924 1925 1925 // getBlob should work without auth if captain.public = true 1926 - req := httptest.NewRequest("GET", "/xrpc/com.atproto.sync.getBlob?did="+handler.pds.DID()+"&cid=test123", nil) 1926 + req := httptest.NewRequest("GET", atproto.SyncGetBlob+"?did="+handler.pds.DID()+"&cid=test123", nil) 1927 1927 w := httptest.NewRecorder() 1928 1928 1929 1929 r.ServeHTTP(w, req) ··· 1946 1946 path string 1947 1947 body string 1948 1948 }{ 1949 - {"delete record", "/xrpc/com.atproto.repo.deleteRecord", `{"repo":"test","collection":"test","rkey":"test"}`}, 1950 - {"upload blob", "/xrpc/com.atproto.repo.uploadBlob", "blob data"}, 1949 + {"delete record", atproto.RepoDeleteRecord, `{"repo":"test","collection":"test","rkey":"test"}`}, 1950 + {"upload blob", atproto.RepoUploadBlob, "blob data"}, 1951 1951 } 1952 1952 1953 1953 for _, tt := range tests { ··· 1976 1976 // GET-only routes should reject POST 1977 1977 tests := []string{ 1978 1978 "/xrpc/_health", 1979 - "/xrpc/com.atproto.server.describeServer", 1980 - "/xrpc/com.atproto.sync.listRepos", 1979 + atproto.ServerDescribeServer, 1980 + atproto.SyncListRepos, 1981 1981 } 1982 1982 1983 1983 for _, path := range tests { ··· 2003 2003 2004 2004 // POST-only routes should reject GET 2005 2005 tests := []string{ 2006 - "/xrpc/com.atproto.repo.deleteRecord", 2007 - "/xrpc/io.atcr.hold.requestCrew", 2006 + atproto.RepoDeleteRecord, 2007 + atproto.HoldRequestCrew, 2008 2008 } 2009 2009 2010 2010 for _, path := range tests {
+432
test-e2e.sh
··· 1 + #!/bin/bash 2 + # 3 + # ATCR End-to-End Test Script 4 + # Tests single-arch and multi-arch image push/pull flows 5 + # 6 + # Usage: 7 + # ./test-e2e.sh # Run full test suite 8 + # ./test-e2e.sh --multi # Skip to multi-arch test only 9 + # REGISTRY=localhost:5000 ./test-e2e.sh # Custom registry 10 + # NAMESPACE=myuser ./test-e2e.sh # Custom namespace 11 + # VERBOSE=0 ./test-e2e.sh # Hide docker output 12 + # 13 + # To see bash command execution, edit this file and uncomment 'set -x' below 14 + # 15 + 16 + set -e 17 + 18 + # Verbose mode - shows docker command output 19 + # Set VERBOSE=0 to hide docker output: VERBOSE=0 ./test-e2e.sh 20 + VERBOSE="${VERBOSE:-1}" 21 + 22 + # Bash trace mode - shows every bash command as it executes 23 + # Uncomment the line below to see bash commands as they execute 24 + # set -x 25 + 26 + # Colors for output 27 + RED='\033[0;31m' 28 + GREEN='\033[0;32m' 29 + YELLOW='\033[1;33m' 30 + BLUE='\033[0;34m' 31 + NC='\033[0m' # No Color 32 + 33 + # Configuration 34 + REGISTRY="${REGISTRY:-127.0.0.1:5000}" 35 + NAMESPACE="${NAMESPACE:-evan.jarrett.net}" 36 + IMAGE_NAME="test-image" 37 + TAG="latest" 38 + MULTI_ARCH_TAG="multiarch" 39 + 40 + # Full image references 41 + SINGLE_ARCH_IMAGE="${REGISTRY}/${NAMESPACE}/${IMAGE_NAME}:${TAG}" 42 + MULTI_ARCH_IMAGE="${REGISTRY}/${NAMESPACE}/${IMAGE_NAME}:${MULTI_ARCH_TAG}" 43 + AMD64_IMAGE="${REGISTRY}/${NAMESPACE}/${IMAGE_NAME}:${TAG}-amd64" 44 + ARM64_IMAGE="${REGISTRY}/${NAMESPACE}/${IMAGE_NAME}:${TAG}-arm64" 45 + 46 + # Temporary directory for test images 47 + BUILD_DIR=$(mktemp -d) 48 + 49 + # Cleanup function 50 + cleanup() { 51 + log_info "Cleaning up..." 52 + rm -rf ${BUILD_DIR} 53 + 54 + # Clean up any dangling manifest lists 55 + docker manifest rm ${MULTI_ARCH_IMAGE} 2>/dev/null || true 56 + } 57 + 58 + trap cleanup EXIT 59 + 60 + # Logging functions 61 + log_info() { 62 + echo -e "${GREEN}[INFO]${NC} $1" 63 + } 64 + 65 + log_error() { 66 + echo -e "${RED}[ERROR]${NC} $1" 67 + } 68 + 69 + log_warn() { 70 + echo -e "${YELLOW}[WARN]${NC} $1" 71 + } 72 + 73 + log_step() { 74 + echo -e "\n${GREEN}==>${NC} $1" 75 + } 76 + 77 + log_cmd() { 78 + echo -e "${BLUE}[CMD]${NC} $*" 79 + if [ "$VERBOSE" = "1" ]; then 80 + "$@" 81 + else 82 + "$@" > /dev/null 2>&1 83 + fi 84 + } 85 + 86 + # Check if docker compose services are running 87 + check_compose_services() { 88 + log_step "Checking docker compose services..." 89 + 90 + if ! docker compose ps | grep -q "Up"; then 91 + log_warn "Some services may not be running. Starting services..." 92 + docker compose up -d 93 + sleep 5 94 + fi 95 + 96 + # Check for errors in logs 97 + log_info "Checking for errors in service logs..." 98 + if docker compose logs --tail=50 | grep -i "error" | grep -v "level=error msg=\"error processing" | grep -v "test"; then 99 + log_warn "Found some errors in logs (may be normal)" 100 + else 101 + log_info "No critical errors found in recent logs" 102 + fi 103 + } 104 + 105 + # Build a simple test image 106 + build_single_arch_image() { 107 + log_step "Building single-arch test image..." 108 + 109 + cat > ${BUILD_DIR}/Dockerfile <<EOF 110 + FROM alpine:latest 111 + 112 + # Add some content to make the image non-trivial 113 + RUN apk add --no-cache curl bash 114 + 115 + # Create a test file with some content 116 + RUN echo "This is a test image created at \$(date)" > /test.txt 117 + RUN echo "Architecture: \$(uname -m)" >> /test.txt 118 + 119 + # Add a simple script 120 + RUN echo '#!/bin/sh' > /test.sh && \\ 121 + echo 'echo "Test image running successfully!"' >> /test.sh && \\ 122 + echo 'cat /test.txt' >> /test.sh && \\ 123 + chmod +x /test.sh 124 + 125 + CMD ["/test.sh"] 126 + EOF 127 + 128 + log_cmd docker build -t ${SINGLE_ARCH_IMAGE} ${BUILD_DIR} 129 + log_info "Built single-arch image: ${SINGLE_ARCH_IMAGE}" 130 + } 131 + 132 + # Build multi-arch images using docker manifest create (old school!) 133 + build_multi_arch_images() { 134 + log_step "Building multi-arch images (amd64 and arm64) using docker manifest..." 135 + 136 + # Create Dockerfile for multi-arch builds 137 + cat > ${BUILD_DIR}/Dockerfile.multiarch <<EOF 138 + FROM alpine:latest 139 + 140 + ARG TARGETARCH=unknown 141 + ARG TARGETOS=linux 142 + 143 + # Add some content to make the image non-trivial 144 + RUN apk add --no-cache curl bash 145 + 146 + # Create a test file with arch info 147 + RUN echo "This is a multi-arch test image" > /test.txt 148 + RUN echo "Target OS: \${TARGETOS}" >> /test.txt 149 + RUN echo "Target Architecture: \${TARGETARCH}" >> /test.txt 150 + RUN echo "Built at: \$(date)" >> /test.txt 151 + 152 + # Add a simple script 153 + RUN echo '#!/bin/sh' > /test.sh && \\ 154 + echo 'echo "Multi-arch test image running!"' >> /test.sh && \\ 155 + echo 'cat /test.txt' >> /test.sh && \\ 156 + chmod +x /test.sh 157 + 158 + CMD ["/test.sh"] 159 + EOF 160 + 161 + # Build amd64 image 162 + log_info "Building amd64 image..." 163 + log_cmd docker build \ 164 + --platform linux/amd64 \ 165 + --build-arg TARGETARCH=amd64 \ 166 + --build-arg TARGETOS=linux \ 167 + -t ${AMD64_IMAGE} \ 168 + -f ${BUILD_DIR}/Dockerfile.multiarch \ 169 + ${BUILD_DIR} 170 + 171 + # Push amd64 image 172 + log_info "Pushing amd64 image..." 173 + echo -e "${BLUE}[CMD]${NC} docker push ${AMD64_IMAGE}" 174 + if ! docker push ${AMD64_IMAGE}; then 175 + log_error "Failed to push amd64 image" 176 + return 1 177 + fi 178 + 179 + # Build arm64 image 180 + log_info "Building arm64 image..." 181 + log_cmd docker build \ 182 + --platform linux/arm64 \ 183 + --build-arg TARGETARCH=arm64 \ 184 + --build-arg TARGETOS=linux \ 185 + -t ${ARM64_IMAGE} \ 186 + -f ${BUILD_DIR}/Dockerfile.multiarch \ 187 + ${BUILD_DIR} 188 + 189 + # Push arm64 image 190 + log_info "Pushing arm64 image..." 191 + echo -e "${BLUE}[CMD]${NC} docker push ${ARM64_IMAGE}" 192 + if ! docker push ${ARM64_IMAGE}; then 193 + log_error "Failed to push arm64 image" 194 + return 1 195 + fi 196 + 197 + # Create manifest list 198 + log_info "Creating multi-arch manifest list..." 199 + echo -e "${BLUE}[CMD]${NC} docker manifest create --insecure ${MULTI_ARCH_IMAGE} ${AMD64_IMAGE} ${ARM64_IMAGE}" 200 + docker manifest create --insecure ${MULTI_ARCH_IMAGE} \ 201 + ${AMD64_IMAGE} \ 202 + ${ARM64_IMAGE} 203 + 204 + # Annotate manifests with platform info 205 + log_info "Annotating manifest with platform info..." 206 + docker manifest annotate ${MULTI_ARCH_IMAGE} ${AMD64_IMAGE} \ 207 + --os linux --arch amd64 208 + docker manifest annotate ${MULTI_ARCH_IMAGE} ${ARM64_IMAGE} \ 209 + --os linux --arch arm64 210 + 211 + # Push the manifest list 212 + log_info "Pushing multi-arch manifest list..." 213 + echo -e "${BLUE}[CMD]${NC} docker manifest push --insecure ${MULTI_ARCH_IMAGE}" 214 + docker manifest push --insecure ${MULTI_ARCH_IMAGE} 215 + 216 + log_info "Multi-arch manifest created and pushed: ${MULTI_ARCH_IMAGE}" 217 + } 218 + 219 + # Push single-arch image and verify digest 220 + push_single_arch_image() { 221 + log_step "Pushing single-arch image..." 222 + 223 + echo -e "${BLUE}[CMD]${NC} docker push ${SINGLE_ARCH_IMAGE}" 224 + local push_output 225 + push_output=$(docker push ${SINGLE_ARCH_IMAGE} 2>&1 | tee /dev/tty) 226 + 227 + # Extract and verify digest 228 + local digest 229 + digest=$(echo "$push_output" | grep -oP 'digest: \K[a-z0-9:]+' | tail -1) 230 + 231 + if [ -z "$digest" ]; then 232 + log_error "Failed to get digest from push output" 233 + return 1 234 + fi 235 + 236 + log_info "Pushed with digest: ${digest}" 237 + 238 + # Verify we can reference by digest 239 + log_info "Verifying digest reference..." 240 + echo -e "${BLUE}[CMD]${NC} docker manifest inspect --insecure ${REGISTRY}/${NAMESPACE}/${IMAGE_NAME}@${digest}" 241 + docker manifest inspect --insecure "${REGISTRY}/${NAMESPACE}/${IMAGE_NAME}@${digest}" 242 + log_info "Digest verification successful!" 243 + } 244 + 245 + # Verify multi-arch manifest 246 + verify_multi_arch_manifest() { 247 + log_step "Verifying multi-arch manifest..." 248 + 249 + echo -e "${BLUE}[CMD]${NC} docker manifest inspect --insecure ${MULTI_ARCH_IMAGE}" 250 + local manifest 251 + manifest=$(docker manifest inspect --insecure ${MULTI_ARCH_IMAGE} | tee /dev/tty) 252 + 253 + # Check for both architectures (check for "amd64" and "arm64" in platform.architecture) 254 + if echo "$manifest" | grep -q '"architecture": "amd64"'; then 255 + log_info "Found linux/amd64 manifest" 256 + else 257 + log_error "Missing linux/amd64 manifest" 258 + return 1 259 + fi 260 + 261 + if echo "$manifest" | grep -q '"architecture": "arm64"'; then 262 + log_info "Found linux/arm64 manifest" 263 + else 264 + log_error "Missing linux/arm64 manifest" 265 + return 1 266 + fi 267 + 268 + # Extract digest 269 + local digest 270 + digest=$(echo "$manifest" | jq -r '.manifests[0].digest' 2>/dev/null || echo "$manifest" | grep -oP 'sha256:[a-f0-9]+' | head -1) 271 + 272 + if [ -n "$digest" ]; then 273 + log_info "Multi-arch manifest digest: ${digest}" 274 + fi 275 + } 276 + 277 + # Remove local images 278 + cleanup_local_images() { 279 + log_step "Removing local images..." 280 + 281 + echo -e "${BLUE}[CMD]${NC} docker rmi ${SINGLE_ARCH_IMAGE}" 282 + docker rmi ${SINGLE_ARCH_IMAGE} || true 283 + 284 + echo -e "${BLUE}[CMD]${NC} docker rmi ${AMD64_IMAGE}" 285 + docker rmi ${AMD64_IMAGE} || true 286 + 287 + echo -e "${BLUE}[CMD]${NC} docker rmi ${ARM64_IMAGE}" 288 + docker rmi ${ARM64_IMAGE} || true 289 + 290 + echo -e "${BLUE}[CMD]${NC} docker rmi ${MULTI_ARCH_IMAGE}" 291 + docker rmi ${MULTI_ARCH_IMAGE} || true 292 + 293 + # Clean up manifest list 294 + docker manifest rm ${MULTI_ARCH_IMAGE} 2>/dev/null || true 295 + 296 + # Also remove any cached layers 297 + log_info "Pruning dangling images..." 298 + log_cmd docker image prune -f 299 + 300 + log_info "Local images removed" 301 + } 302 + 303 + # Pull images back 304 + pull_images() { 305 + log_step "Pulling images back from registry..." 306 + 307 + # Pull single-arch image 308 + log_info "Pulling single-arch image..." 309 + echo -e "${BLUE}[CMD]${NC} docker pull ${SINGLE_ARCH_IMAGE}" 310 + if docker pull ${SINGLE_ARCH_IMAGE}; then 311 + log_info "Successfully pulled: ${SINGLE_ARCH_IMAGE}" 312 + else 313 + log_error "Failed to pull single-arch image" 314 + return 1 315 + fi 316 + 317 + # Pull multi-arch image 318 + log_info "Pulling multi-arch image..." 319 + echo -e "${BLUE}[CMD]${NC} docker pull ${MULTI_ARCH_IMAGE}" 320 + if docker pull ${MULTI_ARCH_IMAGE}; then 321 + log_info "Successfully pulled: ${MULTI_ARCH_IMAGE}" 322 + else 323 + log_error "Failed to pull multi-arch image" 324 + return 1 325 + fi 326 + } 327 + 328 + # Test running the images 329 + test_images() { 330 + log_step "Testing pulled images..." 331 + 332 + # Test single-arch image 333 + log_info "Running single-arch image..." 334 + log_cmd docker run --rm ${SINGLE_ARCH_IMAGE} 335 + 336 + # Test multi-arch image 337 + log_info "Running multi-arch image..." 338 + log_cmd docker run --rm ${MULTI_ARCH_IMAGE} 339 + 340 + log_info "All images ran successfully!" 341 + } 342 + 343 + # Check for errors in compose logs after operations 344 + check_compose_logs() { 345 + log_step "Checking compose logs for errors..." 346 + 347 + local error_count 348 + error_count=$(docker compose logs --tail=100 | grep -i "error" | grep -v "level=error msg=\"error processing" | grep -v "test" | wc -l) 349 + 350 + if [ "$error_count" -gt 0 ]; then 351 + log_warn "Found ${error_count} error messages in logs:" 352 + docker compose logs --tail=100 | grep -i "error" | grep -v "level=error msg=\"error processing" | grep -v "test" | tail -10 353 + else 354 + log_info "No errors found in compose logs" 355 + fi 356 + } 357 + 358 + # Multi-arch only test flow 359 + multi_arch_only() { 360 + log_step "Starting ATCR multi-arch test (skipping single-arch)" 361 + log_info "Registry: ${REGISTRY}" 362 + log_info "Namespace: ${NAMESPACE}" 363 + log_info "Build directory: ${BUILD_DIR}" 364 + log_info "Verbose mode: ${VERBOSE} (set VERBOSE=0 to hide docker output)" 365 + 366 + # Check prerequisites 367 + if ! command -v docker &> /dev/null; then 368 + log_error "Docker is not installed" 369 + exit 1 370 + fi 371 + 372 + if ! command -v jq &> /dev/null; then 373 + log_warn "jq is not installed - some checks may be limited" 374 + fi 375 + 376 + # Run multi-arch test steps only 377 + check_compose_services 378 + build_multi_arch_images 379 + verify_multi_arch_manifest 380 + cleanup_local_images 381 + pull_images 382 + log_info "Running multi-arch image..." 383 + log_cmd docker run --rm ${MULTI_ARCH_IMAGE} 384 + check_compose_logs 385 + 386 + log_step "Multi-arch test completed successfully!" 387 + echo -e "${GREEN}========================================${NC}" 388 + echo -e "${GREEN}Multi-arch test passed!${NC}" 389 + echo -e "${GREEN}========================================${NC}" 390 + } 391 + 392 + # Main test flow 393 + main() { 394 + log_step "Starting ATCR end-to-end test" 395 + log_info "Registry: ${REGISTRY}" 396 + log_info "Namespace: ${NAMESPACE}" 397 + log_info "Build directory: ${BUILD_DIR}" 398 + log_info "Verbose mode: ${VERBOSE} (set VERBOSE=0 to hide docker output)" 399 + 400 + # Check prerequisites 401 + if ! command -v docker &> /dev/null; then 402 + log_error "Docker is not installed" 403 + exit 1 404 + fi 405 + 406 + if ! command -v jq &> /dev/null; then 407 + log_warn "jq is not installed - some checks may be limited" 408 + fi 409 + 410 + # Run test steps 411 + check_compose_services 412 + build_single_arch_image 413 + push_single_arch_image 414 + build_multi_arch_images 415 + verify_multi_arch_manifest 416 + cleanup_local_images 417 + pull_images 418 + test_images 419 + check_compose_logs 420 + 421 + log_step "End-to-end test completed successfully!" 422 + echo -e "${GREEN}========================================${NC}" 423 + echo -e "${GREEN}All tests passed!${NC}" 424 + echo -e "${GREEN}========================================${NC}" 425 + } 426 + 427 + # Parse arguments 428 + if [ "$1" = "--multi" ]; then 429 + multi_arch_only 430 + else 431 + main "$@" 432 + fi