A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix pushing and pulling from docker

+146 -137
+58 -67
pkg/appview/storage/proxy_blob_store.go
··· 222 222 return distribution.Descriptor{}, err 223 223 } 224 224 225 - // Get presigned HEAD URL 226 - url, err := p.getHeadURL(ctx, dgst) 225 + method := "HEAD" 226 + 227 + url, err := p.getPresignedURL(ctx, method, dgst) 227 228 if err != nil { 228 229 return distribution.Descriptor{}, distribution.ErrBlobUnknown 229 230 } 230 231 231 - // Make HEAD request with service token authentication 232 - req, err := http.NewRequestWithContext(ctx, "HEAD", url, nil) 232 + // Make HEAD request to presigned URL 233 + req, err := http.NewRequestWithContext(ctx, method, url, nil) 233 234 if err != nil { 234 235 return distribution.Descriptor{}, distribution.ErrBlobUnknown 235 236 } 236 237 237 - resp, err := p.doAuthenticatedRequest(ctx, req) 238 + // Go directly to the presigned URL, no need to authenticate 239 + resp, err := p.httpClient.Do(req) 238 240 if err != nil { 239 241 return distribution.Descriptor{}, distribution.ErrBlobUnknown 240 242 } ··· 264 266 return nil, err 265 267 } 266 268 267 - url, err := p.getDownloadURL(ctx, dgst) 269 + method := "GET" 270 + 271 + url, err := p.getPresignedURL(ctx, method, dgst) 268 272 if err != nil { 269 273 return nil, err 270 274 } 271 275 272 - // Download the blob 273 - resp, err := http.Get(url) 276 + // Download the blob with service token authentication 277 + req, err := http.NewRequestWithContext(ctx, method, url, nil) 274 278 if err != nil { 275 279 return nil, err 276 280 } 281 + 282 + resp, err := p.doAuthenticatedRequest(ctx, req) 283 + if err != nil { 284 + return nil, err 285 + } 286 + 277 287 defer resp.Body.Close() 278 288 279 289 if resp.StatusCode != http.StatusOK { ··· 290 300 return nil, err 291 301 } 292 302 293 - url, err := p.getDownloadURL(ctx, dgst) 303 + method := "GET" 304 + 305 + url, err := p.getPresignedURL(ctx, method, dgst) 294 306 if err != nil { 295 307 return nil, err 296 308 } 297 309 298 310 // Download the blob with service token authentication 299 - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 311 + req, err := http.NewRequestWithContext(ctx, method, url, nil) 300 312 if err != nil { 301 313 return nil, err 302 314 } ··· 370 382 return err 371 383 } 372 384 373 - // For HEAD requests, proxy the response instead of redirecting 374 - // This avoids authentication issues when client follows redirects 375 - if r.Method == http.MethodHead { 376 - url, err := p.getHeadURL(ctx, dgst) 377 - if err != nil { 378 - return err 379 - } 380 - 381 - // Make authenticated HEAD request to hold service 382 - req, err := http.NewRequestWithContext(ctx, "HEAD", url, nil) 383 - if err != nil { 384 - return err 385 - } 386 - 387 - resp, err := p.doAuthenticatedRequest(ctx, req) 388 - if err != nil { 389 - return err 390 - } 391 - defer resp.Body.Close() 392 - 393 - if resp.StatusCode != http.StatusOK { 394 - return fmt.Errorf("blob not found") 395 - } 396 - 397 - // Copy response headers 398 - if contentLength := resp.Header.Get("Content-Length"); contentLength != "" { 399 - w.Header().Set("Content-Length", contentLength) 400 - } 401 - if contentType := resp.Header.Get("Content-Type"); contentType != "" { 402 - w.Header().Set("Content-Type", contentType) 403 - } 404 - if etag := resp.Header.Get("ETag"); etag != "" { 405 - w.Header().Set("ETag", etag) 406 - } 407 - 408 - w.WriteHeader(http.StatusOK) 409 - return nil 410 - } 411 - 412 - // For GET requests, redirect to presigned URL for direct download 413 - url, err := p.getDownloadURL(ctx, dgst) 385 + url, err := p.getPresignedURL(ctx, r.Method, dgst) 414 386 if err != nil { 415 387 return err 416 388 } ··· 483 455 return writer, nil 484 456 } 485 457 486 - // getDownloadURL returns the XRPC getBlob URL for downloading a blob 487 - // The hold service will redirect to a presigned S3 URL 488 - func (p *ProxyBlobStore) getDownloadURL(ctx context.Context, dgst digest.Digest) (string, error) { 489 - // Use XRPC endpoint: GET /xrpc/com.atproto.sync.getBlob?did={userDID}&cid={digest} 458 + // getPresignedURL returns the XRPC endpoint URL for blob operations 459 + func (p *ProxyBlobStore) getPresignedURL(ctx context.Context, operation string, dgst digest.Digest) (string, error) { 460 + // Use XRPC endpoint: /xrpc/com.atproto.sync.getBlob?did={userDID}&cid={digest} 490 461 // The 'did' parameter is the USER's DID (whose blob we're fetching), not the hold service DID 491 462 // Per migration doc: hold accepts OCI digest directly as cid parameter (checks for sha256: prefix) 492 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 493 - p.holdURL, p.ctx.DID, dgst.String()) 494 - return url, nil 495 - } 463 + xrpcURL := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s&method=%s", 464 + p.holdURL, p.ctx.DID, dgst.String(), operation) 465 + 466 + req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 467 + if err != nil { 468 + return "", fmt.Errorf("failed to create request: %w", err) 469 + } 470 + 471 + resp, err := p.doAuthenticatedRequest(ctx, req) 472 + if err != nil { 473 + return "", fmt.Errorf("failed to call hold service: %w", err) 474 + } 475 + defer resp.Body.Close() 476 + 477 + if resp.StatusCode != http.StatusOK { 478 + bodyBytes, _ := io.ReadAll(resp.Body) 479 + return "", fmt.Errorf("hold service returned error: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 480 + } 481 + 482 + // Parse JSON response to get presigned HEAD URL 483 + var result struct { 484 + URL string `json:"url"` 485 + } 486 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 487 + return "", fmt.Errorf("failed to parse hold service response: %w", err) 488 + } 489 + 490 + if result.URL == "" { 491 + return "", fmt.Errorf("hold service returned empty URL") 492 + } 496 493 497 - // getHeadURL returns the XRPC getBlob URL for HEAD requests 498 - // The hold service will redirect to a presigned S3 URL 499 - func (p *ProxyBlobStore) getHeadURL(ctx context.Context, dgst digest.Digest) (string, error) { 500 - // Same as GET - hold service handles HEAD method on getBlob endpoint 501 - // The 'did' parameter is the USER's DID (whose blob we're checking), not the hold service DID 502 - url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 503 - p.holdURL, p.ctx.DID, dgst.String()) 504 - return url, nil 494 + fmt.Printf("DEBUG [proxy_blob_store]: Got presigned HEAD URL from hold service: %s\n", result.URL) 495 + return result.URL, nil 505 496 } 506 497 507 498 // startMultipartUpload initiates a multipart upload via XRPC uploadBlob endpoint
+23 -49
pkg/hold/pds/xrpc.go
··· 978 978 digest = cidOrDigest 979 979 } 980 980 981 - // Handle HEAD vs GET differently - they need different presigned URLs 982 - if r.Method == http.MethodHead { 983 - // For HEAD requests: generate HEAD presigned URL and proxy to S3 984 - // AppView expects 200 OK with Content-Length, not a redirect 985 - // Note: HEAD presigned URLs have different signatures than GET URLs 986 - 987 - headURL, err := h.blobStore.GetPresignedURL("HEAD", digest, did) // TODO: Add GetPresignedHeadURL method 988 - if err != nil { 989 - log.Printf("[HandleGetBlob] Failed to get presigned HEAD URL: digest=%s, did=%s, err=%v", digest, did, err) 990 - http.Error(w, "blob not found", http.StatusNotFound) 991 - return 992 - } 993 - 994 - log.Printf("[HandleGetBlob] Proxying HEAD request to: %s", headURL) 995 - 996 - headResp, err := http.Head(headURL) 997 - if err != nil { 998 - log.Printf("[HandleGetBlob] HEAD request failed: %v", err) 999 - http.Error(w, "blob not found", http.StatusNotFound) 1000 - return 1001 - } 1002 - defer headResp.Body.Close() 1003 - 1004 - if headResp.StatusCode != http.StatusOK { 1005 - log.Printf("[HandleGetBlob] HEAD request returned non-200: %d", headResp.StatusCode) 1006 - http.Error(w, "blob not found", http.StatusNotFound) 1007 - return 981 + // Determine presigned URL operation 982 + // Check for ?method=HEAD query parameter first (from AppView), then fall back to request method 983 + // HEAD and GET need different presigned URL signatures 984 + operation := r.URL.Query().Get("method") 985 + if operation == "" { 986 + operation = "GET" 987 + if r.Method == http.MethodHead { 988 + operation = "HEAD" 1008 989 } 990 + } 1009 991 1010 - // Copy relevant headers from S3 response 1011 - if contentLength := headResp.Header.Get("Content-Length"); contentLength != "" { 1012 - w.Header().Set("Content-Length", contentLength) 1013 - } 1014 - if contentType := headResp.Header.Get("Content-Type"); contentType != "" { 1015 - w.Header().Set("Content-Type", contentType) 1016 - } 1017 - if etag := headResp.Header.Get("ETag"); etag != "" { 1018 - w.Header().Set("ETag", etag) 1019 - } 992 + // Generate presigned URL for the operation 993 + presignedURL, err := h.blobStore.GetPresignedURL(operation, digest, did) 994 + if err != nil { 995 + log.Printf("[HandleGetBlob] Failed to get presigned %s URL: digest=%s, did=%s, err=%v", operation, digest, did, err) 996 + http.Error(w, "failed to get presigned URL", http.StatusInternalServerError) 997 + return 998 + } 1020 999 1021 - log.Printf("[HandleGetBlob] HEAD request successful, Content-Length: %s", headResp.Header.Get("Content-Length")) 1022 - w.WriteHeader(http.StatusOK) 1023 - } else { 1024 - // For GET requests: generate GET presigned URL and redirect for direct download from S3 1025 - downloadURL, err := h.blobStore.GetPresignedURL("GET", digest, did) 1026 - if err != nil { 1027 - log.Printf("[HandleGetBlob] Failed to get presigned GET URL: digest=%s, did=%s, err=%v", digest, did, err) 1028 - http.Error(w, "failed to get download URL", http.StatusInternalServerError) 1029 - return 1030 - } 1000 + log.Printf("[HandleGetBlob] Returning presigned %s URL: %s", operation, presignedURL) 1031 1001 1032 - log.Printf("[HandleGetBlob] Redirecting GET request to presigned URL: %s", downloadURL) 1033 - http.Redirect(w, r, downloadURL, http.StatusTemporaryRedirect) 1002 + // Return JSON response with the presigned URL 1003 + // AppView will either redirect (GET) or proxy (HEAD) using this URL 1004 + response := map[string]string{ 1005 + "url": presignedURL, 1034 1006 } 1007 + w.Header().Set("Content-Type", "application/json") 1008 + json.NewEncoder(w).Encode(response) 1035 1009 } 1036 1010 1037 1011 // HandleListRepos lists all repositories in this PDS
+65 -21
pkg/hold/pds/xrpc_test.go
··· 1386 1386 } 1387 1387 1388 1388 func (m *mockBlobStore) GetPresignedURL(operation, digest, did string) (string, error) { 1389 - if operation == "GET" { 1389 + if operation == "GET" || operation == "HEAD" { 1390 + // Both GET and HEAD are download operations, just different HTTP methods 1390 1391 m.downloadCalls = append(m.downloadCalls, digest) 1391 1392 if m.downloadURLError != nil { 1392 1393 return "", m.downloadURLError ··· 1395 1396 return "https://s3.example.com/download/" + digest, nil 1396 1397 } 1397 1398 1399 + // PUT or other upload operations 1398 1400 m.uploadCalls = append(m.uploadCalls, digest) 1399 1401 if m.uploadURLError != nil { 1400 1402 return "", m.uploadURLError ··· 1680 1682 1681 1683 handler.HandleGetBlob(w, req) 1682 1684 1683 - // Should redirect to presigned download URL (307 Temporary Redirect) 1684 - if w.Code != http.StatusTemporaryRedirect { 1685 - t.Errorf("Expected status 307 (Temporary Redirect), got %d", w.Code) 1685 + // Should return 200 OK with JSON response containing presigned URL 1686 + if w.Code != http.StatusOK { 1687 + t.Errorf("Expected status 200 OK, got %d", w.Code) 1688 + } 1689 + 1690 + // Verify Content-Type is JSON 1691 + contentType := w.Header().Get("Content-Type") 1692 + if contentType != "application/json" { 1693 + t.Errorf("Expected Content-Type application/json, got %s", contentType) 1694 + } 1695 + 1696 + // Parse JSON response 1697 + var response map[string]string 1698 + if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil { 1699 + t.Fatalf("Failed to parse JSON response: %v", err) 1686 1700 } 1687 1701 1688 - location := w.Header().Get("Location") 1702 + // Verify URL field exists 1689 1703 expectedURL := "https://s3.example.com/download/" + cid 1690 - if location != expectedURL { 1691 - t.Errorf("Expected redirect to %s, got %s", expectedURL, location) 1704 + if response["url"] != expectedURL { 1705 + t.Errorf("Expected url to be %s, got %s", expectedURL, response["url"]) 1692 1706 } 1693 1707 1694 1708 // Verify blob store was called 1695 1709 if len(blobStore.downloadCalls) != 1 || blobStore.downloadCalls[0] != cid { 1696 - t.Errorf("Expected GetPresignedDownloadURL to be called with %s", cid) 1710 + t.Errorf("Expected GetPresignedURL to be called with %s", cid) 1697 1711 } 1698 1712 } 1699 1713 ··· 1713 1727 1714 1728 handler.HandleGetBlob(w, req) 1715 1729 1716 - // Should redirect to presigned download URL 1717 - if w.Code != http.StatusTemporaryRedirect { 1718 - t.Errorf("Expected status 307, got %d", w.Code) 1730 + // Should return 200 OK with JSON response 1731 + if w.Code != http.StatusOK { 1732 + t.Errorf("Expected status 200 OK, got %d", w.Code) 1733 + } 1734 + 1735 + // Parse JSON response 1736 + var response map[string]string 1737 + if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil { 1738 + t.Fatalf("Failed to parse JSON response: %v", err) 1739 + } 1740 + 1741 + // Verify URL field exists 1742 + if response["url"] == "" { 1743 + t.Errorf("Expected url field in response, got empty") 1719 1744 } 1720 1745 1721 1746 // Verify blob store received the sha256 digest 1722 1747 if len(blobStore.downloadCalls) != 1 || blobStore.downloadCalls[0] != digest { 1723 - t.Errorf("Expected GetPresignedDownloadURL to be called with %s, got %v", digest, blobStore.downloadCalls) 1748 + t.Errorf("Expected GetPresignedURL to be called with %s, got %v", digest, blobStore.downloadCalls) 1724 1749 } 1725 1750 } 1726 1751 1727 1752 // TestHandleGetBlob_HeadMethod tests HEAD request support 1728 - // HEAD requests are proxied (not redirected) to avoid S3 presigned URL signature issues 1729 - // The hold service makes the HEAD request itself and returns 200 OK with headers 1753 + // HEAD requests now return JSON with presigned HEAD URL (same as GET) 1754 + // AppView is responsible for making the actual HEAD request to S3 1730 1755 // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1731 1756 func TestHandleGetBlob_HeadMethod(t *testing.T) { 1732 - handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 1757 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1733 1758 1734 1759 holdDID := "did:web:hold.example.com" 1735 1760 cid := "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke" ··· 1740 1765 1741 1766 handler.HandleGetBlob(w, req) 1742 1767 1743 - // HEAD requests are now proxied - the handler makes an HTTP HEAD to the presigned URL 1744 - // In the test environment, this will fail because the mock returns a fake URL 1745 - // Expect 404 because the handler couldn't reach the mock S3 URL 1746 - if w.Code != http.StatusNotFound { 1747 - t.Errorf("Expected status 404 (mock S3 unreachable), got %d", w.Code) 1768 + // Should return 200 OK with JSON response containing presigned HEAD URL 1769 + if w.Code != http.StatusOK { 1770 + t.Errorf("Expected status 200 OK, got %d", w.Code) 1771 + } 1772 + 1773 + // Verify Content-Type is JSON 1774 + contentType := w.Header().Get("Content-Type") 1775 + if contentType != "application/json" { 1776 + t.Errorf("Expected Content-Type application/json, got %s", contentType) 1777 + } 1778 + 1779 + // Parse JSON response 1780 + var response map[string]string 1781 + if err := json.Unmarshal(w.Body.Bytes(), &response); err != nil { 1782 + t.Fatalf("Failed to parse JSON response: %v", err) 1748 1783 } 1749 1784 1750 - // Note: In production with real S3, HEAD would return 200 OK with Content-Length header 1785 + // Verify URL field exists 1786 + expectedURL := "https://s3.example.com/download/" + cid 1787 + if response["url"] != expectedURL { 1788 + t.Errorf("Expected url to be %s, got %s", expectedURL, response["url"]) 1789 + } 1790 + 1791 + // Verify blob store was called with HEAD operation 1792 + if len(blobStore.downloadCalls) != 1 || blobStore.downloadCalls[0] != cid { 1793 + t.Errorf("Expected GetPresignedURL to be called with %s", cid) 1794 + } 1751 1795 } 1752 1796 1753 1797 // TestHandleGetBlob_MissingParameters tests missing required parameters