A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

combine s3 into multipart

+590 -701
+6 -9
cmd/hold/main.go
··· 54 54 log.Fatalf("Database path is required for embedded PDS authorization") 55 55 } 56 56 57 - // Create hold service with PDS 58 - service, err := hold.NewHoldService(cfg, holdPDS) 59 - if err != nil { 60 - log.Fatalf("Failed to create hold service: %v", err) 61 - } 62 - 63 57 // Create blob store adapter and XRPC handler 64 58 if holdPDS != nil { 65 - holdDID := holdPDS.DID() 66 - blobStore := hold.NewHoldServiceBlobStore(service, holdDID) 67 - xrpcHandler = pds.NewXRPCHandler(holdPDS, cfg.Server.PublicURL, blobStore, broadcaster, nil) 59 + // Create hold service with PDS 60 + service, err := hold.NewHoldService(cfg, holdPDS) 61 + if err != nil { 62 + log.Fatalf("Failed to create hold service: %v", err) 63 + } 64 + xrpcHandler = pds.NewXRPCHandler(holdPDS, cfg.Server.PublicURL, service, broadcaster, nil) 68 65 } 69 66 70 67 // Setup HTTP routes
-1
pkg/appview/storage/proxy_blob_store.go
··· 528 528 529 529 var result struct { 530 530 UploadID string `json:"uploadId"` 531 - Mode string `json:"mode"` 532 531 } 533 532 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 534 533 return "", err
-198
pkg/hold/blobstore_adapter.go
··· 1 - package hold 2 - 3 - import ( 4 - "bytes" 5 - "context" 6 - "crypto/sha256" 7 - "fmt" 8 - "io" 9 - 10 - "atcr.io/pkg/hold/pds" 11 - "github.com/ipfs/go-cid" 12 - "github.com/multiformats/go-multihash" 13 - ) 14 - 15 - // HoldServiceBlobStore adapts the hold service to implement the pds.BlobStore interface 16 - type HoldServiceBlobStore struct { 17 - service *HoldService 18 - holdDID string 19 - } 20 - 21 - // NewHoldServiceBlobStore creates a blob store adapter for the hold service 22 - func NewHoldServiceBlobStore(service *HoldService, holdDID string) pds.BlobStore { 23 - return &HoldServiceBlobStore{ 24 - service: service, 25 - holdDID: holdDID, 26 - } 27 - } 28 - 29 - // GetPresignedURL returns a presigned URL for the specified operation (GET, HEAD, or PUT) 30 - func (b *HoldServiceBlobStore) GetPresignedURL(operation string, digest, did string) (string, error) { 31 - // Use provided DID if given, otherwise fall back to hold's DID 32 - // ATProto blobs require DID for per-user storage 33 - // OCI blobs (sha256:...) use content-addressed storage 34 - if did == "" { 35 - did = b.holdDID 36 - } 37 - 38 - ctx := context.Background() 39 - // Cast operation string to PresignedURLOperation type 40 - url, err := b.service.GetPresignedURL(ctx, PresignedURLOperation(operation), digest, did) 41 - if err != nil { 42 - return "", err 43 - } 44 - return url, nil 45 - } 46 - 47 - // StartMultipartUpload initiates a multipart upload 48 - func (b *HoldServiceBlobStore) StartMultipartUpload(ctx context.Context, digest string) (string, string, error) { 49 - uploadID, mode, err := b.service.StartMultipartUploadWithManager(ctx, digest, b.service.MultipartMgr) 50 - if err != nil { 51 - return "", "", err 52 - } 53 - 54 - // Convert mode to string for XRPC response 55 - var modeStr string 56 - switch mode { 57 - case S3Native: 58 - modeStr = "s3native" 59 - case Buffered: 60 - modeStr = "buffered" 61 - default: 62 - modeStr = "unknown" 63 - } 64 - 65 - return uploadID, modeStr, nil 66 - } 67 - 68 - // GetPartUploadURL returns structured upload info for uploading a specific part 69 - func (b *HoldServiceBlobStore) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (*pds.PartUploadInfo, error) { 70 - session, err := b.service.MultipartMgr.GetSession(uploadID) 71 - if err != nil { 72 - return nil, err 73 - } 74 - 75 - // For S3Native mode: return presigned URL 76 - if session.Mode == S3Native { 77 - url, err := b.service.GetPartUploadURL(ctx, session, partNumber, did) 78 - if err != nil { 79 - return nil, err 80 - } 81 - return &pds.PartUploadInfo{ 82 - URL: url, 83 - Method: "PUT", 84 - }, nil 85 - } 86 - 87 - // Buffered mode: return XRPC endpoint with headers 88 - return &pds.PartUploadInfo{ 89 - URL: fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", b.service.config.Server.PublicURL), 90 - Method: "PUT", 91 - Headers: map[string]string{ 92 - "X-Upload-Id": uploadID, 93 - "X-Part-Number": fmt.Sprintf("%d", partNumber), 94 - }, 95 - }, nil 96 - } 97 - 98 - // CompleteMultipartUpload finalizes a multipart upload and moves to final digest location 99 - // finalDigest is the real digest (e.g., "sha256:abc123...") for the final storage location 100 - func (b *HoldServiceBlobStore) CompleteMultipartUpload(ctx context.Context, uploadID string, finalDigest string, parts []pds.PartInfo) error { 101 - session, err := b.service.MultipartMgr.GetSession(uploadID) 102 - if err != nil { 103 - return err 104 - } 105 - 106 - // For S3Native mode, record parts from XRPC request (they have ETags from S3) 107 - if session.Mode == S3Native { 108 - for _, p := range parts { 109 - session.RecordS3Part(p.PartNumber, p.ETag, 0) 110 - } 111 - } 112 - 113 - return b.service.CompleteMultipartUploadWithManager(ctx, session, b.service.MultipartMgr, finalDigest) 114 - } 115 - 116 - // AbortMultipartUpload cancels a multipart upload 117 - func (b *HoldServiceBlobStore) AbortMultipartUpload(ctx context.Context, uploadID string) error { 118 - session, err := b.service.MultipartMgr.GetSession(uploadID) 119 - if err != nil { 120 - return err 121 - } 122 - 123 - return b.service.AbortMultipartUploadWithManager(ctx, session, b.service.MultipartMgr) 124 - } 125 - 126 - // HandleBufferedPartUpload handles uploading a part in buffered mode 127 - func (b *HoldServiceBlobStore) HandleBufferedPartUpload(ctx context.Context, uploadID string, partNumber int, data []byte) (string, error) { 128 - session, err := b.service.MultipartMgr.GetSession(uploadID) 129 - if err != nil { 130 - return "", err 131 - } 132 - 133 - if session.Mode != Buffered { 134 - return "", fmt.Errorf("session is not in buffered mode") 135 - } 136 - 137 - etag := session.StorePart(partNumber, data) 138 - return etag, nil 139 - } 140 - 141 - // UploadBlob receives raw blob bytes, computes CID, and stores via distribution driver 142 - // This is used for standard ATProto blob uploads (profile pics, small media) 143 - func (b *HoldServiceBlobStore) UploadBlob(ctx context.Context, did string, data io.Reader) (cid.Cid, int64, error) { 144 - // Use provided DID if given, otherwise fall back to hold's DID 145 - if did == "" { 146 - did = b.holdDID 147 - } 148 - 149 - // Read all data into memory to compute CID 150 - // For large files, this should use multipart upload instead 151 - blobData, err := io.ReadAll(data) 152 - if err != nil { 153 - return cid.Undef, 0, fmt.Errorf("failed to read blob data: %w", err) 154 - } 155 - 156 - size := int64(len(blobData)) 157 - 158 - // Compute SHA-256 hash 159 - hash := sha256.Sum256(blobData) 160 - 161 - // Create CIDv1 with SHA-256 multihash 162 - mh, err := multihash.EncodeName(hash[:], "sha2-256") 163 - if err != nil { 164 - return cid.Undef, 0, fmt.Errorf("failed to encode multihash: %w", err) 165 - } 166 - 167 - // Create CIDv1 with raw codec (0x55) 168 - // ATProto uses CIDv1 with raw codec for blobs 169 - blobCID := cid.NewCidV1(0x55, mh) 170 - 171 - // Store blob via distribution driver at ATProto path 172 - // Path: /repos/{did}/blobs/{cid}/data 173 - path := atprotoBlobPath(did, blobCID.String()) 174 - 175 - // Write blob to storage using distribution driver 176 - writer, err := b.service.driver.Writer(ctx, path, false) 177 - if err != nil { 178 - return cid.Undef, 0, fmt.Errorf("failed to create writer: %w", err) 179 - } 180 - 181 - // Write data 182 - n, err := io.Copy(writer, bytes.NewReader(blobData)) 183 - if err != nil { 184 - writer.Cancel(ctx) 185 - return cid.Undef, 0, fmt.Errorf("failed to write blob: %w", err) 186 - } 187 - 188 - // Commit the write 189 - if err := writer.Commit(ctx); err != nil { 190 - return cid.Undef, 0, fmt.Errorf("failed to commit blob: %w", err) 191 - } 192 - 193 - if n != size { 194 - return cid.Undef, 0, fmt.Errorf("size mismatch: wrote %d bytes, expected %d", n, size) 195 - } 196 - 197 - return blobCID, size, nil 198 - }
+252 -62
pkg/hold/multipart.go
··· 4 4 "context" 5 5 "crypto/sha256" 6 6 "encoding/hex" 7 + "encoding/json" 7 8 "fmt" 8 9 "log" 10 + "net/http" 11 + "sort" 12 + "strings" 9 13 "sync" 10 14 "time" 11 15 16 + "github.com/aws/aws-sdk-go/service/s3" 12 17 "github.com/google/uuid" 13 18 ) 14 19 ··· 23 28 ) 24 29 25 30 // CompletedPart represents an uploaded part with its ETag 26 - type CompletedPart struct { 31 + type PartInfo struct { 27 32 PartNumber int `json:"part_number"` 28 33 ETag string `json:"etag"` 34 + } 35 + 36 + // PartUploadInfo contains structured information for uploading a part 37 + // Used for both S3 presigned URLs and buffered mode with headers 38 + type PartUploadInfo struct { 39 + URL string `json:"url"` // URL to PUT the part to 40 + Method string `json:"method,omitempty"` // HTTP method (usually "PUT") 41 + Headers map[string]string `json:"headers,omitempty"` // Additional headers required for the request 29 42 } 30 43 31 44 // MultipartSession tracks an in-progress multipart upload ··· 159 172 return etag 160 173 } 161 174 162 - // RecordS3Part records a part uploaded to S3 (for S3Native mode) 163 - func (s *MultipartSession) RecordS3Part(partNumber int, etag string, size int64) { 164 - s.mu.Lock() 165 - defer s.mu.Unlock() 166 - 167 - part := &MultipartPart{ 168 - PartNumber: partNumber, 169 - ETag: etag, 170 - Size: size, 171 - UploadedAt: time.Now(), 172 - } 173 - 174 - s.Parts[partNumber] = part 175 - s.LastActivity = time.Now() 176 - 177 - log.Printf("Recorded S3 part: uploadID=%s, part=%d, size=%d bytes, etag=%s", s.UploadID, partNumber, size, etag) 178 - } 179 - 180 175 // AssembleBufferedParts assembles all buffered parts into a single blob 181 176 // Returns the complete data and total size 182 177 func (s *MultipartSession) AssembleBufferedParts() ([]byte, int64, error) { ··· 215 210 return assembled, totalSize, nil 216 211 } 217 212 218 - // GetCompletedParts returns the list of completed parts for S3 multipart completion 219 - func (s *MultipartSession) GetCompletedParts() []CompletedPart { 220 - s.mu.RLock() 221 - defer s.mu.RUnlock() 222 - 223 - parts := make([]CompletedPart, 0, len(s.Parts)) 224 - for _, part := range s.Parts { 225 - parts = append(parts, CompletedPart{ 226 - PartNumber: part.PartNumber, 227 - ETag: part.ETag, 228 - }) 229 - } 230 - 231 - return parts 232 - } 233 - 234 213 // StartMultipartUploadWithManager initiates a multipart upload using the manager 235 214 // Returns uploadID and mode 236 - func (s *HoldService) StartMultipartUploadWithManager(ctx context.Context, digest string, manager *MultipartManager) (string, MultipartMode, error) { 215 + func (s *HoldService) StartMultipartUploadWithManager(ctx context.Context, digest string) (string, MultipartMode, error) { 237 216 // Check if presigned URLs are disabled for testing 238 217 if s.config.Server.DisablePresignedURLs { 239 218 log.Printf("Presigned URLs disabled (DISABLE_PRESIGNED_URLS=true), using buffered mode") 240 - session := manager.CreateSession(digest, Buffered, "") 219 + session := s.MultipartMgr.CreateSession(digest, Buffered, "") 241 220 log.Printf("Started buffered multipart: uploadID=%s", session.UploadID) 242 221 return session.UploadID, Buffered, nil 243 222 } 244 223 245 224 // Try S3 native multipart first 246 225 if s.s3Client != nil { 247 - s3UploadID, err := s.startMultipartUpload(ctx, digest) 226 + if s.s3Client == nil { 227 + return "", S3Native, fmt.Errorf("S3 not configured") 228 + } 229 + path := blobPath(digest) 230 + s3Key := strings.TrimPrefix(path, "/") 231 + if s.s3PathPrefix != "" { 232 + s3Key = s.s3PathPrefix + "/" + s3Key 233 + } 234 + 235 + result, err := s.s3Client.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ 236 + Bucket: &s.bucket, 237 + Key: &s3Key, 238 + }) 248 239 if err == nil { 240 + s3UploadID := *result.UploadId 249 241 // S3 native multipart succeeded 250 - session := manager.CreateSession(digest, S3Native, s3UploadID) 251 - log.Printf("Started S3 native multipart: uploadID=%s, s3UploadID=%s", session.UploadID, s3UploadID) 242 + session := s.MultipartMgr.CreateSession(digest, S3Native, s3UploadID) 243 + log.Printf("Started S3 native multipart: digest=%s, uploadID=%s, s3UploadID=%s", digest, session.UploadID, s3UploadID) 252 244 return session.UploadID, S3Native, nil 253 245 } 254 246 log.Printf("S3 native multipart failed, falling back to buffered mode: %v", err) 255 247 } 256 248 257 249 // Fallback to buffered mode 258 - session := manager.CreateSession(digest, Buffered, "") 250 + session := s.MultipartMgr.CreateSession(digest, Buffered, "") 259 251 log.Printf("Started buffered multipart: uploadID=%s", session.UploadID) 260 252 return session.UploadID, Buffered, nil 261 253 } 262 254 263 255 // GetPartUploadURL generates a presigned URL for uploading a part 264 256 // Only used for S3Native mode - Buffered mode is handled by blobstore adapter 265 - func (s *HoldService) GetPartUploadURL(ctx context.Context, session *MultipartSession, partNumber int, did string) (string, error) { 266 - if session.Mode != S3Native { 267 - return "", fmt.Errorf("GetPartUploadURL only supports S3Native mode") 257 + func (s *HoldService) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (*PartUploadInfo, error) { 258 + session, err := s.MultipartMgr.GetSession(uploadID) 259 + if err != nil { 260 + return nil, err 268 261 } 269 262 270 - // Generate S3 presigned URL for this part 271 - url, err := s.getPartPresignedURL(ctx, session.Digest, session.S3UploadID, partNumber) 272 - if err != nil { 273 - return "", fmt.Errorf("failed to generate S3 part URL: %w", err) 263 + // For S3Native mode: return presigned URL 264 + if session.Mode == S3Native { 265 + if s.s3Client == nil { 266 + return nil, fmt.Errorf("S3 not configured") 267 + } 268 + 269 + path := blobPath(session.Digest) 270 + s3Key := strings.TrimPrefix(path, "/") 271 + if s.s3PathPrefix != "" { 272 + s3Key = s.s3PathPrefix + "/" + s3Key 273 + } 274 + pnum := int64(partNumber) 275 + req, _ := s.s3Client.UploadPartRequest(&s3.UploadPartInput{ 276 + Bucket: &s.bucket, 277 + Key: &s3Key, 278 + UploadId: &uploadID, 279 + PartNumber: &pnum, 280 + }) 281 + 282 + url, err := req.Presign(15 * time.Minute) 283 + if err != nil { 284 + return nil, err 285 + } 286 + 287 + log.Printf("Generated part presigned URL: digest=%s, uploadID=%s, part=%d", session.Digest, uploadID, partNumber) 288 + 289 + return &PartUploadInfo{ 290 + URL: url, 291 + Method: "PUT", 292 + }, nil 274 293 } 275 - return url, nil 294 + 295 + // Buffered mode: return XRPC endpoint with headers 296 + return &PartUploadInfo{ 297 + URL: fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", s.config.Server.PublicURL), 298 + Method: "PUT", 299 + Headers: map[string]string{ 300 + "X-Upload-Id": uploadID, 301 + "X-Part-Number": fmt.Sprintf("%d", partNumber), 302 + }, 303 + }, nil 276 304 } 277 305 278 306 // CompleteMultipartUploadWithManager completes a multipart upload and moves to final location 279 307 // finalDigest is the real digest (e.g., "sha256:abc123...") for the final storage location 280 308 // session.Digest is the temp location (e.g., "uploads/temp-<uuid>") 281 - func (s *HoldService) CompleteMultipartUploadWithManager(ctx context.Context, session *MultipartSession, manager *MultipartManager, finalDigest string) error { 282 - defer manager.DeleteSession(session.UploadID) 309 + func (s *HoldService) CompleteMultipartUploadWithManager(ctx context.Context, uploadID string, finalDigest string, parts []PartInfo) error { 310 + session, err := s.MultipartMgr.GetSession(uploadID) 311 + defer s.MultipartMgr.DeleteSession(uploadID) 312 + if err != nil { 313 + return err 314 + } 283 315 284 316 if session.Mode == S3Native { 285 - // Complete S3 multipart upload at temp location 286 - parts := session.GetCompletedParts() 287 - if err := s.completeMultipartUpload(ctx, session.Digest, session.S3UploadID, parts); err != nil { 288 - return fmt.Errorf("failed to complete S3 multipart: %w", err) 317 + if s.s3Client == nil { 318 + return fmt.Errorf("S3 not configured") 289 319 } 290 - log.Printf("Completed S3 native multipart at temp location: uploadID=%s, parts=%d", session.UploadID, len(parts)) 291 320 292 - // Verify the blob exists at temp location before moving 321 + // Sort parts by part number (S3 requires ascending order) 322 + sort.Slice(parts, func(i, j int) bool { 323 + return parts[i].PartNumber < parts[j].PartNumber 324 + }) 325 + 326 + // Convert to S3 CompletedPart format 327 + // IMPORTANT: S3 requires ETags to be quoted in the CompleteMultipartUpload XML 328 + s3Parts := make([]*s3.CompletedPart, len(parts)) 329 + for i, p := range parts { 330 + etag := normalizeETag(p.ETag) 331 + pnum := int64(p.PartNumber) 332 + s3Parts[i] = &s3.CompletedPart{ 333 + PartNumber: &pnum, 334 + ETag: &etag, 335 + } 336 + } 293 337 sourcePath := blobPath(session.Digest) 338 + s3Key := strings.TrimPrefix(sourcePath, "/") 339 + if s.s3PathPrefix != "" { 340 + s3Key = s.s3PathPrefix + "/" + s3Key 341 + } 342 + 343 + _, err = s.s3Client.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ 344 + Bucket: &s.bucket, 345 + Key: &s3Key, 346 + UploadId: &uploadID, 347 + MultipartUpload: &s3.CompletedMultipartUpload{ 348 + Parts: s3Parts, 349 + }, 350 + }) 351 + if err != nil { 352 + return fmt.Errorf("failed to complete multipart upload: digest=%s, uploadID=%s, err=%v", session.Digest, uploadID, err) 353 + } 354 + log.Printf("Completed S3 native multipart at temp location: digest=%s, uploadID=%s, parts=%d", session.Digest, session.UploadID, len(s3Parts)) 355 + 356 + // Verify the blob exists at temp location before moving 294 357 destPath := blobPath(finalDigest) 295 358 log.Printf("[DEBUG] About to move: source=%s, dest=%s", sourcePath, destPath) 296 359 ··· 339 402 } 340 403 341 404 // AbortMultipartUploadWithManager aborts a multipart upload 342 - func (s *HoldService) AbortMultipartUploadWithManager(ctx context.Context, session *MultipartSession, manager *MultipartManager) error { 343 - defer manager.DeleteSession(session.UploadID) 405 + func (s *HoldService) AbortMultipartUploadWithManager(ctx context.Context, uploadID string) error { 406 + session, err := s.MultipartMgr.GetSession(uploadID) 407 + defer s.MultipartMgr.DeleteSession(uploadID) 408 + if err != nil { 409 + return err 410 + } 344 411 345 412 if session.Mode == S3Native { 413 + if s.s3Client == nil { 414 + return fmt.Errorf("S3 not configured") 415 + } 416 + path := blobPath(session.Digest) 417 + s3Key := strings.TrimPrefix(path, "/") 418 + if s.s3PathPrefix != "" { 419 + s3Key = s.s3PathPrefix + "/" + s3Key 420 + } 421 + 422 + _, err := s.s3Client.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ 423 + Bucket: &s.bucket, 424 + Key: &s3Key, 425 + UploadId: &uploadID, 426 + }) 346 427 // Abort S3 multipart upload 347 - if err := s.abortMultipartUpload(ctx, session.Digest, session.S3UploadID); err != nil { 348 - return fmt.Errorf("failed to abort S3 multipart: %w", err) 428 + if err != nil { 429 + return fmt.Errorf("failed to abort multipart upload: digest=%s, uploadID=%s, err=%v", session.Digest, uploadID, err) 349 430 } 350 - log.Printf("Aborted S3 native multipart: uploadID=%s", session.UploadID) 431 + log.Printf("Aborted S3 native multipart: digest=%s, uploadID=%s", session.Digest, session.UploadID) 351 432 return nil 352 433 } 353 434 ··· 355 436 log.Printf("Aborted buffered multipart: uploadID=%s", session.UploadID) 356 437 return nil 357 438 } 439 + 440 + // handleMultipartOperation handles multipart upload operations via JSON request 441 + func (s *HoldService) HandleMultipartOperation(w http.ResponseWriter, r *http.Request, did string) { 442 + ctx := r.Context() 443 + 444 + // Parse JSON body 445 + var req struct { 446 + Action string `json:"action"` 447 + Digest string `json:"digest,omitempty"` 448 + UploadID string `json:"uploadId,omitempty"` 449 + PartNumber int `json:"partNumber,omitempty"` 450 + Parts []PartInfo `json:"parts,omitempty"` 451 + } 452 + 453 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 454 + http.Error(w, fmt.Sprintf("invalid JSON body: %v", err), http.StatusBadRequest) 455 + return 456 + } 457 + 458 + // Route based on action 459 + switch req.Action { 460 + case "start": 461 + // Start multipart upload 462 + if req.Digest == "" { 463 + http.Error(w, "digest required for start action", http.StatusBadRequest) 464 + return 465 + } 466 + 467 + uploadID, _, err := s.StartMultipartUploadWithManager(ctx, req.Digest) 468 + if err != nil { 469 + http.Error(w, fmt.Sprintf("failed to start multipart upload: %v", err), http.StatusInternalServerError) 470 + return 471 + } 472 + 473 + w.Header().Set("Content-Type", "application/json") 474 + json.NewEncoder(w).Encode(map[string]any{ 475 + "uploadId": uploadID, 476 + }) 477 + 478 + case "part": 479 + // Get part upload URL 480 + if req.UploadID == "" || req.PartNumber == 0 { 481 + http.Error(w, "uploadId and partNumber required for part action", http.StatusBadRequest) 482 + return 483 + } 484 + 485 + uploadInfo, err := s.GetPartUploadURL(ctx, req.UploadID, req.PartNumber, did) 486 + if err != nil { 487 + http.Error(w, fmt.Sprintf("failed to get part URL: %v", err), http.StatusInternalServerError) 488 + return 489 + } 490 + 491 + w.Header().Set("Content-Type", "application/json") 492 + json.NewEncoder(w).Encode(uploadInfo) 493 + 494 + case "complete": 495 + // Complete multipart upload 496 + if req.UploadID == "" || len(req.Parts) == 0 { 497 + http.Error(w, "uploadId and parts required for complete action", http.StatusBadRequest) 498 + return 499 + } 500 + if req.Digest == "" { 501 + http.Error(w, "digest required for complete action", http.StatusBadRequest) 502 + return 503 + } 504 + 505 + // Pass the real digest so hold can move temp → final location 506 + if err := s.CompleteMultipartUploadWithManager(ctx, req.UploadID, req.Digest, req.Parts); err != nil { 507 + http.Error(w, fmt.Sprintf("failed to complete multipart upload: %v", err), http.StatusInternalServerError) 508 + return 509 + } 510 + 511 + w.Header().Set("Content-Type", "application/json") 512 + json.NewEncoder(w).Encode(map[string]any{ 513 + "status": "completed", 514 + }) 515 + 516 + case "abort": 517 + // Abort multipart upload 518 + if req.UploadID == "" { 519 + http.Error(w, "uploadId required for abort action", http.StatusBadRequest) 520 + return 521 + } 522 + 523 + if err := s.AbortMultipartUploadWithManager(ctx, req.UploadID); err != nil { 524 + http.Error(w, fmt.Sprintf("failed to abort multipart upload: %v", err), http.StatusInternalServerError) 525 + return 526 + } 527 + 528 + w.Header().Set("Content-Type", "application/json") 529 + json.NewEncoder(w).Encode(map[string]any{ 530 + "status": "aborted", 531 + }) 532 + 533 + default: 534 + http.Error(w, fmt.Sprintf("unknown action: %s", req.Action), http.StatusBadRequest) 535 + } 536 + } 537 + 538 + // normalizeETag ensures an ETag has quotes (required by S3 CompleteMultipartUpload) 539 + // S3 returns ETags with quotes, but HTTP clients may strip them 540 + func normalizeETag(etag string) string { 541 + // Already has quotes 542 + if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") { 543 + return etag 544 + } 545 + // Add quotes 546 + return fmt.Sprintf("\"%s\"", etag) 547 + }
+15 -119
pkg/hold/pds/xrpc.go
··· 25 25 type XRPCHandler struct { 26 26 pds *HoldPDS 27 27 publicURL string 28 - blobStore BlobStore 28 + holdService XRPCHoldService 29 29 broadcaster *EventBroadcaster 30 30 httpClient HTTPClient // For testing - allows injecting mock HTTP client 31 31 } 32 32 33 - // BlobStore interface wraps the existing hold service storage operations 34 - type BlobStore interface { 33 + // interface wraps the existing hold service storage operations 34 + type XRPCHoldService interface { 35 35 // GetPresignedURL returns a presigned URL for the specified operation 36 36 // For ATProto blobs (CID), did is required for per-DID storage 37 37 // For OCI blobs (sha256:...), did may be empty 38 38 // operation can be "GET", "HEAD", or "PUT" 39 - GetPresignedURL(operation string, digest, did string) (string, error) 39 + GetPresignedURL(ctx context.Context, operation string, digest string, did string) (string, error) 40 40 41 41 // UploadBlob receives raw blob bytes, computes CID, and stores via distribution driver 42 42 // Used for standard ATProto blob uploads (profile pics, small media) 43 43 // Returns CID and size of stored blob 44 44 UploadBlob(ctx context.Context, did string, data io.Reader) (cid cid.Cid, size int64, err error) 45 45 46 - // Multipart upload operations (used for OCI container layers only) 47 - // StartMultipartUpload initiates a multipart upload, returns uploadID and mode 48 - StartMultipartUpload(ctx context.Context, digest string) (uploadID string, mode string, err error) 49 - // GetPartUploadURL returns structured upload info (URL + optional headers) for a specific part 50 - GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (*PartUploadInfo, error) 51 - // CompleteMultipartUpload finalizes a multipart upload and moves to final digest location 52 - // finalDigest is the real digest (e.g., "sha256:abc123...") for the final storage location 53 - CompleteMultipartUpload(ctx context.Context, uploadID string, finalDigest string, parts []PartInfo) error 54 - // AbortMultipartUpload cancels a multipart upload 55 - AbortMultipartUpload(ctx context.Context, uploadID string) error 56 - // HandleBufferedPartUpload handles uploading a part in buffered mode 46 + // handles multipart upload operations via JSON request 47 + HandleMultipartOperation(w http.ResponseWriter, r *http.Request, did string) 48 + 49 + // handles uploading a part in buffered mode 57 50 HandleBufferedPartUpload(ctx context.Context, uploadID string, partNumber int, data []byte) (etag string, err error) 58 51 } 59 52 ··· 72 65 } 73 66 74 67 // NewXRPCHandler creates a new XRPC handler 75 - func NewXRPCHandler(pds *HoldPDS, publicURL string, blobStore BlobStore, broadcaster *EventBroadcaster, httpClient HTTPClient) *XRPCHandler { 68 + func NewXRPCHandler(pds *HoldPDS, publicURL string, holdService XRPCHoldService, broadcaster *EventBroadcaster, httpClient HTTPClient) *XRPCHandler { 76 69 return &XRPCHandler{ 77 70 pds: pds, 78 71 publicURL: publicURL, 79 - blobStore: blobStore, 72 + holdService: holdService, 80 73 broadcaster: broadcaster, 81 74 httpClient: httpClient, 82 75 } ··· 751 744 // Use authenticated user's DID for ATProto blob storage (per-DID paths) 752 745 did := user.DID 753 746 754 - // Upload blob directly - blobStore will compute CID and store 755 - blobCID, size, err := h.blobStore.UploadBlob(r.Context(), did, r.Body) 747 + // Upload blob directly - holdService will compute CID and store 748 + blobCID, size, err := h.holdService.UploadBlob(r.Context(), did, r.Body) 756 749 if err != nil { 757 750 http.Error(w, fmt.Sprintf("failed to upload blob: %v", err), http.StatusInternalServerError) 758 751 return ··· 801 794 } 802 795 803 796 // Store part via blob store 804 - etag, err := h.blobStore.HandleBufferedPartUpload(ctx, uploadID, partNumber, data) 797 + etag, err := h.holdService.HandleBufferedPartUpload(ctx, uploadID, partNumber, data) 805 798 if err != nil { 806 799 http.Error(w, fmt.Sprintf("failed to upload part: %v", err), http.StatusInternalServerError) 807 800 return ··· 816 809 817 810 // handleMultipartOperation handles multipart upload operations via JSON request 818 811 func (h *XRPCHandler) handleMultipartOperation(w http.ResponseWriter, r *http.Request) { 819 - ctx := r.Context() 820 - 821 - // Parse JSON body 822 - var req struct { 823 - Action string `json:"action"` 824 - Digest string `json:"digest,omitempty"` 825 - UploadID string `json:"uploadId,omitempty"` 826 - PartNumber int `json:"partNumber,omitempty"` 827 - Parts []PartInfo `json:"parts,omitempty"` 828 - } 829 - 830 - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 831 - http.Error(w, fmt.Sprintf("invalid JSON body: %v", err), http.StatusBadRequest) 832 - return 833 - } 834 - 835 812 // Validate blob write access for all multipart operations 836 813 // This checks DPoP + OAuth tokens and verifies user is captain or crew with blob:write permission 837 814 user, err := ValidateBlobWriteAccess(r, h.pds, h.httpClient) ··· 840 817 return 841 818 } 842 819 843 - // Route based on action 844 - switch req.Action { 845 - case "start": 846 - // Start multipart upload 847 - if req.Digest == "" { 848 - http.Error(w, "digest required for start action", http.StatusBadRequest) 849 - return 850 - } 851 - 852 - uploadID, mode, err := h.blobStore.StartMultipartUpload(ctx, req.Digest) 853 - if err != nil { 854 - http.Error(w, fmt.Sprintf("failed to start multipart upload: %v", err), http.StatusInternalServerError) 855 - return 856 - } 857 - 858 - w.Header().Set("Content-Type", "application/json") 859 - json.NewEncoder(w).Encode(map[string]any{ 860 - "uploadId": uploadID, 861 - "mode": mode, 862 - }) 863 - 864 - case "part": 865 - // Get part upload URL 866 - if req.UploadID == "" || req.PartNumber == 0 { 867 - http.Error(w, "uploadId and partNumber required for part action", http.StatusBadRequest) 868 - return 869 - } 870 - 871 - uploadInfo, err := h.blobStore.GetPartUploadURL(ctx, req.UploadID, req.PartNumber, user.DID) 872 - if err != nil { 873 - http.Error(w, fmt.Sprintf("failed to get part URL: %v", err), http.StatusInternalServerError) 874 - return 875 - } 876 - 877 - w.Header().Set("Content-Type", "application/json") 878 - json.NewEncoder(w).Encode(uploadInfo) 879 - 880 - case "complete": 881 - // Complete multipart upload 882 - if req.UploadID == "" || len(req.Parts) == 0 { 883 - http.Error(w, "uploadId and parts required for complete action", http.StatusBadRequest) 884 - return 885 - } 886 - if req.Digest == "" { 887 - http.Error(w, "digest required for complete action", http.StatusBadRequest) 888 - return 889 - } 890 - 891 - // Pass the real digest so hold can move temp → final location 892 - if err := h.blobStore.CompleteMultipartUpload(ctx, req.UploadID, req.Digest, req.Parts); err != nil { 893 - http.Error(w, fmt.Sprintf("failed to complete multipart upload: %v", err), http.StatusInternalServerError) 894 - return 895 - } 896 - 897 - w.Header().Set("Content-Type", "application/json") 898 - json.NewEncoder(w).Encode(map[string]any{ 899 - "status": "completed", 900 - }) 901 - 902 - case "abort": 903 - // Abort multipart upload 904 - if req.UploadID == "" { 905 - http.Error(w, "uploadId required for abort action", http.StatusBadRequest) 906 - return 907 - } 908 - 909 - if err := h.blobStore.AbortMultipartUpload(ctx, req.UploadID); err != nil { 910 - http.Error(w, fmt.Sprintf("failed to abort multipart upload: %v", err), http.StatusInternalServerError) 911 - return 912 - } 913 - 914 - w.Header().Set("Content-Type", "application/json") 915 - json.NewEncoder(w).Encode(map[string]any{ 916 - "status": "aborted", 917 - }) 918 - 919 - default: 920 - http.Error(w, fmt.Sprintf("unknown action: %s", req.Action), http.StatusBadRequest) 921 - } 820 + h.holdService.HandleMultipartOperation(w, r, user.DID) 922 821 } 923 822 924 823 // HandleGetBlob wraps existing presigned download URL logic ··· 984 883 operation := r.URL.Query().Get("method") 985 884 if operation == "" { 986 885 operation = "GET" 987 - if r.Method == http.MethodHead { 988 - operation = "HEAD" 989 - } 990 886 } 991 887 992 888 // Generate presigned URL for the operation 993 - presignedURL, err := h.blobStore.GetPresignedURL(operation, digest, did) 889 + presignedURL, err := h.holdService.GetPresignedURL(r.Context(), operation, digest, did) 994 890 if err != nil { 995 891 log.Printf("[HandleGetBlob] Failed to get presigned %s URL: digest=%s, did=%s, err=%v", operation, digest, did, err) 996 892 http.Error(w, "failed to get presigned URL", http.StatusInternalServerError)
+12 -12
pkg/hold/pds/xrpc_multipart_test.go
··· 31 31 // TestHandleUploadBlob_MultipartStart tests multipart upload start operation 32 32 // Non-standard ATCR extension for large blob uploads 33 33 func TestHandleUploadBlob_MultipartStart(t *testing.T) { 34 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 34 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 35 35 36 36 digest := "sha256:largefile123" 37 37 body := map[string]string{ ··· 62 62 } 63 63 64 64 // Verify blob store was called 65 - if len(blobStore.startCalls) != 1 || blobStore.startCalls[0] != digest { 65 + if len(holdService.startCalls) != 1 || holdService.startCalls[0] != digest { 66 66 t.Errorf("Expected StartMultipartUpload to be called with %s", digest) 67 67 } 68 68 } ··· 91 91 // TestHandleUploadBlob_MultipartPart tests getting presigned URL for a part 92 92 // Non-standard ATCR extension for multipart uploads 93 93 func TestHandleUploadBlob_MultipartPart(t *testing.T) { 94 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 94 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 95 95 96 96 uploadID := "test-upload-123" 97 97 partNumber := 1 ··· 121 121 } 122 122 123 123 // Verify blob store was called with authenticated user's DID 124 - if len(blobStore.partURLCalls) != 1 { 124 + if len(holdService.partURLCalls) != 1 { 125 125 t.Fatalf("Expected GetPartUploadURL to be called once") 126 126 } 127 - call := blobStore.partURLCalls[0] 127 + call := holdService.partURLCalls[0] 128 128 if call.uploadID != uploadID || call.partNumber != partNumber || call.did != expectedDID { 129 129 t.Errorf("Expected GetPartUploadURL(%s, %d, %s), got (%s, %d, %s)", 130 130 uploadID, partNumber, expectedDID, call.uploadID, call.partNumber, call.did) ··· 183 183 // TestHandleUploadBlob_MultipartComplete tests completing a multipart upload 184 184 // Non-standard ATCR extension for multipart uploads 185 185 func TestHandleUploadBlob_MultipartComplete(t *testing.T) { 186 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 186 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 187 187 188 188 uploadID := "test-upload-123" 189 189 parts := []PartInfo{ ··· 216 216 } 217 217 218 218 // Verify blob store was called 219 - if len(blobStore.completeCalls) != 1 || blobStore.completeCalls[0] != uploadID { 219 + if len(holdService.completeCalls) != 1 || holdService.completeCalls[0] != uploadID { 220 220 t.Errorf("Expected CompleteMultipartUpload to be called with %s", uploadID) 221 221 } 222 222 } ··· 284 284 // TestHandleUploadBlob_MultipartAbort tests aborting a multipart upload 285 285 // Non-standard ATCR extension for multipart uploads 286 286 func TestHandleUploadBlob_MultipartAbort(t *testing.T) { 287 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 287 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 288 288 289 289 uploadID := "test-upload-123" 290 290 ··· 311 311 } 312 312 313 313 // Verify blob store was called 314 - if len(blobStore.abortCalls) != 1 || blobStore.abortCalls[0] != uploadID { 314 + if len(holdService.abortCalls) != 1 || holdService.abortCalls[0] != uploadID { 315 315 t.Errorf("Expected AbortMultipartUpload to be called with %s", uploadID) 316 316 } 317 317 } ··· 340 340 // TestHandleUploadBlob_BufferedPartUpload tests uploading a part in buffered mode 341 341 // Non-standard ATCR extension for multipart uploads without S3 presigned URLs 342 342 func TestHandleUploadBlob_BufferedPartUpload(t *testing.T) { 343 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 343 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 344 344 345 345 uploadID := "test-upload-123" 346 346 partNumber := "1" ··· 366 366 } 367 367 368 368 // Verify blob store was called 369 - if len(blobStore.partUploadCalls) != 1 { 369 + if len(holdService.partUploadCalls) != 1 { 370 370 t.Fatalf("Expected HandleBufferedPartUpload to be called once") 371 371 } 372 - call := blobStore.partUploadCalls[0] 372 + call := holdService.partUploadCalls[0] 373 373 if call.uploadID != uploadID || call.partNumber != 1 || call.dataSize != len(data) { 374 374 t.Errorf("Expected HandleBufferedPartUpload(%s, 1, %d bytes), got (%s, %d, %d bytes)", 375 375 uploadID, len(data), call.uploadID, call.partNumber, call.dataSize)
+143 -39
pkg/hold/pds/xrpc_test.go
··· 1330 1330 } 1331 1331 } 1332 1332 1333 - // Mock BlobStore for testing blob endpoints 1333 + // Mock HoldService for testing blob endpoints 1334 1334 1335 - // mockBlobStore implements BlobStore interface for testing 1336 - type mockBlobStore struct { 1335 + // mockHoldService implements XRPCHoldService interface for testing 1336 + type mockHoldService struct { 1337 1337 // Control behavior 1338 1338 downloadURLError error 1339 1339 uploadURLError error ··· 1372 1372 dataSize int 1373 1373 } 1374 1374 1375 - func newMockBlobStore() *mockBlobStore { 1376 - return &mockBlobStore{ 1375 + func newMockHoldService() *mockHoldService { 1376 + return &mockHoldService{ 1377 1377 downloadCalls: []string{}, 1378 1378 uploadCalls: []string{}, 1379 1379 uploadBlobCalls: []uploadBlobCall{}, ··· 1385 1385 } 1386 1386 } 1387 1387 1388 - func (m *mockBlobStore) GetPresignedURL(operation, digest, did string) (string, error) { 1388 + func (m *mockHoldService) GetPresignedURL(ctx context.Context, operation, digest, did string) (string, error) { 1389 1389 if operation == "GET" || operation == "HEAD" { 1390 1390 // Both GET and HEAD are download operations, just different HTTP methods 1391 1391 m.downloadCalls = append(m.downloadCalls, digest) ··· 1405 1405 return "https://s3.example.com/upload/" + digest, nil 1406 1406 } 1407 1407 1408 - func (m *mockBlobStore) UploadBlob(ctx context.Context, did string, data io.Reader) (cid.Cid, int64, error) { 1408 + func (m *mockHoldService) UploadBlob(ctx context.Context, did string, data io.Reader) (cid.Cid, int64, error) { 1409 1409 // Read data to get size 1410 1410 blobData, err := io.ReadAll(data) 1411 1411 if err != nil { ··· 1426 1426 return testCID, int64(len(blobData)), nil 1427 1427 } 1428 1428 1429 - func (m *mockBlobStore) StartMultipartUpload(ctx context.Context, digest string) (string, string, error) { 1429 + func (m *mockHoldService) StartMultipartUploadWithManager(ctx context.Context, digest string) (string, int, error) { 1430 1430 m.startCalls = append(m.startCalls, digest) 1431 1431 if m.startError != nil { 1432 - return "", "", m.startError 1432 + return "", 0, m.startError 1433 1433 } 1434 - return "test-upload-id", "s3native", nil 1434 + return "test-upload-id", 0, nil // Return 0 for S3Native mode 1435 1435 } 1436 1436 1437 - func (m *mockBlobStore) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (*PartUploadInfo, error) { 1437 + func (m *mockHoldService) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (*PartUploadInfo, error) { 1438 1438 m.partURLCalls = append(m.partURLCalls, partURLCall{uploadID, partNumber, did}) 1439 1439 if m.partURLError != nil { 1440 1440 return nil, m.partURLError ··· 1445 1445 }, nil 1446 1446 } 1447 1447 1448 - func (m *mockBlobStore) CompleteMultipartUpload(ctx context.Context, uploadID string, finalDigest string, parts []PartInfo) error { 1448 + func (m *mockHoldService) CompleteMultipartUploadWithManager(ctx context.Context, uploadID string, finalDigest string, parts []PartInfo) error { 1449 1449 m.completeCalls = append(m.completeCalls, uploadID) 1450 1450 if m.completeError != nil { 1451 1451 return m.completeError ··· 1453 1453 return nil 1454 1454 } 1455 1455 1456 - func (m *mockBlobStore) AbortMultipartUpload(ctx context.Context, uploadID string) error { 1456 + func (m *mockHoldService) AbortMultipartUploadWithManager(ctx context.Context, uploadID string) error { 1457 1457 m.abortCalls = append(m.abortCalls, uploadID) 1458 1458 if m.abortError != nil { 1459 1459 return m.abortError ··· 1461 1461 return nil 1462 1462 } 1463 1463 1464 - func (m *mockBlobStore) HandleBufferedPartUpload(ctx context.Context, uploadID string, partNumber int, data []byte) (string, error) { 1464 + func (m *mockHoldService) HandleBufferedPartUpload(ctx context.Context, uploadID string, partNumber int, data []byte) (string, error) { 1465 1465 m.partUploadCalls = append(m.partUploadCalls, partUploadCall{uploadID, partNumber, len(data)}) 1466 1466 if m.partUploadError != nil { 1467 1467 return "", m.partUploadError ··· 1469 1469 return "test-etag-" + uploadID, nil 1470 1470 } 1471 1471 1472 - // setupTestXRPCHandlerWithBlobs creates handler with mock blob store and mock PDS client 1473 - func setupTestXRPCHandlerWithBlobs(t *testing.T) (*XRPCHandler, *mockBlobStore, context.Context) { 1472 + func (m *mockHoldService) HandleMultipartOperation(w http.ResponseWriter, r *http.Request, did string) { 1473 + ctx := r.Context() 1474 + 1475 + // Parse JSON body (same as real implementation) 1476 + var req struct { 1477 + Action string `json:"action"` 1478 + Digest string `json:"digest,omitempty"` 1479 + UploadID string `json:"uploadId,omitempty"` 1480 + PartNumber int `json:"partNumber,omitempty"` 1481 + Parts []PartInfo `json:"parts,omitempty"` 1482 + } 1483 + 1484 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 1485 + http.Error(w, fmt.Sprintf("invalid JSON body: %v", err), http.StatusBadRequest) 1486 + return 1487 + } 1488 + 1489 + // Route based on action 1490 + switch req.Action { 1491 + case "start": 1492 + if req.Digest == "" { 1493 + http.Error(w, "digest required for start action", http.StatusBadRequest) 1494 + return 1495 + } 1496 + 1497 + uploadID, mode, err := m.StartMultipartUploadWithManager(ctx, req.Digest) 1498 + if err != nil { 1499 + http.Error(w, fmt.Sprintf("failed to start multipart upload: %v", err), http.StatusInternalServerError) 1500 + return 1501 + } 1502 + 1503 + // Convert mode to string 1504 + var modeStr string 1505 + switch mode { 1506 + case 0: 1507 + modeStr = "s3native" 1508 + case 1: 1509 + modeStr = "buffered" 1510 + default: 1511 + modeStr = "unknown" 1512 + } 1513 + 1514 + w.Header().Set("Content-Type", "application/json") 1515 + json.NewEncoder(w).Encode(map[string]any{ 1516 + "uploadId": uploadID, 1517 + "mode": modeStr, 1518 + }) 1519 + 1520 + case "part": 1521 + if req.UploadID == "" || req.PartNumber == 0 { 1522 + http.Error(w, "uploadId and partNumber required for part action", http.StatusBadRequest) 1523 + return 1524 + } 1525 + 1526 + uploadInfo, err := m.GetPartUploadURL(ctx, req.UploadID, req.PartNumber, did) 1527 + if err != nil { 1528 + http.Error(w, fmt.Sprintf("failed to get part URL: %v", err), http.StatusInternalServerError) 1529 + return 1530 + } 1531 + 1532 + w.Header().Set("Content-Type", "application/json") 1533 + json.NewEncoder(w).Encode(uploadInfo) 1534 + 1535 + case "complete": 1536 + if req.UploadID == "" || len(req.Parts) == 0 { 1537 + http.Error(w, "uploadId and parts required for complete action", http.StatusBadRequest) 1538 + return 1539 + } 1540 + if req.Digest == "" { 1541 + http.Error(w, "digest required for complete action", http.StatusBadRequest) 1542 + return 1543 + } 1544 + 1545 + if err := m.CompleteMultipartUploadWithManager(ctx, req.UploadID, req.Digest, req.Parts); err != nil { 1546 + http.Error(w, fmt.Sprintf("failed to complete multipart upload: %v", err), http.StatusInternalServerError) 1547 + return 1548 + } 1549 + 1550 + w.Header().Set("Content-Type", "application/json") 1551 + json.NewEncoder(w).Encode(map[string]any{ 1552 + "status": "completed", 1553 + }) 1554 + 1555 + case "abort": 1556 + if req.UploadID == "" { 1557 + http.Error(w, "uploadId required for abort action", http.StatusBadRequest) 1558 + return 1559 + } 1560 + 1561 + if err := m.AbortMultipartUploadWithManager(ctx, req.UploadID); err != nil { 1562 + http.Error(w, fmt.Sprintf("failed to abort multipart upload: %v", err), http.StatusInternalServerError) 1563 + return 1564 + } 1565 + 1566 + w.Header().Set("Content-Type", "application/json") 1567 + json.NewEncoder(w).Encode(map[string]any{ 1568 + "status": "aborted", 1569 + }) 1570 + 1571 + default: 1572 + http.Error(w, fmt.Sprintf("unknown action: %s", req.Action), http.StatusBadRequest) 1573 + } 1574 + } 1575 + 1576 + // setupTestXRPCHandlerWithBlobs creates handler with mock hold service and mock PDS client 1577 + func setupTestXRPCHandlerWithBlobs(t *testing.T) (*XRPCHandler, *mockHoldService, context.Context) { 1474 1578 t.Helper() 1475 1579 1476 1580 ctx := context.Background() ··· 1503 1607 t.Fatalf("Failed to bootstrap PDS: %v", err) 1504 1608 } 1505 1609 1506 - // Create mock blob store 1507 - blobStore := newMockBlobStore() 1610 + // Create mock hold service 1611 + holdService := newMockHoldService() 1508 1612 1509 1613 // Create mock PDS client for DPoP validation 1510 1614 mockClient := &mockPDSClient{} 1511 1615 1512 - // Create XRPC handler with mock blob store and mock HTTP client 1513 - handler := NewXRPCHandler(pds, "https://hold.example.com", blobStore, nil, mockClient) 1616 + // Create XRPC handler with mock hold service and mock HTTP client 1617 + handler := NewXRPCHandler(pds, "https://hold.example.com", holdService, nil, mockClient) 1514 1618 1515 - return handler, blobStore, ctx 1619 + return handler, holdService, ctx 1516 1620 } 1517 1621 1518 1622 // Tests for HandleUploadBlob ··· 1520 1624 // TestHandleUploadBlob tests com.atproto.repo.uploadBlob with direct upload 1521 1625 // Spec: https://docs.bsky.app/docs/api/com-atproto-repo-upload-blob 1522 1626 func TestHandleUploadBlob(t *testing.T) { 1523 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1627 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 1524 1628 1525 1629 // Test data - a simple text blob 1526 1630 blobData := []byte("Hello, ATProto!") ··· 1574 1678 } 1575 1679 1576 1680 // Verify blob store was called 1577 - if len(blobStore.uploadBlobCalls) != 1 { 1578 - t.Errorf("Expected UploadBlob to be called once, got %d calls", len(blobStore.uploadBlobCalls)) 1681 + if len(holdService.uploadBlobCalls) != 1 { 1682 + t.Errorf("Expected UploadBlob to be called once, got %d calls", len(holdService.uploadBlobCalls)) 1579 1683 } 1580 1684 1581 - if blobStore.uploadBlobCalls[0].dataSize != len(blobData) { 1582 - t.Errorf("Expected UploadBlob to receive %d bytes, got %d", len(blobData), blobStore.uploadBlobCalls[0].dataSize) 1685 + if holdService.uploadBlobCalls[0].dataSize != len(blobData) { 1686 + t.Errorf("Expected UploadBlob to receive %d bytes, got %d", len(blobData), holdService.uploadBlobCalls[0].dataSize) 1583 1687 } 1584 1688 } 1585 1689 1586 1690 // TestHandleUploadBlob_EmptyBody tests empty blob upload 1587 1691 // Spec: https://docs.bsky.app/docs/api/com-atproto-repo-upload-blob 1588 1692 func TestHandleUploadBlob_EmptyBody(t *testing.T) { 1589 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1693 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 1590 1694 1591 1695 // Empty blob should succeed (edge case) 1592 1696 req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte{})) ··· 1612 1716 } 1613 1717 1614 1718 // Verify blob store was called with 0 bytes 1615 - if len(blobStore.uploadBlobCalls) != 1 || blobStore.uploadBlobCalls[0].dataSize != 0 { 1719 + if len(holdService.uploadBlobCalls) != 1 || holdService.uploadBlobCalls[0].dataSize != 0 { 1616 1720 t.Errorf("Expected UploadBlob with 0 bytes") 1617 1721 } 1618 1722 } ··· 1636 1740 // TestHandleUploadBlob_BlobStoreError tests blob store returning error 1637 1741 // Spec: https://docs.bsky.app/docs/api/com-atproto-repo-upload-blob 1638 1742 func TestHandleUploadBlob_BlobStoreError(t *testing.T) { 1639 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1743 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 1640 1744 1641 1745 // Configure mock to return error 1642 - blobStore.uploadBlobError = fmt.Errorf("storage driver unavailable") 1746 + holdService.uploadBlobError = fmt.Errorf("storage driver unavailable") 1643 1747 1644 1748 req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte("test data"))) 1645 1749 req.Header.Set("Content-Type", "application/octet-stream") ··· 1669 1773 // TestHandleGetBlob tests com.atproto.sync.getBlob 1670 1774 // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1671 1775 func TestHandleGetBlob(t *testing.T) { 1672 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1776 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 1673 1777 1674 1778 holdDID := "did:web:hold.example.com" 1675 1779 cid := "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke" ··· 1706 1810 } 1707 1811 1708 1812 // Verify blob store was called 1709 - if len(blobStore.downloadCalls) != 1 || blobStore.downloadCalls[0] != cid { 1813 + if len(holdService.downloadCalls) != 1 || holdService.downloadCalls[0] != cid { 1710 1814 t.Errorf("Expected GetPresignedURL to be called with %s", cid) 1711 1815 } 1712 1816 } ··· 1714 1818 // TestHandleGetBlob_SHA256Digest tests getBlob with OCI sha256 digest format 1715 1819 // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1716 1820 func TestHandleGetBlob_SHA256Digest(t *testing.T) { 1717 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1821 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 1718 1822 1719 1823 holdDID := "did:web:hold.example.com" 1720 1824 digest := "sha256:abc123def456" // OCI digest format ··· 1744 1848 } 1745 1849 1746 1850 // Verify blob store received the sha256 digest 1747 - if len(blobStore.downloadCalls) != 1 || blobStore.downloadCalls[0] != digest { 1748 - t.Errorf("Expected GetPresignedURL to be called with %s, got %v", digest, blobStore.downloadCalls) 1851 + if len(holdService.downloadCalls) != 1 || holdService.downloadCalls[0] != digest { 1852 + t.Errorf("Expected GetPresignedURL to be called with %s, got %v", digest, holdService.downloadCalls) 1749 1853 } 1750 1854 } 1751 1855 ··· 1754 1858 // AppView is responsible for making the actual HEAD request to S3 1755 1859 // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1756 1860 func TestHandleGetBlob_HeadMethod(t *testing.T) { 1757 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1861 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 1758 1862 1759 1863 holdDID := "did:web:hold.example.com" 1760 1864 cid := "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke" ··· 1789 1893 } 1790 1894 1791 1895 // Verify blob store was called with HEAD operation 1792 - if len(blobStore.downloadCalls) != 1 || blobStore.downloadCalls[0] != cid { 1896 + if len(holdService.downloadCalls) != 1 || holdService.downloadCalls[0] != cid { 1793 1897 t.Errorf("Expected GetPresignedURL to be called with %s", cid) 1794 1898 } 1795 1899 } ··· 1856 1960 // TestHandleGetBlob_BlobStoreError tests blob store returning error 1857 1961 // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1858 1962 func TestHandleGetBlob_BlobStoreError(t *testing.T) { 1859 - handler, blobStore, _ := setupTestXRPCHandlerWithBlobs(t) 1963 + handler, holdService, _ := setupTestXRPCHandlerWithBlobs(t) 1860 1964 1861 1965 // Configure mock to return error 1862 - blobStore.downloadURLError = fmt.Errorf("blob not found in S3") 1966 + holdService.downloadURLError = fmt.Errorf("blob not found in S3") 1863 1967 1864 1968 req := makeXRPCGetRequest("/xrpc/com.atproto.sync.getBlob", map[string]string{ 1865 1969 "did": "did:web:hold.example.com",
-221
pkg/hold/s3.go
··· 1 - package hold 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "log" 7 - "sort" 8 - "strings" 9 - "time" 10 - 11 - "github.com/aws/aws-sdk-go/aws" 12 - "github.com/aws/aws-sdk-go/aws/credentials" 13 - "github.com/aws/aws-sdk-go/aws/session" 14 - "github.com/aws/aws-sdk-go/service/s3" 15 - ) 16 - 17 - // initS3Client initializes the S3 client for presigned URL generation 18 - // Returns nil error if S3 client is successfully initialized 19 - // Returns error if storage is not S3 or if initialization fails (service will fall back to proxy mode) 20 - func (s *HoldService) initS3Client() error { 21 - // Check if presigned URLs are explicitly disabled 22 - if s.config.Server.DisablePresignedURLs { 23 - log.Printf("⚠️ S3 presigned URLs DISABLED by config (DISABLE_PRESIGNED_URLS=true)") 24 - log.Printf(" All uploads will use buffered mode (parts buffered in hold service)") 25 - return nil // Not an error - just using buffered mode 26 - } 27 - 28 - // Check if storage driver is S3 29 - if s.config.Storage.Type() != "s3" { 30 - log.Printf("Storage driver is %s (not S3), presigned URLs disabled", s.config.Storage.Type()) 31 - return nil // Not an error - just using different driver 32 - } 33 - 34 - // Extract S3 configuration from storage parameters 35 - params := s.config.Storage.Parameters() 36 - 37 - // Extract required S3 configuration 38 - region, _ := params["region"].(string) 39 - if region == "" { 40 - region = "us-east-1" // Default region 41 - } 42 - 43 - accessKey, _ := params["accesskey"].(string) 44 - secretKey, _ := params["secretkey"].(string) 45 - bucket, _ := params["bucket"].(string) 46 - 47 - if bucket == "" { 48 - return fmt.Errorf("S3 bucket not configured") 49 - } 50 - 51 - // Build AWS config 52 - awsConfig := &aws.Config{ 53 - Region: aws.String(region), 54 - } 55 - 56 - // Add credentials if provided (allow IAM role auth if not provided) 57 - if accessKey != "" && secretKey != "" { 58 - awsConfig.Credentials = credentials.NewStaticCredentials(accessKey, secretKey, "") 59 - } 60 - 61 - // Add custom endpoint for S3-compatible services (Storj, MinIO, R2, etc.) 62 - if endpoint, ok := params["regionendpoint"].(string); ok && endpoint != "" { 63 - awsConfig.Endpoint = aws.String(endpoint) 64 - awsConfig.S3ForcePathStyle = aws.Bool(true) // Required for MinIO, Storj 65 - } 66 - 67 - // Create AWS session 68 - sess, err := session.NewSession(awsConfig) 69 - if err != nil { 70 - return fmt.Errorf("failed to create AWS session: %w", err) 71 - } 72 - 73 - // Create S3 client 74 - s.s3Client = s3.New(sess) 75 - s.bucket = bucket 76 - 77 - // Extract path prefix if configured (rootdirectory in S3 params) 78 - if rootDir, ok := params["rootdirectory"].(string); ok && rootDir != "" { 79 - s.s3PathPrefix = strings.TrimPrefix(rootDir, "/") 80 - } 81 - 82 - log.Printf("✅ S3 presigned URLs enabled") 83 - 84 - return nil 85 - } 86 - 87 - // startMultipartUpload initiates a multipart upload and returns upload ID 88 - func (s *HoldService) startMultipartUpload(ctx context.Context, digest string) (string, error) { 89 - if s.s3Client == nil { 90 - return "", fmt.Errorf("S3 not configured") 91 - } 92 - 93 - path := blobPath(digest) 94 - s3Key := strings.TrimPrefix(path, "/") 95 - if s.s3PathPrefix != "" { 96 - s3Key = s.s3PathPrefix + "/" + s3Key 97 - } 98 - 99 - result, err := s.s3Client.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{ 100 - Bucket: aws.String(s.bucket), 101 - Key: aws.String(s3Key), 102 - }) 103 - if err != nil { 104 - return "", err 105 - } 106 - 107 - log.Printf("Started multipart upload: digest=%s, uploadID=%s", digest, *result.UploadId) 108 - return *result.UploadId, nil 109 - } 110 - 111 - // getPartPresignedURL generates presigned URL for a specific part 112 - func (s *HoldService) getPartPresignedURL(ctx context.Context, digest, uploadID string, partNumber int) (string, error) { 113 - if s.s3Client == nil { 114 - return "", fmt.Errorf("S3 not configured") 115 - } 116 - 117 - path := blobPath(digest) 118 - s3Key := strings.TrimPrefix(path, "/") 119 - if s.s3PathPrefix != "" { 120 - s3Key = s.s3PathPrefix + "/" + s3Key 121 - } 122 - 123 - req, _ := s.s3Client.UploadPartRequest(&s3.UploadPartInput{ 124 - Bucket: aws.String(s.bucket), 125 - Key: aws.String(s3Key), 126 - UploadId: aws.String(uploadID), 127 - PartNumber: aws.Int64(int64(partNumber)), 128 - }) 129 - 130 - url, err := req.Presign(15 * time.Minute) 131 - if err != nil { 132 - return "", err 133 - } 134 - 135 - log.Printf("Generated part presigned URL: digest=%s, uploadID=%s, part=%d", digest, uploadID, partNumber) 136 - return url, nil 137 - } 138 - 139 - // normalizeETag ensures an ETag has quotes (required by S3 CompleteMultipartUpload) 140 - // S3 returns ETags with quotes, but HTTP clients may strip them 141 - func normalizeETag(etag string) string { 142 - // Already has quotes 143 - if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") { 144 - return etag 145 - } 146 - // Add quotes 147 - return fmt.Sprintf("\"%s\"", etag) 148 - } 149 - 150 - // completeMultipartUpload finalizes the multipart upload 151 - func (s *HoldService) completeMultipartUpload(ctx context.Context, digest, uploadID string, parts []CompletedPart) error { 152 - if s.s3Client == nil { 153 - return fmt.Errorf("S3 not configured") 154 - } 155 - 156 - path := blobPath(digest) 157 - s3Key := strings.TrimPrefix(path, "/") 158 - if s.s3PathPrefix != "" { 159 - s3Key = s.s3PathPrefix + "/" + s3Key 160 - } 161 - 162 - // Sort parts by part number (S3 requires ascending order) 163 - sort.Slice(parts, func(i, j int) bool { 164 - return parts[i].PartNumber < parts[j].PartNumber 165 - }) 166 - 167 - // Convert to S3 CompletedPart format 168 - // IMPORTANT: S3 requires ETags to be quoted in the CompleteMultipartUpload XML 169 - s3Parts := make([]*s3.CompletedPart, len(parts)) 170 - for i, p := range parts { 171 - etag := normalizeETag(p.ETag) 172 - s3Parts[i] = &s3.CompletedPart{ 173 - PartNumber: aws.Int64(int64(p.PartNumber)), 174 - ETag: aws.String(etag), 175 - } 176 - } 177 - 178 - _, err := s.s3Client.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ 179 - Bucket: aws.String(s.bucket), 180 - Key: aws.String(s3Key), 181 - UploadId: aws.String(uploadID), 182 - MultipartUpload: &s3.CompletedMultipartUpload{ 183 - Parts: s3Parts, 184 - }, 185 - }) 186 - 187 - if err != nil { 188 - log.Printf("Failed to complete multipart upload: digest=%s, uploadID=%s, err=%v", digest, uploadID, err) 189 - return err 190 - } 191 - 192 - log.Printf("Completed multipart upload: digest=%s, uploadID=%s, parts=%d", digest, uploadID, len(parts)) 193 - return nil 194 - } 195 - 196 - // abortMultipartUpload aborts an in-progress multipart upload 197 - func (s *HoldService) abortMultipartUpload(ctx context.Context, digest, uploadID string) error { 198 - if s.s3Client == nil { 199 - return fmt.Errorf("S3 not configured") 200 - } 201 - 202 - path := blobPath(digest) 203 - s3Key := strings.TrimPrefix(path, "/") 204 - if s.s3PathPrefix != "" { 205 - s3Key = s.s3PathPrefix + "/" + s3Key 206 - } 207 - 208 - _, err := s.s3Client.AbortMultipartUploadWithContext(ctx, &s3.AbortMultipartUploadInput{ 209 - Bucket: aws.String(s.bucket), 210 - Key: aws.String(s3Key), 211 - UploadId: aws.String(uploadID), 212 - }) 213 - 214 - if err != nil { 215 - log.Printf("Failed to abort multipart upload: digest=%s, uploadID=%s, err=%v", digest, uploadID, err) 216 - return err 217 - } 218 - 219 - log.Printf("Aborted multipart upload: digest=%s, uploadID=%s", digest, uploadID) 220 - return nil 221 - }
+155 -34
pkg/hold/service.go
··· 4 4 "context" 5 5 "fmt" 6 6 "log" 7 + "strings" 7 8 8 - "atcr.io/pkg/auth" 9 + "github.com/aws/aws-sdk-go/aws" 10 + "github.com/aws/aws-sdk-go/aws/credentials" 11 + "github.com/aws/aws-sdk-go/aws/session" 9 12 "github.com/aws/aws-sdk-go/service/s3" 10 13 storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" 11 14 "github.com/distribution/distribution/v3/registry/storage/driver/factory" 12 - ) 13 15 14 - // HoldPDSInterface is the minimal interface needed from the embedded PDS 15 - // This avoids a circular import between pkg/hold and pkg/hold/pds 16 - type HoldPDSInterface interface { 17 - DID() string 18 - } 16 + "bytes" 17 + "crypto/sha256" 18 + "io" 19 + 20 + "github.com/ipfs/go-cid" 21 + "github.com/multiformats/go-multihash" 22 + ) 19 23 20 24 // HoldService provides presigned URLs for blob storage in a hold 21 25 type HoldService struct { 22 26 driver storagedriver.StorageDriver 23 27 config *Config 24 - s3Client *s3.S3 // S3 client for presigned URLs (nil if not S3 storage) 25 - bucket string // S3 bucket name 26 - s3PathPrefix string // S3 path prefix (if any) 27 - MultipartMgr *MultipartManager // Exported for access in route handlers 28 - pds HoldPDSInterface // Embedded PDS for captain/crew records 29 - authorizer auth.HoldAuthorizer // Authorizer for access control 28 + s3Client *s3.S3 // S3 client for presigned URLs (nil if not S3 storage) 29 + bucket string // S3 bucket name 30 + s3PathPrefix string // S3 path prefix (if any) 31 + MultipartMgr *MultipartManager // Exported for access in route handlers 30 32 } 31 33 32 - // PresignedURLOperation defines the type of presigned URL operation 33 - type PresignedURLOperation string 34 - 35 - const ( 36 - OperationGet PresignedURLOperation = "GET" 37 - OperationHead PresignedURLOperation = "HEAD" 38 - OperationPut PresignedURLOperation = "PUT" 39 - ) 40 - 41 34 // NewHoldService creates a new hold service 42 35 // holdPDS must be a *pds.HoldPDS but we use any to avoid import cycle 43 36 func NewHoldService(cfg *Config, holdPDS any) (*HoldService, error) { ··· 48 41 return nil, fmt.Errorf("failed to create storage driver: %w", err) 49 42 } 50 43 51 - // Create local authorizer using the embedded PDS 52 - // This requires casting holdPDS to the concrete type expected by auth 53 - authorizer := auth.NewLocalHoldAuthorizerFromInterface(holdPDS) 54 - 55 - // Cast to our interface for storage 56 - pdsInterface, ok := holdPDS.(HoldPDSInterface) 57 - if !ok { 58 - return nil, fmt.Errorf("holdPDS must implement HoldPDSInterface") 59 - } 60 - 61 44 service := &HoldService{ 62 45 driver: driver, 63 46 config: cfg, 64 47 MultipartMgr: NewMultipartManager(), 65 - pds: pdsInterface, 66 - authorizer: authorizer, 67 48 } 68 49 69 50 // Initialize S3 client for presigned URLs (if using S3 storage) ··· 73 54 74 55 return service, nil 75 56 } 57 + 58 + // UploadBlob receives raw blob bytes, computes CID, and stores via distribution driver 59 + // This is used for standard ATProto blob uploads (profile pics, small media) 60 + func (h *HoldService) UploadBlob(ctx context.Context, did string, data io.Reader) (cid.Cid, int64, error) { 61 + 62 + // Read all data into memory to compute CID 63 + // For large files, this should use multipart upload instead 64 + blobData, err := io.ReadAll(data) 65 + if err != nil { 66 + return cid.Undef, 0, fmt.Errorf("failed to read blob data: %w", err) 67 + } 68 + 69 + size := int64(len(blobData)) 70 + 71 + // Compute SHA-256 hash 72 + hash := sha256.Sum256(blobData) 73 + 74 + // Create CIDv1 with SHA-256 multihash 75 + mh, err := multihash.EncodeName(hash[:], "sha2-256") 76 + if err != nil { 77 + return cid.Undef, 0, fmt.Errorf("failed to encode multihash: %w", err) 78 + } 79 + 80 + // Create CIDv1 with raw codec (0x55) 81 + // ATProto uses CIDv1 with raw codec for blobs 82 + blobCID := cid.NewCidV1(0x55, mh) 83 + 84 + // Store blob via distribution driver at ATProto path 85 + // Path: /repos/{did}/blobs/{cid}/data 86 + path := atprotoBlobPath(did, blobCID.String()) 87 + 88 + // Write blob to storage using distribution driver 89 + writer, err := h.driver.Writer(ctx, path, false) 90 + if err != nil { 91 + return cid.Undef, 0, fmt.Errorf("failed to create writer: %w", err) 92 + } 93 + 94 + // Write data 95 + n, err := io.Copy(writer, bytes.NewReader(blobData)) 96 + if err != nil { 97 + writer.Cancel(ctx) 98 + return cid.Undef, 0, fmt.Errorf("failed to write blob: %w", err) 99 + } 100 + 101 + // Commit the write 102 + if err := writer.Commit(ctx); err != nil { 103 + return cid.Undef, 0, fmt.Errorf("failed to commit blob: %w", err) 104 + } 105 + 106 + if n != size { 107 + return cid.Undef, 0, fmt.Errorf("size mismatch: wrote %d bytes, expected %d", n, size) 108 + } 109 + 110 + return blobCID, size, nil 111 + } 112 + 113 + // HandleBufferedPartUpload handles uploading a part in buffered mode 114 + func (h *HoldService) HandleBufferedPartUpload(ctx context.Context, uploadID string, partNumber int, data []byte) (string, error) { 115 + session, err := h.MultipartMgr.GetSession(uploadID) 116 + if err != nil { 117 + return "", err 118 + } 119 + 120 + if session.Mode != Buffered { 121 + return "", fmt.Errorf("session is not in buffered mode") 122 + } 123 + 124 + etag := session.StorePart(partNumber, data) 125 + return etag, nil 126 + } 127 + 128 + // initS3Client initializes the S3 client for presigned URL generation 129 + // Returns nil error if S3 client is successfully initialized 130 + // Returns error if storage is not S3 or if initialization fails (service will fall back to proxy mode) 131 + func (s *HoldService) initS3Client() error { 132 + // Check if presigned URLs are explicitly disabled 133 + if s.config.Server.DisablePresignedURLs { 134 + log.Printf("⚠️ S3 presigned URLs DISABLED by config (DISABLE_PRESIGNED_URLS=true)") 135 + log.Printf(" All uploads will use buffered mode (parts buffered in hold service)") 136 + return nil // Not an error - just using buffered mode 137 + } 138 + 139 + // Check if storage driver is S3 140 + if s.config.Storage.Type() != "s3" { 141 + log.Printf("Storage driver is %s (not S3), presigned URLs disabled", s.config.Storage.Type()) 142 + return nil // Not an error - just using different driver 143 + } 144 + 145 + // Extract S3 configuration from storage parameters 146 + params := s.config.Storage.Parameters() 147 + 148 + // Extract required S3 configuration 149 + region, _ := params["region"].(string) 150 + if region == "" { 151 + region = "us-east-1" // Default region 152 + } 153 + 154 + accessKey, _ := params["accesskey"].(string) 155 + secretKey, _ := params["secretkey"].(string) 156 + bucket, _ := params["bucket"].(string) 157 + 158 + if bucket == "" { 159 + return fmt.Errorf("S3 bucket not configured") 160 + } 161 + 162 + // Build AWS config 163 + awsConfig := &aws.Config{ 164 + Region: &region, 165 + } 166 + 167 + // Add credentials if provided (allow IAM role auth if not provided) 168 + if accessKey != "" && secretKey != "" { 169 + awsConfig.Credentials = credentials.NewStaticCredentials(accessKey, secretKey, "") 170 + } 171 + 172 + // Add custom endpoint for S3-compatible services (Storj, MinIO, R2, etc.) 173 + if endpoint, ok := params["regionendpoint"].(string); ok && endpoint != "" { 174 + awsConfig.Endpoint = &endpoint 175 + awsConfig.S3ForcePathStyle = aws.Bool(true) // Required for MinIO, Storj 176 + } 177 + 178 + // Create AWS session 179 + sess, err := session.NewSession(awsConfig) 180 + if err != nil { 181 + return fmt.Errorf("failed to create AWS session: %w", err) 182 + } 183 + 184 + // Create S3 client 185 + s.s3Client = s3.New(sess) 186 + s.bucket = bucket 187 + 188 + // Extract path prefix if configured (rootdirectory in S3 params) 189 + if rootDir, ok := params["rootdirectory"].(string); ok && rootDir != "" { 190 + s.s3PathPrefix = strings.TrimPrefix(rootDir, "/") 191 + } 192 + 193 + log.Printf("✅ S3 presigned URLs enabled") 194 + 195 + return nil 196 + }
+7 -6
pkg/hold/storage.go
··· 4 4 "context" 5 5 "fmt" 6 6 "log" 7 + "net/http" 7 8 "strings" 8 9 "time" 9 10 ··· 53 54 54 55 // getPresignedURL generates a presigned URL for GET, HEAD, or PUT operations 55 56 // Distinguishes between ATProto blobs (per-DID) and OCI blobs (content-addressed) 56 - func (s *HoldService) GetPresignedURL(ctx context.Context, operation PresignedURLOperation, digest string, did string) (string, error) { 57 + func (s *HoldService) GetPresignedURL(ctx context.Context, operation string, digest string, did string) (string, error) { 57 58 var path string 58 59 59 60 // Determine blob type and construct appropriate path ··· 97 98 Presign(time.Duration) (string, error) 98 99 } 99 100 switch operation { 100 - case OperationGet: 101 + case http.MethodGet: 101 102 // Note: Don't use ResponseContentType - not supported by all S3-compatible services 102 103 req, _ = s.s3Client.GetObjectRequest(&s3.GetObjectInput{ 103 104 Bucket: aws.String(s.bucket), 104 105 Key: aws.String(s3Key), 105 106 }) 106 107 107 - case OperationHead: 108 + case http.MethodHead: 108 109 req, _ = s.s3Client.HeadObjectRequest(&s3.HeadObjectInput{ 109 110 Bucket: aws.String(s.bucket), 110 111 Key: aws.String(s3Key), 111 112 }) 112 113 113 - case OperationPut: 114 + case http.MethodPut: 114 115 req, _ = s.s3Client.PutObjectRequest(&s3.PutObjectInput{ 115 116 Bucket: aws.String(s.bucket), 116 117 Key: aws.String(s3Key), ··· 147 148 // getProxyURL returns XRPC endpoint for blob operations (fallback when presigned URLs unavailable) 148 149 // For GET/HEAD operations, returns the XRPC getBlob endpoint 149 150 // For PUT operations, this fallback is no longer supported - use multipart upload instead 150 - func (s *HoldService) getProxyURL(digest, did string, operation PresignedURLOperation) string { 151 + func (s *HoldService) getProxyURL(digest, did string, operation string) string { 151 152 // For read operations, use XRPC getBlob endpoint 152 - if operation == OperationGet || operation == OperationHead { 153 + if operation == http.MethodGet || operation == http.MethodHead { 153 154 // Generate hold DID from public URL using shared function 154 155 holdDID := atproto.ResolveHoldDIDFromURL(s.config.Server.PublicURL) 155 156 return fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s",