A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
73
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove older endpoints add docs for blob migration to xrpc

+675 -140
-2
cmd/hold/main.go
··· 82 82 http.NotFound(w, r) 83 83 }) 84 84 85 - mux.HandleFunc("/health", service.HealthHandler) 86 - mux.HandleFunc("/register", service.HandleRegister) 87 85 mux.HandleFunc("/presigned-url", service.HandlePresignedURL) 88 86 mux.HandleFunc("/move", service.HandleMove) 89 87
+675
docs/XRPC_BLOB_MIGRATION.md
··· 1 + # XRPC Blob Upload Migration 2 + 3 + This document describes how to migrate from separate legacy multipart upload endpoints to a unified `com.atproto.repo.uploadBlob` endpoint that supports both standard single-blob uploads and OCI container layer multipart uploads. 4 + 5 + ## Current State 6 + 7 + ### Legacy HTTP Endpoints (cmd/hold/main.go) 8 + 9 + ```go 10 + // Multipart upload endpoints 11 + mux.HandleFunc("/start-multipart", service.HandleStartMultipart) 12 + mux.HandleFunc("/part-presigned-url", service.HandleGetPartURL) 13 + mux.HandleFunc("/complete-multipart", service.HandleCompleteMultipart) 14 + mux.HandleFunc("/abort-multipart", service.HandleAbortMultipart) 15 + 16 + // Buffered part upload (when presigned URLs unavailable) 17 + mux.HandleFunc("/multipart-parts/", func(w http.ResponseWriter, r *http.Request) { 18 + // Parse URL: /multipart-parts/{uploadID}/{partNumber} 19 + // ... 20 + service.HandleMultipartPartUpload(w, r, uploadID, partNumber, did, service.MultipartMgr) 21 + }) 22 + ``` 23 + 24 + ### Existing XRPC Endpoint (pkg/hold/pds/xrpc.go) 25 + 26 + ```go 27 + // Current implementation - redirects to presigned URL 28 + func (h *XRPCHandler) HandleUploadBlob(w http.ResponseWriter, r *http.Request) { 29 + digest := r.URL.Query().Get("digest") 30 + uploadURL, err := h.blobStore.GetPresignedUploadURL(digest) 31 + http.Redirect(w, r, uploadURL, http.StatusFound) 32 + } 33 + ``` 34 + 35 + ### Supporting Code 36 + 37 + **pkg/hold/multipart.go:** 38 + - `MultipartManager` - Tracks upload sessions 39 + - `MultipartSession` - State for each upload (parts, mode, etc.) 40 + - Modes: `S3Native` (presigned URLs), `Buffered` (proxy uploads) 41 + 42 + **pkg/hold/blobstore_adapter.go:** 43 + - `HoldServiceBlobStore` - Adapter wrapping HoldService for XRPC handlers 44 + - Implements presigned URL generation 45 + - Currently not used by XRPC handlers 46 + 47 + **pkg/hold/handlers.go:** 48 + - `HandleStartMultipart()` - Starts upload, returns uploadID 49 + - `HandleGetPartURL()` - Returns presigned URL for part 50 + - `HandleCompleteMultipart()` - Finalizes upload, assembles parts 51 + - `HandleAbortMultipart()` - Cancels upload 52 + - `HandleMultipartPartUpload()` - Buffered part upload fallback 53 + 54 + ## New Unified Design 55 + 56 + ### Single Endpoint: `com.atproto.repo.uploadBlob` 57 + 58 + Content-Type discrimination determines operation: 59 + - `application/octet-stream` → Standard blob upload (profile images, small media) 60 + - `application/json` → Multipart operations (large OCI layers) 61 + 62 + ### API Specification 63 + 64 + #### Standard Single Upload (ATProto Spec Compliant) 65 + 66 + ``` 67 + POST /xrpc/com.atproto.repo.uploadBlob 68 + Content-Type: application/octet-stream 69 + 70 + [raw blob bytes] 71 + 72 + Response (200 OK): 73 + { 74 + "blob": { 75 + "$type": "blob", 76 + "ref": { 77 + "$link": "bafyreib..." // CID 78 + }, 79 + "mimeType": "application/octet-stream", 80 + "size": 12345 81 + } 82 + } 83 + ``` 84 + 85 + **Use case:** Profile images, small media (< 10MB), standard ATProto blobs 86 + 87 + #### Multipart Start (ATCR Extension) 88 + 89 + ``` 90 + POST /xrpc/com.atproto.repo.uploadBlob 91 + Content-Type: application/json 92 + 93 + { 94 + "action": "start", 95 + "digest": "sha256:abc123...", 96 + "size": 1234567890 // Optional hint for storage allocation 97 + } 98 + 99 + Response (200 OK): 100 + { 101 + "uploadId": "upload-1634567890", 102 + "expiresAt": "2025-10-16T12:00:00Z", 103 + "mode": "s3-native" // or "buffered" 104 + } 105 + ``` 106 + 107 + **Implementation:** 108 + - Calls `service.StartMultipartUploadWithManager(ctx, digest, multipartMgr)` 109 + - Returns uploadID and mode from MultipartSession 110 + 111 + #### Multipart Get Part URL (ATCR Extension) 112 + 113 + ``` 114 + POST /xrpc/com.atproto.repo.uploadBlob 115 + Content-Type: application/json 116 + 117 + { 118 + "action": "part", 119 + "uploadId": "upload-1634567890", 120 + "partNumber": 1, 121 + "digest": "sha256:abc123..." 122 + } 123 + 124 + Response (200 OK): 125 + { 126 + "url": "https://s3.amazonaws.com/bucket/...?X-Amz-...", 127 + "expiresAt": "2025-10-16T12:15:00Z", 128 + "method": "PUT" 129 + } 130 + 131 + // OR for buffered mode: 132 + { 133 + "url": "https://hold01.atcr.io/xrpc/com.atproto.repo.uploadBlob", 134 + "method": "PUT", 135 + "headers": { 136 + "X-Upload-Id": "upload-1634567890", 137 + "X-Part-Number": "1" 138 + }, 139 + "expiresAt": "2025-10-16T12:15:00Z" 140 + } 141 + ``` 142 + 143 + **Implementation:** 144 + - Retrieve session: `multipartMgr.GetSession(uploadID)` 145 + - S3Native mode: Call `service.GetPartUploadURL(ctx, session, partNumber, did)` 146 + - Buffered mode: Return self-referential URL with headers 147 + 148 + #### Multipart Upload Part (Buffered Mode) 149 + 150 + ``` 151 + PUT /xrpc/com.atproto.repo.uploadBlob 152 + Content-Type: application/octet-stream 153 + X-Upload-Id: upload-1634567890 154 + X-Part-Number: 1 155 + 156 + [part data bytes] 157 + 158 + Response (200 OK): 159 + { 160 + "etag": "abc123def456", 161 + "partNumber": 1 162 + } 163 + ``` 164 + 165 + **Implementation:** 166 + - Extract headers: `X-Upload-Id`, `X-Part-Number` 167 + - Call `service.HandleMultipartPartUpload(w, r, uploadID, partNumber, did, multipartMgr)` 168 + - Return ETag for completion 169 + 170 + #### Multipart Complete (ATCR Extension) 171 + 172 + ``` 173 + POST /xrpc/com.atproto.repo.uploadBlob 174 + Content-Type: application/json 175 + 176 + { 177 + "action": "complete", 178 + "uploadId": "upload-1634567890", 179 + "digest": "sha256:abc123...", 180 + "parts": [ 181 + { "partNumber": 1, "etag": "abc123" }, 182 + { "partNumber": 2, "etag": "def456" } 183 + ] 184 + } 185 + 186 + Response (200 OK): 187 + { 188 + "status": "completed", 189 + "blob": { 190 + "$type": "blob", 191 + "ref": { 192 + "$link": "bafyreib..." // CID computed from digest 193 + }, 194 + "mimeType": "application/octet-stream", 195 + "size": 1234567890 196 + } 197 + } 198 + ``` 199 + 200 + **Implementation:** 201 + - Retrieve session: `multipartMgr.GetSession(uploadID)` 202 + - For S3Native: Record parts via `session.RecordS3Part()` 203 + - Call `service.CompleteMultipartUploadWithManager(ctx, session, multipartMgr)` 204 + - Convert digest to CID for response 205 + 206 + #### Multipart Abort (ATCR Extension) 207 + 208 + ``` 209 + POST /xrpc/com.atproto.repo.uploadBlob 210 + Content-Type: application/json 211 + 212 + { 213 + "action": "abort", 214 + "uploadId": "upload-1634567890", 215 + "digest": "sha256:abc123..." 216 + } 217 + 218 + Response (200 OK): 219 + { 220 + "status": "aborted" 221 + } 222 + ``` 223 + 224 + **Implementation:** 225 + - Retrieve session: `multipartMgr.GetSession(uploadID)` 226 + - Call `service.AbortMultipartUploadWithManager(ctx, session, multipartMgr)` 227 + 228 + ## Implementation Strategy 229 + 230 + ### Phase 1: Add Unified Handler (Keep Legacy Endpoints) 231 + 232 + **File:** `pkg/hold/pds/xrpc.go` 233 + 234 + ```go 235 + // HandleUploadBlob unified handler supporting both single and multipart uploads 236 + func (h *XRPCHandler) HandleUploadBlob(w http.ResponseWriter, r *http.Request) { 237 + if r.Method != http.MethodPost && r.Method != http.MethodPut { 238 + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 239 + return 240 + } 241 + 242 + contentType := r.Header.Get("Content-Type") 243 + 244 + // Buffered multipart part upload (PUT with headers) 245 + if r.Method == http.MethodPut && r.Header.Get("X-Upload-Id") != "" { 246 + h.handleBufferedPartUpload(w, r) 247 + return 248 + } 249 + 250 + // Multipart operations (JSON body) 251 + if strings.Contains(contentType, "application/json") { 252 + h.handleMultipartOperation(w, r) 253 + return 254 + } 255 + 256 + // Standard single blob upload (raw bytes) 257 + h.handleSingleBlobUpload(w, r) 258 + } 259 + 260 + func (h *XRPCHandler) handleMultipartOperation(w http.ResponseWriter, r *http.Request) { 261 + var req struct { 262 + Action string `json:"action"` 263 + Digest string `json:"digest,omitempty"` 264 + Size int64 `json:"size,omitempty"` 265 + UploadID string `json:"uploadId,omitempty"` 266 + PartNumber int `json:"partNumber,omitempty"` 267 + Parts []struct { 268 + PartNumber int `json:"partNumber"` 269 + ETag string `json:"etag"` 270 + } `json:"parts,omitempty"` 271 + } 272 + 273 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 274 + http.Error(w, fmt.Sprintf("invalid JSON: %v", err), http.StatusBadRequest) 275 + return 276 + } 277 + 278 + // TODO: Add authentication check 279 + // user, err := ValidateDPoPRequest(r) 280 + 281 + ctx := r.Context() 282 + 283 + switch req.Action { 284 + case "start": 285 + h.handleMultipartStart(w, r, req.Digest, req.Size) 286 + case "part": 287 + h.handleMultipartPart(w, r, req.UploadID, req.PartNumber, req.Digest) 288 + case "complete": 289 + h.handleMultipartComplete(w, r, req.UploadID, req.Digest, req.Parts) 290 + case "abort": 291 + h.handleMultipartAbort(w, r, req.UploadID, req.Digest) 292 + default: 293 + http.Error(w, "invalid action", http.StatusBadRequest) 294 + } 295 + } 296 + 297 + func (h *XRPCHandler) handleMultipartStart(w http.ResponseWriter, r *http.Request, digest string, size int64) { 298 + ctx := r.Context() 299 + 300 + // Use HoldService multipart manager 301 + // Note: h.blobStore is HoldServiceBlobStore which wraps the service 302 + uploadID, mode, err := h.blobStore.StartMultipart(ctx, digest, size) 303 + if err != nil { 304 + http.Error(w, fmt.Sprintf("failed to start upload: %v", err), http.StatusInternalServerError) 305 + return 306 + } 307 + 308 + response := map[string]any{ 309 + "uploadId": uploadID, 310 + "expiresAt": time.Now().Add(24 * time.Hour), 311 + "mode": mode, // "s3-native" or "buffered" 312 + } 313 + 314 + w.Header().Set("Content-Type", "application/json") 315 + json.NewEncoder(w).Encode(response) 316 + } 317 + 318 + func (h *XRPCHandler) handleMultipartPart(w http.ResponseWriter, r *http.Request, uploadID string, partNumber int, digest string) { 319 + ctx := r.Context() 320 + 321 + // Get part upload URL (presigned S3 or buffered endpoint) 322 + partURL, err := h.blobStore.GetPartUploadURL(ctx, uploadID, partNumber, digest) 323 + if err != nil { 324 + http.Error(w, fmt.Sprintf("failed to get part URL: %v", err), http.StatusInternalServerError) 325 + return 326 + } 327 + 328 + response := map[string]any{ 329 + "url": partURL, 330 + "expiresAt": time.Now().Add(15 * time.Minute), 331 + "method": "PUT", 332 + } 333 + 334 + w.Header().Set("Content-Type", "application/json") 335 + json.NewEncoder(w).Encode(response) 336 + } 337 + 338 + func (h *XRPCHandler) handleMultipartComplete(w http.ResponseWriter, r *http.Request, uploadID string, digest string, parts []struct{ PartNumber int; ETag string }) { 339 + ctx := r.Context() 340 + 341 + // Convert parts format 342 + completedParts := make([]hold.CompletedPart, len(parts)) 343 + for i, p := range parts { 344 + completedParts[i] = hold.CompletedPart{ 345 + PartNumber: p.PartNumber, 346 + ETag: p.ETag, 347 + } 348 + } 349 + 350 + // Complete upload 351 + if err := h.blobStore.CompleteMultipart(ctx, uploadID, digest, completedParts); err != nil { 352 + http.Error(w, fmt.Sprintf("failed to complete upload: %v", err), http.StatusInternalServerError) 353 + return 354 + } 355 + 356 + // Convert digest to CID for ATProto response format 357 + cid, err := digestToCID(digest) 358 + if err != nil { 359 + http.Error(w, fmt.Sprintf("failed to generate CID: %v", err), http.StatusInternalServerError) 360 + return 361 + } 362 + 363 + response := map[string]any{ 364 + "status": "completed", 365 + "blob": map[string]any{ 366 + "$type": "blob", 367 + "ref": map[string]any{ 368 + "$link": cid.String(), 369 + }, 370 + "mimeType": "application/octet-stream", 371 + // Size would need to be tracked in session 372 + }, 373 + } 374 + 375 + w.Header().Set("Content-Type", "application/json") 376 + json.NewEncoder(w).Encode(response) 377 + } 378 + 379 + func (h *XRPCHandler) handleMultipartAbort(w http.ResponseWriter, r *http.Request, uploadID string, digest string) { 380 + ctx := r.Context() 381 + 382 + if err := h.blobStore.AbortMultipart(ctx, uploadID, digest); err != nil { 383 + http.Error(w, fmt.Sprintf("failed to abort upload: %v", err), http.StatusInternalServerError) 384 + return 385 + } 386 + 387 + response := map[string]any{ 388 + "status": "aborted", 389 + } 390 + 391 + w.Header().Set("Content-Type", "application/json") 392 + json.NewEncoder(w).Encode(response) 393 + } 394 + 395 + func (h *XRPCHandler) handleBufferedPartUpload(w http.ResponseWriter, r *http.Request) { 396 + uploadID := r.Header.Get("X-Upload-Id") 397 + partNumberStr := r.Header.Get("X-Part-Number") 398 + 399 + partNumber, err := strconv.Atoi(partNumberStr) 400 + if err != nil { 401 + http.Error(w, "invalid part number", http.StatusBadRequest) 402 + return 403 + } 404 + 405 + // Stream part data to storage 406 + etag, err := h.blobStore.UploadPart(r.Context(), uploadID, partNumber, r.Body) 407 + if err != nil { 408 + http.Error(w, fmt.Sprintf("failed to upload part: %v", err), http.StatusInternalServerError) 409 + return 410 + } 411 + 412 + response := map[string]any{ 413 + "etag": etag, 414 + "partNumber": partNumber, 415 + } 416 + 417 + w.Header().Set("Content-Type", "application/json") 418 + json.NewEncoder(w).Encode(response) 419 + } 420 + 421 + func (h *XRPCHandler) handleSingleBlobUpload(w http.ResponseWriter, r *http.Request) { 422 + // Standard ATProto uploadBlob behavior 423 + // Read blob data 424 + data, err := io.ReadAll(r.Body) 425 + if err != nil { 426 + http.Error(w, "failed to read blob", http.StatusInternalServerError) 427 + return 428 + } 429 + 430 + // Upload to storage (single operation) 431 + cid, size, err := h.blobStore.UploadBlob(r.Context(), bytes.NewReader(data)) 432 + if err != nil { 433 + http.Error(w, fmt.Sprintf("failed to upload blob: %v", err), http.StatusInternalServerError) 434 + return 435 + } 436 + 437 + // Standard ATProto blob response format 438 + response := map[string]any{ 439 + "blob": map[string]any{ 440 + "$type": "blob", 441 + "ref": map[string]any{ 442 + "$link": cid.String(), 443 + }, 444 + "mimeType": "application/octet-stream", 445 + "size": size, 446 + }, 447 + } 448 + 449 + w.Header().Set("Content-Type", "application/json") 450 + json.NewEncoder(w).Encode(response) 451 + } 452 + 453 + // digestToCID converts OCI digest (sha256:abc...) to ATProto CID 454 + func digestToCID(digest string) (cid.Cid, error) { 455 + // Implementation in pkg/hold/cid.go or similar 456 + // Strip "sha256:" prefix, decode hex, construct CIDv1 with sha256 multihash 457 + return cid.Undef, fmt.Errorf("not implemented") 458 + } 459 + ``` 460 + 461 + ### Phase 2: Extend HoldServiceBlobStore (pkg/hold/blobstore_adapter.go) 462 + 463 + The `HoldServiceBlobStore` currently wraps HoldService for presigned URLs. Extend it to support multipart operations: 464 + 465 + ```go 466 + // Add multipart methods to HoldServiceBlobStore 467 + 468 + func (h *HoldServiceBlobStore) StartMultipart(ctx context.Context, digest string, size int64) (uploadID string, mode string, err error) { 469 + uploadID, uploadMode, err := h.service.StartMultipartUploadWithManager(ctx, digest, h.service.MultipartMgr) 470 + if err != nil { 471 + return "", "", err 472 + } 473 + 474 + modeStr := "s3-native" 475 + if uploadMode == hold.Buffered { 476 + modeStr = "buffered" 477 + } 478 + 479 + return uploadID, modeStr, nil 480 + } 481 + 482 + func (h *HoldServiceBlobStore) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, digest string) (string, error) { 483 + session, err := h.service.MultipartMgr.GetSession(uploadID) 484 + if err != nil { 485 + return "", err 486 + } 487 + 488 + // For S3Native: return presigned URL 489 + // For Buffered: return self-referential URL with upload instructions 490 + if session.Mode == hold.S3Native { 491 + return h.service.GetPartUploadURL(ctx, session, partNumber, h.holdDID) 492 + } 493 + 494 + // Buffered mode: client will PUT to uploadBlob with headers 495 + return fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", h.publicURL), nil 496 + } 497 + 498 + func (h *HoldServiceBlobStore) UploadPart(ctx context.Context, uploadID string, partNumber int, data io.Reader) (string, error) { 499 + // Buffered part upload - streams data to storage 500 + // Used when client PUTs to uploadBlob with X-Upload-Id header 501 + session, err := h.service.MultipartMgr.GetSession(uploadID) 502 + if err != nil { 503 + return "", err 504 + } 505 + 506 + // Stream to storage, return ETag 507 + // This wraps HandleMultipartPartUpload logic 508 + etag, err := h.service.UploadPartBuffered(ctx, session, partNumber, data) 509 + return etag, err 510 + } 511 + 512 + func (h *HoldServiceBlobStore) CompleteMultipart(ctx context.Context, uploadID string, digest string, parts []hold.CompletedPart) error { 513 + session, err := h.service.MultipartMgr.GetSession(uploadID) 514 + if err != nil { 515 + return err 516 + } 517 + 518 + // For S3Native: record parts ETags 519 + if session.Mode == hold.S3Native { 520 + for _, p := range parts { 521 + session.RecordS3Part(p.PartNumber, p.ETag, 0) 522 + } 523 + } 524 + 525 + return h.service.CompleteMultipartUploadWithManager(ctx, session, h.service.MultipartMgr) 526 + } 527 + 528 + func (h *HoldServiceBlobStore) AbortMultipart(ctx context.Context, uploadID string, digest string) error { 529 + session, err := h.service.MultipartMgr.GetSession(uploadID) 530 + if err != nil { 531 + return err 532 + } 533 + 534 + return h.service.AbortMultipartUploadWithManager(ctx, session, h.service.MultipartMgr) 535 + } 536 + 537 + func (h *HoldServiceBlobStore) UploadBlob(ctx context.Context, data io.Reader) (cid.Cid, int64, error) { 538 + // Single blob upload for standard ATProto use case 539 + // Compute digest, store via service driver 540 + // Return CID and size 541 + // Implementation TBD 542 + return cid.Undef, 0, fmt.Errorf("not implemented") 543 + } 544 + ``` 545 + 546 + ### Phase 3: Update AppView Client (pkg/appview/storage/) 547 + 548 + Create new XRPC client or update ProxyBlobStore to use unified endpoint: 549 + 550 + ```go 551 + // In ProxyBlobStore or new XRPCBlobStore 552 + 553 + func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, digest string) (string, error) { 554 + reqBody := map[string]any{ 555 + "action": "start", 556 + "digest": digest, 557 + } 558 + 559 + body, _ := json.Marshal(reqBody) 560 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.storageEndpoint) 561 + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 562 + req.Header.Set("Content-Type", "application/json") 563 + 564 + resp, err := p.httpClient.Do(req) 565 + // ... parse response, return uploadID 566 + } 567 + 568 + func (p *ProxyBlobStore) getPartPresignedURL(ctx context.Context, digest, uploadID string, partNumber int) (string, error) { 569 + reqBody := map[string]any{ 570 + "action": "part", 571 + "uploadId": uploadID, 572 + "partNumber": partNumber, 573 + "digest": digest, 574 + } 575 + 576 + body, _ := json.Marshal(reqBody) 577 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.storageEndpoint) 578 + req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 579 + req.Header.Set("Content-Type", "application/json") 580 + 581 + resp, err := p.httpClient.Do(req) 582 + // ... parse response, return presigned URL 583 + } 584 + 585 + // Similar for complete, abort 586 + ``` 587 + 588 + ### Phase 4: Testing Period 589 + 590 + **During transition:** 591 + - Both legacy HTTP endpoints AND new XRPC endpoint active 592 + - AppView can use either based on configuration/feature flag 593 + - New deployments use XRPC 594 + - Old deployments continue with legacy 595 + 596 + **Detection logic:** 597 + ```go 598 + func (r *RoutingRepository) Blobs(ctx context.Context) distribution.BlobStore { 599 + // Try XRPC first (check for /.well-known/did.json) 600 + if supportsXRPC(storageEndpoint) { 601 + return NewXRPCBlobStore(storageEndpoint, ...) 602 + } 603 + // Fallback to legacy 604 + return NewProxyBlobStore(storageEndpoint, ...) 605 + } 606 + ``` 607 + 608 + ### Phase 5: Remove Legacy Endpoints 609 + 610 + Once all holds migrated and tested: 611 + 612 + **cmd/hold/main.go - Remove:** 613 + ```go 614 + // DELETE these lines 615 + mux.HandleFunc("/start-multipart", service.HandleStartMultipart) 616 + mux.HandleFunc("/part-presigned-url", service.HandleGetPartURL) 617 + mux.HandleFunc("/complete-multipart", service.HandleCompleteMultipart) 618 + mux.HandleFunc("/abort-multipart", service.HandleAbortMultipart) 619 + mux.HandleFunc("/multipart-parts/", ...) 620 + ``` 621 + 622 + **pkg/hold/handlers.go - Mark as deprecated:** 623 + ```go 624 + // Keep methods for now (used by service internals) 625 + // But remove HTTP handler wrappers 626 + ``` 627 + 628 + ## Key Design Decisions 629 + 630 + 1. **Content-Type discrimination**: Natural way to distinguish single vs multipart uploads 631 + 2. **JSON bodies for multipart**: Follows XRPC conventions (like putRecord, deleteRecord) 632 + 3. **Preserve standard uploadBlob**: Raw bytes still work for profile images, small media 633 + 4. **Reuse existing code**: HoldService multipart logic unchanged, just new HTTP layer 634 + 5. **Backward compatibility**: Both endpoints active during transition 635 + 6. **Action-based routing**: Clear, extensible JSON structure 636 + 637 + ## Benefits 638 + 639 + - ✅ Single endpoint for all blob operations 640 + - ✅ Standard ATProto uploadBlob preserved 641 + - ✅ XRPC-like JSON request/response 642 + - ✅ Reuses existing multipart.go logic 643 + - ✅ Gradual migration path 644 + - ✅ Less endpoints to maintain 645 + - ✅ Cleaner AppView client code 646 + 647 + ## Testing Checklist 648 + 649 + - [ ] Single blob upload (< 10MB, raw bytes) 650 + - [ ] Multipart start → part → complete flow 651 + - [ ] S3Native mode (presigned URLs) 652 + - [ ] Buffered mode (proxy uploads) 653 + - [ ] Multipart abort 654 + - [ ] Large blob upload (> 5GB, many parts) 655 + - [ ] Concurrent uploads 656 + - [ ] Upload resume after network failure 657 + - [ ] Legacy endpoint backward compatibility 658 + - [ ] AppView XRPC client integration 659 + - [ ] Performance comparison (XRPC vs legacy) 660 + 661 + ## Migration Timeline 662 + 663 + 1. **Week 1**: Implement unified uploadBlob handler (Phase 1-2) 664 + 2. **Week 2**: Update AppView client, feature flag (Phase 3) 665 + 3. **Week 3**: Deploy to dev/staging, test both paths (Phase 4) 666 + 4. **Week 4**: Roll out to production (gradual) 667 + 5. **Week 5-6**: Monitor, verify all holds migrated 668 + 6. **Week 7**: Remove legacy endpoints (Phase 5) 669 + 670 + ## References 671 + 672 + - ATProto uploadBlob spec: https://docs.bsky.app/docs/api/com-atproto-repo-upload-blob 673 + - XRPC conventions: https://atproto.com/specs/xrpc 674 + - Existing multipart implementation: pkg/hold/multipart.go 675 + - Blob store adapter: pkg/hold/blobstore_adapter.go
-91
pkg/hold/handlers.go
··· 8 8 "log" 9 9 "net/http" 10 10 "time" 11 - 12 - "atcr.io/pkg/atproto" 13 11 ) 14 12 15 13 // PresignedURLOperation defines the type of presigned URL operation ··· 496 494 "status": "aborted", 497 495 }) 498 496 } 499 - 500 - // RegisterRequest represents a request to register this hold in a user's PDS 501 - type RegisterRequest struct { 502 - DID string `json:"did"` 503 - AccessToken string `json:"access_token"` 504 - PDSEndpoint string `json:"pds_endpoint"` 505 - } 506 - 507 - // RegisterResponse contains the registration result 508 - type RegisterResponse struct { 509 - HoldURI string `json:"hold_uri"` 510 - CrewURI string `json:"crew_uri"` 511 - Message string `json:"message"` 512 - } 513 - 514 - // HandleRegister registers this hold service in a user's PDS (manual endpoint) 515 - func (s *HoldService) HandleRegister(w http.ResponseWriter, r *http.Request) { 516 - if r.Method != http.MethodPost { 517 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 518 - return 519 - } 520 - 521 - var req RegisterRequest 522 - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 523 - http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) 524 - return 525 - } 526 - 527 - // Validate required fields 528 - if req.DID == "" || req.AccessToken == "" || req.PDSEndpoint == "" { 529 - http.Error(w, "missing required fields: did, access_token, pds_endpoint", http.StatusBadRequest) 530 - return 531 - } 532 - 533 - // Get public URL from config 534 - publicURL := s.config.Server.PublicURL 535 - if publicURL == "" { 536 - // Fallback to constructing URL from request 537 - scheme := "http" 538 - if r.TLS != nil { 539 - scheme = "https" 540 - } 541 - publicURL = fmt.Sprintf("%s://%s", scheme, r.Host) 542 - } 543 - 544 - // Derive hold name from URL 545 - holdName, err := extractHostname(publicURL) 546 - if err != nil { 547 - http.Error(w, fmt.Sprintf("failed to extract hostname: %v", err), http.StatusBadRequest) 548 - return 549 - } 550 - 551 - ctx := r.Context() 552 - 553 - // Create ATProto client with user's credentials 554 - client := atproto.NewClient(req.PDSEndpoint, req.DID, req.AccessToken) 555 - 556 - // Create HoldRecord 557 - holdRecord := atproto.NewHoldRecord(publicURL, req.DID, s.config.Server.Public) 558 - 559 - holdResult, err := client.PutRecord(ctx, atproto.HoldCollection, holdName, holdRecord) 560 - if err != nil { 561 - http.Error(w, fmt.Sprintf("failed to create hold record: %v", err), http.StatusInternalServerError) 562 - return 563 - } 564 - 565 - log.Printf("Created hold record: %s", holdResult.URI) 566 - 567 - // Create HoldCrewRecord for the owner 568 - crewRecord := atproto.NewHoldCrewRecord(holdResult.URI, req.DID, "owner") 569 - 570 - crewRKey := fmt.Sprintf("%s-%s", holdName, req.DID) 571 - crewResult, err := client.PutRecord(ctx, atproto.HoldCrewCollection, crewRKey, crewRecord) 572 - if err != nil { 573 - http.Error(w, fmt.Sprintf("failed to create crew record: %v", err), http.StatusInternalServerError) 574 - return 575 - } 576 - 577 - log.Printf("Created crew record: %s", crewResult.URI) 578 - 579 - resp := RegisterResponse{ 580 - HoldURI: holdResult.URI, 581 - CrewURI: crewResult.URI, 582 - Message: fmt.Sprintf("Successfully registered hold service. Storage endpoint: %s", publicURL), 583 - } 584 - 585 - w.Header().Set("Content-Type", "application/json") 586 - json.NewEncoder(w).Encode(resp) 587 - }
-40
pkg/hold/patterns.go
··· 1 - package hold 2 - 3 - import ( 4 - "regexp" 5 - "strings" 6 - ) 7 - 8 - // matchPattern checks if a handle matches a pattern 9 - // Supports wildcards: "*" (all), "*.domain.com" (suffix), "prefix.*" (prefix), "*.mid.*" (contains) 10 - func matchPattern(pattern, handle string) bool { 11 - if pattern == "*" { 12 - // Wildcard matches all 13 - return true 14 - } 15 - 16 - // Convert glob to regex and match 17 - regex := globToRegex(pattern) 18 - matched, err := regexp.MatchString(regex, handle) 19 - if err != nil { 20 - // Log error but fail closed (don't grant access on regex error) 21 - return false 22 - } 23 - return matched 24 - } 25 - 26 - // globToRegex converts a glob pattern to a regex pattern 27 - // Examples: 28 - // - "*.example.com" → "^.*\.example\.com$" 29 - // - "subdomain.*" → "^subdomain\..*$" 30 - // - "*.bsky.*" → "^.*\.bsky\..*$" 31 - func globToRegex(pattern string) string { 32 - // Escape special regex characters (except *) 33 - escaped := regexp.QuoteMeta(pattern) 34 - 35 - // Replace escaped \* with .* 36 - regex := strings.ReplaceAll(escaped, "\\*", ".*") 37 - 38 - // Anchor to start and end 39 - return "^" + regex + "$" 40 - }
-7
pkg/hold/service.go
··· 4 4 "context" 5 5 "fmt" 6 6 "log" 7 - "net/http" 8 7 "net/url" 9 8 10 9 "atcr.io/pkg/auth" ··· 94 93 return false 95 94 } 96 95 return allowed 97 - } 98 - 99 - // HealthHandler handles health check requests 100 - func (s *HoldService) HealthHandler(w http.ResponseWriter, r *http.Request) { 101 - w.Header().Set("Content-Type", "application/json") 102 - w.Write([]byte(`{"status":"ok"}`)) 103 96 } 104 97 105 98 // extractHostname extracts the hostname from a URL