A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

xrpc multipart blob upload functionality for OCI containers

+1455 -30
+152 -6
docs/XRPC_BLOB_MIGRATION.md
··· 7 7 ### Legacy HTTP Endpoints (cmd/hold/main.go) 8 8 9 9 ```go 10 + // Unified presigned URL endpoint (handles upload AND download) 11 + mux.HandleFunc("/presigned-url", service.HandlePresignedURL) 12 + 13 + // Internal move operation (used by multipart complete) 14 + mux.HandleFunc("/move", service.HandleMove) 15 + 10 16 // Multipart upload endpoints 11 17 mux.HandleFunc("/start-multipart", service.HandleStartMultipart) 12 18 mux.HandleFunc("/part-presigned-url", service.HandleGetPartURL) ··· 45 51 - Currently not used by XRPC handlers 46 52 47 53 **pkg/hold/handlers.go:** 54 + - `HandlePresignedURL()` - Unified endpoint for GET/HEAD/PUT presigned URLs 55 + - `HandleMove()` - Moves blob from temp to final location (internal operation) 48 56 - `HandleStartMultipart()` - Starts upload, returns uploadID 49 57 - `HandleGetPartURL()` - Returns presigned URL for part 50 - - `HandleCompleteMultipart()` - Finalizes upload, assembles parts 58 + - `HandleCompleteMultipart()` - Finalizes upload, assembles parts (calls Move internally) 51 59 - `HandleAbortMultipart()` - Cancels upload 52 60 - `HandleMultipartPartUpload()` - Buffered part upload fallback 53 61 62 + ## Legacy Endpoint Mapping 63 + 64 + ### `/presigned-url` → Multiple XRPC Operations 65 + 66 + The legacy `/presigned-url` endpoint is a **unified endpoint** that handles both upload and download operations based on the `operation` field in the JSON body: 67 + 68 + **Legacy format:** 69 + ``` 70 + POST /presigned-url 71 + Content-Type: application/json 72 + 73 + { 74 + "operation": "GET", // or "HEAD" or "PUT" 75 + "did": "did:plc:alice123", 76 + "digest": "sha256:abc123...", 77 + "size": 1234567890 // Only for PUT operations 78 + } 79 + 80 + Response: 81 + { 82 + "url": "https://s3.amazonaws.com/...", 83 + "expires_at": "2025-10-16T..." 84 + } 85 + ``` 86 + 87 + **XRPC mapping:** 88 + - `operation: "GET"` → `GET /xrpc/com.atproto.sync.getBlob?did=...&cid=sha256:abc...` 89 + - `operation: "HEAD"` → `HEAD /xrpc/com.atproto.sync.getBlob?did=...&cid=sha256:abc...` 90 + - `operation: "PUT"` → `com.atproto.repo.uploadBlob` (single upload via presigned URL) 91 + 92 + **Note:** For GET/HEAD operations, AppView passes OCI digest directly as `cid` parameter. Hold detects `sha256:` prefix and uses digest directly (no CID conversion needed). 93 + 94 + ### `/move` → Internal to Multipart Complete 95 + 96 + The legacy `/move` endpoint moves a blob from temporary location to final digest-based location: 97 + 98 + **Legacy format:** 99 + ``` 100 + POST /move?from=uploads/temp-123&to=sha256:abc123...&did=did:plc:alice123 101 + 102 + Response: 200 OK 103 + ``` 104 + 105 + **Purpose:** Server-side S3 copy after multipart assembly. Used in this flow: 106 + 107 + 1. Multipart parts uploaded → `uploads/temp-{uploadID}/part-1`, `part-2`, etc. 108 + 2. Complete multipart → S3 assembles parts at `uploads/temp-{uploadID}` 109 + 3. **Move operation** → S3 copy from `uploads/temp-{uploadID}` → `blobs/sha256/ab/abc123...` 110 + 111 + **XRPC mapping:** 112 + - **Not a separate endpoint** - becomes internal operation in `uploadBlob?action=complete` 113 + - The `complete` action automatically handles the move after multipart assembly 114 + - AppView doesn't need to call move explicitly in XRPC flow 115 + 54 116 ## New Unified Design 55 117 56 118 ### Single Endpoint: `com.atproto.repo.uploadBlob` ··· 59 121 - `application/octet-stream` → Standard blob upload (profile images, small media) 60 122 - `application/json` → Multipart operations (large OCI layers) 61 123 124 + ### Complementary Endpoint: `com.atproto.sync.getBlob` 125 + 126 + For blob downloads (maps from legacy `/presigned-url` with operation=GET/HEAD): 127 + 128 + **Standard ATProto blobs (CID):** 129 + ``` 130 + GET /xrpc/com.atproto.sync.getBlob?did={holdDID}&cid=bafyreib... 131 + 132 + Response: 307 Temporary Redirect 133 + Location: https://s3.amazonaws.com/bucket/...?presigned-params 134 + ``` 135 + 136 + **OCI container layers (digest):** 137 + ``` 138 + GET /xrpc/com.atproto.sync.getBlob?did={holdDID}&cid=sha256:abc123... 139 + 140 + Response: 307 Temporary Redirect 141 + Location: https://s3.amazonaws.com/bucket/...?presigned-params 142 + ``` 143 + 144 + **Implementation - Flexible CID parameter:** 145 + ```go 146 + func (h *XRPCHandler) HandleGetBlob(w http.ResponseWriter, r *http.Request) { 147 + cidOrDigest := r.URL.Query().Get("cid") 148 + 149 + var digest string 150 + if strings.HasPrefix(cidOrDigest, "sha256:") { 151 + // OCI digest - use directly (no conversion needed) 152 + digest = cidOrDigest 153 + } else { 154 + // Standard CID - convert to digest 155 + c, _ := cid.Decode(cidOrDigest) 156 + digest = cidToDigest(c) // bafyreib... → sha256:abc... 157 + } 158 + 159 + // Generate presigned URL for S3 160 + url := h.blobStore.GetPresignedDownloadURL(digest) 161 + http.Redirect(w, r, url, http.StatusTemporaryRedirect) 162 + } 163 + ``` 164 + 165 + **Key insight:** The `cid` parameter accepts both formats. Hold service checks prefix and handles accordingly. This keeps the endpoint spec-compliant (GET with query params) while supporting OCI digests natively. 166 + 62 167 ### API Specification 63 168 64 169 #### Standard Single Upload (ATProto Spec Compliant) ··· 201 306 - Retrieve session: `multipartMgr.GetSession(uploadID)` 202 307 - For S3Native: Record parts via `session.RecordS3Part()` 203 308 - Call `service.CompleteMultipartUploadWithManager(ctx, session, multipartMgr)` 309 + - This internally calls S3 CompleteMultipartUpload to assemble parts 310 + - Then performs server-side S3 copy from temp location to final digest location 311 + - Equivalent to legacy `/move` endpoint operation 204 312 - Convert digest to CID for response 205 313 206 314 #### Multipart Abort (ATCR Extension) ··· 547 655 548 656 Create new XRPC client or update ProxyBlobStore to use unified endpoint: 549 657 658 + **Download (GET/HEAD):** 550 659 ```go 551 - // In ProxyBlobStore or new XRPCBlobStore 660 + func (p *ProxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { 661 + // Pass digest directly as cid parameter (no conversion) 662 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 663 + p.storageEndpoint, p.holdDID, dgst.String()) // cid=sha256:abc... 664 + 665 + http.Redirect(w, r, url, http.StatusTemporaryRedirect) 666 + return nil 667 + } 668 + ``` 552 669 670 + **Multipart Upload:** 671 + ```go 553 672 func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, digest string) (string, error) { 554 673 reqBody := map[string]any{ 555 674 "action": "start", ··· 612 731 **cmd/hold/main.go - Remove:** 613 732 ```go 614 733 // DELETE these lines 734 + mux.HandleFunc("/presigned-url", service.HandlePresignedURL) 735 + mux.HandleFunc("/move", service.HandleMove) 615 736 mux.HandleFunc("/start-multipart", service.HandleStartMultipart) 616 737 mux.HandleFunc("/part-presigned-url", service.HandleGetPartURL) 617 738 mux.HandleFunc("/complete-multipart", service.HandleCompleteMultipart) ··· 619 740 mux.HandleFunc("/multipart-parts/", ...) 620 741 ``` 621 742 622 - **pkg/hold/handlers.go - Mark as deprecated:** 743 + **pkg/hold/handlers.go - Remove HTTP handler wrappers:** 623 744 ```go 624 - // Keep methods for now (used by service internals) 625 - // But remove HTTP handler wrappers 745 + // DELETE these functions: 746 + // - HandlePresignedURL() - replaced by uploadBlob + getBlob XRPC endpoints 747 + // - HandleMove() - now internal operation in CompleteMultipartUploadWithManager() 748 + // - HandleStartMultipart() - replaced by uploadBlob?action=start 749 + // - HandleGetPartURL() - replaced by uploadBlob?action=part 750 + // - HandleCompleteMultipart() - replaced by uploadBlob?action=complete 751 + // - HandleAbortMultipart() - replaced by uploadBlob?action=abort 752 + // - HandleMultipartPartUpload() - replaced by uploadBlob PUT with headers 753 + 754 + // KEEP internal service methods: 755 + // - s.getPresignedURL() - still used by blobstore_adapter 756 + // - s.driver.Move() - still used for temp→final move 757 + // - s.StartMultipartUploadWithManager() - core multipart logic 758 + // - s.GetPartUploadURL() - presigned URL generation 759 + // - s.CompleteMultipartUploadWithManager() - includes move operation 760 + // - s.AbortMultipartUploadWithManager() - cleanup logic 626 761 ``` 627 762 628 763 ## Key Design Decisions 629 764 630 765 1. **Content-Type discrimination**: Natural way to distinguish single vs multipart uploads 631 - 2. **JSON bodies for multipart**: Follows XRPC conventions (like putRecord, deleteRecord) 766 + 2. **JSON bodies for all parameters**: Follows XRPC conventions (like putRecord, deleteRecord) 767 + - **No query parameters** - all operation details in request body 768 + - Makes requests more inspectable and debuggable 769 + - Easier to extend with new fields 632 770 3. **Preserve standard uploadBlob**: Raw bytes still work for profile images, small media 633 771 4. **Reuse existing code**: HoldService multipart logic unchanged, just new HTTP layer 634 772 5. **Backward compatibility**: Both endpoints active during transition 635 773 6. **Action-based routing**: Clear, extensible JSON structure 774 + 7. **Move is internal**: `/move` endpoint logic absorbed into multipart complete operation 775 + - No separate XRPC endpoint needed 776 + - Simplifies AppView client code 777 + 8. **Unified presigned URL handling**: Single `uploadBlob`/`getBlob` pair replaces operation-based routing 778 + 9. **Flexible CID parameter**: `getBlob` accepts both standard CIDs and OCI digests via prefix detection 779 + - Keeps endpoint spec-compliant (GET with query params) 780 + - No conversion overhead on AppView side 781 + - Hold does simple prefix check: `sha256:` → use directly, else → convert CID 636 782 637 783 ## Benefits 638 784
+156 -6
pkg/hold/blobstore_adapter.go
··· 1 1 package hold 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 6 + "crypto/sha256" 7 + "fmt" 8 + "io" 5 9 6 10 "atcr.io/pkg/hold/pds" 11 + "github.com/ipfs/go-cid" 12 + "github.com/multiformats/go-multihash" 7 13 ) 8 14 9 15 // HoldServiceBlobStore adapts the hold service to implement the pds.BlobStore interface ··· 21 27 } 22 28 23 29 // GetPresignedDownloadURL returns a presigned URL for downloading a blob 24 - func (b *HoldServiceBlobStore) GetPresignedDownloadURL(digest string) (string, error) { 25 - // Use the hold service's existing presigned URL logic 30 + func (b *HoldServiceBlobStore) GetPresignedDownloadURL(digest, did string) (string, error) { 31 + // Use provided DID if given, otherwise fall back to hold's DID 32 + // ATProto blobs require DID for per-user storage 33 + // OCI blobs (sha256:...) use content-addressed storage 34 + if did == "" { 35 + did = b.holdDID 36 + } 37 + 26 38 ctx := context.Background() 27 - url, err := b.service.GetPresignedURL(ctx, OperationGet, digest, b.holdDID) 39 + url, err := b.service.GetPresignedURL(ctx, OperationGet, digest, did) 28 40 if err != nil { 29 41 return "", err 30 42 } ··· 32 44 } 33 45 34 46 // GetPresignedUploadURL returns a presigned URL for uploading a blob 35 - func (b *HoldServiceBlobStore) GetPresignedUploadURL(digest string) (string, error) { 36 - // Use the hold service's existing presigned URL logic 47 + func (b *HoldServiceBlobStore) GetPresignedUploadURL(digest, did string) (string, error) { 48 + // Use provided DID if given, otherwise fall back to hold's DID 49 + // ATProto blobs require DID for per-user storage 50 + // OCI blobs (sha256:...) use content-addressed storage 51 + if did == "" { 52 + did = b.holdDID 53 + } 54 + 37 55 ctx := context.Background() 38 - url, err := b.service.GetPresignedURL(ctx, OperationPut, digest, b.holdDID) 56 + url, err := b.service.GetPresignedURL(ctx, OperationPut, digest, did) 39 57 if err != nil { 40 58 return "", err 41 59 } 42 60 return url, nil 43 61 } 62 + 63 + // StartMultipartUpload initiates a multipart upload 64 + func (b *HoldServiceBlobStore) StartMultipartUpload(ctx context.Context, digest string) (string, string, error) { 65 + uploadID, mode, err := b.service.StartMultipartUploadWithManager(ctx, digest, b.service.MultipartMgr) 66 + if err != nil { 67 + return "", "", err 68 + } 69 + 70 + // Convert mode to string for XRPC response 71 + var modeStr string 72 + switch mode { 73 + case S3Native: 74 + modeStr = "s3native" 75 + case Buffered: 76 + modeStr = "buffered" 77 + default: 78 + modeStr = "unknown" 79 + } 80 + 81 + return uploadID, modeStr, nil 82 + } 83 + 84 + // GetPartUploadURL returns a presigned URL for uploading a specific part 85 + func (b *HoldServiceBlobStore) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (string, error) { 86 + session, err := b.service.MultipartMgr.GetSession(uploadID) 87 + if err != nil { 88 + return "", err 89 + } 90 + 91 + return b.service.GetPartUploadURL(ctx, session, partNumber, did) 92 + } 93 + 94 + // CompleteMultipartUpload finalizes a multipart upload 95 + func (b *HoldServiceBlobStore) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []pds.PartInfo) error { 96 + session, err := b.service.MultipartMgr.GetSession(uploadID) 97 + if err != nil { 98 + return err 99 + } 100 + 101 + // For S3Native mode, record parts from XRPC request (they have ETags from S3) 102 + if session.Mode == S3Native { 103 + for _, p := range parts { 104 + session.RecordS3Part(p.PartNumber, p.ETag, 0) 105 + } 106 + } 107 + 108 + return b.service.CompleteMultipartUploadWithManager(ctx, session, b.service.MultipartMgr) 109 + } 110 + 111 + // AbortMultipartUpload cancels a multipart upload 112 + func (b *HoldServiceBlobStore) AbortMultipartUpload(ctx context.Context, uploadID string) error { 113 + session, err := b.service.MultipartMgr.GetSession(uploadID) 114 + if err != nil { 115 + return err 116 + } 117 + 118 + return b.service.AbortMultipartUploadWithManager(ctx, session, b.service.MultipartMgr) 119 + } 120 + 121 + // HandleBufferedPartUpload handles uploading a part in buffered mode 122 + func (b *HoldServiceBlobStore) HandleBufferedPartUpload(ctx context.Context, uploadID string, partNumber int, data []byte) (string, error) { 123 + session, err := b.service.MultipartMgr.GetSession(uploadID) 124 + if err != nil { 125 + return "", err 126 + } 127 + 128 + if session.Mode != Buffered { 129 + return "", fmt.Errorf("session is not in buffered mode") 130 + } 131 + 132 + etag := session.StorePart(partNumber, data) 133 + return etag, nil 134 + } 135 + 136 + // UploadBlob receives raw blob bytes, computes CID, and stores via distribution driver 137 + // This is used for standard ATProto blob uploads (profile pics, small media) 138 + func (b *HoldServiceBlobStore) UploadBlob(ctx context.Context, did string, data io.Reader) (cid.Cid, int64, error) { 139 + // Use provided DID if given, otherwise fall back to hold's DID 140 + if did == "" { 141 + did = b.holdDID 142 + } 143 + 144 + // Read all data into memory to compute CID 145 + // For large files, this should use multipart upload instead 146 + blobData, err := io.ReadAll(data) 147 + if err != nil { 148 + return cid.Undef, 0, fmt.Errorf("failed to read blob data: %w", err) 149 + } 150 + 151 + size := int64(len(blobData)) 152 + 153 + // Compute SHA-256 hash 154 + hash := sha256.Sum256(blobData) 155 + 156 + // Create CIDv1 with SHA-256 multihash 157 + mh, err := multihash.EncodeName(hash[:], "sha2-256") 158 + if err != nil { 159 + return cid.Undef, 0, fmt.Errorf("failed to encode multihash: %w", err) 160 + } 161 + 162 + // Create CIDv1 with raw codec (0x55) 163 + // ATProto uses CIDv1 with raw codec for blobs 164 + blobCID := cid.NewCidV1(0x55, mh) 165 + 166 + // Store blob via distribution driver at ATProto path 167 + // Path: /repos/{did}/blobs/{cid}/data 168 + path := atprotoBlobPath(did, blobCID.String()) 169 + 170 + // Write blob to storage using distribution driver 171 + writer, err := b.service.driver.Writer(ctx, path, false) 172 + if err != nil { 173 + return cid.Undef, 0, fmt.Errorf("failed to create writer: %w", err) 174 + } 175 + 176 + // Write data 177 + n, err := io.Copy(writer, bytes.NewReader(blobData)) 178 + if err != nil { 179 + writer.Cancel(ctx) 180 + return cid.Undef, 0, fmt.Errorf("failed to write blob: %w", err) 181 + } 182 + 183 + // Commit the write 184 + if err := writer.Commit(ctx); err != nil { 185 + return cid.Undef, 0, fmt.Errorf("failed to commit blob: %w", err) 186 + } 187 + 188 + if n != size { 189 + return cid.Undef, 0, fmt.Errorf("size mismatch: wrote %d bytes, expected %d", n, size) 190 + } 191 + 192 + return blobCID, size, nil 193 + }
+235 -17
pkg/hold/pds/xrpc.go
··· 2 2 3 3 import ( 4 4 "bytes" 5 + "context" 5 6 "encoding/json" 6 7 "fmt" 8 + "io" 7 9 "net/http" 8 10 "strconv" 9 11 "strings" ··· 30 32 // BlobStore interface wraps the existing hold service storage operations 31 33 type BlobStore interface { 32 34 // GetPresignedDownloadURL returns a presigned URL for downloading a blob 33 - GetPresignedDownloadURL(digest string) (string, error) 35 + // For ATProto blobs (CID), did is required for per-DID storage 36 + // For OCI blobs (sha256:...), did may be empty 37 + GetPresignedDownloadURL(digest, did string) (string, error) 34 38 // GetPresignedUploadURL returns a presigned URL for uploading a blob 35 - GetPresignedUploadURL(digest string) (string, error) 39 + // For ATProto blobs (CID), did is required for per-DID storage 40 + // For OCI blobs (sha256:...), did may be empty 41 + GetPresignedUploadURL(digest, did string) (string, error) 42 + 43 + // UploadBlob receives raw blob bytes, computes CID, and stores via distribution driver 44 + // Used for standard ATProto blob uploads (profile pics, small media) 45 + // Returns CID and size of stored blob 46 + UploadBlob(ctx context.Context, did string, data io.Reader) (cid cid.Cid, size int64, err error) 47 + 48 + // Multipart upload operations (used for OCI container layers only) 49 + // StartMultipartUpload initiates a multipart upload, returns uploadID and mode 50 + StartMultipartUpload(ctx context.Context, digest string) (uploadID string, mode string, err error) 51 + // GetPartUploadURL returns a presigned URL for uploading a specific part 52 + GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (url string, err error) 53 + // CompleteMultipartUpload finalizes a multipart upload 54 + CompleteMultipartUpload(ctx context.Context, uploadID string, parts []PartInfo) error 55 + // AbortMultipartUpload cancels a multipart upload 56 + AbortMultipartUpload(ctx context.Context, uploadID string) error 57 + // HandleBufferedPartUpload handles uploading a part in buffered mode 58 + HandleBufferedPartUpload(ctx context.Context, uploadID string, partNumber int, data []byte) (etag string, err error) 59 + } 60 + 61 + // PartInfo represents a completed part in a multipart upload 62 + type PartInfo struct { 63 + PartNumber int `json:"partNumber"` 64 + ETag string `json:"etag"` 36 65 } 37 66 38 67 // NewXRPCHandler creates a new XRPC handler ··· 669 698 }() 670 699 } 671 700 672 - // HandleUploadBlob wraps existing presigned upload URL logic 701 + // HandleUploadBlob handles blob uploads with support for multipart operations 702 + // Supports three modes: 703 + // 1. Buffered part upload: PUT with X-Upload-Id and X-Part-Number headers 704 + // 2. Multipart operations: POST with JSON body containing action field 705 + // 3. Direct blob upload: POST with raw bytes (ATProto-compliant) 673 706 func (h *XRPCHandler) HandleUploadBlob(w http.ResponseWriter, r *http.Request) { 707 + contentType := r.Header.Get("Content-Type") 708 + 709 + // Mode 1: Buffered part upload (PUT with headers) 710 + if r.Method == http.MethodPut { 711 + uploadID := r.Header.Get("X-Upload-Id") 712 + partNumberStr := r.Header.Get("X-Part-Number") 713 + 714 + if uploadID != "" && partNumberStr != "" { 715 + h.handleBufferedPartUpload(w, r, uploadID, partNumberStr) 716 + return 717 + } 718 + http.Error(w, "PUT requires X-Upload-Id and X-Part-Number headers", http.StatusBadRequest) 719 + return 720 + } 721 + 722 + // Ensure POST method for remaining modes 674 723 if r.Method != http.MethodPost { 675 724 http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 676 725 return 677 726 } 678 727 728 + // Mode 2: Multipart operations (JSON body with action field) 729 + if strings.Contains(contentType, "application/json") { 730 + h.handleMultipartOperation(w, r) 731 + return 732 + } 733 + 734 + // Mode 3: Direct blob upload (ATProto-compliant) 735 + // Receives raw bytes, computes CID, stores via distribution driver 679 736 // TODO: Authentication check 680 737 681 - // Read digest from query or calculate from body 682 - digest := r.URL.Query().Get("digest") 683 - if digest == "" { 684 - http.Error(w, "digest required", http.StatusBadRequest) 738 + // Extract DID for ATProto blob storage (per-DID paths) 739 + did := r.URL.Query().Get("did") 740 + if did == "" { 741 + // TODO: Extract from auth context when authentication is implemented 742 + // For now, use hold's DID as fallback 743 + did = h.pds.DID() 744 + } 745 + 746 + // Upload blob directly - blobStore will compute CID and store 747 + blobCID, size, err := h.blobStore.UploadBlob(r.Context(), did, r.Body) 748 + if err != nil { 749 + http.Error(w, fmt.Sprintf("failed to upload blob: %v", err), http.StatusInternalServerError) 685 750 return 686 751 } 687 752 688 - // Get presigned upload URL from existing blob store 689 - uploadURL, err := h.blobStore.GetPresignedUploadURL(digest) 753 + // Return ATProto-compliant blob response 754 + response := map[string]any{ 755 + "blob": map[string]any{ 756 + "$type": "blob", 757 + "ref": map[string]any{ 758 + "$link": blobCID.String(), 759 + }, 760 + "mimeType": "application/octet-stream", 761 + "size": size, 762 + }, 763 + } 764 + 765 + w.Header().Set("Content-Type", "application/json") 766 + json.NewEncoder(w).Encode(response) 767 + } 768 + 769 + // handleBufferedPartUpload handles uploading a part in buffered mode 770 + func (h *XRPCHandler) handleBufferedPartUpload(w http.ResponseWriter, r *http.Request, uploadID, partNumberStr string) { 771 + ctx := r.Context() 772 + 773 + // Parse part number 774 + partNumber, err := strconv.Atoi(partNumberStr) 690 775 if err != nil { 691 - http.Error(w, fmt.Sprintf("failed to get upload URL: %v", err), http.StatusInternalServerError) 776 + http.Error(w, fmt.Sprintf("invalid part number: %v", err), http.StatusBadRequest) 777 + return 778 + } 779 + 780 + // Read part data from body 781 + data, err := io.ReadAll(r.Body) 782 + if err != nil { 783 + http.Error(w, fmt.Sprintf("failed to read part data: %v", err), http.StatusInternalServerError) 784 + return 785 + } 786 + 787 + // Store part via blob store 788 + etag, err := h.blobStore.HandleBufferedPartUpload(ctx, uploadID, partNumber, data) 789 + if err != nil { 790 + http.Error(w, fmt.Sprintf("failed to upload part: %v", err), http.StatusInternalServerError) 791 + return 792 + } 793 + 794 + // Return ETag in response 795 + w.Header().Set("Content-Type", "application/json") 796 + json.NewEncoder(w).Encode(map[string]any{ 797 + "etag": etag, 798 + }) 799 + } 800 + 801 + // handleMultipartOperation handles multipart upload operations via JSON request 802 + func (h *XRPCHandler) handleMultipartOperation(w http.ResponseWriter, r *http.Request) { 803 + ctx := r.Context() 804 + 805 + // Parse JSON body 806 + var req struct { 807 + Action string `json:"action"` 808 + Digest string `json:"digest,omitempty"` 809 + UploadID string `json:"uploadId,omitempty"` 810 + PartNumber int `json:"partNumber,omitempty"` 811 + Parts []PartInfo `json:"parts,omitempty"` 812 + } 813 + 814 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 815 + http.Error(w, fmt.Sprintf("invalid JSON body: %v", err), http.StatusBadRequest) 692 816 return 693 817 } 694 818 695 - // Return 302 redirect to presigned URL 696 - http.Redirect(w, r, uploadURL, http.StatusFound) 819 + // Route based on action 820 + switch req.Action { 821 + case "start": 822 + // Start multipart upload 823 + if req.Digest == "" { 824 + http.Error(w, "digest required for start action", http.StatusBadRequest) 825 + return 826 + } 827 + 828 + uploadID, mode, err := h.blobStore.StartMultipartUpload(ctx, req.Digest) 829 + if err != nil { 830 + http.Error(w, fmt.Sprintf("failed to start multipart upload: %v", err), http.StatusInternalServerError) 831 + return 832 + } 833 + 834 + w.Header().Set("Content-Type", "application/json") 835 + json.NewEncoder(w).Encode(map[string]any{ 836 + "uploadId": uploadID, 837 + "mode": mode, 838 + }) 839 + 840 + case "part": 841 + // Get part upload URL 842 + if req.UploadID == "" || req.PartNumber == 0 { 843 + http.Error(w, "uploadId and partNumber required for part action", http.StatusBadRequest) 844 + return 845 + } 846 + 847 + // Extract DID from query or header (for authorization) 848 + did := r.URL.Query().Get("did") 849 + if did == "" { 850 + did = r.Header.Get("X-ATCR-DID") 851 + } 852 + 853 + url, err := h.blobStore.GetPartUploadURL(ctx, req.UploadID, req.PartNumber, did) 854 + if err != nil { 855 + http.Error(w, fmt.Sprintf("failed to get part URL: %v", err), http.StatusInternalServerError) 856 + return 857 + } 858 + 859 + w.Header().Set("Content-Type", "application/json") 860 + json.NewEncoder(w).Encode(map[string]any{ 861 + "url": url, 862 + }) 863 + 864 + case "complete": 865 + // Complete multipart upload 866 + if req.UploadID == "" || len(req.Parts) == 0 { 867 + http.Error(w, "uploadId and parts required for complete action", http.StatusBadRequest) 868 + return 869 + } 870 + 871 + if err := h.blobStore.CompleteMultipartUpload(ctx, req.UploadID, req.Parts); err != nil { 872 + http.Error(w, fmt.Sprintf("failed to complete multipart upload: %v", err), http.StatusInternalServerError) 873 + return 874 + } 875 + 876 + w.Header().Set("Content-Type", "application/json") 877 + json.NewEncoder(w).Encode(map[string]any{ 878 + "status": "completed", 879 + }) 880 + 881 + case "abort": 882 + // Abort multipart upload 883 + if req.UploadID == "" { 884 + http.Error(w, "uploadId required for abort action", http.StatusBadRequest) 885 + return 886 + } 887 + 888 + if err := h.blobStore.AbortMultipartUpload(ctx, req.UploadID); err != nil { 889 + http.Error(w, fmt.Sprintf("failed to abort multipart upload: %v", err), http.StatusInternalServerError) 890 + return 891 + } 892 + 893 + w.Header().Set("Content-Type", "application/json") 894 + json.NewEncoder(w).Encode(map[string]any{ 895 + "status": "aborted", 896 + }) 897 + 898 + default: 899 + http.Error(w, fmt.Sprintf("unknown action: %s", req.Action), http.StatusBadRequest) 900 + } 697 901 } 698 902 699 903 // HandleGetBlob wraps existing presigned download URL logic 904 + // Supports both ATProto CIDs and OCI sha256 digests 700 905 func (h *XRPCHandler) HandleGetBlob(w http.ResponseWriter, r *http.Request) { 701 - if r.Method != http.MethodGet { 906 + if r.Method != http.MethodGet && r.Method != http.MethodHead { 702 907 http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 703 908 return 704 909 } 705 910 706 911 did := r.URL.Query().Get("did") 707 - digest := r.URL.Query().Get("cid") 912 + cidOrDigest := r.URL.Query().Get("cid") 708 913 709 - if did == "" || digest == "" { 914 + if did == "" || cidOrDigest == "" { 710 915 http.Error(w, "missing required parameters", http.StatusBadRequest) 711 916 return 712 917 } ··· 716 921 return 717 922 } 718 923 924 + // Flexible digest parsing: accept both CID and sha256 digest formats 925 + var digest string 926 + if strings.HasPrefix(cidOrDigest, "sha256:") { 927 + // OCI digest format - use directly 928 + digest = cidOrDigest 929 + } else { 930 + // Standard ATProto CID - for ATCR OCI use case, we expect sha256 digests 931 + // If a real CID is provided, we could convert it here, but for now 932 + // we'll just pass it through and let the blob store handle it 933 + digest = cidOrDigest 934 + } 935 + 719 936 // Get presigned download URL from existing blob store 720 - downloadURL, err := h.blobStore.GetPresignedDownloadURL(digest) 937 + // Pass DID for ATProto blob storage (per-DID paths) 938 + downloadURL, err := h.blobStore.GetPresignedDownloadURL(digest, did) 721 939 if err != nil { 722 940 http.Error(w, fmt.Sprintf("failed to get download URL: %v", err), http.StatusInternalServerError) 723 941 return 724 942 } 725 943 726 944 // Return 302 redirect to presigned URL 727 - http.Redirect(w, r, downloadURL, http.StatusFound) 945 + http.Redirect(w, r, downloadURL, http.StatusTemporaryRedirect) 728 946 } 729 947 730 948 // HandleListRepos lists all repositories in this PDS
+427
pkg/hold/pds/xrpc_multipart_test.go
··· 1 + package pds 2 + 3 + import ( 4 + "bytes" 5 + "net/http" 6 + "net/http/httptest" 7 + "testing" 8 + ) 9 + 10 + // ATCR-Specific Tests: Non-standard multipart upload extensions 11 + // 12 + // This file contains tests for ATCR's custom multipart upload extensions 13 + // to the ATProto blob endpoints. These are not part of the official ATProto spec. 14 + // 15 + // Standard ATProto blob tests are in xrpc_test.go 16 + 17 + // Tests for HandleUploadBlob - Multipart Start 18 + 19 + // TestHandleUploadBlob_MultipartStart tests multipart upload start operation 20 + // Non-standard ATCR extension for large blob uploads 21 + func TestHandleUploadBlob_MultipartStart(t *testing.T) { 22 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 23 + 24 + digest := "sha256:largefile123" 25 + body := map[string]string{ 26 + "action": "start", 27 + "digest": digest, 28 + } 29 + 30 + req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 31 + w := httptest.NewRecorder() 32 + 33 + handler.HandleUploadBlob(w, req) 34 + 35 + if w.Code != http.StatusOK { 36 + t.Errorf("Expected status 200, got %d", w.Code) 37 + } 38 + 39 + // Verify response contains uploadId and mode 40 + result := assertJSONResponse(t, w, http.StatusOK) 41 + 42 + if uploadID, ok := result["uploadId"].(string); !ok || uploadID == "" { 43 + t.Error("Expected uploadId string in response") 44 + } 45 + 46 + if mode, ok := result["mode"].(string); !ok || mode == "" { 47 + t.Error("Expected mode string in response") 48 + } 49 + 50 + // Verify blob store was called 51 + if len(blobStore.startCalls) != 1 || blobStore.startCalls[0] != digest { 52 + t.Errorf("Expected StartMultipartUpload to be called with %s", digest) 53 + } 54 + } 55 + 56 + // TestHandleUploadBlob_MultipartStart_MissingDigest tests missing digest in start operation 57 + func TestHandleUploadBlob_MultipartStart_MissingDigest(t *testing.T) { 58 + handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 59 + 60 + body := map[string]string{ 61 + "action": "start", 62 + } 63 + 64 + req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 65 + w := httptest.NewRecorder() 66 + 67 + handler.HandleUploadBlob(w, req) 68 + 69 + if w.Code != http.StatusBadRequest { 70 + t.Errorf("Expected status 400, got %d", w.Code) 71 + } 72 + } 73 + 74 + // Tests for HandleUploadBlob - Multipart Part URL 75 + 76 + // TestHandleUploadBlob_MultipartPart tests getting presigned URL for a part 77 + // Non-standard ATCR extension for multipart uploads 78 + func TestHandleUploadBlob_MultipartPart(t *testing.T) { 79 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 80 + 81 + uploadID := "test-upload-123" 82 + partNumber := 1 83 + did := "did:plc:testuser" 84 + 85 + body := map[string]any{ 86 + "action": "part", 87 + "uploadId": uploadID, 88 + "partNumber": partNumber, 89 + } 90 + 91 + req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob?did="+did, body) 92 + w := httptest.NewRecorder() 93 + 94 + handler.HandleUploadBlob(w, req) 95 + 96 + if w.Code != http.StatusOK { 97 + t.Errorf("Expected status 200, got %d", w.Code) 98 + } 99 + 100 + // Verify response contains URL 101 + result := assertJSONResponse(t, w, http.StatusOK) 102 + 103 + if url, ok := result["url"].(string); !ok || url == "" { 104 + t.Error("Expected url string in response") 105 + } 106 + 107 + // Verify blob store was called 108 + if len(blobStore.partURLCalls) != 1 { 109 + t.Fatalf("Expected GetPartUploadURL to be called once") 110 + } 111 + call := blobStore.partURLCalls[0] 112 + if call.uploadID != uploadID || call.partNumber != partNumber || call.did != did { 113 + t.Errorf("Expected GetPartUploadURL(%s, %d, %s), got (%s, %d, %s)", 114 + uploadID, partNumber, did, call.uploadID, call.partNumber, call.did) 115 + } 116 + } 117 + 118 + // TestHandleUploadBlob_MultipartPart_MissingParams tests missing parameters 119 + func TestHandleUploadBlob_MultipartPart_MissingParams(t *testing.T) { 120 + handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 121 + 122 + tests := []struct { 123 + name string 124 + body map[string]any 125 + }{ 126 + { 127 + name: "missing uploadId", 128 + body: map[string]any{ 129 + "action": "part", 130 + "partNumber": 1, 131 + }, 132 + }, 133 + { 134 + name: "missing partNumber", 135 + body: map[string]any{ 136 + "action": "part", 137 + "uploadId": "test-123", 138 + }, 139 + }, 140 + { 141 + name: "partNumber zero", 142 + body: map[string]any{ 143 + "action": "part", 144 + "uploadId": "test-123", 145 + "partNumber": 0, 146 + }, 147 + }, 148 + } 149 + 150 + for _, tt := range tests { 151 + t.Run(tt.name, func(t *testing.T) { 152 + req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", tt.body) 153 + w := httptest.NewRecorder() 154 + 155 + handler.HandleUploadBlob(w, req) 156 + 157 + if w.Code != http.StatusBadRequest { 158 + t.Errorf("Expected status 400, got %d", w.Code) 159 + } 160 + }) 161 + } 162 + } 163 + 164 + // Tests for HandleUploadBlob - Multipart Complete 165 + 166 + // TestHandleUploadBlob_MultipartComplete tests completing a multipart upload 167 + // Non-standard ATCR extension for multipart uploads 168 + func TestHandleUploadBlob_MultipartComplete(t *testing.T) { 169 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 170 + 171 + uploadID := "test-upload-123" 172 + parts := []PartInfo{ 173 + {PartNumber: 1, ETag: "etag1"}, 174 + {PartNumber: 2, ETag: "etag2"}, 175 + } 176 + 177 + body := map[string]any{ 178 + "action": "complete", 179 + "uploadId": uploadID, 180 + "parts": parts, 181 + } 182 + 183 + req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 184 + w := httptest.NewRecorder() 185 + 186 + handler.HandleUploadBlob(w, req) 187 + 188 + if w.Code != http.StatusOK { 189 + t.Errorf("Expected status 200, got %d", w.Code) 190 + } 191 + 192 + // Verify response 193 + result := assertJSONResponse(t, w, http.StatusOK) 194 + 195 + if status, ok := result["status"].(string); !ok || status != "completed" { 196 + t.Errorf("Expected status='completed', got %v", result["status"]) 197 + } 198 + 199 + // Verify blob store was called 200 + if len(blobStore.completeCalls) != 1 || blobStore.completeCalls[0] != uploadID { 201 + t.Errorf("Expected CompleteMultipartUpload to be called with %s", uploadID) 202 + } 203 + } 204 + 205 + // TestHandleUploadBlob_MultipartComplete_MissingParams tests missing parameters 206 + func TestHandleUploadBlob_MultipartComplete_MissingParams(t *testing.T) { 207 + handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 208 + 209 + tests := []struct { 210 + name string 211 + body map[string]any 212 + }{ 213 + { 214 + name: "missing uploadId", 215 + body: map[string]any{ 216 + "action": "complete", 217 + "parts": []PartInfo{{PartNumber: 1, ETag: "etag1"}}, 218 + }, 219 + }, 220 + { 221 + name: "missing parts", 222 + body: map[string]any{ 223 + "action": "complete", 224 + "uploadId": "test-123", 225 + }, 226 + }, 227 + { 228 + name: "empty parts array", 229 + body: map[string]any{ 230 + "action": "complete", 231 + "uploadId": "test-123", 232 + "parts": []PartInfo{}, 233 + }, 234 + }, 235 + } 236 + 237 + for _, tt := range tests { 238 + t.Run(tt.name, func(t *testing.T) { 239 + req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", tt.body) 240 + w := httptest.NewRecorder() 241 + 242 + handler.HandleUploadBlob(w, req) 243 + 244 + if w.Code != http.StatusBadRequest { 245 + t.Errorf("Expected status 400, got %d", w.Code) 246 + } 247 + }) 248 + } 249 + } 250 + 251 + // Tests for HandleUploadBlob - Multipart Abort 252 + 253 + // TestHandleUploadBlob_MultipartAbort tests aborting a multipart upload 254 + // Non-standard ATCR extension for multipart uploads 255 + func TestHandleUploadBlob_MultipartAbort(t *testing.T) { 256 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 257 + 258 + uploadID := "test-upload-123" 259 + 260 + body := map[string]string{ 261 + "action": "abort", 262 + "uploadId": uploadID, 263 + } 264 + 265 + req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 266 + w := httptest.NewRecorder() 267 + 268 + handler.HandleUploadBlob(w, req) 269 + 270 + if w.Code != http.StatusOK { 271 + t.Errorf("Expected status 200, got %d", w.Code) 272 + } 273 + 274 + // Verify response 275 + result := assertJSONResponse(t, w, http.StatusOK) 276 + 277 + if status, ok := result["status"].(string); !ok || status != "aborted" { 278 + t.Errorf("Expected status='aborted', got %v", result["status"]) 279 + } 280 + 281 + // Verify blob store was called 282 + if len(blobStore.abortCalls) != 1 || blobStore.abortCalls[0] != uploadID { 283 + t.Errorf("Expected AbortMultipartUpload to be called with %s", uploadID) 284 + } 285 + } 286 + 287 + // TestHandleUploadBlob_MultipartAbort_MissingUploadID tests missing uploadId 288 + func TestHandleUploadBlob_MultipartAbort_MissingUploadID(t *testing.T) { 289 + handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 290 + 291 + body := map[string]string{ 292 + "action": "abort", 293 + } 294 + 295 + req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 296 + w := httptest.NewRecorder() 297 + 298 + handler.HandleUploadBlob(w, req) 299 + 300 + if w.Code != http.StatusBadRequest { 301 + t.Errorf("Expected status 400, got %d", w.Code) 302 + } 303 + } 304 + 305 + // Tests for HandleUploadBlob - Buffered Part Upload 306 + 307 + // TestHandleUploadBlob_BufferedPartUpload tests uploading a part in buffered mode 308 + // Non-standard ATCR extension for multipart uploads without S3 presigned URLs 309 + func TestHandleUploadBlob_BufferedPartUpload(t *testing.T) { 310 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 311 + 312 + uploadID := "test-upload-123" 313 + partNumber := "1" 314 + data := []byte("test data for part 1") 315 + 316 + req := httptest.NewRequest(http.MethodPut, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader(data)) 317 + req.Header.Set("X-Upload-Id", uploadID) 318 + req.Header.Set("X-Part-Number", partNumber) 319 + w := httptest.NewRecorder() 320 + 321 + handler.HandleUploadBlob(w, req) 322 + 323 + if w.Code != http.StatusOK { 324 + t.Errorf("Expected status 200, got %d", w.Code) 325 + } 326 + 327 + // Verify response contains ETag 328 + result := assertJSONResponse(t, w, http.StatusOK) 329 + 330 + if etag, ok := result["etag"].(string); !ok || etag == "" { 331 + t.Error("Expected etag string in response") 332 + } 333 + 334 + // Verify blob store was called 335 + if len(blobStore.partUploadCalls) != 1 { 336 + t.Fatalf("Expected HandleBufferedPartUpload to be called once") 337 + } 338 + call := blobStore.partUploadCalls[0] 339 + if call.uploadID != uploadID || call.partNumber != 1 || call.dataSize != len(data) { 340 + t.Errorf("Expected HandleBufferedPartUpload(%s, 1, %d bytes), got (%s, %d, %d bytes)", 341 + uploadID, len(data), call.uploadID, call.partNumber, call.dataSize) 342 + } 343 + } 344 + 345 + // TestHandleUploadBlob_BufferedPartUpload_MissingHeaders tests missing required headers 346 + func TestHandleUploadBlob_BufferedPartUpload_MissingHeaders(t *testing.T) { 347 + handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 348 + 349 + tests := []struct { 350 + name string 351 + uploadID string 352 + partNumber string 353 + setUploadID bool 354 + setPartNumber bool 355 + }{ 356 + { 357 + name: "missing both headers", 358 + setUploadID: false, 359 + setPartNumber: false, 360 + }, 361 + { 362 + name: "missing X-Part-Number", 363 + uploadID: "test-123", 364 + setUploadID: true, 365 + setPartNumber: false, 366 + }, 367 + { 368 + name: "missing X-Upload-Id", 369 + partNumber: "1", 370 + setUploadID: false, 371 + setPartNumber: true, 372 + }, 373 + } 374 + 375 + for _, tt := range tests { 376 + t.Run(tt.name, func(t *testing.T) { 377 + req := httptest.NewRequest(http.MethodPut, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte("data"))) 378 + if tt.setUploadID { 379 + req.Header.Set("X-Upload-Id", tt.uploadID) 380 + } 381 + if tt.setPartNumber { 382 + req.Header.Set("X-Part-Number", tt.partNumber) 383 + } 384 + w := httptest.NewRecorder() 385 + 386 + handler.HandleUploadBlob(w, req) 387 + 388 + if w.Code != http.StatusBadRequest { 389 + t.Errorf("Expected status 400, got %d", w.Code) 390 + } 391 + }) 392 + } 393 + } 394 + 395 + // TestHandleUploadBlob_BufferedPartUpload_InvalidPartNumber tests invalid part number 396 + func TestHandleUploadBlob_BufferedPartUpload_InvalidPartNumber(t *testing.T) { 397 + handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 398 + 399 + req := httptest.NewRequest(http.MethodPut, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte("data"))) 400 + req.Header.Set("X-Upload-Id", "test-123") 401 + req.Header.Set("X-Part-Number", "not-a-number") 402 + w := httptest.NewRecorder() 403 + 404 + handler.HandleUploadBlob(w, req) 405 + 406 + if w.Code != http.StatusBadRequest { 407 + t.Errorf("Expected status 400 for invalid part number, got %d", w.Code) 408 + } 409 + } 410 + 411 + // TestHandleUploadBlob_UnknownAction tests unknown action value 412 + func TestHandleUploadBlob_UnknownAction(t *testing.T) { 413 + handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 414 + 415 + body := map[string]string{ 416 + "action": "invalid", 417 + } 418 + 419 + req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 420 + w := httptest.NewRecorder() 421 + 422 + handler.HandleUploadBlob(w, req) 423 + 424 + if w.Code != http.StatusBadRequest { 425 + t.Errorf("Expected status 400 for unknown action, got %d", w.Code) 426 + } 427 + }
+459
pkg/hold/pds/xrpc_test.go
··· 4 4 "bytes" 5 5 "context" 6 6 "encoding/json" 7 + "fmt" 7 8 "io" 8 9 "net/http" 9 10 "net/http/httptest" ··· 13 14 "testing" 14 15 15 16 "atcr.io/pkg/atproto" 17 + "github.com/ipfs/go-cid" 16 18 ) 17 19 18 20 // Test helpers ··· 1316 1318 t.Errorf("Expected DID string, got %s", body) 1317 1319 } 1318 1320 } 1321 + 1322 + // Mock BlobStore for testing blob endpoints 1323 + 1324 + // mockBlobStore implements BlobStore interface for testing 1325 + type mockBlobStore struct { 1326 + // Control behavior 1327 + downloadURLError error 1328 + uploadURLError error 1329 + uploadBlobError error 1330 + startError error 1331 + partURLError error 1332 + completeError error 1333 + abortError error 1334 + partUploadError error 1335 + 1336 + // Track calls 1337 + downloadCalls []string // Track digests requested for download 1338 + uploadCalls []string // Track digests requested for upload 1339 + uploadBlobCalls []uploadBlobCall // Track direct blob uploads 1340 + startCalls []string // Track digests for multipart start 1341 + partURLCalls []partURLCall 1342 + completeCalls []string 1343 + abortCalls []string 1344 + partUploadCalls []partUploadCall 1345 + } 1346 + 1347 + type uploadBlobCall struct { 1348 + did string 1349 + dataSize int 1350 + } 1351 + 1352 + type partURLCall struct { 1353 + uploadID string 1354 + partNumber int 1355 + did string 1356 + } 1357 + 1358 + type partUploadCall struct { 1359 + uploadID string 1360 + partNumber int 1361 + dataSize int 1362 + } 1363 + 1364 + func newMockBlobStore() *mockBlobStore { 1365 + return &mockBlobStore{ 1366 + downloadCalls: []string{}, 1367 + uploadCalls: []string{}, 1368 + uploadBlobCalls: []uploadBlobCall{}, 1369 + startCalls: []string{}, 1370 + partURLCalls: []partURLCall{}, 1371 + completeCalls: []string{}, 1372 + abortCalls: []string{}, 1373 + partUploadCalls: []partUploadCall{}, 1374 + } 1375 + } 1376 + 1377 + func (m *mockBlobStore) GetPresignedDownloadURL(digest, did string) (string, error) { 1378 + m.downloadCalls = append(m.downloadCalls, digest) 1379 + if m.downloadURLError != nil { 1380 + return "", m.downloadURLError 1381 + } 1382 + return "https://s3.example.com/download/" + digest, nil 1383 + } 1384 + 1385 + func (m *mockBlobStore) GetPresignedUploadURL(digest, did string) (string, error) { 1386 + m.uploadCalls = append(m.uploadCalls, digest) 1387 + if m.uploadURLError != nil { 1388 + return "", m.uploadURLError 1389 + } 1390 + return "https://s3.example.com/upload/" + digest, nil 1391 + } 1392 + 1393 + func (m *mockBlobStore) UploadBlob(ctx context.Context, did string, data io.Reader) (cid.Cid, int64, error) { 1394 + // Read data to get size 1395 + blobData, err := io.ReadAll(data) 1396 + if err != nil { 1397 + return cid.Undef, 0, err 1398 + } 1399 + 1400 + m.uploadBlobCalls = append(m.uploadBlobCalls, uploadBlobCall{ 1401 + did: did, 1402 + dataSize: len(blobData), 1403 + }) 1404 + 1405 + if m.uploadBlobError != nil { 1406 + return cid.Undef, 0, m.uploadBlobError 1407 + } 1408 + 1409 + // Return a test CID (just use a fixed one for testing) 1410 + testCID, _ := cid.Decode("bafkreihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku") 1411 + return testCID, int64(len(blobData)), nil 1412 + } 1413 + 1414 + func (m *mockBlobStore) StartMultipartUpload(ctx context.Context, digest string) (string, string, error) { 1415 + m.startCalls = append(m.startCalls, digest) 1416 + if m.startError != nil { 1417 + return "", "", m.startError 1418 + } 1419 + return "test-upload-id", "s3native", nil 1420 + } 1421 + 1422 + func (m *mockBlobStore) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (string, error) { 1423 + m.partURLCalls = append(m.partURLCalls, partURLCall{uploadID, partNumber, did}) 1424 + if m.partURLError != nil { 1425 + return "", m.partURLError 1426 + } 1427 + return "https://s3.example.com/part/" + uploadID, nil 1428 + } 1429 + 1430 + func (m *mockBlobStore) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []PartInfo) error { 1431 + m.completeCalls = append(m.completeCalls, uploadID) 1432 + if m.completeError != nil { 1433 + return m.completeError 1434 + } 1435 + return nil 1436 + } 1437 + 1438 + func (m *mockBlobStore) AbortMultipartUpload(ctx context.Context, uploadID string) error { 1439 + m.abortCalls = append(m.abortCalls, uploadID) 1440 + if m.abortError != nil { 1441 + return m.abortError 1442 + } 1443 + return nil 1444 + } 1445 + 1446 + func (m *mockBlobStore) HandleBufferedPartUpload(ctx context.Context, uploadID string, partNumber int, data []byte) (string, error) { 1447 + m.partUploadCalls = append(m.partUploadCalls, partUploadCall{uploadID, partNumber, len(data)}) 1448 + if m.partUploadError != nil { 1449 + return "", m.partUploadError 1450 + } 1451 + return "test-etag-" + uploadID, nil 1452 + } 1453 + 1454 + // setupTestXRPCHandlerWithBlobs creates handler with mock blob store 1455 + func setupTestXRPCHandlerWithBlobs(t *testing.T) (*XRPCHandler, *mockBlobStore, context.Context) { 1456 + t.Helper() 1457 + 1458 + ctx := context.Background() 1459 + tmpDir := t.TempDir() 1460 + 1461 + dbPath := filepath.Join(tmpDir, "pds.db") 1462 + keyPath := filepath.Join(tmpDir, "signing-key") 1463 + 1464 + pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", "https://hold.example.com", dbPath, keyPath) 1465 + if err != nil { 1466 + t.Fatalf("Failed to create test PDS: %v", err) 1467 + } 1468 + 1469 + // Bootstrap with a test owner, suppressing stdout to avoid log spam 1470 + ownerDID := "did:plc:testowner123" 1471 + 1472 + // Redirect stdout to suppress bootstrap logging 1473 + oldStdout := os.Stdout 1474 + r, w, _ := os.Pipe() 1475 + os.Stdout = w 1476 + 1477 + err = pds.Bootstrap(ctx, ownerDID, true, false) 1478 + 1479 + // Restore stdout 1480 + w.Close() 1481 + os.Stdout = oldStdout 1482 + io.ReadAll(r) // Drain the pipe 1483 + 1484 + if err != nil { 1485 + t.Fatalf("Failed to bootstrap PDS: %v", err) 1486 + } 1487 + 1488 + // Create mock blob store 1489 + blobStore := newMockBlobStore() 1490 + 1491 + // Create XRPC handler with mock blob store 1492 + handler := NewXRPCHandler(pds, "https://hold.example.com", blobStore, nil) 1493 + 1494 + return handler, blobStore, ctx 1495 + } 1496 + 1497 + // Tests for HandleUploadBlob 1498 + 1499 + // TestHandleUploadBlob tests com.atproto.repo.uploadBlob with direct upload 1500 + // Spec: https://docs.bsky.app/docs/api/com-atproto-repo-upload-blob 1501 + func TestHandleUploadBlob(t *testing.T) { 1502 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1503 + 1504 + // Test data - a simple text blob 1505 + blobData := []byte("Hello, ATProto!") 1506 + 1507 + // Test standard single blob upload (POST with raw bytes) 1508 + req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader(blobData)) 1509 + req.Header.Set("Content-Type", "application/octet-stream") 1510 + w := httptest.NewRecorder() 1511 + 1512 + handler.HandleUploadBlob(w, req) 1513 + 1514 + // Should return 200 OK with blob metadata 1515 + if w.Code != http.StatusOK { 1516 + t.Errorf("Expected status 200 OK, got %d", w.Code) 1517 + } 1518 + 1519 + // Verify response contains blob metadata 1520 + result := assertJSONResponse(t, w, http.StatusOK) 1521 + 1522 + blob, ok := result["blob"].(map[string]any) 1523 + if !ok { 1524 + t.Fatal("Expected blob object in response") 1525 + } 1526 + 1527 + if blobType, ok := blob["$type"].(string); !ok || blobType != "blob" { 1528 + t.Errorf("Expected $type='blob', got %v", blob["$type"]) 1529 + } 1530 + 1531 + ref, ok := blob["ref"].(map[string]any) 1532 + if !ok { 1533 + t.Fatal("Expected ref object in blob") 1534 + } 1535 + 1536 + if link, ok := ref["$link"].(string); !ok || link == "" { 1537 + t.Error("Expected $link (CID) in ref") 1538 + } 1539 + 1540 + if size, ok := blob["size"].(float64); !ok || int(size) != len(blobData) { 1541 + t.Errorf("Expected size=%d, got %v", len(blobData), blob["size"]) 1542 + } 1543 + 1544 + // Verify blob store was called 1545 + if len(blobStore.uploadBlobCalls) != 1 { 1546 + t.Errorf("Expected UploadBlob to be called once, got %d calls", len(blobStore.uploadBlobCalls)) 1547 + } 1548 + 1549 + if blobStore.uploadBlobCalls[0].dataSize != len(blobData) { 1550 + t.Errorf("Expected UploadBlob to receive %d bytes, got %d", len(blobData), blobStore.uploadBlobCalls[0].dataSize) 1551 + } 1552 + } 1553 + 1554 + // TestHandleUploadBlob_EmptyBody tests empty blob upload 1555 + // Spec: https://docs.bsky.app/docs/api/com-atproto-repo-upload-blob 1556 + func TestHandleUploadBlob_EmptyBody(t *testing.T) { 1557 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1558 + 1559 + // Empty blob should succeed (edge case) 1560 + req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte{})) 1561 + req.Header.Set("Content-Type", "application/octet-stream") 1562 + w := httptest.NewRecorder() 1563 + 1564 + handler.HandleUploadBlob(w, req) 1565 + 1566 + // Should succeed with empty blob 1567 + if w.Code != http.StatusOK { 1568 + t.Errorf("Expected status 200, got %d", w.Code) 1569 + } 1570 + 1571 + // Verify blob store was called with 0 bytes 1572 + if len(blobStore.uploadBlobCalls) != 1 || blobStore.uploadBlobCalls[0].dataSize != 0 { 1573 + t.Errorf("Expected UploadBlob with 0 bytes") 1574 + } 1575 + } 1576 + 1577 + // TestHandleUploadBlob_MethodNotAllowed tests wrong HTTP method 1578 + // Spec: https://docs.bsky.app/docs/api/com-atproto-repo-upload-blob 1579 + func TestHandleUploadBlob_MethodNotAllowed(t *testing.T) { 1580 + handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 1581 + 1582 + // GET is not allowed for upload (only POST and PUT) 1583 + req := httptest.NewRequest(http.MethodGet, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte("test"))) 1584 + w := httptest.NewRecorder() 1585 + 1586 + handler.HandleUploadBlob(w, req) 1587 + 1588 + if w.Code != http.StatusMethodNotAllowed { 1589 + t.Errorf("Expected status 405, got %d", w.Code) 1590 + } 1591 + } 1592 + 1593 + // TestHandleUploadBlob_BlobStoreError tests blob store returning error 1594 + // Spec: https://docs.bsky.app/docs/api/com-atproto-repo-upload-blob 1595 + func TestHandleUploadBlob_BlobStoreError(t *testing.T) { 1596 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1597 + 1598 + // Configure mock to return error 1599 + blobStore.uploadBlobError = fmt.Errorf("storage driver unavailable") 1600 + 1601 + req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte("test data"))) 1602 + req.Header.Set("Content-Type", "application/octet-stream") 1603 + w := httptest.NewRecorder() 1604 + 1605 + handler.HandleUploadBlob(w, req) 1606 + 1607 + if w.Code != http.StatusInternalServerError { 1608 + t.Errorf("Expected status 500, got %d", w.Code) 1609 + } 1610 + } 1611 + 1612 + // Tests for HandleGetBlob 1613 + 1614 + // TestHandleGetBlob tests com.atproto.sync.getBlob 1615 + // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1616 + func TestHandleGetBlob(t *testing.T) { 1617 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1618 + 1619 + holdDID := "did:web:hold.example.com" 1620 + cid := "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke" 1621 + 1622 + req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getBlob", map[string]string{ 1623 + "did": holdDID, 1624 + "cid": cid, 1625 + }) 1626 + w := httptest.NewRecorder() 1627 + 1628 + handler.HandleGetBlob(w, req) 1629 + 1630 + // Should redirect to presigned download URL (307 Temporary Redirect) 1631 + if w.Code != http.StatusTemporaryRedirect { 1632 + t.Errorf("Expected status 307 (Temporary Redirect), got %d", w.Code) 1633 + } 1634 + 1635 + location := w.Header().Get("Location") 1636 + expectedURL := "https://s3.example.com/download/" + cid 1637 + if location != expectedURL { 1638 + t.Errorf("Expected redirect to %s, got %s", expectedURL, location) 1639 + } 1640 + 1641 + // Verify blob store was called 1642 + if len(blobStore.downloadCalls) != 1 || blobStore.downloadCalls[0] != cid { 1643 + t.Errorf("Expected GetPresignedDownloadURL to be called with %s", cid) 1644 + } 1645 + } 1646 + 1647 + // TestHandleGetBlob_SHA256Digest tests getBlob with OCI sha256 digest format 1648 + // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1649 + func TestHandleGetBlob_SHA256Digest(t *testing.T) { 1650 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1651 + 1652 + holdDID := "did:web:hold.example.com" 1653 + digest := "sha256:abc123def456" // OCI digest format 1654 + 1655 + req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getBlob", map[string]string{ 1656 + "did": holdDID, 1657 + "cid": digest, 1658 + }) 1659 + w := httptest.NewRecorder() 1660 + 1661 + handler.HandleGetBlob(w, req) 1662 + 1663 + // Should redirect to presigned download URL 1664 + if w.Code != http.StatusTemporaryRedirect { 1665 + t.Errorf("Expected status 307, got %d", w.Code) 1666 + } 1667 + 1668 + // Verify blob store received the sha256 digest 1669 + if len(blobStore.downloadCalls) != 1 || blobStore.downloadCalls[0] != digest { 1670 + t.Errorf("Expected GetPresignedDownloadURL to be called with %s, got %v", digest, blobStore.downloadCalls) 1671 + } 1672 + } 1673 + 1674 + // TestHandleGetBlob_HeadMethod tests HEAD request support 1675 + // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1676 + func TestHandleGetBlob_HeadMethod(t *testing.T) { 1677 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1678 + 1679 + holdDID := "did:web:hold.example.com" 1680 + cid := "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke" 1681 + 1682 + // Use HEAD instead of GET 1683 + req := httptest.NewRequest(http.MethodHead, "/xrpc/com.atproto.sync.getBlob?did="+holdDID+"&cid="+cid, nil) 1684 + w := httptest.NewRecorder() 1685 + 1686 + handler.HandleGetBlob(w, req) 1687 + 1688 + // Should still redirect 1689 + if w.Code != http.StatusTemporaryRedirect { 1690 + t.Errorf("Expected status 307 for HEAD request, got %d", w.Code) 1691 + } 1692 + 1693 + // Verify blob store was called 1694 + if len(blobStore.downloadCalls) != 1 { 1695 + t.Errorf("Expected GetPresignedDownloadURL to be called for HEAD request") 1696 + } 1697 + } 1698 + 1699 + // TestHandleGetBlob_MissingParameters tests missing required parameters 1700 + // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1701 + func TestHandleGetBlob_MissingParameters(t *testing.T) { 1702 + handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 1703 + 1704 + tests := []struct { 1705 + name string 1706 + params map[string]string 1707 + }{ 1708 + { 1709 + name: "missing all params", 1710 + params: map[string]string{}, 1711 + }, 1712 + { 1713 + name: "missing cid", 1714 + params: map[string]string{ 1715 + "did": "did:web:hold.example.com", 1716 + }, 1717 + }, 1718 + { 1719 + name: "missing did", 1720 + params: map[string]string{ 1721 + "cid": "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke", 1722 + }, 1723 + }, 1724 + } 1725 + 1726 + for _, tt := range tests { 1727 + t.Run(tt.name, func(t *testing.T) { 1728 + req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getBlob", tt.params) 1729 + w := httptest.NewRecorder() 1730 + 1731 + handler.HandleGetBlob(w, req) 1732 + 1733 + if w.Code != http.StatusBadRequest { 1734 + t.Errorf("Expected status 400, got %d", w.Code) 1735 + } 1736 + }) 1737 + } 1738 + } 1739 + 1740 + // TestHandleGetBlob_InvalidDID tests invalid DID 1741 + // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1742 + func TestHandleGetBlob_InvalidDID(t *testing.T) { 1743 + handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 1744 + 1745 + req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getBlob", map[string]string{ 1746 + "did": "did:plc:wrongdid", 1747 + "cid": "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke", 1748 + }) 1749 + w := httptest.NewRecorder() 1750 + 1751 + handler.HandleGetBlob(w, req) 1752 + 1753 + if w.Code != http.StatusBadRequest { 1754 + t.Errorf("Expected status 400 for invalid DID, got %d", w.Code) 1755 + } 1756 + } 1757 + 1758 + // TestHandleGetBlob_BlobStoreError tests blob store returning error 1759 + // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1760 + func TestHandleGetBlob_BlobStoreError(t *testing.T) { 1761 + handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1762 + 1763 + // Configure mock to return error 1764 + blobStore.downloadURLError = fmt.Errorf("blob not found in S3") 1765 + 1766 + req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getBlob", map[string]string{ 1767 + "did": "did:web:hold.example.com", 1768 + "cid": "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke", 1769 + }) 1770 + w := httptest.NewRecorder() 1771 + 1772 + handler.HandleGetBlob(w, req) 1773 + 1774 + if w.Code != http.StatusInternalServerError { 1775 + t.Errorf("Expected status 500, got %d", w.Code) 1776 + } 1777 + }
+26 -1
pkg/hold/storage.go
··· 11 11 "github.com/aws/aws-sdk-go/service/s3" 12 12 ) 13 13 14 + // atprotoBlobPath creates a per-DID storage path for ATProto blobs 15 + // ATProto spec stores blobs as: /repos/{did}/blobs/{cid}/data 16 + // This provides data sovereignty - each user's blobs are isolated 17 + func atprotoBlobPath(did, cid string) string { 18 + // Clean DID for filesystem safety (replace : with -) 19 + safeDID := strings.ReplaceAll(did, ":", "-") 20 + return fmt.Sprintf("/repos/%s/blobs/%s/data", safeDID, cid) 21 + } 22 + 14 23 // blobPath converts a digest (e.g., "sha256:abc123...") or temp path to a storage path 15 24 // Distribution stores blobs as: /docker/registry/v2/blobs/{algorithm}/{xx}/{hash}/data 16 25 // where xx is the first 2 characters of the hash for directory sharding 17 26 // NOTE: Path must start with / for filesystem driver 27 + // This is used for OCI container layers (content-addressed, globally deduplicated) 18 28 func blobPath(digest string) string { 19 29 // Handle temp paths (start with uploads/temp-) 20 30 if strings.HasPrefix(digest, "uploads/temp-") { ··· 40 50 } 41 51 42 52 // getPresignedURL generates a presigned URL for GET, HEAD, or PUT operations 53 + // Distinguishes between ATProto blobs (per-DID) and OCI blobs (content-addressed) 43 54 func (s *HoldService) getPresignedURL(ctx context.Context, operation PresignedURLOperation, digest string, did string) (string, error) { 44 - path := blobPath(digest) 55 + var path string 56 + 57 + // Determine blob type and construct appropriate path 58 + if strings.HasPrefix(digest, "sha256:") || strings.HasPrefix(digest, "uploads/") { 59 + // OCI container layer (sha256 digest or temp upload path) 60 + // Use content-addressed storage (globally deduplicated) 61 + path = blobPath(digest) 62 + } else { 63 + // ATProto blob (CID format like bafyreib...) 64 + // Use per-DID storage for data sovereignty 65 + if did == "" { 66 + return "", fmt.Errorf("DID required for ATProto blob storage") 67 + } 68 + path = atprotoBlobPath(did, digest) 69 + } 45 70 46 71 // Check blob exists for GET/HEAD operations (not for PUT since blob doesn't exist yet) 47 72 if operation == OperationGet || operation == OperationHead {