A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
81
fork

Configure Feed

Select the types of activity you want to include in your feed.

try and use multipart uploads

+1278 -136
+1 -2
cmd/appview/serve.go
··· 411 411 return nil, nil, nil 412 412 } 413 413 414 - fmt.Printf("UI database initialized at %s\n", dbPath) 415 - fmt.Printf("Read-only connection with authorizer created (blocks: oauth_sessions, ui_sessions, devices, etc.)\n") 414 + fmt.Printf("UI database (readonly) initialized at %s\n", dbPath) 416 415 417 416 // Create SQLite-backed session store 418 417 sessionStore := db.NewSessionStore(database)
+313
cmd/hold/main.go
··· 194 194 ExpiresAt time.Time `json:"expires_at"` 195 195 } 196 196 197 + // StartMultipartUploadRequest initiates a multipart upload 198 + type StartMultipartUploadRequest struct { 199 + DID string `json:"did"` 200 + Digest string `json:"digest"` 201 + } 202 + 203 + // StartMultipartUploadResponse contains the upload ID 204 + type StartMultipartUploadResponse struct { 205 + UploadID string `json:"upload_id"` 206 + ExpiresAt time.Time `json:"expires_at"` 207 + } 208 + 209 + // GetPartURLRequest requests a presigned URL for a specific part 210 + type GetPartURLRequest struct { 211 + DID string `json:"did"` 212 + Digest string `json:"digest"` 213 + UploadID string `json:"upload_id"` 214 + PartNumber int `json:"part_number"` 215 + } 216 + 217 + // GetPartURLResponse contains the presigned URL for the part 218 + type GetPartURLResponse struct { 219 + URL string `json:"url"` 220 + ExpiresAt time.Time `json:"expires_at"` 221 + } 222 + 223 + // CompletedPart represents a completed multipart upload part 224 + type CompletedPart struct { 225 + PartNumber int `json:"part_number"` 226 + ETag string `json:"etag"` 227 + } 228 + 229 + // CompleteMultipartRequest completes a multipart upload 230 + type CompleteMultipartRequest struct { 231 + DID string `json:"did"` 232 + Digest string `json:"digest"` 233 + UploadID string `json:"upload_id"` 234 + Parts []CompletedPart `json:"parts"` 235 + } 236 + 237 + // AbortMultipartRequest aborts an in-progress upload 238 + type AbortMultipartRequest struct { 239 + DID string `json:"did"` 240 + Digest string `json:"digest"` 241 + UploadID string `json:"upload_id"` 242 + } 243 + 197 244 // HandleGetPresignedURL handles requests for download URLs 198 245 func (s *HoldService) HandleGetPresignedURL(w http.ResponseWriter, r *http.Request) { 199 246 if r.Method != http.MethodPost { ··· 642 689 return fmt.Sprintf("%s/blobs/%s?did=%s", s.config.Server.PublicURL, digest, did) 643 690 } 644 691 692 + // startMultipartUpload initiates a multipart upload and returns upload ID 693 + func (s *HoldService) startMultipartUpload(ctx context.Context, digest string) (string, error) { 694 + if s.s3Client == nil { 695 + return "", fmt.Errorf("S3 not configured for multipart uploads") 696 + } 697 + 698 + path := blobPath(digest) 699 + s3Key := strings.TrimPrefix(path, "/") 700 + if s.s3PathPrefix != "" { 701 + s3Key = s.s3PathPrefix + "/" + s3Key 702 + } 703 + 704 + result, err := s.s3Client.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ 705 + Bucket: aws.String(s.bucket), 706 + Key: aws.String(s3Key), 707 + }) 708 + if err != nil { 709 + return "", fmt.Errorf("failed to create multipart upload: %w", err) 710 + } 711 + 712 + log.Printf("Started multipart upload: key=%s, uploadId=%s", s3Key, *result.UploadId) 713 + return *result.UploadId, nil 714 + } 715 + 716 + // getPartPresignedURL generates presigned URL for a specific part 717 + func (s *HoldService) getPartPresignedURL(ctx context.Context, digest, uploadID string, partNumber int) (string, error) { 718 + if s.s3Client == nil { 719 + return "", fmt.Errorf("S3 not configured for multipart uploads") 720 + } 721 + 722 + path := blobPath(digest) 723 + s3Key := strings.TrimPrefix(path, "/") 724 + if s.s3PathPrefix != "" { 725 + s3Key = s.s3PathPrefix + "/" + s3Key 726 + } 727 + 728 + req, _ := s.s3Client.UploadPartRequest(&s3.UploadPartInput{ 729 + Bucket: aws.String(s.bucket), 730 + Key: aws.String(s3Key), 731 + UploadId: aws.String(uploadID), 732 + PartNumber: aws.Int64(int64(partNumber)), 733 + }) 734 + 735 + url, err := req.Presign(15 * time.Minute) 736 + if err != nil { 737 + return "", fmt.Errorf("failed to presign part URL: %w", err) 738 + } 739 + 740 + log.Printf("Generated presigned URL for part %d: key=%s, uploadId=%s", partNumber, s3Key, uploadID) 741 + return url, nil 742 + } 743 + 744 + // completeMultipartUpload finalizes the multipart upload 745 + func (s *HoldService) completeMultipartUpload(ctx context.Context, digest, uploadID string, parts []CompletedPart) error { 746 + if s.s3Client == nil { 747 + return fmt.Errorf("S3 not configured for multipart uploads") 748 + } 749 + 750 + path := blobPath(digest) 751 + s3Key := strings.TrimPrefix(path, "/") 752 + if s.s3PathPrefix != "" { 753 + s3Key = s.s3PathPrefix + "/" + s3Key 754 + } 755 + 756 + // Convert to S3 CompletedPart format 757 + s3Parts := make([]*s3.CompletedPart, len(parts)) 758 + for i, p := range parts { 759 + s3Parts[i] = &s3.CompletedPart{ 760 + PartNumber: aws.Int64(int64(p.PartNumber)), 761 + ETag: aws.String(p.ETag), 762 + } 763 + } 764 + 765 + _, err := s.s3Client.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ 766 + Bucket: aws.String(s.bucket), 767 + Key: aws.String(s3Key), 768 + UploadId: aws.String(uploadID), 769 + MultipartUpload: &s3.CompletedMultipartUpload{ 770 + Parts: s3Parts, 771 + }, 772 + }) 773 + 774 + if err != nil { 775 + return fmt.Errorf("failed to complete multipart upload: %w", err) 776 + } 777 + 778 + log.Printf("Completed multipart upload: key=%s, uploadId=%s, parts=%d", s3Key, uploadID, len(parts)) 779 + return nil 780 + } 781 + 782 + // abortMultipartUpload cancels an in-progress multipart upload 783 + func (s *HoldService) abortMultipartUpload(ctx context.Context, digest, uploadID string) error { 784 + if s.s3Client == nil { 785 + return fmt.Errorf("S3 not configured for multipart uploads") 786 + } 787 + 788 + path := blobPath(digest) 789 + s3Key := strings.TrimPrefix(path, "/") 790 + if s.s3PathPrefix != "" { 791 + s3Key = s.s3PathPrefix + "/" + s3Key 792 + } 793 + 794 + _, err := s.s3Client.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ 795 + Bucket: aws.String(s.bucket), 796 + Key: aws.String(s3Key), 797 + UploadId: aws.String(uploadID), 798 + }) 799 + 800 + if err != nil { 801 + return fmt.Errorf("failed to abort multipart upload: %w", err) 802 + } 803 + 804 + log.Printf("Aborted multipart upload: key=%s, uploadId=%s", s3Key, uploadID) 805 + return nil 806 + } 807 + 808 + // HandleStartMultipart initiates a multipart upload 809 + func (s *HoldService) HandleStartMultipart(w http.ResponseWriter, r *http.Request) { 810 + if r.Method != http.MethodPost { 811 + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 812 + return 813 + } 814 + 815 + var req StartMultipartUploadRequest 816 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 817 + http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) 818 + return 819 + } 820 + 821 + // Validate DID authorization for WRITE 822 + if !s.isAuthorizedWrite(req.DID) { 823 + if req.DID == "" { 824 + http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 825 + } else { 826 + http.Error(w, "forbidden: write access denied", http.StatusForbidden) 827 + } 828 + return 829 + } 830 + 831 + ctx := r.Context() 832 + uploadID, err := s.startMultipartUpload(ctx, req.Digest) 833 + if err != nil { 834 + http.Error(w, fmt.Sprintf("failed to start multipart upload: %v", err), http.StatusInternalServerError) 835 + return 836 + } 837 + 838 + resp := StartMultipartUploadResponse{ 839 + UploadID: uploadID, 840 + ExpiresAt: time.Now().Add(24 * time.Hour), // Multipart uploads expire in 24h 841 + } 842 + 843 + w.Header().Set("Content-Type", "application/json") 844 + json.NewEncoder(w).Encode(resp) 845 + } 846 + 847 + // HandleGetPartURL generates a presigned URL for uploading a specific part 848 + func (s *HoldService) HandleGetPartURL(w http.ResponseWriter, r *http.Request) { 849 + if r.Method != http.MethodPost { 850 + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 851 + return 852 + } 853 + 854 + var req GetPartURLRequest 855 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 856 + http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) 857 + return 858 + } 859 + 860 + // Validate DID authorization for WRITE 861 + if !s.isAuthorizedWrite(req.DID) { 862 + if req.DID == "" { 863 + http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 864 + } else { 865 + http.Error(w, "forbidden: write access denied", http.StatusForbidden) 866 + } 867 + return 868 + } 869 + 870 + ctx := r.Context() 871 + url, err := s.getPartPresignedURL(ctx, req.Digest, req.UploadID, req.PartNumber) 872 + if err != nil { 873 + http.Error(w, fmt.Sprintf("failed to generate part URL: %v", err), http.StatusInternalServerError) 874 + return 875 + } 876 + 877 + resp := GetPartURLResponse{ 878 + URL: url, 879 + ExpiresAt: time.Now().Add(15 * time.Minute), 880 + } 881 + 882 + w.Header().Set("Content-Type", "application/json") 883 + json.NewEncoder(w).Encode(resp) 884 + } 885 + 886 + // HandleCompleteMultipart completes a multipart upload 887 + func (s *HoldService) HandleCompleteMultipart(w http.ResponseWriter, r *http.Request) { 888 + if r.Method != http.MethodPost { 889 + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 890 + return 891 + } 892 + 893 + var req CompleteMultipartRequest 894 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 895 + http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) 896 + return 897 + } 898 + 899 + // Validate DID authorization for WRITE 900 + if !s.isAuthorizedWrite(req.DID) { 901 + if req.DID == "" { 902 + http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 903 + } else { 904 + http.Error(w, "forbidden: write access denied", http.StatusForbidden) 905 + } 906 + return 907 + } 908 + 909 + ctx := r.Context() 910 + if err := s.completeMultipartUpload(ctx, req.Digest, req.UploadID, req.Parts); err != nil { 911 + http.Error(w, fmt.Sprintf("failed to complete multipart upload: %v", err), http.StatusInternalServerError) 912 + return 913 + } 914 + 915 + w.WriteHeader(http.StatusOK) 916 + json.NewEncoder(w).Encode(map[string]string{"status": "completed"}) 917 + } 918 + 919 + // HandleAbortMultipart aborts a multipart upload 920 + func (s *HoldService) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) { 921 + if r.Method != http.MethodPost { 922 + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 923 + return 924 + } 925 + 926 + var req AbortMultipartRequest 927 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 928 + http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) 929 + return 930 + } 931 + 932 + // Validate DID authorization for WRITE 933 + if !s.isAuthorizedWrite(req.DID) { 934 + if req.DID == "" { 935 + http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 936 + } else { 937 + http.Error(w, "forbidden: write access denied", http.StatusForbidden) 938 + } 939 + return 940 + } 941 + 942 + ctx := r.Context() 943 + if err := s.abortMultipartUpload(ctx, req.Digest, req.UploadID); err != nil { 944 + http.Error(w, fmt.Sprintf("failed to abort multipart upload: %v", err), http.StatusInternalServerError) 945 + return 946 + } 947 + 948 + w.WriteHeader(http.StatusOK) 949 + json.NewEncoder(w).Encode(map[string]string{"status": "aborted"}) 950 + } 951 + 645 952 // RegisterRequest represents a request to register this hold in a user's PDS 646 953 type RegisterRequest struct { 647 954 DID string `json:"did"` ··· 759 1066 mux.HandleFunc("/get-presigned-url", service.HandleGetPresignedURL) 760 1067 mux.HandleFunc("/put-presigned-url", service.HandlePutPresignedURL) 761 1068 mux.HandleFunc("/move", service.HandleMove) 1069 + 1070 + // Multipart upload endpoints 1071 + mux.HandleFunc("/start-multipart", service.HandleStartMultipart) 1072 + mux.HandleFunc("/part-presigned-url", service.HandleGetPartURL) 1073 + mux.HandleFunc("/complete-multipart", service.HandleCompleteMultipart) 1074 + mux.HandleFunc("/abort-multipart", service.HandleAbortMultipart) 762 1075 763 1076 // Pre-register OAuth callback route (will be populated by auto-registration) 764 1077 var oauthCallbackHandler http.HandlerFunc
+570
docs/MULTIPART.md
··· 1 + S3 Multipart Upload Implementation Plan 2 + 3 + Problem Summary 4 + 5 + Current implementation uses a single presigned URL with a pipe for chunked uploads (PATCH). This causes: 6 + - Docker PATCH requests block waiting for pipe writes 7 + - S3 upload happens in background via single presigned URL 8 + - Docker times out → "client disconnected during blob PATCH" 9 + - Root cause: Single presigned URLs don't support OCI's chunked upload protocol 10 + 11 + Solution: S3 Multipart Upload API 12 + 13 + Implement proper S3 multipart upload to support Docker's chunked PATCH operations: 14 + - Each PATCH → separate S3 part upload with its own presigned URL 15 + - On Commit → complete multipart upload 16 + - No buffering, no pipes, no blocking 17 + 18 + --- 19 + Architecture Changes 20 + 21 + Current (Broken) Flow 22 + 23 + POST /blobs/uploads/ → Create() → Single presigned URL to temp location 24 + PATCH → Write to pipe → [blocks] → Background goroutine uploads via single URL 25 + PATCH → [blocks on pipe] → Docker timeout → disconnect ❌ 26 + 27 + New (Multipart) Flow 28 + 29 + POST /blobs/uploads/ → Create() → Initiate multipart upload, get upload ID 30 + PATCH #1 → Get presigned URL for part 1 → Upload part 1 to S3 → Store ETag 31 + PATCH #2 → Get presigned URL for part 2 → Upload part 2 to S3 → Store ETag 32 + PUT (commit) → Complete multipart upload with ETags → Done ✅ 33 + 34 + --- 35 + Implementation Details 36 + 37 + 1. Hold Service: Add Multipart Upload Endpoints 38 + 39 + File: cmd/hold/main.go 40 + 41 + New Request/Response Types 42 + 43 + // StartMultipartUploadRequest initiates a multipart upload 44 + type StartMultipartUploadRequest struct { 45 + DID string `json:"did"` 46 + Digest string `json:"digest"` 47 + } 48 + 49 + type StartMultipartUploadResponse struct { 50 + UploadID string `json:"upload_id"` 51 + ExpiresAt time.Time `json:"expires_at"` 52 + } 53 + 54 + // GetPartURLRequest requests a presigned URL for a specific part 55 + type GetPartURLRequest struct { 56 + DID string `json:"did"` 57 + Digest string `json:"digest"` 58 + UploadID string `json:"upload_id"` 59 + PartNumber int `json:"part_number"` 60 + } 61 + 62 + type GetPartURLResponse struct { 63 + URL string `json:"url"` 64 + ExpiresAt time.Time `json:"expires_at"` 65 + } 66 + 67 + // CompleteMultipartRequest completes a multipart upload 68 + type CompleteMultipartRequest struct { 69 + DID string `json:"did"` 70 + Digest string `json:"digest"` 71 + UploadID string `json:"upload_id"` 72 + Parts []CompletedPart `json:"parts"` 73 + } 74 + 75 + type CompletedPart struct { 76 + PartNumber int `json:"part_number"` 77 + ETag string `json:"etag"` 78 + } 79 + 80 + // AbortMultipartRequest aborts an in-progress upload 81 + type AbortMultipartRequest struct { 82 + DID string `json:"did"` 83 + Digest string `json:"digest"` 84 + UploadID string `json:"upload_id"` 85 + } 86 + 87 + New Endpoints 88 + 89 + POST /start-multipart 90 + func (s *HoldService) HandleStartMultipart(w http.ResponseWriter, r *http.Request) { 91 + // Validate DID authorization for WRITE 92 + // Build S3 key from digest 93 + // Call s3.CreateMultipartUploadRequest() 94 + // Generate presigned URL if needed, or return upload ID 95 + // Return upload ID to client 96 + } 97 + 98 + POST /part-presigned-url 99 + func (s *HoldService) HandleGetPartURL(w http.ResponseWriter, r *http.Request) { 100 + // Validate DID authorization for WRITE 101 + // Build S3 key from digest 102 + // Call s3.UploadPartRequest() with part number and upload ID 103 + // Generate presigned URL 104 + // Return presigned URL for this specific part 105 + } 106 + 107 + POST /complete-multipart 108 + func (s *HoldService) HandleCompleteMultipart(w http.ResponseWriter, r *http.Request) { 109 + // Validate DID authorization for WRITE 110 + // Build S3 key from digest 111 + // Prepare CompletedPart array with part numbers and ETags 112 + // Call s3.CompleteMultipartUpload() 113 + // Return success 114 + } 115 + 116 + POST /abort-multipart (for cleanup) 117 + func (s *HoldService) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) { 118 + // Validate DID authorization for WRITE 119 + // Call s3.AbortMultipartUpload() 120 + // Return success 121 + } 122 + 123 + S3 Implementation 124 + 125 + // startMultipartUpload initiates a multipart upload and returns upload ID 126 + func (s *HoldService) startMultipartUpload(ctx context.Context, digest string) (string, error) { 127 + if s.s3Client == nil { 128 + return "", fmt.Errorf("S3 not configured") 129 + } 130 + 131 + path := blobPath(digest) 132 + s3Key := strings.TrimPrefix(path, "/") 133 + if s.s3PathPrefix != "" { 134 + s3Key = s.s3PathPrefix + "/" + s3Key 135 + } 136 + 137 + result, err := s.s3Client.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ 138 + Bucket: aws.String(s.bucket), 139 + Key: aws.String(s3Key), 140 + }) 141 + if err != nil { 142 + return "", err 143 + } 144 + 145 + return *result.UploadId, nil 146 + } 147 + 148 + // getPartPresignedURL generates presigned URL for a specific part 149 + func (s *HoldService) getPartPresignedURL(ctx context.Context, digest, uploadID string, partNumber int) (string, error) { 150 + if s.s3Client == nil { 151 + return "", fmt.Errorf("S3 not configured") 152 + } 153 + 154 + path := blobPath(digest) 155 + s3Key := strings.TrimPrefix(path, "/") 156 + if s.s3PathPrefix != "" { 157 + s3Key = s.s3PathPrefix + "/" + s3Key 158 + } 159 + 160 + req, _ := s.s3Client.UploadPartRequest(&s3.UploadPartInput{ 161 + Bucket: aws.String(s.bucket), 162 + Key: aws.String(s3Key), 163 + UploadId: aws.String(uploadID), 164 + PartNumber: aws.Int64(int64(partNumber)), 165 + }) 166 + 167 + return req.Presign(15 * time.Minute) 168 + } 169 + 170 + // completeMultipartUpload finalizes the multipart upload 171 + func (s *HoldService) completeMultipartUpload(ctx context.Context, digest, uploadID string, parts []CompletedPart) error { 172 + if s.s3Client == nil { 173 + return fmt.Errorf("S3 not configured") 174 + } 175 + 176 + path := blobPath(digest) 177 + s3Key := strings.TrimPrefix(path, "/") 178 + if s.s3PathPrefix != "" { 179 + s3Key = s.s3PathPrefix + "/" + s3Key 180 + } 181 + 182 + // Convert to S3 CompletedPart format 183 + s3Parts := make([]*s3.CompletedPart, len(parts)) 184 + for i, p := range parts { 185 + s3Parts[i] = &s3.CompletedPart{ 186 + PartNumber: aws.Int64(int64(p.PartNumber)), 187 + ETag: aws.String(p.ETag), 188 + } 189 + } 190 + 191 + _, err := s.s3Client.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ 192 + Bucket: aws.String(s.bucket), 193 + Key: aws.String(s3Key), 194 + UploadId: aws.String(uploadID), 195 + MultipartUpload: &s3.CompletedMultipartUpload{ 196 + Parts: s3Parts, 197 + }, 198 + }) 199 + 200 + return err 201 + } 202 + 203 + --- 204 + 2. AppView: Rewrite ProxyBlobStore for Multipart 205 + 206 + File: pkg/storage/proxy_blob_store.go 207 + 208 + Remove Current Implementation 209 + 210 + - Remove pipe-based streaming 211 + - Remove background goroutine with single presigned URL 212 + - Remove global upload tracking map 213 + 214 + New ProxyBlobWriter Structure 215 + 216 + type ProxyBlobWriter struct { 217 + store *ProxyBlobStore 218 + options distribution.CreateOptions 219 + uploadID string // S3 multipart upload ID 220 + parts []CompletedPart // Track uploaded parts with ETags 221 + partNumber int // Current part number (starts at 1) 222 + buffer *bytes.Buffer // Buffer for current part 223 + size int64 // Total bytes written 224 + closed bool 225 + id string // Distribution's upload ID (for state) 226 + startedAt time.Time 227 + finalDigest string // Set on Commit 228 + } 229 + 230 + type CompletedPart struct { 231 + PartNumber int 232 + ETag string 233 + } 234 + 235 + New Create() - Initiate Multipart Upload 236 + 237 + func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { 238 + var opts distribution.CreateOptions 239 + for _, option := range options { 240 + if err := option.Apply(&opts); err != nil { 241 + return nil, err 242 + } 243 + } 244 + 245 + // Use temp digest for upload location 246 + writerID := fmt.Sprintf("upload-%d", time.Now().UnixNano()) 247 + tempDigest := digest.Digest(fmt.Sprintf("uploads/temp-%s", writerID)) 248 + 249 + // Start multipart upload via hold service 250 + uploadID, err := p.startMultipartUpload(ctx, tempDigest) 251 + if err != nil { 252 + return nil, fmt.Errorf("failed to start multipart upload: %w", err) 253 + } 254 + 255 + writer := &ProxyBlobWriter{ 256 + store: p, 257 + options: opts, 258 + uploadID: uploadID, 259 + parts: make([]CompletedPart, 0), 260 + partNumber: 1, 261 + buffer: bytes.NewBuffer(make([]byte, 0, 5*1024*1024)), // 5MB buffer 262 + id: writerID, 263 + startedAt: time.Now(), 264 + } 265 + 266 + // Store in global map for Resume() 267 + globalUploadsMu.Lock() 268 + globalUploads[writer.id] = writer 269 + globalUploadsMu.Unlock() 270 + 271 + return writer, nil 272 + } 273 + 274 + New Write() - Buffer and Flush Parts 275 + 276 + func (w *ProxyBlobWriter) Write(p []byte) (int, error) { 277 + if w.closed { 278 + return 0, fmt.Errorf("writer closed") 279 + } 280 + 281 + n, err := w.buffer.Write(p) 282 + w.size += int64(n) 283 + 284 + // Flush if buffer reaches 5MB (S3 minimum part size) 285 + if w.buffer.Len() >= 5*1024*1024 { 286 + if err := w.flushPart(); err != nil { 287 + return n, err 288 + } 289 + } 290 + 291 + return n, err 292 + } 293 + 294 + func (w *ProxyBlobWriter) flushPart() error { 295 + if w.buffer.Len() == 0 { 296 + return nil 297 + } 298 + 299 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 300 + defer cancel() 301 + 302 + // Get presigned URL for this part 303 + tempDigest := digest.Digest(fmt.Sprintf("uploads/temp-%s", w.id)) 304 + url, err := w.store.getPartPresignedURL(ctx, tempDigest, w.uploadID, w.partNumber) 305 + if err != nil { 306 + return fmt.Errorf("failed to get part presigned URL: %w", err) 307 + } 308 + 309 + // Upload part to S3 310 + req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(w.buffer.Bytes())) 311 + if err != nil { 312 + return err 313 + } 314 + 315 + resp, err := w.store.httpClient.Do(req) 316 + if err != nil { 317 + return err 318 + } 319 + defer resp.Body.Close() 320 + 321 + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 322 + return fmt.Errorf("part upload failed: status %d", resp.StatusCode) 323 + } 324 + 325 + // Store ETag for completion 326 + etag := resp.Header.Get("ETag") 327 + if etag == "" { 328 + return fmt.Errorf("no ETag in response") 329 + } 330 + 331 + w.parts = append(w.parts, CompletedPart{ 332 + PartNumber: w.partNumber, 333 + ETag: etag, 334 + }) 335 + 336 + // Reset buffer and increment part number 337 + w.buffer.Reset() 338 + w.partNumber++ 339 + 340 + return nil 341 + } 342 + 343 + New Commit() - Complete Multipart and Move 344 + 345 + func (w *ProxyBlobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error) { 346 + if w.closed { 347 + return distribution.Descriptor{}, fmt.Errorf("writer closed") 348 + } 349 + w.closed = true 350 + 351 + // Flush any remaining buffered data 352 + if w.buffer.Len() > 0 { 353 + if err := w.flushPart(); err != nil { 354 + // Try to abort multipart on error 355 + w.store.abortMultipartUpload(ctx, w.uploadID) 356 + return distribution.Descriptor{}, err 357 + } 358 + } 359 + 360 + // Complete multipart upload at temp location 361 + tempDigest := digest.Digest(fmt.Sprintf("uploads/temp-%s", w.id)) 362 + if err := w.store.completeMultipartUpload(ctx, tempDigest, w.uploadID, w.parts); err != nil { 363 + return distribution.Descriptor{}, err 364 + } 365 + 366 + // Move from temp → final location (server-side S3 copy) 367 + tempPath := fmt.Sprintf("uploads/temp-%s", w.id) 368 + finalPath := desc.Digest.String() 369 + 370 + moveURL := fmt.Sprintf("%s/move?from=%s&to=%s&did=%s", 371 + w.store.storageEndpoint, tempPath, finalPath, w.store.did) 372 + 373 + req, err := http.NewRequestWithContext(ctx, "POST", moveURL, nil) 374 + if err != nil { 375 + return distribution.Descriptor{}, err 376 + } 377 + 378 + resp, err := w.store.httpClient.Do(req) 379 + if err != nil { 380 + return distribution.Descriptor{}, err 381 + } 382 + defer resp.Body.Close() 383 + 384 + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 385 + bodyBytes, _ := io.ReadAll(resp.Body) 386 + return distribution.Descriptor{}, fmt.Errorf("move failed: %d, %s", resp.StatusCode, bodyBytes) 387 + } 388 + 389 + // Remove from global map 390 + globalUploadsMu.Lock() 391 + delete(globalUploads, w.id) 392 + globalUploadsMu.Unlock() 393 + 394 + return distribution.Descriptor{ 395 + Digest: desc.Digest, 396 + Size: w.size, 397 + MediaType: desc.MediaType, 398 + }, nil 399 + } 400 + 401 + Add Hold Service Client Methods 402 + 403 + func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, dgst digest.Digest) (string, error) { 404 + reqBody := map[string]any{ 405 + "did": p.did, 406 + "digest": dgst.String(), 407 + } 408 + body, _ := json.Marshal(reqBody) 409 + 410 + url := fmt.Sprintf("%s/start-multipart", p.storageEndpoint) 411 + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 412 + req.Header.Set("Content-Type", "application/json") 413 + 414 + resp, err := p.httpClient.Do(req) 415 + if err != nil { 416 + return "", err 417 + } 418 + defer resp.Body.Close() 419 + 420 + var result struct { 421 + UploadID string `json:"upload_id"` 422 + } 423 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 424 + return "", err 425 + } 426 + 427 + return result.UploadID, nil 428 + } 429 + 430 + func (p *ProxyBlobStore) getPartPresignedURL(ctx context.Context, dgst digest.Digest, uploadID string, partNumber int) (string, error) { 431 + reqBody := map[string]any{ 432 + "did": p.did, 433 + "digest": dgst.String(), 434 + "upload_id": uploadID, 435 + "part_number": partNumber, 436 + } 437 + body, _ := json.Marshal(reqBody) 438 + 439 + url := fmt.Sprintf("%s/part-presigned-url", p.storageEndpoint) 440 + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 441 + req.Header.Set("Content-Type", "application/json") 442 + 443 + resp, err := p.httpClient.Do(req) 444 + if err != nil { 445 + return "", err 446 + } 447 + defer resp.Body.Close() 448 + 449 + var result struct { 450 + URL string `json:"url"` 451 + } 452 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 453 + return "", err 454 + } 455 + 456 + return result.URL, nil 457 + } 458 + 459 + func (p *ProxyBlobStore) completeMultipartUpload(ctx context.Context, dgst digest.Digest, uploadID string, parts []CompletedPart) error { 460 + reqBody := map[string]any{ 461 + "did": p.did, 462 + "digest": dgst.String(), 463 + "upload_id": uploadID, 464 + "parts": parts, 465 + } 466 + body, _ := json.Marshal(reqBody) 467 + 468 + url := fmt.Sprintf("%s/complete-multipart", p.storageEndpoint) 469 + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 470 + req.Header.Set("Content-Type", "application/json") 471 + 472 + resp, err := p.httpClient.Do(req) 473 + if err != nil { 474 + return err 475 + } 476 + defer resp.Body.Close() 477 + 478 + if resp.StatusCode != http.StatusOK { 479 + return fmt.Errorf("complete multipart failed: status %d", resp.StatusCode) 480 + } 481 + 482 + return nil 483 + } 484 + 485 + --- 486 + Testing Plan 487 + 488 + 1. Unit Tests 489 + 490 + - Test multipart upload initiation 491 + - Test part upload with presigned URLs 492 + - Test completion with ETags 493 + - Test abort on errors 494 + 495 + 2. Integration Tests 496 + 497 + - Push small images (< 5MB, single part) 498 + - Push medium images (10MB, 2 parts) 499 + - Push large images (100MB, 20 parts) 500 + - Test with Upcloud S3 501 + - Test with Storj S3 502 + 503 + 3. Validation 504 + 505 + - Monitor logs for "client disconnected" errors (should be gone) 506 + - Check Docker push success rate 507 + - Verify blobs stored correctly in S3 508 + - Check bandwidth usage on hold service (should be minimal) 509 + 510 + --- 511 + Migration & Deployment 512 + 513 + Backward Compatibility 514 + 515 + - Keep /put-presigned-url endpoint for fallback 516 + - Keep /move endpoint (still needed) 517 + - New multipart endpoints are additive 518 + 519 + Deployment Steps 520 + 521 + 1. Update hold service with new endpoints 522 + 2. Update AppView ProxyBlobStore 523 + 3. Deploy hold service first 524 + 4. Deploy AppView 525 + 5. Test with sample push 526 + 6. Monitor logs 527 + 528 + Rollback Plan 529 + 530 + - Revert AppView to previous version (uses old presigned URL method) 531 + - Hold service keeps both old and new endpoints 532 + 533 + --- 534 + Documentation Updates 535 + 536 + Update docs/PRESIGNED_URLS.md 537 + 538 + - Add section "Multipart Upload for Chunked Data" 539 + - Explain why single presigned URLs don't work with PATCH 540 + - Document new endpoints and flow 541 + - Add S3 part size recommendations (5MB-64MB for Storj) 542 + 543 + Add Troubleshooting Section 544 + 545 + - "Client disconnected during PATCH" → resolved by multipart 546 + - Storj-specific considerations (64MB parts recommended) 547 + - Upcloud compatibility notes 548 + 549 + --- 550 + Performance Impact 551 + 552 + Before (Broken) 553 + 554 + - Docker PATCH → blocks on pipe → timeout → retry → fail 555 + - Unable to push large images reliably 556 + 557 + After (Multipart) 558 + 559 + - Each PATCH → independent part upload → immediate response 560 + - No blocking, no timeouts 561 + - Parallel part uploads possible (future optimization) 562 + - Reliable pushes for any image size 563 + 564 + Bandwidth 565 + 566 + - Hold service: Only API calls (~1KB per part) 567 + - Direct S3 uploads: Full blob data 568 + - S3 copy for move: Server-side (no hold bandwidth) 569 + 570 + Estimated savings: 99.98% hold service bandwidth reduction (same as before, but now actually works!)
+141 -3
docs/PRESIGNED_URLS.md
··· 110 110 **Move path:** S3 internal copy (no data transfer!) 111 111 **Hold service bandwidth:** ~2KB (presigned URL + CopyObject API) 112 112 113 + ### For Chunked Uploads (Multipart Upload) 114 + 115 + **Large blobs with OCI chunked protocol (Docker PATCH requests):** 116 + 117 + The OCI Distribution Spec uses chunked uploads via multiple PATCH requests. Single presigned URLs don't support this - we need **S3 Multipart Upload**. 118 + 119 + 1. **Docker starts upload:** `POST /v2/alice/myapp/blobs/uploads/` 120 + 2. **AppView initiates multipart:** 121 + ```json 122 + POST /start-multipart 123 + {"did": "...", "digest": "uploads/temp-{uuid}"} 124 + → Returns: {"upload_id": "xyz123"} 125 + ``` 126 + 3. **Docker sends chunk 1:** `PATCH /v2/.../uploads/{uuid}` (5MB data) 127 + 4. **AppView gets part URL:** 128 + ```json 129 + POST /part-presigned-url 130 + {"did": "...", "digest": "uploads/temp-{uuid}", "upload_id": "xyz123", "part_number": 1} 131 + → Returns: {"url": "https://s3.../part?uploadId=xyz123&partNumber=1&..."} 132 + ``` 133 + 5. **AppView uploads part 1** using presigned URL → Gets ETag 134 + 6. **Docker sends chunk 2:** `PATCH /v2/.../uploads/{uuid}` (5MB data) 135 + 7. **Repeat steps 4-5** for part 2 (and subsequent parts) 136 + 8. **Docker finalizes:** `PUT /v2/.../uploads/{uuid}?digest=sha256:abc123` 137 + 9. **AppView completes multipart:** 138 + ```json 139 + POST /complete-multipart 140 + {"did": "...", "digest": "uploads/temp-{uuid}", "upload_id": "xyz123", 141 + "parts": [{"part_number": 1, "etag": "..."}, {"part_number": 2, "etag": "..."}]} 142 + ``` 143 + 10. **AppView requests move:** `POST /move?from=uploads/temp-{uuid}&to=sha256:abc123` 144 + 11. **Hold service executes S3 server-side copy** (same as above) 145 + 146 + **Data path:** Docker → AppView (buffers 5MB) → S3 (via presigned URL per part) 147 + **Each PATCH:** Independent, non-blocking, immediate response 148 + **Hold service bandwidth:** ~1KB per part + ~1KB for completion 149 + 150 + **Why This Fixes "Client Disconnected" Errors:** 151 + - Previous implementation: Single presigned URL + pipe → PATCH blocks → Docker timeout 152 + - New implementation: Each PATCH → separate part upload → immediate response → no blocking 153 + 113 154 ## Why the Temp → Final Move is Required 114 155 115 156 This is **not an ATCR implementation detail** — it's required by the [OCI Distribution Specification](https://github.com/opencontainers/distribution-spec/blob/main/spec.md#push). ··· 305 346 } 306 347 ``` 307 348 308 - ### 3. No Changes Needed for Move Operation 349 + ### 3. Multipart Upload Endpoints (Required for Chunked Uploads) 350 + 351 + **File: `cmd/hold/main.go`** 352 + 353 + #### Start Multipart Upload 354 + 355 + ```go 356 + func (s *HoldService) HandleStartMultipart(w http.ResponseWriter, r *http.Request) { 357 + var req StartMultipartUploadRequest // {did, digest} 358 + 359 + // Validate DID authorization for WRITE 360 + if !s.isAuthorizedWrite(req.DID) { 361 + // Return 403 Forbidden 362 + } 363 + 364 + // Initiate S3 multipart upload 365 + result, err := s.s3Client.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ 366 + Bucket: aws.String(s.bucket), 367 + Key: aws.String(s3Key), 368 + }) 369 + 370 + // Return upload ID 371 + json.NewEncoder(w).Encode(StartMultipartUploadResponse{ 372 + UploadID: *result.UploadId, 373 + ExpiresAt: time.Now().Add(24 * time.Hour), 374 + }) 375 + } 376 + ``` 377 + 378 + **Route:** `POST /start-multipart` 379 + 380 + #### Get Part Presigned URL 381 + 382 + ```go 383 + func (s *HoldService) HandleGetPartURL(w http.ResponseWriter, r *http.Request) { 384 + var req GetPartURLRequest // {did, digest, upload_id, part_number} 385 + 386 + // Generate presigned URL for specific part 387 + req, _ := s.s3Client.UploadPartRequest(&s3.UploadPartInput{ 388 + Bucket: aws.String(s.bucket), 389 + Key: aws.String(s3Key), 390 + UploadId: aws.String(uploadID), 391 + PartNumber: aws.Int64(int64(partNumber)), 392 + }) 393 + 394 + url, err := req.Presign(15 * time.Minute) 395 + 396 + json.NewEncoder(w).Encode(GetPartURLResponse{URL: url}) 397 + } 398 + ``` 399 + 400 + **Route:** `POST /part-presigned-url` 401 + 402 + #### Complete Multipart Upload 403 + 404 + ```go 405 + func (s *HoldService) HandleCompleteMultipart(w http.ResponseWriter, r *http.Request) { 406 + var req CompleteMultipartRequest // {did, digest, upload_id, parts: [{part_number, etag}]} 407 + 408 + // Convert parts to S3 format 409 + s3Parts := make([]*s3.CompletedPart, len(req.Parts)) 410 + for i, p := range req.Parts { 411 + s3Parts[i] = &s3.CompletedPart{ 412 + PartNumber: aws.Int64(int64(p.PartNumber)), 413 + ETag: aws.String(p.ETag), 414 + } 415 + } 416 + 417 + // Complete multipart upload 418 + _, err := s.s3Client.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ 419 + Bucket: aws.String(s.bucket), 420 + Key: aws.String(s3Key), 421 + UploadId: aws.String(uploadID), 422 + MultipartUpload: &s3.CompletedMultipartUpload{Parts: s3Parts}, 423 + }) 424 + } 425 + ``` 426 + 427 + **Route:** `POST /complete-multipart` 428 + 429 + #### Abort Multipart Upload 430 + 431 + ```go 432 + func (s *HoldService) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) { 433 + var req AbortMultipartRequest // {did, digest, upload_id} 434 + 435 + // Abort and cleanup parts 436 + _, err := s.s3Client.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ 437 + Bucket: aws.String(s.bucket), 438 + Key: aws.String(s3Key), 439 + UploadId: aws.String(uploadID), 440 + }) 441 + } 442 + ``` 443 + 444 + **Route:** `POST /abort-multipart` 445 + 446 + ### 4. Move Operation (No Changes) 309 447 310 448 The existing `/move` endpoint already uses `driver.Move()`, which for S3: 311 449 - Calls `s3.CopyObject()` (server-side copy) 312 450 - Calls `s3.DeleteObject()` (delete source) 313 451 - No data transfer through hold service! 314 452 315 - **File: `cmd/hold/main.go:296` (already exists, no changes needed)** 453 + **File: `cmd/hold/main.go:393` (already exists, no changes needed)** 316 454 317 455 ```go 318 456 func (s *HoldService) HandleMove(w http.ResponseWriter, r *http.Request) { ··· 328 466 } 329 467 ``` 330 468 331 - ### 4. AppView Changes (Optional Optimization) 469 + ### 5. AppView Changes (Multipart Upload Implementation) 332 470 333 471 **File: `pkg/storage/proxy_blob_store.go:228`** 334 472
-5
pkg/auth/token/handler.go
··· 95 95 96 96 did = device.DID 97 97 handle = device.Handle 98 - fmt.Printf("DEBUG [token/handler]: Device secret validated for DID=%s, handle=%s\n", did, handle) 99 - 100 98 // Device is linked to OAuth session via DID 101 99 // OAuth refresher will provide access token when needed via middleware 102 100 } else { ··· 150 148 return 151 149 } 152 150 153 - fmt.Printf("DEBUG [token/handler]: Access validated for DID=%s\n", did) 154 - 155 151 // Issue JWT token 156 152 tokenString, err := h.issuer.Issue(did, access) 157 153 if err != nil { ··· 161 157 } 162 158 163 159 fmt.Printf("DEBUG [token/handler]: Issued JWT token (length=%d) for DID=%s\n", len(tokenString), did) 164 - fmt.Printf("DEBUG [token/handler]: JWT Token: %s\n", tokenString) 165 160 166 161 // Return token response 167 162 now := time.Now()
-5
pkg/middleware/registry.go
··· 117 117 return nil, fmt.Errorf("no storage endpoint configured: ensure default_storage_endpoint is set in middleware config") 118 118 } 119 119 ctx = context.WithValue(ctx, "storage.endpoint", storageEndpoint) 120 - fmt.Printf("DEBUG [registry/middleware]: Using storage endpoint: %s\n", storageEndpoint) 121 120 122 121 // Create a new reference with identity/image format 123 122 // Use the identity (or DID) as the namespace to ensure canonical format ··· 145 144 if err == nil { 146 145 // OAuth session available - use indigo's API client (handles DPoP automatically) 147 146 apiClient := session.APIClient() 148 - fmt.Printf("DEBUG [registry/middleware]: Using OAuth session with indigo API client for DID=%s\n", did) 149 147 atprotoClient = atproto.NewClientWithIndigoClient(pdsEndpoint, did, apiClient) 150 148 } else { 151 149 fmt.Printf("DEBUG [registry/middleware]: OAuth refresh failed for DID=%s: %v, falling back to Basic Auth\n", did, err) ··· 174 172 175 173 // Check cache first 176 174 if cached, ok := nr.repositories.Load(cacheKey); ok { 177 - fmt.Printf("DEBUG [registry/middleware]: Using cached RoutingRepository for %s\n", cacheKey) 178 175 return cached.(*storage.RoutingRepository), nil 179 176 } 180 - 181 - fmt.Printf("DEBUG [registry/middleware]: Creating new RoutingRepository for image=%s (ATProto repo name)\n", repositoryName) 182 177 183 178 // Create routing repository - routes manifests to ATProto, blobs to hold service 184 179 // The registry is stateless - no local storage is used
+253 -121
pkg/storage/proxy_blob_store.go
··· 7 7 "fmt" 8 8 "io" 9 9 "net/http" 10 + "strings" 10 11 "sync" 11 12 "time" 12 13 ··· 15 16 ) 16 17 17 18 const ( 18 - // maxChunkSize is the maximum buffer size before flushing to hold service 19 - // Matches S3's minimum multipart upload size 20 - maxChunkSize = 5 * 1024 * 1024 // 5MB 19 + // minPartSize is S3's minimum part size for multipart uploads 20 + // Parts must be at least 5MB (except the last part) 21 + minPartSize = 5 * 1024 * 1024 // 5MB 21 22 ) 23 + 24 + // CompletedPart represents a completed multipart upload part 25 + type CompletedPart struct { 26 + PartNumber int `json:"part_number"` 27 + ETag string `json:"etag"` 28 + } 22 29 23 30 // Global upload tracking (shared across all ProxyBlobStore instances) 24 31 // This is necessary because distribution creates new repository/blob store instances per request ··· 198 205 } 199 206 } 200 207 201 - // Create pipe for streaming upload 202 - pipeReader, pipeWriter := io.Pipe() 203 - uploadErr := make(chan error, 1) 204 - digestChan := make(chan string, 1) 208 + // Use temp digest for upload location 209 + writerID := fmt.Sprintf("upload-%d", time.Now().UnixNano()) 210 + tempPath := fmt.Sprintf("uploads/temp-%s", writerID) 211 + tempDigest := digest.Digest(tempPath) 212 + 213 + // Start multipart upload via hold service 214 + uploadID, err := p.startMultipartUpload(ctx, tempDigest) 215 + if err != nil { 216 + return nil, fmt.Errorf("failed to start multipart upload: %w", err) 217 + } 218 + 219 + fmt.Printf("DEBUG [proxy_blob_store/Create]: Started multipart upload: id=%s, uploadID=%s\n", writerID, uploadID) 205 220 206 - // Create writer 207 221 writer := &ProxyBlobWriter{ 208 222 store: p, 209 223 options: opts, 210 - pipeWriter: pipeWriter, 211 - pipeReader: pipeReader, 212 - digestChan: digestChan, 213 - uploadErr: uploadErr, 214 - id: fmt.Sprintf("upload-%d", time.Now().UnixNano()), 224 + uploadID: uploadID, 225 + parts: make([]CompletedPart, 0), 226 + partNumber: 1, 227 + buffer: bytes.NewBuffer(make([]byte, 0, minPartSize)), 228 + id: writerID, 215 229 startedAt: time.Now(), 230 + tempDigest: tempDigest, 216 231 } 217 232 218 - // Store in global uploads map for resume support 233 + // Store in global map for Resume() 219 234 globalUploadsMu.Lock() 220 235 globalUploads[writer.id] = writer 221 236 globalUploadsMu.Unlock() 222 237 223 - // Start background goroutine that streams to temp location immediately 224 - go func() { 225 - defer pipeReader.Close() 226 - 227 - // Stream to temp location immediately to avoid pipe deadlock 228 - tempPath := fmt.Sprintf("uploads/temp-%s", writer.id) // No leading slash 229 - url := fmt.Sprintf("%s/blobs/%s?did=%s", p.storageEndpoint, tempPath, p.did) 230 - 231 - fmt.Printf("DEBUG [goroutine]: Starting upload to temp: url=%s\n", url) 232 - 233 - // Use context with timeout to prevent hanging forever 234 - uploadCtx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) 235 - defer cancel() 236 - 237 - req, err := http.NewRequestWithContext(uploadCtx, "PUT", url, pipeReader) 238 - if err != nil { 239 - fmt.Printf("DEBUG [goroutine]: Failed to create request: %v\n", err) 240 - // Consume digest channel even on error 241 - <-digestChan 242 - uploadErr <- fmt.Errorf("failed to create request: %w", err) 243 - return 244 - } 245 - req.Header.Set("Content-Type", "application/octet-stream") 246 - 247 - fmt.Printf("DEBUG [goroutine]: Sending PUT request...\n") 248 - // Stream to temp location (this will block until all data is written) 249 - resp, err := p.httpClient.Do(req) 250 - if err != nil { 251 - fmt.Printf("DEBUG [goroutine]: PUT failed: %v\n", err) 252 - <-digestChan 253 - uploadErr <- fmt.Errorf("failed to upload to temp: %w", err) 254 - return 255 - } 256 - defer resp.Body.Close() 257 - 258 - fmt.Printf("DEBUG [goroutine]: Got response status=%d\n", resp.StatusCode) 259 - 260 - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 261 - bodyBytes, _ := io.ReadAll(resp.Body) 262 - fmt.Printf("DEBUG [goroutine]: Upload failed with status %d, body=%s\n", resp.StatusCode, string(bodyBytes)) 263 - <-digestChan 264 - uploadErr <- fmt.Errorf("upload to temp failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 265 - return 266 - } 267 - 268 - fmt.Printf("DEBUG [goroutine]: Upload to temp succeeded, waiting for digest...\n") 269 - // Upload to temp succeeded, now wait for digest from Commit() 270 - digest, ok := <-digestChan 271 - if !ok { 272 - uploadErr <- fmt.Errorf("upload cancelled after streaming to temp") 273 - return 274 - } 275 - 276 - fmt.Printf("DEBUG [goroutine]: Got digest=%s, signaling completion\n", digest) 277 - // Store digest for Commit() to use in move operation 278 - writer.finalDigest = digest 279 - uploadErr <- nil 280 - }() 281 - 282 238 return writer, nil 283 239 } 284 240 ··· 293 249 return nil, distribution.ErrBlobUploadUnknown 294 250 } 295 251 296 - // With streaming, no flush needed - just return the writer 297 252 return writer, nil 298 253 } 299 254 ··· 384 339 type ProxyBlobWriter struct { 385 340 store *ProxyBlobStore 386 341 options distribution.CreateOptions 387 - pipeWriter *io.PipeWriter // Streams directly to hold service 388 - pipeReader *io.PipeReader 389 - digestChan chan string // Sends digest to upload goroutine 390 - uploadErr chan error // Receives upload result from goroutine 391 - finalDigest string // Final digest for move operation 392 - size int64 342 + uploadID string // S3 multipart upload ID 343 + parts []CompletedPart // Track uploaded parts with ETags 344 + partNumber int // Current part number (starts at 1) 345 + buffer *bytes.Buffer // Buffer for current part 346 + size int64 // Total bytes written 393 347 closed bool 394 - id string // Distribution's upload ID 348 + id string // Distribution's upload ID (for state) 395 349 startedAt time.Time 350 + finalDigest string // Set on Commit 351 + tempDigest digest.Digest // Temp location digest 396 352 } 397 353 398 354 // ID returns the upload ID ··· 406 362 } 407 363 408 364 // Write writes data to the upload 409 - // Streams directly to hold service via pipe 365 + // Buffers data and flushes parts when buffer reaches minPartSize 410 366 func (w *ProxyBlobWriter) Write(p []byte) (int, error) { 411 367 if w.closed { 412 368 return 0, fmt.Errorf("writer closed") 413 369 } 414 370 415 - // Write to pipe - streams immediately to hold service 416 - n, err := w.pipeWriter.Write(p) 371 + n, err := w.buffer.Write(p) 372 + w.size += int64(n) 373 + 374 + // Flush if buffer reaches minimum part size (5MB) 375 + if w.buffer.Len() >= minPartSize { 376 + if err := w.flushPart(); err != nil { 377 + return n, fmt.Errorf("failed to flush part: %w", err) 378 + } 379 + } 380 + 381 + return n, err 382 + } 383 + 384 + // flushPart uploads the current buffer as a multipart upload part 385 + func (w *ProxyBlobWriter) flushPart() error { 386 + if w.buffer.Len() == 0 { 387 + return nil 388 + } 389 + 390 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 391 + defer cancel() 392 + 393 + // Get presigned URL for this part 394 + url, err := w.store.getPartPresignedURL(ctx, w.tempDigest, w.uploadID, w.partNumber) 417 395 if err != nil { 418 - // If write fails (client disconnected), close pipe to unblock goroutine 419 - w.pipeWriter.CloseWithError(err) 420 - return n, err 396 + return fmt.Errorf("failed to get part presigned URL: %w", err) 397 + } 398 + 399 + // Upload part to S3 400 + req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(w.buffer.Bytes())) 401 + if err != nil { 402 + return err 421 403 } 422 - w.size += int64(n) 404 + req.Header.Set("Content-Type", "application/octet-stream") 405 + 406 + fmt.Printf("DEBUG [proxy_blob_store/flushPart]: Uploading part %d, size=%d bytes\n", w.partNumber, w.buffer.Len()) 423 407 424 - return n, nil 408 + resp, err := w.store.httpClient.Do(req) 409 + if err != nil { 410 + return fmt.Errorf("part upload failed: %w", err) 411 + } 412 + defer resp.Body.Close() 413 + 414 + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 415 + bodyBytes, _ := io.ReadAll(resp.Body) 416 + return fmt.Errorf("part upload failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 417 + } 418 + 419 + // Store ETag for completion 420 + etag := resp.Header.Get("ETag") 421 + if etag == "" { 422 + return fmt.Errorf("no ETag in response") 423 + } 424 + 425 + // Remove quotes from ETag if present (S3 sometimes adds them) 426 + etag = strings.Trim(etag, "\"") 427 + 428 + w.parts = append(w.parts, CompletedPart{ 429 + PartNumber: w.partNumber, 430 + ETag: etag, 431 + }) 432 + 433 + fmt.Printf("DEBUG [proxy_blob_store/flushPart]: Part %d uploaded successfully, ETag=%s\n", w.partNumber, etag) 434 + 435 + // Reset buffer and increment part number 436 + w.buffer.Reset() 437 + w.partNumber++ 438 + 439 + return nil 425 440 } 426 441 427 442 // ReadFrom reads from a reader ··· 466 481 } 467 482 w.closed = true 468 483 469 - // Remove from global uploads map 470 - globalUploadsMu.Lock() 471 - delete(globalUploads, w.id) 472 - globalUploadsMu.Unlock() 484 + // Flush any remaining buffered data as the final part 485 + if w.buffer.Len() > 0 { 486 + if err := w.flushPart(); err != nil { 487 + // Try to abort multipart on error 488 + w.store.abortMultipartUpload(ctx, w.tempDigest, w.uploadID) 489 + return distribution.Descriptor{}, fmt.Errorf("failed to flush final part: %w", err) 490 + } 491 + } 473 492 474 - // Close pipe to signal EOF to upload goroutine 475 - if err := w.pipeWriter.Close(); err != nil { 476 - return distribution.Descriptor{}, fmt.Errorf("failed to close pipe: %w", err) 493 + // Complete multipart upload at temp location 494 + if err := w.store.completeMultipartUpload(ctx, w.tempDigest, w.uploadID, w.parts); err != nil { 495 + return distribution.Descriptor{}, fmt.Errorf("failed to complete multipart upload: %w", err) 477 496 } 478 497 479 - // Send digest to upload goroutine (it's waiting after temp upload completes) 480 - w.digestChan <- desc.Digest.String() 481 - close(w.digestChan) 482 - 483 - // Wait for upload goroutine to complete 484 - if err := <-w.uploadErr; err != nil { 485 - return distribution.Descriptor{}, fmt.Errorf("upload to temp failed: %w", err) 486 - } 498 + fmt.Printf("DEBUG [proxy_blob_store/Commit]: Completed multipart upload with %d parts, total size=%d\n", len(w.parts), w.size) 487 499 488 - // Now move temp → final location 489 - tempPath := fmt.Sprintf("uploads/temp-%s", w.id) // No leading slash 500 + // Move from temp → final location (server-side S3 copy) 501 + tempPath := fmt.Sprintf("uploads/temp-%s", w.id) 490 502 finalPath := desc.Digest.String() 491 503 492 504 moveURL := fmt.Sprintf("%s/move?from=%s&to=%s&did=%s", 493 505 w.store.storageEndpoint, tempPath, finalPath, w.store.did) 494 506 495 - req, err := http.NewRequestWithContext(context.Background(), "POST", moveURL, nil) 507 + req, err := http.NewRequestWithContext(ctx, "POST", moveURL, nil) 496 508 if err != nil { 497 509 return distribution.Descriptor{}, fmt.Errorf("failed to create move request: %w", err) 498 510 } ··· 505 517 506 518 if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 507 519 bodyBytes, _ := io.ReadAll(resp.Body) 508 - return distribution.Descriptor{}, fmt.Errorf("move blob failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 520 + return distribution.Descriptor{}, fmt.Errorf("move failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 509 521 } 510 522 511 - fmt.Printf("DEBUG [proxy_blob_store]: Committed upload: digest=%s, size=%d (moved from temp)\n", desc.Digest, w.size) 523 + // Remove from global map 524 + globalUploadsMu.Lock() 525 + delete(globalUploads, w.id) 526 + globalUploadsMu.Unlock() 527 + 528 + fmt.Printf("DEBUG [proxy_blob_store/Commit]: Successfully committed: digest=%s, size=%d\n", desc.Digest, w.size) 512 529 513 530 return distribution.Descriptor{ 514 531 Digest: desc.Digest, ··· 526 543 delete(globalUploads, w.id) 527 544 globalUploadsMu.Unlock() 528 545 529 - // Close digest channel without sending digest 530 - close(w.digestChan) 531 - 532 - // Close pipe with error to stop streaming 533 - if w.pipeWriter != nil { 534 - w.pipeWriter.CloseWithError(fmt.Errorf("upload cancelled")) 546 + // Abort multipart upload on S3 547 + if err := w.store.abortMultipartUpload(ctx, w.tempDigest, w.uploadID); err != nil { 548 + fmt.Printf("DEBUG [proxy_blob_store/Cancel]: Failed to abort multipart upload: %v\n", err) 549 + // Continue anyway - we still want to clean up 535 550 } 536 551 537 - // Wait for goroutine to finish 538 - <-w.uploadErr 539 - 540 - fmt.Printf("DEBUG [proxy_blob_store]: Cancelled upload: id=%s\n", w.id) 552 + fmt.Printf("DEBUG [proxy_blob_store/Cancel]: Cancelled upload: id=%s, uploadID=%s\n", w.id, w.uploadID) 541 553 return nil 542 554 } 543 555 544 556 // Close closes the writer 545 - // Just returns - streaming continues via pipe 557 + // Does nothing - actual completion happens in Commit() or Cancel() 546 558 func (w *ProxyBlobWriter) Close() error { 547 - // Don't close pipe here - that happens in Commit() or Cancel() 548 559 // Don't set w.closed = true - allow resuming for next PATCH 560 + return nil 561 + } 562 + 563 + // startMultipartUpload initiates a multipart upload via hold service 564 + func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, dgst digest.Digest) (string, error) { 565 + reqBody := map[string]any{ 566 + "did": p.did, 567 + "digest": dgst.String(), 568 + } 569 + body, _ := json.Marshal(reqBody) 570 + 571 + url := fmt.Sprintf("%s/start-multipart", p.storageEndpoint) 572 + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 573 + req.Header.Set("Content-Type", "application/json") 574 + 575 + resp, err := p.httpClient.Do(req) 576 + if err != nil { 577 + return "", err 578 + } 579 + defer resp.Body.Close() 580 + 581 + if resp.StatusCode != http.StatusOK { 582 + return "", fmt.Errorf("failed to start multipart upload: status %d", resp.StatusCode) 583 + } 584 + 585 + var result struct { 586 + UploadID string `json:"upload_id"` 587 + } 588 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 589 + return "", err 590 + } 591 + 592 + return result.UploadID, nil 593 + } 594 + 595 + // getPartPresignedURL gets a presigned URL for uploading a specific part 596 + func (p *ProxyBlobStore) getPartPresignedURL(ctx context.Context, dgst digest.Digest, uploadID string, partNumber int) (string, error) { 597 + reqBody := map[string]any{ 598 + "did": p.did, 599 + "digest": dgst.String(), 600 + "upload_id": uploadID, 601 + "part_number": partNumber, 602 + } 603 + body, _ := json.Marshal(reqBody) 604 + 605 + url := fmt.Sprintf("%s/part-presigned-url", p.storageEndpoint) 606 + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 607 + req.Header.Set("Content-Type", "application/json") 608 + 609 + resp, err := p.httpClient.Do(req) 610 + if err != nil { 611 + return "", err 612 + } 613 + defer resp.Body.Close() 614 + 615 + if resp.StatusCode != http.StatusOK { 616 + return "", fmt.Errorf("failed to get part presigned URL: status %d", resp.StatusCode) 617 + } 618 + 619 + var result struct { 620 + URL string `json:"url"` 621 + } 622 + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 623 + return "", err 624 + } 625 + 626 + return result.URL, nil 627 + } 628 + 629 + // completeMultipartUpload completes a multipart upload 630 + func (p *ProxyBlobStore) completeMultipartUpload(ctx context.Context, dgst digest.Digest, uploadID string, parts []CompletedPart) error { 631 + reqBody := map[string]any{ 632 + "did": p.did, 633 + "digest": dgst.String(), 634 + "upload_id": uploadID, 635 + "parts": parts, 636 + } 637 + body, _ := json.Marshal(reqBody) 638 + 639 + url := fmt.Sprintf("%s/complete-multipart", p.storageEndpoint) 640 + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 641 + req.Header.Set("Content-Type", "application/json") 642 + 643 + resp, err := p.httpClient.Do(req) 644 + if err != nil { 645 + return err 646 + } 647 + defer resp.Body.Close() 648 + 649 + if resp.StatusCode != http.StatusOK { 650 + bodyBytes, _ := io.ReadAll(resp.Body) 651 + return fmt.Errorf("complete multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 652 + } 653 + 654 + return nil 655 + } 656 + 657 + // abortMultipartUpload aborts a multipart upload 658 + func (p *ProxyBlobStore) abortMultipartUpload(ctx context.Context, dgst digest.Digest, uploadID string) error { 659 + reqBody := map[string]any{ 660 + "did": p.did, 661 + "digest": dgst.String(), 662 + "upload_id": uploadID, 663 + } 664 + body, _ := json.Marshal(reqBody) 665 + 666 + url := fmt.Sprintf("%s/abort-multipart", p.storageEndpoint) 667 + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 668 + req.Header.Set("Content-Type", "application/json") 669 + 670 + resp, err := p.httpClient.Do(req) 671 + if err != nil { 672 + return err 673 + } 674 + defer resp.Body.Close() 675 + 676 + if resp.StatusCode != http.StatusOK { 677 + bodyBytes, _ := io.ReadAll(resp.Body) 678 + return fmt.Errorf("abort multipart failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 679 + } 680 + 549 681 return nil 550 682 } 551 683