A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 8d39daa09d8d9d5066c9fb336c61ef5bfb8a0b1f 205 lines 7.6 kB view raw
1// Package s3 provides S3 client initialization and presigned URL generation 2// for hold services. It supports S3, Storj, and Minio storage backends. 3package s3 4 5import ( 6 "context" 7 "fmt" 8 "log/slog" 9 "strings" 10 "time" 11 12 "github.com/aws/aws-sdk-go-v2/aws" 13 "github.com/aws/aws-sdk-go-v2/config" 14 "github.com/aws/aws-sdk-go-v2/credentials" 15 awss3 "github.com/aws/aws-sdk-go-v2/service/s3" 16) 17 18// S3Client defines the S3 operations used by the hold service. 19// This interface allows mocking S3 for tests without real credentials. 20// Use RealS3Client to wrap *s3.Client, or MockS3Client for testing. 21type S3Client interface { 22 // Multipart upload operations 23 CreateMultipartUpload(ctx context.Context, input *awss3.CreateMultipartUploadInput, opts ...func(*awss3.Options)) (*awss3.CreateMultipartUploadOutput, error) 24 CompleteMultipartUpload(ctx context.Context, input *awss3.CompleteMultipartUploadInput, opts ...func(*awss3.Options)) (*awss3.CompleteMultipartUploadOutput, error) 25 AbortMultipartUpload(ctx context.Context, input *awss3.AbortMultipartUploadInput, opts ...func(*awss3.Options)) (*awss3.AbortMultipartUploadOutput, error) 26 27 // Presigned URL operations - return URL string directly 28 PresignGetObject(ctx context.Context, input *awss3.GetObjectInput, expires time.Duration) (string, error) 29 PresignHeadObject(ctx context.Context, input *awss3.HeadObjectInput, expires time.Duration) (string, error) 30 PresignPutObject(ctx context.Context, input *awss3.PutObjectInput, expires time.Duration) (string, error) 31 PresignUploadPart(ctx context.Context, input *awss3.UploadPartInput, expires time.Duration) (string, error) 32} 33 34// RealS3Client wraps AWS SDK v2 *s3.Client to implement S3Client interface 35type RealS3Client struct { 36 client *awss3.Client 37 presign *awss3.PresignClient 38} 39 40// NewRealS3Client creates a new RealS3Client wrapper 41func NewRealS3Client(client *awss3.Client) *RealS3Client { 42 return &RealS3Client{ 43 client: client, 44 presign: awss3.NewPresignClient(client), 45 } 46} 47 48// CreateMultipartUpload implements S3Client 49func (r *RealS3Client) CreateMultipartUpload(ctx context.Context, input *awss3.CreateMultipartUploadInput, opts ...func(*awss3.Options)) (*awss3.CreateMultipartUploadOutput, error) { 50 return r.client.CreateMultipartUpload(ctx, input, opts...) 51} 52 53// CompleteMultipartUpload implements S3Client 54func (r *RealS3Client) CompleteMultipartUpload(ctx context.Context, input *awss3.CompleteMultipartUploadInput, opts ...func(*awss3.Options)) (*awss3.CompleteMultipartUploadOutput, error) { 55 return r.client.CompleteMultipartUpload(ctx, input, opts...) 56} 57 58// AbortMultipartUpload implements S3Client 59func (r *RealS3Client) AbortMultipartUpload(ctx context.Context, input *awss3.AbortMultipartUploadInput, opts ...func(*awss3.Options)) (*awss3.AbortMultipartUploadOutput, error) { 60 return r.client.AbortMultipartUpload(ctx, input, opts...) 61} 62 63// PresignGetObject implements S3Client 64func (r *RealS3Client) PresignGetObject(ctx context.Context, input *awss3.GetObjectInput, expires time.Duration) (string, error) { 65 result, err := r.presign.PresignGetObject(ctx, input, func(opts *awss3.PresignOptions) { 66 opts.Expires = expires 67 }) 68 if err != nil { 69 return "", err 70 } 71 return result.URL, nil 72} 73 74// PresignHeadObject implements S3Client 75func (r *RealS3Client) PresignHeadObject(ctx context.Context, input *awss3.HeadObjectInput, expires time.Duration) (string, error) { 76 result, err := r.presign.PresignHeadObject(ctx, input, func(opts *awss3.PresignOptions) { 77 opts.Expires = expires 78 }) 79 if err != nil { 80 return "", err 81 } 82 return result.URL, nil 83} 84 85// PresignPutObject implements S3Client 86func (r *RealS3Client) PresignPutObject(ctx context.Context, input *awss3.PutObjectInput, expires time.Duration) (string, error) { 87 result, err := r.presign.PresignPutObject(ctx, input, func(opts *awss3.PresignOptions) { 88 opts.Expires = expires 89 }) 90 if err != nil { 91 return "", err 92 } 93 return result.URL, nil 94} 95 96// PresignUploadPart implements S3Client 97func (r *RealS3Client) PresignUploadPart(ctx context.Context, input *awss3.UploadPartInput, expires time.Duration) (string, error) { 98 result, err := r.presign.PresignUploadPart(ctx, input, func(opts *awss3.PresignOptions) { 99 opts.Expires = expires 100 }) 101 if err != nil { 102 return "", err 103 } 104 return result.URL, nil 105} 106 107// S3Service wraps an S3 client for presigned URL generation 108type S3Service struct { 109 Client S3Client // S3 client for presigned URLs (interface for testability) 110 Bucket string // S3 bucket name 111 PathPrefix string // S3 path prefix (if any) 112} 113 114// NewS3Service initializes the S3 client for presigned URL generation 115// S3 is required - this will return an error if not properly configured 116func NewS3Service(params map[string]any) (*S3Service, error) { 117 // Extract required S3 configuration 118 region, _ := params["region"].(string) 119 if region == "" { 120 region = "us-east-1" // Default region 121 } 122 123 accessKey, _ := params["accesskey"].(string) 124 secretKey, _ := params["secretkey"].(string) 125 bucket, _ := params["bucket"].(string) 126 127 if bucket == "" { 128 return nil, fmt.Errorf("S3 bucket not configured") 129 } 130 131 // Build config options 132 var configOpts []func(*config.LoadOptions) error 133 configOpts = append(configOpts, config.WithRegion(region)) 134 135 // Add credentials if provided (allow IAM role auth if not provided) 136 if accessKey != "" && secretKey != "" { 137 configOpts = append(configOpts, config.WithCredentialsProvider( 138 credentials.NewStaticCredentialsProvider(accessKey, secretKey, ""), 139 )) 140 } 141 142 // Load AWS config 143 cfg, err := config.LoadDefaultConfig(context.Background(), configOpts...) 144 if err != nil { 145 return nil, fmt.Errorf("failed to load AWS config: %w", err) 146 } 147 148 // Custom endpoint for S3-compatible services (Storj, MinIO, R2, etc.) 149 endpoint, _ := params["regionendpoint"].(string) 150 151 // Create S3 client 152 client := awss3.NewFromConfig(cfg, func(o *awss3.Options) { 153 if endpoint != "" { 154 o.BaseEndpoint = aws.String(endpoint) 155 o.UsePathStyle = true 156 } 157 }) 158 159 var s3PathPrefix string 160 // Extract path prefix if configured (rootdirectory in S3 params) 161 if rootDir, ok := params["rootdirectory"].(string); ok && rootDir != "" { 162 s3PathPrefix = strings.TrimPrefix(rootDir, "/") 163 } 164 165 slog.Info("S3 presigned URLs enabled", 166 "bucket", bucket, 167 "region", region, 168 "pathPrefix", s3PathPrefix) 169 170 // Create S3 client wrapped in RealS3Client for interface compatibility 171 return &S3Service{ 172 Client: NewRealS3Client(client), 173 Bucket: bucket, 174 PathPrefix: s3PathPrefix, 175 }, nil 176} 177 178// BlobPath converts a digest (e.g., "sha256:abc123...") or temp path to a storage path 179// Distribution stores blobs as: /docker/registry/v2/blobs/{algorithm}/{xx}/{hash}/data 180// where xx is the first 2 characters of the hash for directory sharding 181// NOTE: Path must start with / for filesystem driver 182// This is used for OCI container layers (content-addressed, globally deduplicated) 183func BlobPath(digest string) string { 184 // Handle temp paths (start with uploads/temp-) 185 if strings.HasPrefix(digest, "uploads/temp-") { 186 return fmt.Sprintf("/docker/registry/v2/%s/data", digest) 187 } 188 189 // Split digest into algorithm and hash 190 parts := strings.SplitN(digest, ":", 2) 191 if len(parts) != 2 { 192 // Fallback for malformed digest 193 return fmt.Sprintf("/docker/registry/v2/blobs/%s/data", digest) 194 } 195 196 algorithm := parts[0] 197 hash := parts[1] 198 199 // Use first 2 characters for sharding 200 if len(hash) < 2 { 201 return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/data", algorithm, hash) 202 } 203 204 return fmt.Sprintf("/docker/registry/v2/blobs/%s/%s/%s/data", algorithm, hash[:2], hash) 205}