A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
81
fork

Configure Feed

Select the types of activity you want to include in your feed.

lots of unit testing for xrpc endpoints. start pointing appview to the new endpoints. remove legacy api endpoints

+1840 -851
-8
CLAUDE.md
··· 368 368 369 369 Key insight: "Private" gates anonymous access, not authenticated access. This reflects ATProto's current limitation (no private PDS records yet). 370 370 371 - **Endpoints:** 372 - - `POST /get-presigned-url` - Get download URL for blob 373 - - `POST /put-presigned-url` - Get upload URL for blob 374 - - `GET /blobs/{digest}` - Proxy download (fallback if no presigned URL support) 375 - - `PUT /blobs/{digest}` - Proxy upload (fallback) 376 - - `POST /register` - Manual registration endpoint 377 - - `GET /health` - Health check 378 - 379 371 **Embedded PDS Endpoints:** 380 372 381 373 Each hold service includes an embedded PDS (Personal Data Server) that stores captain + crew records:
+1 -1
Dockerfile.appview
··· 1 - FROM golang:1.25.2-trixie AS builder 1 + FROM docker.io/golang:1.25.2-trixie AS builder 2 2 3 3 RUN apt-get update && \ 4 4 apt-get install -y --no-install-recommends sqlite3 libsqlite3-dev && \
+1 -1
Dockerfile.hold
··· 1 - FROM golang:1.25.2-trixie AS builder 1 + FROM docker.io/golang:1.25.2-trixie AS builder 2 2 3 3 RUN apt-get update && \ 4 4 apt-get install -y --no-install-recommends sqlite3 libsqlite3-dev && \
+1 -51
cmd/hold/main.go
··· 5 5 "fmt" 6 6 "log" 7 7 "net/http" 8 - "strconv" 9 - "strings" 10 8 11 9 "atcr.io/pkg/hold" 12 10 "atcr.io/pkg/hold/pds" ··· 66 64 if holdPDS != nil { 67 65 holdDID := holdPDS.DID() 68 66 blobStore := hold.NewHoldServiceBlobStore(service, holdDID) 69 - xrpcHandler = pds.NewXRPCHandler(holdPDS, cfg.Server.PublicURL, blobStore, broadcaster) 67 + xrpcHandler = pds.NewXRPCHandler(holdPDS, cfg.Server.PublicURL, blobStore, broadcaster, nil) 70 68 } 71 69 72 70 // Setup HTTP routes ··· 80 78 return 81 79 } 82 80 http.NotFound(w, r) 83 - }) 84 - 85 - mux.HandleFunc("/presigned-url", service.HandlePresignedURL) 86 - mux.HandleFunc("/move", service.HandleMove) 87 - 88 - // Multipart upload endpoints 89 - mux.HandleFunc("/start-multipart", service.HandleStartMultipart) 90 - mux.HandleFunc("/part-presigned-url", service.HandleGetPartURL) 91 - mux.HandleFunc("/complete-multipart", service.HandleCompleteMultipart) 92 - mux.HandleFunc("/abort-multipart", service.HandleAbortMultipart) 93 - 94 - // Buffered multipart part upload endpoint (for when presigned URLs are disabled/unavailable) 95 - mux.HandleFunc("/multipart-parts/", func(w http.ResponseWriter, r *http.Request) { 96 - if r.Method != http.MethodPut { 97 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 98 - return 99 - } 100 - 101 - // Parse URL: /multipart-parts/{uploadID}/{partNumber} 102 - path := r.URL.Path[len("/multipart-parts/"):] 103 - parts := strings.Split(path, "/") 104 - if len(parts) != 2 { 105 - http.Error(w, "invalid path format, expected /multipart-parts/{uploadID}/{partNumber}", http.StatusBadRequest) 106 - return 107 - } 108 - 109 - uploadID := parts[0] 110 - partNumber, err := strconv.Atoi(parts[1]) 111 - if err != nil { 112 - http.Error(w, fmt.Sprintf("invalid part number: %v", err), http.StatusBadRequest) 113 - return 114 - } 115 - 116 - // Get DID from query param 117 - did := r.URL.Query().Get("did") 118 - 119 - service.HandleMultipartPartUpload(w, r, uploadID, partNumber, did, service.MultipartMgr) 120 - }) 121 - 122 - mux.HandleFunc("/blobs/", func(w http.ResponseWriter, r *http.Request) { 123 - switch r.Method { 124 - case http.MethodGet, http.MethodHead: 125 - service.HandleProxyGet(w, r) 126 - case http.MethodPut: 127 - service.HandleProxyPut(w, r) 128 - default: 129 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 130 - } 131 81 }) 132 82 133 83 // Register XRPC/ATProto PDS endpoints if PDS is initialized
+1 -1
pkg/appview/middleware/registry.go
··· 60 60 distribution.Namespace 61 61 directory identity.Directory 62 62 defaultStorageEndpoint string 63 - testMode bool // If true, fallback to default hold when user's hold is unreachable 63 + testMode bool // If true, fallback to default hold when user's hold is unreachable 64 64 repositories sync.Map // Cache of RoutingRepository instances by key (did:reponame) 65 65 } 66 66
+109 -144
pkg/appview/storage/proxy_blob_store.go
··· 202 202 }, nil 203 203 } 204 204 205 - // Put stores a blob 205 + // Put stores a blob using the multipart upload flow 206 + // This ensures all uploads go through the same XRPC path 206 207 func (p *ProxyBlobStore) Put(ctx context.Context, mediaType string, content []byte) (distribution.Descriptor, error) { 207 - // Check write access 208 + // Check write access (fast-fail before starting multipart upload) 208 209 if err := p.checkWriteAccess(ctx); err != nil { 209 210 return distribution.Descriptor{}, err 210 211 } ··· 212 213 // Calculate digest 213 214 dgst := digest.FromBytes(content) 214 215 215 - // Get upload URL 216 - url, err := p.getUploadURL(ctx, dgst, int64(len(content))) 216 + // Use Create() flow for all uploads (goes through multipart XRPC endpoints) 217 + writer, err := p.Create(ctx) 217 218 if err != nil { 218 - fmt.Printf("[proxy_blob_store/Put] Failed to get upload URL: digest=%s, error=%v\n", dgst, err) 219 + fmt.Printf("[proxy_blob_store/Put] Failed to create writer: %v\n", err) 219 220 return distribution.Descriptor{}, err 220 221 } 221 222 222 - // Upload the blob 223 - req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(content)) 224 - if err != nil { 225 - fmt.Printf("[proxy_blob_store/Put] Failed to create request: %v\n", err) 223 + // Write the content 224 + if _, err := writer.Write(content); err != nil { 225 + writer.Cancel(ctx) 226 + fmt.Printf("[proxy_blob_store/Put] Failed to write content: %v\n", err) 226 227 return distribution.Descriptor{}, err 227 228 } 228 - req.Header.Set("Content-Type", "application/octet-stream") 229 229 230 - resp, err := p.httpClient.Do(req) 230 + // Commit with the calculated digest 231 + desc, err := writer.Commit(ctx, distribution.Descriptor{ 232 + Digest: dgst, 233 + Size: int64(len(content)), 234 + MediaType: mediaType, 235 + }) 231 236 if err != nil { 232 - fmt.Printf("[proxy_blob_store/Put] HTTP request failed: %v\n", err) 237 + fmt.Printf("[proxy_blob_store/Put] Failed to commit: %v\n", err) 233 238 return distribution.Descriptor{}, err 234 239 } 235 - defer resp.Body.Close() 236 - 237 - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 238 - bodyBytes, _ := io.ReadAll(resp.Body) 239 - fmt.Printf(" Error Body: %s\n", string(bodyBytes)) 240 - return distribution.Descriptor{}, fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(bodyBytes)) 241 - } 242 240 243 241 fmt.Printf("[proxy_blob_store/Put] Upload successful: digest=%s, size=%d\n", dgst, len(content)) 244 - 245 - return distribution.Descriptor{ 246 - Digest: dgst, 247 - Size: int64(len(content)), 248 - MediaType: mediaType, 249 - }, nil 242 + return desc, nil 250 243 } 251 244 252 245 // Delete removes a blob ··· 348 341 return writer, nil 349 342 } 350 343 351 - // getPresignedURL requests a presigned URL from the storage service for any operation 352 - func (p *ProxyBlobStore) getPresignedURL(ctx context.Context, operation, dgst string, size int64) (string, error) { 353 - reqBody := map[string]any{ 354 - "operation": operation, 355 - "did": p.did, 356 - "digest": dgst, 357 - } 358 - 359 - // Only include size for PUT operations 360 - if size > 0 { 361 - reqBody["size"] = size 362 - } 363 - 364 - body, err := json.Marshal(reqBody) 365 - if err != nil { 366 - return "", err 367 - } 368 - 369 - url := fmt.Sprintf("%s/presigned-url", p.storageEndpoint) 370 - req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 371 - if err != nil { 372 - return "", err 373 - } 374 - req.Header.Set("Content-Type", "application/json") 375 - 376 - resp, err := p.httpClient.Do(req) 377 - if err != nil { 378 - return "", err 379 - } 380 - defer resp.Body.Close() 381 - 382 - if resp.StatusCode != http.StatusOK { 383 - return "", fmt.Errorf("failed to get presigned URL: status %d", resp.StatusCode) 384 - } 385 - 386 - var result struct { 387 - URL string `json:"url"` 388 - } 389 - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 390 - return "", err 391 - } 392 - 393 - return result.URL, nil 394 - } 395 - 396 - // getDownloadURL requests a presigned download URL from the storage service 344 + // getDownloadURL returns the XRPC getBlob URL for downloading a blob 345 + // The hold service will redirect to a presigned S3 URL 397 346 func (p *ProxyBlobStore) getDownloadURL(ctx context.Context, dgst digest.Digest) (string, error) { 398 - return p.getPresignedURL(ctx, "GET", dgst.String(), 0) 347 + // Use XRPC endpoint: GET /xrpc/com.atproto.sync.getBlob?did={holdDID}&cid={digest} 348 + // Per migration doc: hold accepts OCI digest directly as cid parameter (checks for sha256: prefix) 349 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 350 + p.storageEndpoint, p.holdDID, dgst.String()) 351 + return url, nil 399 352 } 400 353 401 - // getHeadURL requests a presigned HEAD URL from the storage service 354 + // getHeadURL returns the XRPC getBlob URL for HEAD requests 355 + // The hold service will redirect to a presigned S3 URL 402 356 func (p *ProxyBlobStore) getHeadURL(ctx context.Context, dgst digest.Digest) (string, error) { 403 - return p.getPresignedURL(ctx, "HEAD", dgst.String(), 0) 357 + // Same as GET - hold service handles HEAD method on getBlob endpoint 358 + url := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 359 + p.storageEndpoint, p.holdDID, dgst.String()) 360 + return url, nil 404 361 } 405 362 406 - // getUploadURL requests a presigned upload URL from the storage service 363 + // getUploadURL is deprecated - single blob uploads should use Create() instead 364 + // XRPC migration: No direct presigned upload URL endpoint, use multipart flow for all uploads 407 365 func (p *ProxyBlobStore) getUploadURL(ctx context.Context, dgst digest.Digest, size int64) (string, error) { 408 - fmt.Printf("DEBUG [proxy_blob_store/getUploadURL]: storageEndpoint=%s, digest=%s\n", p.storageEndpoint, dgst) 409 - url, err := p.getPresignedURL(ctx, "PUT", dgst.String(), size) 410 - if err == nil { 411 - fmt.Printf("DEBUG [proxy_blob_store/getUploadURL]: Got presigned URL=%s\n", url) 412 - } 413 - return url, err 366 + return "", fmt.Errorf("single blob upload via Put() not supported with XRPC endpoints - use Create() instead") 414 367 } 415 368 416 - // startMultipartUpload initiates a multipart upload via hold service 369 + // startMultipartUpload initiates a multipart upload via XRPC uploadBlob endpoint 417 370 func (p *ProxyBlobStore) startMultipartUpload(ctx context.Context, digest string) (string, error) { 418 371 reqBody := map[string]any{ 419 - "did": p.did, 372 + "action": "start", 420 373 "digest": digest, 421 374 } 422 375 ··· 425 378 return "", err 426 379 } 427 380 428 - url := fmt.Sprintf("%s/start-multipart", p.storageEndpoint) 381 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.storageEndpoint) 429 382 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 430 383 if err != nil { 431 384 return "", err ··· 444 397 } 445 398 446 399 var result struct { 447 - UploadID string `json:"upload_id"` 400 + UploadID string `json:"uploadId"` 401 + Mode string `json:"mode"` 448 402 } 449 403 if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 450 404 return "", err ··· 453 407 return result.UploadID, nil 454 408 } 455 409 456 - // getPartPresignedURL gets a presigned URL for uploading a specific part 457 - func (p *ProxyBlobStore) getPartPresignedURL(ctx context.Context, digest, uploadID string, partNumber int) (string, error) { 410 + // PartUploadInfo contains structured information for uploading a part 411 + type PartUploadInfo struct { 412 + URL string `json:"url"` 413 + Method string `json:"method,omitempty"` 414 + Headers map[string]string `json:"headers,omitempty"` 415 + } 416 + 417 + // getPartUploadInfo gets structured upload info for uploading a specific part via XRPC 418 + func (p *ProxyBlobStore) getPartUploadInfo(ctx context.Context, digest, uploadID string, partNumber int) (*PartUploadInfo, error) { 458 419 reqBody := map[string]any{ 459 - "did": p.did, 460 - "digest": digest, 461 - "upload_id": uploadID, 462 - "part_number": partNumber, 420 + "action": "part", 421 + "uploadId": uploadID, 422 + "partNumber": partNumber, 423 + "digest": digest, 463 424 } 464 425 465 426 body, err := json.Marshal(reqBody) 466 427 if err != nil { 467 - return "", err 428 + return nil, err 468 429 } 469 430 470 - url := fmt.Sprintf("%s/part-presigned-url", p.storageEndpoint) 431 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.storageEndpoint) 471 432 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 472 433 if err != nil { 473 - return "", err 434 + return nil, err 474 435 } 475 436 req.Header.Set("Content-Type", "application/json") 476 437 477 438 resp, err := p.httpClient.Do(req) 478 439 if err != nil { 479 - return "", err 440 + return nil, err 480 441 } 481 442 defer resp.Body.Close() 482 443 483 444 if resp.StatusCode != http.StatusOK { 484 445 bodyBytes, _ := io.ReadAll(resp.Body) 485 - return "", fmt.Errorf("get part URL failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 446 + return nil, fmt.Errorf("get part URL failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 486 447 } 487 448 488 - var result struct { 489 - URL string `json:"url"` 490 - } 491 - if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { 492 - return "", err 449 + var uploadInfo PartUploadInfo 450 + if err := json.NewDecoder(resp.Body).Decode(&uploadInfo); err != nil { 451 + return nil, err 493 452 } 494 453 495 - return result.URL, nil 454 + return &uploadInfo, nil 496 455 } 497 456 498 - // completeMultipartUpload completes a multipart upload via hold service 457 + // completeMultipartUpload completes a multipart upload via XRPC uploadBlob endpoint 458 + // The XRPC complete action handles the move from temp to final location internally 499 459 func (p *ProxyBlobStore) completeMultipartUpload(ctx context.Context, digest, uploadID string, parts []CompletedPart) error { 460 + // Convert parts to XRPC format (partNumber instead of part_number) 461 + xrpcParts := make([]map[string]any, len(parts)) 462 + for i, part := range parts { 463 + xrpcParts[i] = map[string]any{ 464 + "partNumber": part.PartNumber, 465 + "etag": part.ETag, 466 + } 467 + } 468 + 500 469 reqBody := map[string]any{ 501 - "did": p.did, 502 - "digest": digest, 503 - "upload_id": uploadID, 504 - "parts": parts, 470 + "action": "complete", 471 + "uploadId": uploadID, 472 + "digest": digest, 473 + "parts": xrpcParts, 505 474 } 506 475 507 476 body, err := json.Marshal(reqBody) ··· 509 478 return err 510 479 } 511 480 512 - url := fmt.Sprintf("%s/complete-multipart", p.storageEndpoint) 481 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.storageEndpoint) 513 482 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 514 483 if err != nil { 515 484 return err ··· 530 499 return nil 531 500 } 532 501 533 - // abortMultipartUpload aborts a multipart upload via hold service 502 + // abortMultipartUpload aborts a multipart upload via XRPC uploadBlob endpoint 534 503 func (p *ProxyBlobStore) abortMultipartUpload(ctx context.Context, digest, uploadID string) error { 535 504 reqBody := map[string]any{ 536 - "did": p.did, 537 - "digest": digest, 538 - "upload_id": uploadID, 505 + "action": "abort", 506 + "uploadId": uploadID, 507 + "digest": digest, 539 508 } 540 509 541 510 body, err := json.Marshal(reqBody) ··· 543 512 return err 544 513 } 545 514 546 - url := fmt.Sprintf("%s/abort-multipart", p.storageEndpoint) 515 + url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", p.storageEndpoint) 547 516 req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) 548 517 if err != nil { 549 518 return err ··· 624 593 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) 625 594 defer cancel() 626 595 627 - // Get presigned URL for this part 596 + // Get structured upload info for this part 628 597 tempDigest := fmt.Sprintf("uploads/temp-%s", w.id) 629 - url, err := w.store.getPartPresignedURL(ctx, tempDigest, w.uploadID, w.partNumber) 598 + uploadInfo, err := w.store.getPartUploadInfo(ctx, tempDigest, w.uploadID, w.partNumber) 630 599 if err != nil { 631 - return fmt.Errorf("failed to get part presigned URL: %w", err) 600 + return fmt.Errorf("failed to get part upload info: %w", err) 632 601 } 633 602 634 - // Upload part to S3 635 - req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewReader(w.buffer.Bytes())) 603 + // Determine HTTP method (default to PUT) 604 + method := uploadInfo.Method 605 + if method == "" { 606 + method = "PUT" 607 + } 608 + 609 + // Upload part (either to S3 presigned URL or back to XRPC with headers) 610 + req, err := http.NewRequestWithContext(ctx, method, uploadInfo.URL, bytes.NewReader(w.buffer.Bytes())) 636 611 if err != nil { 637 612 return err 638 613 } 639 614 req.Header.Set("Content-Type", "application/octet-stream") 615 + 616 + // Apply any additional headers from the response (for buffered mode) 617 + for key, value := range uploadInfo.Headers { 618 + req.Header.Set(key, value) 619 + } 640 620 641 621 resp, err := w.store.httpClient.Do(req) 642 622 if err != nil { ··· 650 630 } 651 631 652 632 // Store ETag for completion 633 + // For buffered mode, ETag might be in JSON response body 653 634 etag := resp.Header.Get("ETag") 654 635 if etag == "" { 655 - return fmt.Errorf("no ETag in response") 636 + // Try to parse JSON response for buffered mode 637 + var result struct { 638 + ETag string `json:"etag"` 639 + } 640 + if err := json.NewDecoder(resp.Body).Decode(&result); err == nil && result.ETag != "" { 641 + etag = result.ETag 642 + } else { 643 + return fmt.Errorf("no ETag in response") 644 + } 656 645 } 657 646 658 647 w.parts = append(w.parts, CompletedPart{ ··· 727 716 } 728 717 } 729 718 730 - // Complete multipart upload at temp location 719 + // Complete multipart upload - XRPC complete action handles move internally 731 720 tempDigest := fmt.Sprintf("uploads/temp-%s", w.id) 732 721 fmt.Printf("🔒 [Commit] Completing multipart upload: uploadID=%s, parts=%d\n", w.uploadID, len(w.parts)) 733 722 if err := w.store.completeMultipartUpload(ctx, tempDigest, w.uploadID, w.parts); err != nil { 734 723 return distribution.Descriptor{}, fmt.Errorf("failed to complete multipart upload: %w", err) 735 - } 736 - 737 - // Move from temp → final location (server-side S3 copy) 738 - tempPath := fmt.Sprintf("uploads/temp-%s", w.id) 739 - finalPath := desc.Digest.String() 740 - 741 - fmt.Printf("[Commit] Moving blob: %s → %s\n", tempPath, finalPath) 742 - moveURL := fmt.Sprintf("%s/move?from=%s&to=%s&did=%s", 743 - w.store.storageEndpoint, tempPath, finalPath, w.store.did) 744 - 745 - req, err := http.NewRequestWithContext(ctx, "POST", moveURL, nil) 746 - if err != nil { 747 - return distribution.Descriptor{}, fmt.Errorf("failed to create move request: %w", err) 748 - } 749 - 750 - resp, err := w.store.httpClient.Do(req) 751 - if err != nil { 752 - return distribution.Descriptor{}, fmt.Errorf("failed to move blob: %w", err) 753 - } 754 - defer resp.Body.Close() 755 - 756 - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { 757 - bodyBytes, _ := io.ReadAll(resp.Body) 758 - return distribution.Descriptor{}, fmt.Errorf("move blob failed: status %d, body: %s", resp.StatusCode, string(bodyBytes)) 759 724 } 760 725 761 726 fmt.Printf("[Commit] Upload completed successfully: digest=%s, size=%d, parts=%d\n", desc.Digest, w.size, len(w.parts))
+1 -1
pkg/atproto/client.go
··· 25 25 did string 26 26 accessToken string // For Basic Auth only 27 27 httpClient *http.Client 28 - useIndigoClient bool // true if using indigo's OAuth client (handles auth automatically) 28 + useIndigoClient bool // true if using indigo's OAuth client (handles auth automatically) 29 29 indigoClient *atclient.APIClient // indigo's API client for OAuth requests 30 30 } 31 31
+6 -6
pkg/atproto/lexicon.go
··· 394 394 // Uses CBOR encoding for efficient storage in hold's carstore 395 395 type CaptainRecord struct { 396 396 Type string `json:"$type" cborgen:"$type"` 397 - Owner string `json:"owner" cborgen:"owner"` // DID of hold owner 398 - Public bool `json:"public" cborgen:"public"` // Public read access 399 - AllowAllCrew bool `json:"allowAllCrew" cborgen:"allowAllCrew"` // Allow any authenticated user to register as crew 400 - DeployedAt string `json:"deployedAt" cborgen:"deployedAt"` // RFC3339 timestamp 401 - Region string `json:"region,omitempty" cborgen:"region,omitempty"` // S3 region (optional) 402 - Provider string `json:"provider,omitempty" cborgen:"provider,omitempty"` // Deployment provider (optional) 397 + Owner string `json:"owner" cborgen:"owner"` // DID of hold owner 398 + Public bool `json:"public" cborgen:"public"` // Public read access 399 + AllowAllCrew bool `json:"allowAllCrew" cborgen:"allowAllCrew"` // Allow any authenticated user to register as crew 400 + DeployedAt string `json:"deployedAt" cborgen:"deployedAt"` // RFC3339 timestamp 401 + Region string `json:"region,omitempty" cborgen:"region,omitempty"` // S3 region (optional) 402 + Provider string `json:"provider,omitempty" cborgen:"provider,omitempty"` // Deployment provider (optional) 403 403 } 404 404 405 405 // CrewRecord represents a crew member in the hold
+4 -4
pkg/auth/hold_remote.go
··· 544 544 // This function handles second+ denials: 1m, 5m, 15m, 1h 545 545 func getBackoffDuration(denialCount int) time.Duration { 546 546 backoffs := []time.Duration{ 547 - 1 * time.Minute, // 1st DB denial (2nd overall) - being added soon 548 - 5 * time.Minute, // 2nd DB denial (3rd overall) - probably not happening 549 - 15 * time.Minute, // 3rd DB denial (4th overall) - definitely not soon 550 - 60 * time.Minute, // 4th+ DB denial (5th+ overall) - stop hammering 547 + 1 * time.Minute, // 1st DB denial (2nd overall) - being added soon 548 + 5 * time.Minute, // 2nd DB denial (3rd overall) - probably not happening 549 + 15 * time.Minute, // 3rd DB denial (4th overall) - definitely not soon 550 + 60 * time.Minute, // 4th+ DB denial (5th+ overall) - stop hammering 551 551 } 552 552 553 553 idx := denialCount - 1
+1 -1
pkg/auth/oauth/client.go
··· 129 129 func GetDefaultScopes() []string { 130 130 return []string{ 131 131 "atproto", 132 - "blob:application/vnd.oci.image.manifest.v1+json", 132 + "blob:application/vnd.oci.image.manifest.v1+json", 133 133 "blob:application/vnd.docker.distribution.manifest.v2+json", 134 134 fmt.Sprintf("repo:%s", atproto.ManifestCollection), 135 135 fmt.Sprintf("repo:%s", atproto.TagCollection),
+4 -4
pkg/auth/token/handler.go
··· 18 18 19 19 // Handler handles /auth/token requests 20 20 type Handler struct { 21 - issuer *Issuer 22 - validator *atproto.SessionValidator 23 - deviceStore *db.DeviceStore // For validating device secrets 24 - defaultHoldDID string 21 + issuer *Issuer 22 + validator *atproto.SessionValidator 23 + deviceStore *db.DeviceStore // For validating device secrets 24 + defaultHoldDID string 25 25 } 26 26 27 27 // NewHandler creates a new token handler
+24 -4
pkg/hold/blobstore_adapter.go
··· 81 81 return uploadID, modeStr, nil 82 82 } 83 83 84 - // GetPartUploadURL returns a presigned URL for uploading a specific part 85 - func (b *HoldServiceBlobStore) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (string, error) { 84 + // GetPartUploadURL returns structured upload info for uploading a specific part 85 + func (b *HoldServiceBlobStore) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (*pds.PartUploadInfo, error) { 86 86 session, err := b.service.MultipartMgr.GetSession(uploadID) 87 87 if err != nil { 88 - return "", err 88 + return nil, err 89 89 } 90 90 91 - return b.service.GetPartUploadURL(ctx, session, partNumber, did) 91 + // For S3Native mode: return presigned URL 92 + if session.Mode == S3Native { 93 + url, err := b.service.GetPartUploadURL(ctx, session, partNumber, did) 94 + if err != nil { 95 + return nil, err 96 + } 97 + return &pds.PartUploadInfo{ 98 + URL: url, 99 + Method: "PUT", 100 + }, nil 101 + } 102 + 103 + // Buffered mode: return XRPC endpoint with headers 104 + return &pds.PartUploadInfo{ 105 + URL: fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", b.service.config.Server.PublicURL), 106 + Method: "PUT", 107 + Headers: map[string]string{ 108 + "X-Upload-Id": uploadID, 109 + "X-Part-Number": fmt.Sprintf("%d", partNumber), 110 + }, 111 + }, nil 92 112 } 93 113 94 114 // CompleteMultipartUpload finalizes a multipart upload
+4 -494
pkg/hold/handlers.go
··· 1 1 package hold 2 2 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "fmt" 7 - "io" 8 - "log" 9 - "net/http" 10 - "time" 11 - ) 12 - 13 - // PresignedURLOperation defines the type of presigned URL operation 14 - type PresignedURLOperation string 15 - 16 - const ( 17 - OperationGet PresignedURLOperation = "GET" 18 - OperationHead PresignedURLOperation = "HEAD" 19 - OperationPut PresignedURLOperation = "PUT" 20 - ) 21 - 22 - // PresignedURLRequest represents a request for a presigned URL (GET, HEAD, or PUT) 23 - type PresignedURLRequest struct { 24 - Operation PresignedURLOperation `json:"operation"` 25 - DID string `json:"did"` 26 - Digest string `json:"digest"` 27 - Size int64 `json:"size,omitempty"` // Only required for PUT operations 28 - } 29 - 30 - // PresignedURLResponse contains the presigned URL 31 - type PresignedURLResponse struct { 32 - URL string `json:"url"` 33 - ExpiresAt time.Time `json:"expires_at"` 34 - } 35 - 36 - // HandlePresignedURL handles presigned URL requests (GET, HEAD, or PUT) 37 - // Operation type is specified in the request body 38 - func (s *HoldService) HandlePresignedURL(w http.ResponseWriter, r *http.Request) { 39 - if r.Method != http.MethodPost { 40 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 41 - return 42 - } 43 - 44 - var req PresignedURLRequest 45 - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 46 - http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) 47 - return 48 - } 49 - 50 - // Validate DID authorization based on operation type 51 - var authorized bool 52 - switch req.Operation { 53 - case OperationGet, OperationHead: 54 - authorized = s.isAuthorizedRead(req.DID) 55 - case OperationPut: 56 - authorized = s.isAuthorizedWrite(req.DID) 57 - default: 58 - http.Error(w, "unsupported operation", http.StatusBadRequest) 59 - return 60 - } 61 - 62 - if !authorized { 63 - log.Printf("[HandlePresignedURL:%s] Authorization FAILED", req.Operation) 64 - if req.DID == "" { 65 - http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 66 - } else { 67 - http.Error(w, "forbidden: access denied", http.StatusForbidden) 68 - } 69 - return 70 - } 71 - 72 - // Generate presigned URL (15 minute expiry) 73 - ctx := context.Background() 74 - expiry := time.Now().Add(15 * time.Minute) 75 - 76 - url, err := s.getPresignedURL(ctx, req.Operation, req.Digest, req.DID) 77 - if err != nil { 78 - log.Printf("[HandlePresignedURL:%s] getPresignedURL failed: %v", req.Operation, err) 79 - http.Error(w, fmt.Sprintf("failed to generate URL: %v", err), http.StatusInternalServerError) 80 - return 81 - } 82 - 83 - log.Printf("[HandlePresignedURL:%s] Returning URL to client", req.Operation) 84 - 85 - resp := PresignedURLResponse{ 86 - URL: url, 87 - ExpiresAt: expiry, 88 - } 89 - 90 - w.Header().Set("Content-Type", "application/json") 91 - json.NewEncoder(w).Encode(resp) 92 - } 93 - 94 - // HandleProxyGet proxies a blob download through the service 95 - func (s *HoldService) HandleProxyGet(w http.ResponseWriter, r *http.Request) { 96 - if r.Method != http.MethodGet && r.Method != http.MethodHead { 97 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 98 - return 99 - } 100 - 101 - // Extract digest from path (e.g., /blobs/sha256:abc123) 102 - digest := r.URL.Path[len("/blobs/"):] 103 - if digest == "" { 104 - http.Error(w, "missing digest", http.StatusBadRequest) 105 - return 106 - } 107 - 108 - // Get DID from query param or header 109 - did := r.URL.Query().Get("did") 110 - if did == "" { 111 - did = r.Header.Get("X-ATCR-DID") 112 - } 113 - log.Printf(" DID: %s", did) 114 - 115 - // Authorize READ access 116 - if !s.isAuthorizedRead(did) { 117 - log.Printf("[HandleProxyGet] Authorization FAILED") 118 - if did == "" { 119 - http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 120 - } else { 121 - http.Error(w, "forbidden: access denied", http.StatusForbidden) 122 - } 123 - return 124 - } 125 - 126 - ctx := r.Context() 127 - path := blobPath(digest) 128 - 129 - // For HEAD requests, just check if blob exists 130 - if r.Method == http.MethodHead { 131 - stat, err := s.driver.Stat(ctx, path) 132 - if err != nil { 133 - http.Error(w, "blob not found", http.StatusNotFound) 134 - return 135 - } 136 - w.Header().Set("Content-Type", "application/octet-stream") 137 - w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size())) 138 - w.WriteHeader(http.StatusOK) 139 - return 140 - } 141 - 142 - // For GET requests, read and return the blob 143 - content, err := s.driver.GetContent(ctx, path) 144 - if err != nil { 145 - http.Error(w, "blob not found", http.StatusNotFound) 146 - return 147 - } 148 - 149 - w.Header().Set("Content-Type", "application/octet-stream") 150 - w.Write(content) 151 - } 152 - 153 - // HandleMove moves a blob from one path to another 154 - // POST /move?from={path}&to={digest}&did={did} 155 - func (s *HoldService) HandleMove(w http.ResponseWriter, r *http.Request) { 156 - if r.Method != http.MethodPost { 157 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 158 - return 159 - } 160 - 161 - fromPath := r.URL.Query().Get("from") 162 - toDigest := r.URL.Query().Get("to") 163 - did := r.URL.Query().Get("did") 164 - 165 - if fromPath == "" || toDigest == "" { 166 - http.Error(w, "missing from or to parameter", http.StatusBadRequest) 167 - return 168 - } 169 - 170 - // Authorize WRITE access 171 - if !s.isAuthorizedWrite(did) { 172 - if did == "" { 173 - http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 174 - } else { 175 - http.Error(w, "forbidden: write access denied", http.StatusForbidden) 176 - } 177 - return 178 - } 179 - 180 - ctx := r.Context() 181 - sourcePath := blobPath(fromPath) 182 - destPath := blobPath(toDigest) 183 - 184 - // Try to move using driver's Move operation 185 - if err := s.driver.Move(ctx, sourcePath, destPath); err != nil { 186 - log.Printf("HandleMove: failed to move blob: %v", err) 187 - http.Error(w, fmt.Sprintf("failed to move blob: %v", err), http.StatusInternalServerError) 188 - return 189 - } 190 - 191 - log.Printf("HandleMove: successfully moved blob from=%s to=%s", fromPath, toDigest) 192 - w.WriteHeader(http.StatusOK) 193 - } 194 - 195 - // HandleProxyPut proxies a blob upload through the service 196 - func (s *HoldService) HandleProxyPut(w http.ResponseWriter, r *http.Request) { 197 - if r.Method != http.MethodPut { 198 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 199 - return 200 - } 201 - 202 - digest := r.URL.Path[len("/blobs/"):] 203 - if digest == "" { 204 - http.Error(w, "missing digest", http.StatusBadRequest) 205 - return 206 - } 207 - 208 - did := r.URL.Query().Get("did") 209 - if did == "" { 210 - did = r.Header.Get("X-ATCR-DID") 211 - } 212 - 213 - // Authorize WRITE access 214 - if !s.isAuthorizedWrite(did) { 215 - log.Printf("[HandleProxyPut] Authorization FAILED") 216 - if did == "" { 217 - http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 218 - } else { 219 - http.Error(w, "forbidden: write access denied", http.StatusForbidden) 220 - } 221 - return 222 - } 223 - 224 - // Stream blob to storage (no buffering) 225 - ctx := r.Context() 226 - path := blobPath(digest) 227 - 228 - // Create writer for streaming 229 - writer, err := s.driver.Writer(ctx, path, false) 230 - if err != nil { 231 - log.Printf("HandleProxyPut: failed to create writer: %v", err) 232 - http.Error(w, "failed to create writer", http.StatusInternalServerError) 233 - return 234 - } 235 - 236 - // Stream directly from request body to storage 237 - written, err := io.Copy(writer, r.Body) 238 - if err != nil { 239 - writer.Cancel(ctx) 240 - log.Printf("HandleProxyPut: failed to write blob: %v", err) 241 - http.Error(w, "failed to write blob", http.StatusInternalServerError) 242 - return 243 - } 244 - 245 - // Commit the write 246 - if err := writer.Commit(ctx); err != nil { 247 - log.Printf("HandleProxyPut: failed to commit blob: %v", err) 248 - http.Error(w, "failed to commit blob", http.StatusInternalServerError) 249 - return 250 - } 251 - 252 - log.Printf("HandleProxyPut: successfully stored blob path=%s, size=%d", digest, written) 253 - w.WriteHeader(http.StatusCreated) 254 - } 255 - 256 - // StartMultipartUploadRequest initiates a multipart upload 257 - type StartMultipartUploadRequest struct { 258 - DID string `json:"did"` 259 - Digest string `json:"digest"` 260 - } 261 - 262 - // StartMultipartUploadResponse contains the multipart upload ID 263 - type StartMultipartUploadResponse struct { 264 - UploadID string `json:"upload_id"` 265 - ExpiresAt time.Time `json:"expires_at"` 266 - } 267 - 268 - // HandleStartMultipart initiates a multipart upload 269 - func (s *HoldService) HandleStartMultipart(w http.ResponseWriter, r *http.Request) { 270 - if r.Method != http.MethodPost { 271 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 272 - return 273 - } 274 - 275 - var req StartMultipartUploadRequest 276 - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 277 - http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) 278 - return 279 - } 280 - 281 - // Validate DID authorization for WRITE 282 - if !s.isAuthorizedWrite(req.DID) { 283 - if req.DID == "" { 284 - http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 285 - } else { 286 - http.Error(w, "forbidden: write access denied", http.StatusForbidden) 287 - } 288 - return 289 - } 290 - 291 - // Start multipart upload with manager (supports both S3Native and Buffered modes) 292 - ctx := r.Context() 293 - uploadID, mode, err := s.StartMultipartUploadWithManager(ctx, req.Digest, s.MultipartMgr) 294 - if err != nil { 295 - http.Error(w, fmt.Sprintf("failed to start multipart upload: %v", err), http.StatusInternalServerError) 296 - return 297 - } 298 - 299 - log.Printf("Started multipart upload: uploadID=%s, mode=%v, digest=%s", uploadID, mode, req.Digest) 300 - 301 - expiry := time.Now().Add(24 * time.Hour) // Multipart uploads can take longer 302 - 303 - resp := StartMultipartUploadResponse{ 304 - UploadID: uploadID, 305 - ExpiresAt: expiry, 306 - } 307 - 308 - w.Header().Set("Content-Type", "application/json") 309 - json.NewEncoder(w).Encode(resp) 310 - } 311 - 312 - // GetPartURLRequest requests a presigned URL for a specific part 313 - type GetPartURLRequest struct { 314 - DID string `json:"did"` 315 - Digest string `json:"digest"` 316 - UploadID string `json:"upload_id"` 317 - PartNumber int `json:"part_number"` 318 - } 319 - 320 - // GetPartURLResponse contains the presigned URL for a part 321 - type GetPartURLResponse struct { 322 - URL string `json:"url"` 323 - ExpiresAt time.Time `json:"expires_at"` 324 - } 325 - 326 - // HandleGetPartURL generates a presigned URL for uploading a specific part 327 - func (s *HoldService) HandleGetPartURL(w http.ResponseWriter, r *http.Request) { 328 - if r.Method != http.MethodPost { 329 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 330 - return 331 - } 332 - 333 - var req GetPartURLRequest 334 - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 335 - http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) 336 - return 337 - } 338 - 339 - // Validate DID authorization for WRITE 340 - if !s.isAuthorizedWrite(req.DID) { 341 - if req.DID == "" { 342 - http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 343 - } else { 344 - http.Error(w, "forbidden: write access denied", http.StatusForbidden) 345 - } 346 - return 347 - } 348 - 349 - // Get multipart session 350 - session, err := s.MultipartMgr.GetSession(req.UploadID) 351 - if err != nil { 352 - http.Error(w, fmt.Sprintf("session not found: %v", err), http.StatusNotFound) 353 - return 354 - } 355 - 356 - // Get part upload URL (presigned for S3Native, proxy for Buffered) 357 - ctx := r.Context() 358 - url, err := s.GetPartUploadURL(ctx, session, req.PartNumber, req.DID) 359 - if err != nil { 360 - http.Error(w, fmt.Sprintf("failed to generate part URL: %v", err), http.StatusInternalServerError) 361 - return 362 - } 363 - 364 - expiry := time.Now().Add(15 * time.Minute) 365 - 366 - resp := GetPartURLResponse{ 367 - URL: url, 368 - ExpiresAt: expiry, 369 - } 370 - 371 - w.Header().Set("Content-Type", "application/json") 372 - json.NewEncoder(w).Encode(resp) 373 - } 374 - 375 - // CompleteMultipartRequest completes a multipart upload 376 - type CompleteMultipartRequest struct { 377 - DID string `json:"did"` 378 - Digest string `json:"digest"` 379 - UploadID string `json:"upload_id"` 380 - Parts []CompletedPart `json:"parts"` 381 - } 382 - 383 - // CompletedPart represents an uploaded part with its ETag 384 - type CompletedPart struct { 385 - PartNumber int `json:"part_number"` 386 - ETag string `json:"etag"` 387 - } 388 - 389 - // HandleCompleteMultipart completes a multipart upload 390 - func (s *HoldService) HandleCompleteMultipart(w http.ResponseWriter, r *http.Request) { 391 - if r.Method != http.MethodPost { 392 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 393 - return 394 - } 395 - 396 - var req CompleteMultipartRequest 397 - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 398 - http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) 399 - return 400 - } 401 - 402 - // Validate DID authorization for WRITE 403 - if !s.isAuthorizedWrite(req.DID) { 404 - if req.DID == "" { 405 - http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 406 - } else { 407 - http.Error(w, "forbidden: write access denied", http.StatusForbidden) 408 - } 409 - return 410 - } 411 - 412 - // Get multipart session 413 - session, err := s.MultipartMgr.GetSession(req.UploadID) 414 - if err != nil { 415 - http.Error(w, fmt.Sprintf("session not found: %v", err), http.StatusNotFound) 416 - return 417 - } 418 - 419 - // For S3Native mode, use parts from request (uploaded directly to S3) 420 - // For Buffered mode, parts are in the session 421 - if session.Mode == S3Native { 422 - // Record parts from AppView's request (they have ETags from S3) 423 - for _, p := range req.Parts { 424 - session.RecordS3Part(p.PartNumber, p.ETag, 0) 425 - } 426 - log.Printf("Recorded %d S3 parts from request for uploadID=%s", len(req.Parts), req.UploadID) 427 - } 428 - 429 - // Complete multipart upload (handles both S3Native and Buffered modes) 430 - ctx := r.Context() 431 - if err := s.CompleteMultipartUploadWithManager(ctx, session, s.MultipartMgr); err != nil { 432 - http.Error(w, fmt.Sprintf("failed to complete multipart upload: %v", err), http.StatusInternalServerError) 433 - return 434 - } 435 - 436 - log.Printf("Completed multipart upload: uploadID=%s, mode=%v", req.UploadID, session.Mode) 437 - 438 - w.WriteHeader(http.StatusOK) 439 - w.Header().Set("Content-Type", "application/json") 440 - json.NewEncoder(w).Encode(map[string]string{ 441 - "status": "completed", 442 - }) 443 - } 444 - 445 - // AbortMultipartRequest aborts an in-progress upload 446 - type AbortMultipartRequest struct { 447 - DID string `json:"did"` 448 - Digest string `json:"digest"` 449 - UploadID string `json:"upload_id"` 450 - } 451 - 452 - // HandleAbortMultipart aborts an in-progress multipart upload 453 - func (s *HoldService) HandleAbortMultipart(w http.ResponseWriter, r *http.Request) { 454 - if r.Method != http.MethodPost { 455 - http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 456 - return 457 - } 458 - 459 - var req AbortMultipartRequest 460 - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 461 - http.Error(w, fmt.Sprintf("invalid request: %v", err), http.StatusBadRequest) 462 - return 463 - } 464 - 465 - // Validate DID authorization for WRITE 466 - if !s.isAuthorizedWrite(req.DID) { 467 - if req.DID == "" { 468 - http.Error(w, "unauthorized: authentication required", http.StatusUnauthorized) 469 - } else { 470 - http.Error(w, "forbidden: write access denied", http.StatusForbidden) 471 - } 472 - return 473 - } 474 - 475 - // Get multipart session 476 - session, err := s.MultipartMgr.GetSession(req.UploadID) 477 - if err != nil { 478 - http.Error(w, fmt.Sprintf("session not found: %v", err), http.StatusNotFound) 479 - return 480 - } 481 - 482 - // Abort multipart upload (handles both S3Native and Buffered modes) 483 - ctx := r.Context() 484 - if err := s.AbortMultipartUploadWithManager(ctx, session, s.MultipartMgr); err != nil { 485 - http.Error(w, fmt.Sprintf("failed to abort multipart upload: %v", err), http.StatusInternalServerError) 486 - return 487 - } 488 - 489 - log.Printf("Aborted multipart upload: uploadID=%s, mode=%v", req.UploadID, session.Mode) 490 - 491 - w.WriteHeader(http.StatusOK) 492 - w.Header().Set("Content-Type", "application/json") 493 - json.NewEncoder(w).Encode(map[string]string{ 494 - "status": "aborted", 495 - }) 496 - } 3 + // This file previously contained legacy HTTP handlers that have been replaced by XRPC endpoints. 4 + // The handlers (HandleProxyGet, HandleProxyPut, HandleMultipartPartUpload) are no longer needed 5 + // as all blob operations now go through the XRPC com.atproto.repo.uploadBlob and 6 + // com.atproto.sync.getBlob endpoints.
+8
pkg/hold/multipart.go
··· 24 24 Buffered 25 25 ) 26 26 27 + // CompletedPart represents an uploaded part with its ETag 28 + type CompletedPart struct { 29 + PartNumber int `json:"part_number"` 30 + ETag string `json:"etag"` 31 + } 32 + 27 33 // MultipartSession tracks an in-progress multipart upload 28 34 type MultipartSession struct { 29 35 UploadID string // Unique upload ID ··· 270 276 } 271 277 272 278 // Buffered mode: return proxy endpoint 279 + // url := fmt.Sprintf("%s/xrpc/com.atproto.repo.uploadBlob", s.config.Server.PublicURL) 280 + 273 281 url := fmt.Sprintf("%s/multipart-parts/%s/%d?did=%s", 274 282 s.config.Server.PublicURL, session.UploadID, partNumber, did) 275 283 return url, nil
+116 -11
pkg/hold/pds/auth.go
··· 7 7 "fmt" 8 8 "io" 9 9 "net/http" 10 + "slices" 10 11 "strings" 11 12 12 13 "github.com/bluesky-social/indigo/atproto/identity" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 14 15 ) 16 + 17 + // HTTPClient interface allows injecting a custom HTTP client for testing 18 + type HTTPClient interface { 19 + Do(*http.Request) (*http.Response, error) 20 + } 15 21 16 22 // ValidatedUser represents a successfully validated user from DPoP + OAuth 17 23 type ValidatedUser struct { ··· 27 33 // 2. Extract DPoP header (proof JWT) 28 34 // 3. Call user's PDS to validate token via com.atproto.server.getSession 29 35 // 4. Return validated user DID 30 - func ValidateDPoPRequest(r *http.Request) (*ValidatedUser, error) { 36 + // 37 + // The httpClient parameter is optional and defaults to http.DefaultClient if nil. 38 + // This allows tests to inject a mock HTTP client. 39 + func ValidateDPoPRequest(r *http.Request, httpClient HTTPClient) (*ValidatedUser, error) { 31 40 // Extract Authorization header 32 41 authHeader := r.Header.Get("Authorization") 33 42 if authHeader == "" { ··· 72 81 } 73 82 74 83 // Validate token with the user's PDS 75 - session, err := validateTokenWithPDS(r.Context(), pds, accessToken, dpopProof) 84 + session, err := validateTokenWithPDS(r.Context(), pds, accessToken, dpopProof, httpClient) 76 85 if err != nil { 77 86 return nil, fmt.Errorf("token validation failed: %w", err) 78 87 } ··· 139 148 } 140 149 141 150 // validateTokenWithPDS calls the user's PDS to validate the token 142 - func validateTokenWithPDS(ctx context.Context, pdsURL, accessToken, dpopProof string) (*SessionResponse, error) { 151 + // The httpClient parameter is optional and defaults to http.DefaultClient if nil. 152 + func validateTokenWithPDS(ctx context.Context, pdsURL, accessToken, dpopProof string, httpClient HTTPClient) (*SessionResponse, error) { 143 153 // Call com.atproto.server.getSession with DPoP headers 144 154 url := fmt.Sprintf("%s/xrpc/com.atproto.server.getSession", strings.TrimSuffix(pdsURL, "/")) 145 155 ··· 152 162 req.Header.Set("Authorization", "DPoP "+accessToken) 153 163 req.Header.Set("DPoP", dpopProof) 154 164 155 - resp, err := http.DefaultClient.Do(req) 165 + // Use provided client or default to http.DefaultClient 166 + client := httpClient 167 + if client == nil { 168 + client = http.DefaultClient 169 + } 170 + 171 + resp, err := client.Do(req) 156 172 if err != nil { 157 173 return nil, fmt.Errorf("failed to call PDS: %w", err) 158 174 } ··· 194 210 } 195 211 196 212 // ValidateOwnerOrCrewAdmin validates that the request has valid DPoP + OAuth tokens 197 - // and that the authenticated user is either the hold owner or a crew member with crew:admin permission 198 - func ValidateOwnerOrCrewAdmin(r *http.Request, pds *HoldPDS) (*ValidatedUser, error) { 213 + // and that the authenticated user is either the hold owner or a crew member with crew:admin permission. 214 + // The httpClient parameter is optional and defaults to http.DefaultClient if nil. 215 + func ValidateOwnerOrCrewAdmin(r *http.Request, pds *HoldPDS, httpClient HTTPClient) (*ValidatedUser, error) { 199 216 // Validate DPoP + OAuth token 200 - user, err := ValidateDPoPRequest(r) 217 + user, err := ValidateDPoPRequest(r, httpClient) 201 218 if err != nil { 202 219 return nil, fmt.Errorf("authentication failed: %w", err) 203 220 } ··· 222 239 for _, member := range crew { 223 240 if member.Record.Member == user.DID { 224 241 // Check if this crew member has crew:admin permission 225 - for _, perm := range member.Record.Permissions { 226 - if perm == "crew:admin" { 227 - return user, nil 228 - } 242 + if slices.Contains(member.Record.Permissions, "crew:admin") { 243 + return user, nil 229 244 } 230 245 // User is crew but doesn't have admin permission 231 246 return nil, fmt.Errorf("crew member lacks required 'crew:admin' permission") ··· 235 250 // User is neither owner nor authorized crew 236 251 return nil, fmt.Errorf("user is not authorized (must be hold owner or crew admin)") 237 252 } 253 + 254 + // ValidateBlobWriteAccess validates that the request has valid DPoP + OAuth tokens 255 + // and that the authenticated user is either the hold owner or a crew member with blob:write permission. 256 + // The httpClient parameter is optional and defaults to http.DefaultClient if nil. 257 + func ValidateBlobWriteAccess(r *http.Request, pds *HoldPDS, httpClient HTTPClient) (*ValidatedUser, error) { 258 + // Validate DPoP + OAuth token 259 + user, err := ValidateDPoPRequest(r, httpClient) 260 + if err != nil { 261 + return nil, fmt.Errorf("authentication failed: %w", err) 262 + } 263 + 264 + // Get captain record to check owner and public settings 265 + _, captain, err := pds.GetCaptainRecord(r.Context()) 266 + if err != nil { 267 + return nil, fmt.Errorf("failed to get captain record: %w", err) 268 + } 269 + 270 + // Check if user is the owner (always has write access) 271 + if user.DID == captain.Owner { 272 + return user, nil 273 + } 274 + 275 + // Check if user is crew with blob:write permission 276 + crew, err := pds.ListCrewMembers(r.Context()) 277 + if err != nil { 278 + return nil, fmt.Errorf("failed to check crew membership: %w", err) 279 + } 280 + 281 + for _, member := range crew { 282 + if member.Record.Member == user.DID { 283 + // Check if this crew member has blob:write permission 284 + if slices.Contains(member.Record.Permissions, "blob:write") { 285 + return user, nil 286 + } 287 + // User is crew but doesn't have write permission 288 + return nil, fmt.Errorf("crew member lacks required 'blob:write' permission") 289 + } 290 + } 291 + 292 + // User is neither owner nor authorized crew 293 + return nil, fmt.Errorf("user is not authorized for blob write (must be hold owner or crew with blob:write permission)") 294 + } 295 + 296 + // ValidateBlobReadAccess validates that the request has read access to blobs 297 + // If captain.public = true: No auth required (returns nil user to indicate public access) 298 + // If captain.public = false: Requires valid DPoP + OAuth and (captain OR crew with blob:read permission). 299 + // The httpClient parameter is optional and defaults to http.DefaultClient if nil. 300 + func ValidateBlobReadAccess(r *http.Request, pds *HoldPDS, httpClient HTTPClient) (*ValidatedUser, error) { 301 + // Get captain record to check public setting 302 + _, captain, err := pds.GetCaptainRecord(r.Context()) 303 + if err != nil { 304 + return nil, fmt.Errorf("failed to get captain record: %w", err) 305 + } 306 + 307 + // If hold is public, allow access without authentication 308 + if captain.Public { 309 + return nil, nil // nil user indicates public access 310 + } 311 + 312 + // Private hold - require authentication 313 + user, err := ValidateDPoPRequest(r, httpClient) 314 + if err != nil { 315 + return nil, fmt.Errorf("authentication required for private hold: %w", err) 316 + } 317 + 318 + // Check if user is the owner (always has read access) 319 + if user.DID == captain.Owner { 320 + return user, nil 321 + } 322 + 323 + // Check if user is crew with blob:read permission 324 + crew, err := pds.ListCrewMembers(r.Context()) 325 + if err != nil { 326 + return nil, fmt.Errorf("failed to check crew membership: %w", err) 327 + } 328 + 329 + for _, member := range crew { 330 + if member.Record.Member == user.DID { 331 + // Check if this crew member has blob:read permission 332 + if slices.Contains(member.Record.Permissions, "blob:read") { 333 + return user, nil 334 + } 335 + // User is crew but doesn't have read permission 336 + return nil, fmt.Errorf("crew member lacks required 'blob:read' permission") 337 + } 338 + } 339 + 340 + // User is neither owner nor authorized crew 341 + return nil, fmt.Errorf("user is not authorized for blob read (must be hold owner or crew with blob:read permission)") 342 + }
+587
pkg/hold/pds/auth_test.go
··· 1 + package pds 2 + 3 + import ( 4 + "encoding/base64" 5 + "encoding/json" 6 + "fmt" 7 + "io" 8 + "net/http" 9 + "net/http/httptest" 10 + "slices" 11 + "strings" 12 + "testing" 13 + "time" 14 + 15 + "github.com/bluesky-social/indigo/atproto/atcrypto" 16 + "github.com/bluesky-social/indigo/atproto/auth/oauth" 17 + ) 18 + 19 + // Tests for authorization functions in auth.go 20 + 21 + // mockPDSClient is a mock HTTP client that simulates a PDS server 22 + // It validates DPoP tokens and returns session information 23 + type mockPDSClient struct{} 24 + 25 + func (m *mockPDSClient) Do(req *http.Request) (*http.Response, error) { 26 + // Verify request is for getSession endpoint 27 + if !strings.Contains(req.URL.Path, "/xrpc/com.atproto.server.getSession") { 28 + return &http.Response{ 29 + StatusCode: http.StatusNotFound, 30 + Body: http.NoBody, 31 + }, nil 32 + } 33 + 34 + // Verify DPoP headers are present 35 + authHeader := req.Header.Get("Authorization") 36 + dpopHeader := req.Header.Get("DPoP") 37 + 38 + if authHeader == "" || dpopHeader == "" { 39 + return &http.Response{ 40 + StatusCode: http.StatusUnauthorized, 41 + Body: http.NoBody, 42 + }, nil 43 + } 44 + 45 + // Extract access token from Authorization header 46 + parts := strings.SplitN(authHeader, " ", 2) 47 + if len(parts) != 2 || parts[0] != "DPoP" { 48 + return &http.Response{ 49 + StatusCode: http.StatusUnauthorized, 50 + Body: http.NoBody, 51 + }, nil 52 + } 53 + 54 + accessToken := parts[1] 55 + 56 + // Parse token to extract DID 57 + did, _, err := extractDIDFromToken(accessToken) 58 + if err != nil { 59 + return &http.Response{ 60 + StatusCode: http.StatusBadRequest, 61 + Body: http.NoBody, 62 + }, nil 63 + } 64 + 65 + // Return session response 66 + session := SessionResponse{ 67 + DID: did, 68 + Handle: strings.Replace(did, "did:plc:", "", 1) + ".test", 69 + } 70 + 71 + body, _ := json.Marshal(session) 72 + 73 + return &http.Response{ 74 + StatusCode: http.StatusOK, 75 + Body: io.NopCloser(strings.NewReader(string(body))), 76 + Header: http.Header{"Content-Type": []string{"application/json"}}, 77 + }, nil 78 + } 79 + 80 + // DPoPTestHelper provides utilities for creating valid DPoP requests in tests 81 + type DPoPTestHelper struct { 82 + privKey atcrypto.PrivateKey 83 + did string 84 + pdsURL string 85 + } 86 + 87 + // NewDPoPTestHelper creates a new test helper for the given DID and PDS 88 + func NewDPoPTestHelper(did, pdsURL string) (*DPoPTestHelper, error) { 89 + // Generate a test P-256 key (required for OAuth DPoP) 90 + // Note: ATProto uses K-256 for DID keys, but OAuth DPoP requires P-256 91 + privKey, err := atcrypto.GeneratePrivateKeyP256() 92 + if err != nil { 93 + return nil, fmt.Errorf("failed to generate key: %w", err) 94 + } 95 + 96 + return &DPoPTestHelper{ 97 + privKey: privKey, 98 + did: did, 99 + pdsURL: pdsURL, 100 + }, nil 101 + } 102 + 103 + // CreateAccessToken creates a mock OAuth access token for testing 104 + // This mimics what a real PDS would issue 105 + func (h *DPoPTestHelper) CreateAccessToken() (string, error) { 106 + // Create access token claims 107 + claims := map[string]any{ 108 + "sub": h.did, // Subject (DID) 109 + "iss": h.pdsURL, // Issuer (PDS URL) 110 + "aud": "atcr", // Audience 111 + "iat": time.Now().Unix(), // Issued at 112 + "exp": time.Now().Add(1 * time.Hour).Unix(), // Expires in 1 hour 113 + } 114 + 115 + // For testing, we create a valid JWT structure without actually validating the signature 116 + // The ValidateDPoPRequest in real use would validate this by calling the PDS 117 + header := base64.RawURLEncoding.EncodeToString([]byte(`{"alg":"ES256K","typ":"JWT"}`)) 118 + payload, err := json.Marshal(claims) 119 + if err != nil { 120 + return "", fmt.Errorf("failed to marshal claims: %w", err) 121 + } 122 + encodedPayload := base64.RawURLEncoding.EncodeToString(payload) 123 + 124 + // Create a mock signature (in real use, the PDS validates this) 125 + signature := base64.RawURLEncoding.EncodeToString([]byte("mock-signature-for-testing")) 126 + 127 + tokenString := fmt.Sprintf("%s.%s.%s", header, encodedPayload, signature) 128 + return tokenString, nil 129 + } 130 + 131 + // CreateDPoPProof creates a DPoP proof JWT for the given HTTP request 132 + func (h *DPoPTestHelper) CreateDPoPProof(method, url string) (string, error) { 133 + return oauth.NewAuthDPoP(method, url, "", h.privKey) 134 + } 135 + 136 + // AddDPoPToRequest adds proper DPoP headers to an HTTP request 137 + func (h *DPoPTestHelper) AddDPoPToRequest(req *http.Request) error { 138 + // Create access token 139 + accessToken, err := h.CreateAccessToken() 140 + if err != nil { 141 + return fmt.Errorf("failed to create access token: %w", err) 142 + } 143 + 144 + // Create DPoP proof for this specific request 145 + dpopProof, err := h.CreateDPoPProof(req.Method, req.URL.String()) 146 + if err != nil { 147 + return fmt.Errorf("failed to create DPoP proof: %w", err) 148 + } 149 + 150 + // Add headers 151 + req.Header.Set("Authorization", "DPoP "+accessToken) 152 + req.Header.Set("DPoP", dpopProof) 153 + 154 + return nil 155 + } 156 + 157 + // AddTestDPoP is a quick helper for common test case: owner with standard PDS 158 + func AddTestDPoP(req *http.Request, did, pdsURL string) error { 159 + helper, err := NewDPoPTestHelper(did, pdsURL) 160 + if err != nil { 161 + return err 162 + } 163 + return helper.AddDPoPToRequest(req) 164 + } 165 + 166 + // TestValidateBlobWriteAccess_Owner tests that the hold owner has write access 167 + func TestValidateBlobWriteAccess_Owner(t *testing.T) { 168 + pds, ctx := setupTestPDS(t) 169 + 170 + ownerDID := "did:plc:owner123" 171 + 172 + // Bootstrap with owner 173 + err := pds.Bootstrap(ctx, ownerDID, true, false) 174 + if err != nil { 175 + t.Fatalf("Failed to bootstrap PDS: %v", err) 176 + } 177 + 178 + // Create DPoP helper for owner 179 + dpopHelper, err := NewDPoPTestHelper(ownerDID, "https://test-pds.example.com") 180 + if err != nil { 181 + t.Fatalf("Failed to create DPoP helper: %v", err) 182 + } 183 + 184 + // Create request with proper DPoP tokens 185 + req := httptest.NewRequest(http.MethodPost, "/test", nil) 186 + if err := dpopHelper.AddDPoPToRequest(req); err != nil { 187 + t.Fatalf("Failed to add DPoP to request: %v", err) 188 + } 189 + 190 + // Use mock PDS client 191 + mockClient := &mockPDSClient{} 192 + 193 + // Test owner has write access 194 + user, err := ValidateBlobWriteAccess(req, pds, mockClient) 195 + if err != nil { 196 + t.Errorf("Expected owner to have write access, got error: %v", err) 197 + } 198 + 199 + if user == nil { 200 + t.Fatal("Expected non-nil user") 201 + } 202 + 203 + if user.DID != ownerDID { 204 + t.Errorf("Expected DID %s, got %s", ownerDID, user.DID) 205 + } 206 + 207 + if !user.Authorized { 208 + t.Error("Expected user to be authorized") 209 + } 210 + } 211 + 212 + // TestValidateBlobWriteAccess_CrewPermissions tests crew permission checking 213 + func TestValidateBlobWriteAccess_CrewPermissions(t *testing.T) { 214 + pds, ctx := setupTestPDS(t) 215 + 216 + ownerDID := "did:plc:owner123" 217 + 218 + // Bootstrap 219 + err := pds.Bootstrap(ctx, ownerDID, true, false) 220 + if err != nil { 221 + t.Fatalf("Failed to bootstrap PDS: %v", err) 222 + } 223 + 224 + // Add crew member with blob:write permission 225 + writerDID := "did:plc:writer123" 226 + _, err = pds.AddCrewMember(ctx, writerDID, "writer", []string{"blob:write"}) 227 + if err != nil { 228 + t.Fatalf("Failed to add crew member: %v", err) 229 + } 230 + 231 + // Add crew member without blob:write permission 232 + readerDID := "did:plc:reader123" 233 + _, err = pds.AddCrewMember(ctx, readerDID, "reader", []string{"blob:read"}) 234 + if err != nil { 235 + t.Fatalf("Failed to add crew member: %v", err) 236 + } 237 + 238 + mockClient := &mockPDSClient{} 239 + 240 + // Test writer (has blob:write permission) can write 241 + t.Run("crew with blob:write can write", func(t *testing.T) { 242 + dpopHelper, err := NewDPoPTestHelper(writerDID, "https://test-pds.example.com") 243 + if err != nil { 244 + t.Fatalf("Failed to create DPoP helper: %v", err) 245 + } 246 + 247 + req := httptest.NewRequest(http.MethodPost, "/test", nil) 248 + if err := dpopHelper.AddDPoPToRequest(req); err != nil { 249 + t.Fatalf("Failed to add DPoP to request: %v", err) 250 + } 251 + 252 + user, err := ValidateBlobWriteAccess(req, pds, mockClient) 253 + if err != nil { 254 + t.Errorf("Expected writer to have write access, got error: %v", err) 255 + } 256 + 257 + if user == nil || user.DID != writerDID { 258 + t.Errorf("Expected user DID %s, got %v", writerDID, user) 259 + } 260 + }) 261 + 262 + // Test reader (no blob:write permission) cannot write 263 + t.Run("crew without blob:write cannot write", func(t *testing.T) { 264 + dpopHelper, err := NewDPoPTestHelper(readerDID, "https://test-pds.example.com") 265 + if err != nil { 266 + t.Fatalf("Failed to create DPoP helper: %v", err) 267 + } 268 + 269 + req := httptest.NewRequest(http.MethodPost, "/test", nil) 270 + if err := dpopHelper.AddDPoPToRequest(req); err != nil { 271 + t.Fatalf("Failed to add DPoP to request: %v", err) 272 + } 273 + 274 + _, err = ValidateBlobWriteAccess(req, pds, mockClient) 275 + if err == nil { 276 + t.Error("Expected reader without blob:write permission to be denied") 277 + } 278 + 279 + if !strings.Contains(err.Error(), "blob:write") { 280 + t.Errorf("Expected error about blob:write permission, got: %v", err) 281 + } 282 + }) 283 + } 284 + 285 + // TestValidateBlobReadAccess_PublicHold tests public hold access 286 + func TestValidateBlobReadAccess_PublicHold(t *testing.T) { 287 + pds, ctx := setupTestPDS(t) 288 + 289 + ownerDID := "did:plc:owner123" 290 + 291 + // Bootstrap with public=true 292 + err := pds.Bootstrap(ctx, ownerDID, true, false) 293 + if err != nil { 294 + t.Fatalf("Failed to bootstrap PDS: %v", err) 295 + } 296 + 297 + // Verify captain record has public=true 298 + _, captain, err := pds.GetCaptainRecord(ctx) 299 + if err != nil { 300 + t.Fatalf("Failed to get captain record: %v", err) 301 + } 302 + 303 + if !captain.Public { 304 + t.Error("Expected public=true for captain record") 305 + } 306 + 307 + // Create request without auth headers (anonymous user) 308 + req := httptest.NewRequest(http.MethodGet, "/test", nil) 309 + 310 + // This should return nil (public access allowed) for public holds 311 + user, err := ValidateBlobReadAccess(req, pds, nil) 312 + if err != nil { 313 + t.Errorf("Expected public access for public hold, got error: %v", err) 314 + } 315 + 316 + // nil user indicates public access 317 + if user != nil { 318 + t.Error("Expected nil user for public access") 319 + } 320 + } 321 + 322 + // TestValidateBlobReadAccess_PrivateHold tests private hold access 323 + func TestValidateBlobReadAccess_PrivateHold(t *testing.T) { 324 + pds, ctx := setupTestPDS(t) 325 + 326 + ownerDID := "did:plc:owner123" 327 + 328 + // Bootstrap with public=false 329 + err := pds.Bootstrap(ctx, ownerDID, false, false) 330 + if err != nil { 331 + t.Fatalf("Failed to bootstrap PDS: %v", err) 332 + } 333 + 334 + // Update captain to be private 335 + _, err = pds.UpdateCaptainRecord(ctx, false, false) 336 + if err != nil { 337 + t.Fatalf("Failed to update captain record: %v", err) 338 + } 339 + 340 + // Verify captain record has public=false 341 + _, captain, err := pds.GetCaptainRecord(ctx) 342 + if err != nil { 343 + t.Fatalf("Failed to get captain record: %v", err) 344 + } 345 + 346 + if captain.Public { 347 + t.Error("Expected public=false for captain record") 348 + } 349 + 350 + // Create request without auth headers (anonymous user) 351 + req := httptest.NewRequest(http.MethodGet, "/test", nil) 352 + 353 + // This should return error (auth required) for private holds 354 + user, err := ValidateBlobReadAccess(req, pds, nil) 355 + if err == nil { 356 + t.Error("Expected error for private hold without auth") 357 + } 358 + 359 + if user != nil { 360 + t.Error("Expected nil user when auth fails") 361 + } 362 + } 363 + 364 + // TestValidateOwnerOrCrewAdmin tests admin permission checking 365 + func TestValidateOwnerOrCrewAdmin(t *testing.T) { 366 + pds, ctx := setupTestPDS(t) 367 + 368 + ownerDID := "did:plc:owner123" 369 + 370 + // Bootstrap 371 + err := pds.Bootstrap(ctx, ownerDID, true, false) 372 + if err != nil { 373 + t.Fatalf("Failed to bootstrap PDS: %v", err) 374 + } 375 + 376 + // Add crew member with crew:admin permission 377 + adminDID := "did:plc:admin123" 378 + _, err = pds.AddCrewMember(ctx, adminDID, "admin", []string{"crew:admin", "blob:write", "blob:read"}) 379 + if err != nil { 380 + t.Fatalf("Failed to add crew admin: %v", err) 381 + } 382 + 383 + // Add crew member without crew:admin permission 384 + writerDID := "did:plc:writer123" 385 + _, err = pds.AddCrewMember(ctx, writerDID, "writer", []string{"blob:write"}) 386 + if err != nil { 387 + t.Fatalf("Failed to add crew writer: %v", err) 388 + } 389 + 390 + // Verify crew records were created 391 + crew, err := pds.ListCrewMembers(ctx) 392 + if err != nil { 393 + t.Fatalf("Failed to list crew members: %v", err) 394 + } 395 + 396 + // Verify admin has crew:admin permission 397 + hasAdminPermission := false 398 + for _, member := range crew { 399 + if member.Record.Member == adminDID { 400 + if slices.Contains(member.Record.Permissions, "crew:admin") { 401 + hasAdminPermission = true 402 + } 403 + } 404 + } 405 + 406 + if !hasAdminPermission { 407 + t.Error("Admin crew member should have crew:admin permission") 408 + } 409 + 410 + // Verify writer does NOT have crew:admin permission 411 + writerHasAdminPermission := false 412 + for _, member := range crew { 413 + if member.Record.Member == writerDID { 414 + if slices.Contains(member.Record.Permissions, "crew:admin") { 415 + writerHasAdminPermission = true 416 + } 417 + } 418 + } 419 + 420 + if writerHasAdminPermission { 421 + t.Error("Writer crew member should NOT have crew:admin permission") 422 + } 423 + 424 + // Test that function requires auth (will fail without DPoP tokens) 425 + req := httptest.NewRequest(http.MethodPost, "/test", nil) 426 + _, err = ValidateOwnerOrCrewAdmin(req, pds, nil) 427 + if err == nil { 428 + t.Error("Expected error for missing auth headers") 429 + } 430 + } 431 + 432 + // TestCrewPermissions tests various permission combinations 433 + func TestCrewPermissions(t *testing.T) { 434 + pds, ctx := setupTestPDS(t) 435 + 436 + ownerDID := "did:plc:owner123" 437 + 438 + // Bootstrap 439 + err := pds.Bootstrap(ctx, ownerDID, true, false) 440 + if err != nil { 441 + t.Fatalf("Failed to bootstrap PDS: %v", err) 442 + } 443 + 444 + tests := []struct { 445 + name string 446 + did string 447 + role string 448 + permissions []string 449 + }{ 450 + { 451 + name: "full admin", 452 + did: "did:plc:fulladmin", 453 + role: "admin", 454 + permissions: []string{"crew:admin", "blob:write", "blob:read"}, 455 + }, 456 + { 457 + name: "writer only", 458 + did: "did:plc:writer", 459 + role: "writer", 460 + permissions: []string{"blob:write"}, 461 + }, 462 + { 463 + name: "reader only", 464 + did: "did:plc:reader", 465 + role: "reader", 466 + permissions: []string{"blob:read"}, 467 + }, 468 + { 469 + name: "read-write", 470 + did: "did:plc:readwrite", 471 + role: "editor", 472 + permissions: []string{"blob:read", "blob:write"}, 473 + }, 474 + } 475 + 476 + // Add all crew members 477 + for _, tt := range tests { 478 + _, err := pds.AddCrewMember(ctx, tt.did, tt.role, tt.permissions) 479 + if err != nil { 480 + t.Fatalf("Failed to add crew member %s: %v", tt.name, err) 481 + } 482 + } 483 + 484 + // Verify all crew members were created 485 + crew, err := pds.ListCrewMembers(ctx) 486 + if err != nil { 487 + t.Fatalf("Failed to list crew members: %v", err) 488 + } 489 + 490 + // Should have: 1 owner (from bootstrap) + 4 test crew members 491 + expectedCount := len(tests) + 1 492 + if len(crew) != expectedCount { 493 + t.Errorf("Expected %d crew members (owner + %d test members), got %d", 494 + expectedCount, len(tests), len(crew)) 495 + } 496 + 497 + // Verify each crew member has the expected permissions 498 + for _, tt := range tests { 499 + found := false 500 + for _, member := range crew { 501 + if member.Record.Member == tt.did { 502 + found = true 503 + 504 + // Check that all expected permissions are present 505 + for _, expectedPerm := range tt.permissions { 506 + hasPerm := slices.Contains(member.Record.Permissions, expectedPerm) 507 + if !hasPerm { 508 + t.Errorf("Crew member %s missing expected permission %s", 509 + tt.name, expectedPerm) 510 + } 511 + } 512 + 513 + // Verify role 514 + if member.Record.Role != tt.role { 515 + t.Errorf("Crew member %s has role %s, expected %s", 516 + tt.name, member.Record.Role, tt.role) 517 + } 518 + } 519 + } 520 + 521 + if !found { 522 + t.Errorf("Crew member %s not found in list", tt.name) 523 + } 524 + } 525 + } 526 + 527 + // TestCaptainRecordSettings tests captain record public/allowAllCrew settings 528 + func TestCaptainRecordSettings(t *testing.T) { 529 + tests := []struct { 530 + name string 531 + public bool 532 + allowAllCrew bool 533 + }{ 534 + { 535 + name: "public hold, crew approval required", 536 + public: true, 537 + allowAllCrew: false, 538 + }, 539 + { 540 + name: "public hold, open crew", 541 + public: true, 542 + allowAllCrew: true, 543 + }, 544 + { 545 + name: "private hold, crew approval required", 546 + public: false, 547 + allowAllCrew: false, 548 + }, 549 + { 550 + name: "private hold, open crew", 551 + public: false, 552 + allowAllCrew: true, 553 + }, 554 + } 555 + 556 + for _, tt := range tests { 557 + t.Run(tt.name, func(t *testing.T) { 558 + pds, ctx := setupTestPDS(t) 559 + 560 + ownerDID := "did:plc:owner123" 561 + 562 + // Bootstrap with specified settings 563 + err := pds.Bootstrap(ctx, ownerDID, tt.public, tt.allowAllCrew) 564 + if err != nil { 565 + t.Fatalf("Failed to bootstrap PDS: %v", err) 566 + } 567 + 568 + // Verify captain record has expected settings 569 + _, captain, err := pds.GetCaptainRecord(ctx) 570 + if err != nil { 571 + t.Fatalf("Failed to get captain record: %v", err) 572 + } 573 + 574 + if captain.Public != tt.public { 575 + t.Errorf("Expected public=%v, got %v", tt.public, captain.Public) 576 + } 577 + 578 + if captain.AllowAllCrew != tt.allowAllCrew { 579 + t.Errorf("Expected allowAllCrew=%v, got %v", tt.allowAllCrew, captain.AllowAllCrew) 580 + } 581 + 582 + if captain.Owner != ownerDID { 583 + t.Errorf("Expected owner %s, got %s", ownerDID, captain.Owner) 584 + } 585 + }) 586 + } 587 + }
+274
pkg/hold/pds/did_test.go
··· 1 + package pds 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "path/filepath" 7 + "testing" 8 + ) 9 + 10 + // TestGenerateDIDFromURL tests DID generation from various URL formats 11 + func TestGenerateDIDFromURL(t *testing.T) { 12 + tests := []struct { 13 + name string 14 + publicURL string 15 + expectedDID string 16 + }{ 17 + { 18 + name: "standard HTTP with standard port", 19 + publicURL: "http://hold.example.com", 20 + expectedDID: "did:web:hold.example.com", 21 + }, 22 + { 23 + name: "standard HTTPS with standard port", 24 + publicURL: "https://hold.example.com", 25 + expectedDID: "did:web:hold.example.com", 26 + }, 27 + { 28 + name: "HTTP with non-standard port", 29 + publicURL: "http://hold.example.com:8080", 30 + expectedDID: "did:web:hold.example.com:8080", 31 + }, 32 + { 33 + name: "HTTPS with non-standard port", 34 + publicURL: "https://hold.example.com:8443", 35 + expectedDID: "did:web:hold.example.com:8443", 36 + }, 37 + { 38 + name: "localhost with port", 39 + publicURL: "http://localhost:8080", 40 + expectedDID: "did:web:localhost:8080", 41 + }, 42 + { 43 + name: "HTTP with explicit port 80", 44 + publicURL: "http://hold.example.com:80", 45 + expectedDID: "did:web:hold.example.com", 46 + }, 47 + { 48 + name: "HTTPS with explicit port 443", 49 + publicURL: "https://hold.example.com:443", 50 + expectedDID: "did:web:hold.example.com", 51 + }, 52 + { 53 + name: "subdomain", 54 + publicURL: "https://hold1.atcr.io", 55 + expectedDID: "did:web:hold1.atcr.io", 56 + }, 57 + } 58 + 59 + for _, tt := range tests { 60 + t.Run(tt.name, func(t *testing.T) { 61 + did := GenerateDIDFromURL(tt.publicURL) 62 + if did != tt.expectedDID { 63 + t.Errorf("Expected DID %s, got %s", tt.expectedDID, did) 64 + } 65 + }) 66 + } 67 + } 68 + 69 + // TestGenerateDIDFromURL_InvalidURL tests handling of invalid URLs 70 + func TestGenerateDIDFromURL_InvalidURL(t *testing.T) { 71 + // Invalid URLs get parsed with empty hostname, which defaults to localhost 72 + did := GenerateDIDFromURL("not a url") 73 + if did != "did:web:localhost" { 74 + t.Errorf("Expected did:web:localhost for invalid URL, got %s", did) 75 + } 76 + } 77 + 78 + // TestGenerateDIDDocument tests DID document generation 79 + func TestGenerateDIDDocument(t *testing.T) { 80 + ctx := context.Background() 81 + tmpDir := t.TempDir() 82 + 83 + dbPath := filepath.Join(tmpDir, "pds.db") 84 + keyPath := filepath.Join(tmpDir, "signing-key") 85 + publicURL := "https://hold.example.com" 86 + 87 + pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", publicURL, dbPath, keyPath) 88 + if err != nil { 89 + t.Fatalf("Failed to create PDS: %v", err) 90 + } 91 + 92 + doc, err := pds.GenerateDIDDocument(publicURL) 93 + if err != nil { 94 + t.Fatalf("Failed to generate DID document: %v", err) 95 + } 96 + 97 + // Verify required fields 98 + if doc.ID != "did:web:hold.example.com" { 99 + t.Errorf("Expected DID did:web:hold.example.com, got %s", doc.ID) 100 + } 101 + 102 + // Verify context 103 + if len(doc.Context) != 3 { 104 + t.Errorf("Expected 3 context entries, got %d", len(doc.Context)) 105 + } 106 + 107 + expectedContexts := []string{ 108 + "https://www.w3.org/ns/did/v1", 109 + "https://w3id.org/security/multikey/v1", 110 + "https://w3id.org/security/suites/secp256k1-2019/v1", 111 + } 112 + for i, expected := range expectedContexts { 113 + if doc.Context[i] != expected { 114 + t.Errorf("Expected context[%d] = %s, got %s", i, expected, doc.Context[i]) 115 + } 116 + } 117 + 118 + // Verify alsoKnownAs 119 + if len(doc.AlsoKnownAs) != 1 || doc.AlsoKnownAs[0] != "at://hold.example.com" { 120 + t.Errorf("Expected alsoKnownAs=['at://hold.example.com'], got %v", doc.AlsoKnownAs) 121 + } 122 + 123 + // Verify verification method 124 + if len(doc.VerificationMethod) != 1 { 125 + t.Fatalf("Expected 1 verification method, got %d", len(doc.VerificationMethod)) 126 + } 127 + 128 + vm := doc.VerificationMethod[0] 129 + if vm.ID != "did:web:hold.example.com#atproto" { 130 + t.Errorf("Expected verification method ID did:web:hold.example.com#atproto, got %s", vm.ID) 131 + } 132 + if vm.Type != "Multikey" { 133 + t.Errorf("Expected type Multikey, got %s", vm.Type) 134 + } 135 + if vm.Controller != "did:web:hold.example.com" { 136 + t.Errorf("Expected controller did:web:hold.example.com, got %s", vm.Controller) 137 + } 138 + if vm.PublicKeyMultibase == "" { 139 + t.Error("Expected non-empty publicKeyMultibase") 140 + } 141 + 142 + // Verify authentication 143 + if len(doc.Authentication) != 1 || doc.Authentication[0] != "did:web:hold.example.com#atproto" { 144 + t.Errorf("Expected authentication=['did:web:hold.example.com#atproto'], got %v", doc.Authentication) 145 + } 146 + 147 + // Verify services 148 + if len(doc.Service) != 2 { 149 + t.Fatalf("Expected 2 services, got %d", len(doc.Service)) 150 + } 151 + 152 + // Check PDS service 153 + pdsService := doc.Service[0] 154 + if pdsService.ID != "#atproto_pds" { 155 + t.Errorf("Expected service ID #atproto_pds, got %s", pdsService.ID) 156 + } 157 + if pdsService.Type != "AtprotoPersonalDataServer" { 158 + t.Errorf("Expected service type AtprotoPersonalDataServer, got %s", pdsService.Type) 159 + } 160 + if pdsService.ServiceEndpoint != publicURL { 161 + t.Errorf("Expected service endpoint %s, got %s", publicURL, pdsService.ServiceEndpoint) 162 + } 163 + 164 + // Check hold service 165 + holdService := doc.Service[1] 166 + if holdService.ID != "#atcr_hold" { 167 + t.Errorf("Expected service ID #atcr_hold, got %s", holdService.ID) 168 + } 169 + if holdService.Type != "AtcrHoldService" { 170 + t.Errorf("Expected service type AtcrHoldService, got %s", holdService.Type) 171 + } 172 + if holdService.ServiceEndpoint != publicURL { 173 + t.Errorf("Expected service endpoint %s, got %s", publicURL, holdService.ServiceEndpoint) 174 + } 175 + } 176 + 177 + // TestGenerateDIDDocument_WithPort tests DID document with non-standard port 178 + func TestGenerateDIDDocument_WithPort(t *testing.T) { 179 + ctx := context.Background() 180 + tmpDir := t.TempDir() 181 + 182 + dbPath := filepath.Join(tmpDir, "pds.db") 183 + keyPath := filepath.Join(tmpDir, "signing-key") 184 + publicURL := "https://hold.example.com:8443" 185 + 186 + pds, err := NewHoldPDS(ctx, "did:web:hold.example.com:8443", publicURL, dbPath, keyPath) 187 + if err != nil { 188 + t.Fatalf("Failed to create PDS: %v", err) 189 + } 190 + 191 + doc, err := pds.GenerateDIDDocument(publicURL) 192 + if err != nil { 193 + t.Fatalf("Failed to generate DID document: %v", err) 194 + } 195 + 196 + // Verify DID includes port 197 + if doc.ID != "did:web:hold.example.com:8443" { 198 + t.Errorf("Expected DID did:web:hold.example.com:8443, got %s", doc.ID) 199 + } 200 + 201 + // Verify alsoKnownAs includes port 202 + if doc.AlsoKnownAs[0] != "at://hold.example.com:8443" { 203 + t.Errorf("Expected alsoKnownAs with port, got %s", doc.AlsoKnownAs[0]) 204 + } 205 + } 206 + 207 + // TestMarshalDIDDocument tests DID document JSON marshaling 208 + func TestMarshalDIDDocument(t *testing.T) { 209 + ctx := context.Background() 210 + tmpDir := t.TempDir() 211 + 212 + dbPath := filepath.Join(tmpDir, "pds.db") 213 + keyPath := filepath.Join(tmpDir, "signing-key") 214 + publicURL := "https://hold.example.com" 215 + 216 + pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", publicURL, dbPath, keyPath) 217 + if err != nil { 218 + t.Fatalf("Failed to create PDS: %v", err) 219 + } 220 + 221 + jsonBytes, err := pds.MarshalDIDDocument() 222 + if err != nil { 223 + t.Fatalf("Failed to marshal DID document: %v", err) 224 + } 225 + 226 + // Verify it's valid JSON 227 + var doc map[string]any 228 + if err := json.Unmarshal(jsonBytes, &doc); err != nil { 229 + t.Fatalf("Failed to unmarshal DID document JSON: %v", err) 230 + } 231 + 232 + // Verify required fields 233 + if id, ok := doc["id"].(string); !ok || id != "did:web:hold.example.com" { 234 + t.Errorf("Expected id='did:web:hold.example.com', got %v", doc["id"]) 235 + } 236 + 237 + if _, ok := doc["@context"]; !ok { 238 + t.Error("Expected @context field in JSON") 239 + } 240 + 241 + if _, ok := doc["verificationMethod"]; !ok { 242 + t.Error("Expected verificationMethod field in JSON") 243 + } 244 + 245 + if _, ok := doc["service"]; !ok { 246 + t.Error("Expected service field in JSON") 247 + } 248 + 249 + // Verify pretty-printed (has indentation) 250 + if len(jsonBytes) < 100 { 251 + t.Error("Expected pretty-printed JSON to be reasonably sized") 252 + } 253 + } 254 + 255 + // TestGenerateDIDDocument_InvalidURL tests error handling 256 + func TestGenerateDIDDocument_InvalidURL(t *testing.T) { 257 + ctx := context.Background() 258 + tmpDir := t.TempDir() 259 + 260 + dbPath := filepath.Join(tmpDir, "pds.db") 261 + keyPath := filepath.Join(tmpDir, "signing-key") 262 + publicURL := "https://hold.example.com" 263 + 264 + pds, err := NewHoldPDS(ctx, "did:web:hold.example.com", publicURL, dbPath, keyPath) 265 + if err != nil { 266 + t.Fatalf("Failed to create PDS: %v", err) 267 + } 268 + 269 + // Try to generate DID document with invalid URL 270 + _, err = pds.GenerateDIDDocument("ht!tp://invalid url") 271 + if err == nil { 272 + t.Error("Expected error for invalid URL, got nil") 273 + } 274 + }
+10 -10
pkg/hold/pds/events.go
··· 19 19 eventSeq int64 20 20 eventHistory []HistoricalEvent // Ring buffer for cursor backfill 21 21 maxHistory int 22 - holdDID string // DID of the hold for setting repo field 22 + holdDID string // DID of the hold for setting repo field 23 23 } 24 24 25 25 // Subscriber represents a WebSocket client subscribed to the firehose ··· 37 37 38 38 // RepoCommitEvent represents a #commit event in subscribeRepos 39 39 type RepoCommitEvent struct { 40 - Seq int64 `json:"seq" cborgen:"seq"` 41 - Repo string `json:"repo" cborgen:"repo"` 42 - Commit string `json:"commit" cborgen:"commit"` // CID string 43 - Rev string `json:"rev" cborgen:"rev"` 44 - Since *string `json:"since,omitempty" cborgen:"since,omitempty"` 45 - Blocks []byte `json:"blocks" cborgen:"blocks"` // CAR slice bytes 46 - Ops []*atproto.SyncSubscribeRepos_RepoOp `json:"ops" cborgen:"ops"` 47 - Time string `json:"time" cborgen:"time"` 48 - Type string `json:"$type" cborgen:"$type"` // Always "#commit" 40 + Seq int64 `json:"seq" cborgen:"seq"` 41 + Repo string `json:"repo" cborgen:"repo"` 42 + Commit string `json:"commit" cborgen:"commit"` // CID string 43 + Rev string `json:"rev" cborgen:"rev"` 44 + Since *string `json:"since,omitempty" cborgen:"since,omitempty"` 45 + Blocks []byte `json:"blocks" cborgen:"blocks"` // CAR slice bytes 46 + Ops []*atproto.SyncSubscribeRepos_RepoOp `json:"ops" cborgen:"ops"` 47 + Time string `json:"time" cborgen:"time"` 48 + Type string `json:"$type" cborgen:"$type"` // Always "#commit" 49 49 } 50 50 51 51 // NewEventBroadcaster creates a new event broadcaster
+384
pkg/hold/pds/events_test.go
··· 1 + package pds 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "testing" 7 + "time" 8 + 9 + atproto "github.com/bluesky-social/indigo/api/atproto" 10 + "github.com/ipfs/go-cid" 11 + ) 12 + 13 + // TestNewEventBroadcaster tests event broadcaster creation 14 + func TestNewEventBroadcaster(t *testing.T) { 15 + holdDID := "did:web:hold.example.com" 16 + broadcaster := NewEventBroadcaster(holdDID, 100) 17 + 18 + if broadcaster.holdDID != holdDID { 19 + t.Errorf("Expected holdDID=%s, got %s", holdDID, broadcaster.holdDID) 20 + } 21 + 22 + if broadcaster.eventSeq != 0 { 23 + t.Errorf("Expected initial eventSeq=0, got %d", broadcaster.eventSeq) 24 + } 25 + 26 + if broadcaster.maxHistory != 100 { 27 + t.Errorf("Expected maxHistory=100, got %d", broadcaster.maxHistory) 28 + } 29 + 30 + if len(broadcaster.subscribers) != 0 { 31 + t.Errorf("Expected 0 subscribers initially, got %d", len(broadcaster.subscribers)) 32 + } 33 + } 34 + 35 + // TestNewEventBroadcaster_DefaultHistory tests default history size 36 + func TestNewEventBroadcaster_DefaultHistory(t *testing.T) { 37 + // Zero or negative maxHistory should default to 100 38 + broadcaster := NewEventBroadcaster("did:web:test", 0) 39 + if broadcaster.maxHistory != 100 { 40 + t.Errorf("Expected default maxHistory=100 for input 0, got %d", broadcaster.maxHistory) 41 + } 42 + 43 + broadcaster2 := NewEventBroadcaster("did:web:test", -5) 44 + if broadcaster2.maxHistory != 100 { 45 + t.Errorf("Expected default maxHistory=100 for negative input, got %d", broadcaster2.maxHistory) 46 + } 47 + } 48 + 49 + // TestGetCurrentSeq tests sequence number tracking 50 + func TestGetCurrentSeq(t *testing.T) { 51 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10) 52 + 53 + // Initial seq should be 0 54 + seq := broadcaster.GetCurrentSeq() 55 + if seq != 0 { 56 + t.Errorf("Expected initial seq=0, got %d", seq) 57 + } 58 + 59 + // After broadcasting, seq should increment 60 + ctx := context.Background() 61 + testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") 62 + 63 + event := &RepoEvent{ 64 + NewRoot: testCID, 65 + Rev: "test-rev-1", 66 + RepoSlice: []byte("test CAR data"), 67 + Ops: []RepoOp{ 68 + { 69 + Kind: EvtKindCreateRecord, 70 + Collection: "io.atcr.hold.crew", 71 + Rkey: "test123", 72 + }, 73 + }, 74 + } 75 + 76 + broadcaster.Broadcast(ctx, event) 77 + 78 + seq = broadcaster.GetCurrentSeq() 79 + if seq != 1 { 80 + t.Errorf("Expected seq=1 after one broadcast, got %d", seq) 81 + } 82 + 83 + // Broadcast again 84 + broadcaster.Broadcast(ctx, event) 85 + 86 + seq = broadcaster.GetCurrentSeq() 87 + if seq != 2 { 88 + t.Errorf("Expected seq=2 after two broadcasts, got %d", seq) 89 + } 90 + } 91 + 92 + // TestBroadcast tests event broadcasting 93 + func TestBroadcast(t *testing.T) { 94 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10) 95 + ctx := context.Background() 96 + 97 + testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") 98 + 99 + event := &RepoEvent{ 100 + NewRoot: testCID, 101 + Rev: "test-rev-1", 102 + RepoSlice: []byte("test CAR data"), 103 + Ops: []RepoOp{ 104 + { 105 + Kind: EvtKindCreateRecord, 106 + Collection: "io.atcr.hold.crew", 107 + Rkey: "test123", 108 + RecCid: &testCID, 109 + }, 110 + }, 111 + } 112 + 113 + // Broadcast should not panic without subscribers 114 + broadcaster.Broadcast(ctx, event) 115 + 116 + // Verify sequence incremented 117 + if broadcaster.eventSeq != 1 { 118 + t.Errorf("Expected eventSeq=1, got %d", broadcaster.eventSeq) 119 + } 120 + 121 + // Verify event added to history 122 + if len(broadcaster.eventHistory) != 1 { 123 + t.Errorf("Expected 1 event in history, got %d", len(broadcaster.eventHistory)) 124 + } 125 + 126 + he := broadcaster.eventHistory[0] 127 + if he.Seq != 1 { 128 + t.Errorf("Expected history seq=1, got %d", he.Seq) 129 + } 130 + 131 + if he.Event.Repo != "did:web:hold.example.com" { 132 + t.Errorf("Expected repo=did:web:hold.example.com, got %s", he.Event.Repo) 133 + } 134 + 135 + if he.Event.Type != "#commit" { 136 + t.Errorf("Expected type=#commit, got %s", he.Event.Type) 137 + } 138 + 139 + if len(he.Event.Ops) != 1 { 140 + t.Errorf("Expected 1 op, got %d", len(he.Event.Ops)) 141 + } 142 + } 143 + 144 + // TestAddToHistory_RingBuffer tests ring buffer behavior 145 + func TestAddToHistory_RingBuffer(t *testing.T) { 146 + // Create broadcaster with small history 147 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 3) 148 + ctx := context.Background() 149 + 150 + testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") 151 + 152 + // Broadcast 5 events (exceeds maxHistory of 3) 153 + for i := 0; i < 5; i++ { 154 + event := &RepoEvent{ 155 + NewRoot: testCID, 156 + Rev: "test-rev", 157 + RepoSlice: []byte("test CAR data"), 158 + Ops: []RepoOp{}, 159 + } 160 + broadcaster.Broadcast(ctx, event) 161 + } 162 + 163 + // Should only keep last 3 events 164 + if len(broadcaster.eventHistory) != 3 { 165 + t.Errorf("Expected 3 events in history (ring buffer), got %d", len(broadcaster.eventHistory)) 166 + } 167 + 168 + // Verify we kept the most recent events (seq 3, 4, 5) 169 + expectedSeqs := []int64{3, 4, 5} 170 + for i, expected := range expectedSeqs { 171 + if broadcaster.eventHistory[i].Seq != expected { 172 + t.Errorf("Expected history[%d].Seq=%d, got %d", i, expected, broadcaster.eventHistory[i].Seq) 173 + } 174 + } 175 + 176 + // Final sequence should be 5 177 + if broadcaster.eventSeq != 5 { 178 + t.Errorf("Expected eventSeq=5, got %d", broadcaster.eventSeq) 179 + } 180 + } 181 + 182 + // TestConvertToCommitEvent tests event conversion 183 + func TestConvertToCommitEvent(t *testing.T) { 184 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10) 185 + 186 + testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") 187 + since := "prev-rev" 188 + 189 + event := &RepoEvent{ 190 + NewRoot: testCID, 191 + Rev: "test-rev-123", 192 + Since: &since, 193 + RepoSlice: []byte("test CAR data"), 194 + Ops: []RepoOp{ 195 + { 196 + Kind: EvtKindCreateRecord, 197 + Collection: "io.atcr.hold.crew", 198 + Rkey: "member1", 199 + RecCid: &testCID, 200 + }, 201 + { 202 + Kind: EvtKindUpdateRecord, 203 + Collection: "io.atcr.hold.captain", 204 + Rkey: "self", 205 + RecCid: &testCID, 206 + }, 207 + { 208 + Kind: EvtKindDeleteRecord, 209 + Collection: "io.atcr.hold.crew", 210 + Rkey: "oldmember", 211 + RecCid: nil, // Deletes don't have CIDs 212 + }, 213 + }, 214 + } 215 + 216 + commitEvent := broadcaster.convertToCommitEvent(event, 42) 217 + 218 + // Verify basic fields 219 + if commitEvent.Seq != 42 { 220 + t.Errorf("Expected seq=42, got %d", commitEvent.Seq) 221 + } 222 + 223 + if commitEvent.Repo != "did:web:hold.example.com" { 224 + t.Errorf("Expected repo=did:web:hold.example.com, got %s", commitEvent.Repo) 225 + } 226 + 227 + if commitEvent.Commit != testCID.String() { 228 + t.Errorf("Expected commit=%s, got %s", testCID.String(), commitEvent.Commit) 229 + } 230 + 231 + if commitEvent.Rev != "test-rev-123" { 232 + t.Errorf("Expected rev=test-rev-123, got %s", commitEvent.Rev) 233 + } 234 + 235 + if commitEvent.Since == nil || *commitEvent.Since != since { 236 + t.Errorf("Expected since=%s, got %v", since, commitEvent.Since) 237 + } 238 + 239 + if string(commitEvent.Blocks) != "test CAR data" { 240 + t.Errorf("Expected blocks='test CAR data', got %s", string(commitEvent.Blocks)) 241 + } 242 + 243 + if commitEvent.Type != "#commit" { 244 + t.Errorf("Expected type=#commit, got %s", commitEvent.Type) 245 + } 246 + 247 + // Verify time is set 248 + if commitEvent.Time == "" { 249 + t.Error("Expected non-empty time") 250 + } 251 + 252 + // Parse time to verify it's valid RFC3339 253 + _, err := time.Parse(time.RFC3339, commitEvent.Time) 254 + if err != nil { 255 + t.Errorf("Expected valid RFC3339 time, got error: %v", err) 256 + } 257 + 258 + // Verify ops conversion 259 + if len(commitEvent.Ops) != 3 { 260 + t.Fatalf("Expected 3 ops, got %d", len(commitEvent.Ops)) 261 + } 262 + 263 + // Check create op 264 + createOp := commitEvent.Ops[0] 265 + if createOp.Action != "create" { 266 + t.Errorf("Expected action=create, got %s", createOp.Action) 267 + } 268 + if createOp.Path != "io.atcr.hold.crew/member1" { 269 + t.Errorf("Expected path=io.atcr.hold.crew/member1, got %s", createOp.Path) 270 + } 271 + if createOp.Cid == nil { 272 + t.Error("Expected non-nil CID for create op") 273 + } 274 + 275 + // Check update op 276 + updateOp := commitEvent.Ops[1] 277 + if updateOp.Action != "update" { 278 + t.Errorf("Expected action=update, got %s", updateOp.Action) 279 + } 280 + if updateOp.Path != "io.atcr.hold.captain/self" { 281 + t.Errorf("Expected path=io.atcr.hold.captain/self, got %s", updateOp.Path) 282 + } 283 + 284 + // Check delete op 285 + deleteOp := commitEvent.Ops[2] 286 + if deleteOp.Action != "delete" { 287 + t.Errorf("Expected action=delete, got %s", deleteOp.Action) 288 + } 289 + if deleteOp.Path != "io.atcr.hold.crew/oldmember" { 290 + t.Errorf("Expected path=io.atcr.hold.crew/oldmember, got %s", deleteOp.Path) 291 + } 292 + if deleteOp.Cid != nil { 293 + t.Error("Expected nil CID for delete op") 294 + } 295 + } 296 + 297 + // TestConvertToCommitEvent_NoSince tests event without since field 298 + func TestConvertToCommitEvent_NoSince(t *testing.T) { 299 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10) 300 + 301 + testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") 302 + 303 + event := &RepoEvent{ 304 + NewRoot: testCID, 305 + Rev: "test-rev-123", 306 + Since: nil, // No since 307 + RepoSlice: []byte("test CAR data"), 308 + Ops: []RepoOp{}, 309 + } 310 + 311 + commitEvent := broadcaster.convertToCommitEvent(event, 1) 312 + 313 + if commitEvent.Since != nil { 314 + t.Errorf("Expected nil since, got %v", commitEvent.Since) 315 + } 316 + } 317 + 318 + // TestSetRepoEventHandler tests handler registration 319 + func TestSetRepoEventHandler(t *testing.T) { 320 + broadcaster := NewEventBroadcaster("did:web:hold.example.com", 10) 321 + 322 + handler := broadcaster.SetRepoEventHandler() 323 + if handler == nil { 324 + t.Fatal("Expected non-nil handler") 325 + } 326 + 327 + // Call handler 328 + ctx := context.Background() 329 + testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") 330 + 331 + event := &RepoEvent{ 332 + NewRoot: testCID, 333 + Rev: "test-rev", 334 + RepoSlice: []byte("test CAR data"), 335 + Ops: []RepoOp{}, 336 + } 337 + 338 + handler(ctx, event) 339 + 340 + // Verify event was broadcast 341 + if broadcaster.eventSeq != 1 { 342 + t.Errorf("Expected eventSeq=1 after handler call, got %d", broadcaster.eventSeq) 343 + } 344 + 345 + if len(broadcaster.eventHistory) != 1 { 346 + t.Errorf("Expected 1 event in history after handler call, got %d", len(broadcaster.eventHistory)) 347 + } 348 + } 349 + 350 + // TestEncodeCBOR tests CBOR encoding (currently JSON) 351 + func TestEncodeCBOR(t *testing.T) { 352 + testCID, _ := cid.Decode("bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke") 353 + 354 + event := &RepoCommitEvent{ 355 + Seq: 1, 356 + Repo: "did:web:hold.example.com", 357 + Commit: testCID.String(), 358 + Rev: "test-rev", 359 + Blocks: []byte("test data"), 360 + Ops: []*atproto.SyncSubscribeRepos_RepoOp{}, 361 + Time: time.Now().Format(time.RFC3339), 362 + Type: "#commit", 363 + } 364 + 365 + encoded, err := encodeCBOR(event) 366 + if err != nil { 367 + t.Fatalf("Failed to encode CBOR: %v", err) 368 + } 369 + 370 + if len(encoded) == 0 { 371 + t.Error("Expected non-empty encoded data") 372 + } 373 + 374 + // Current implementation uses JSON, so verify it's valid JSON 375 + // In future, this would be proper CBOR validation 376 + var decoded RepoCommitEvent 377 + if err := json.Unmarshal(encoded, &decoded); err != nil { 378 + t.Errorf("Failed to decode JSON: %v", err) 379 + } 380 + 381 + if decoded.Seq != 1 { 382 + t.Errorf("Expected decoded seq=1, got %d", decoded.Seq) 383 + } 384 + }
+8 -8
pkg/hold/pds/repomgr.go
··· 5 5 // Reason: The indigo library is unmaintained and contains a critical bug in UpdateRecord 6 6 // 7 7 // Modifications from original: 8 - // - Changed package from 'repomgr' to 'pds' for integration with hold service 9 - // - Fixed UpdateRecord bug (line 263): Changed r.PutRecord to r.UpdateRecord 10 - // (UpdateRecord was incorrectly calling PutRecord, causing incorrect MST operations) 11 - // - Removed 5 Prometheus metrics calls (openAndSigCheckDuration, calcDiffDuration, 12 - // writeCarSliceDuration, repoOpsImported) as metrics are not used in this project 13 - // - Added PutRecord method (lines 309-381) for creating records with explicit rkeys 14 - // (like CreateRecord but with specified rkey instead of auto-generated TID) 15 - // Based on streamplace/indigo implementation 8 + // - Changed package from 'repomgr' to 'pds' for integration with hold service 9 + // - Fixed UpdateRecord bug (line 263): Changed r.PutRecord to r.UpdateRecord 10 + // (UpdateRecord was incorrectly calling PutRecord, causing incorrect MST operations) 11 + // - Removed 5 Prometheus metrics calls (openAndSigCheckDuration, calcDiffDuration, 12 + // writeCarSliceDuration, repoOpsImported) as metrics are not used in this project 13 + // - Added PutRecord method (lines 309-381) for creating records with explicit rkeys 14 + // (like CreateRecord but with specified rkey instead of auto-generated TID) 15 + // Based on streamplace/indigo implementation 16 16 package pds 17 17 18 18 import (
+53 -25
pkg/hold/pds/xrpc.go
··· 27 27 publicURL string 28 28 blobStore BlobStore 29 29 broadcaster *EventBroadcaster 30 + httpClient HTTPClient // For testing - allows injecting mock HTTP client 30 31 } 31 32 32 33 // BlobStore interface wraps the existing hold service storage operations ··· 48 49 // Multipart upload operations (used for OCI container layers only) 49 50 // StartMultipartUpload initiates a multipart upload, returns uploadID and mode 50 51 StartMultipartUpload(ctx context.Context, digest string) (uploadID string, mode string, err error) 51 - // GetPartUploadURL returns a presigned URL for uploading a specific part 52 - GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (url string, err error) 52 + // GetPartUploadURL returns structured upload info (URL + optional headers) for a specific part 53 + GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (*PartUploadInfo, error) 53 54 // CompleteMultipartUpload finalizes a multipart upload 54 55 CompleteMultipartUpload(ctx context.Context, uploadID string, parts []PartInfo) error 55 56 // AbortMultipartUpload cancels a multipart upload ··· 64 65 ETag string `json:"etag"` 65 66 } 66 67 68 + // PartUploadInfo contains structured information for uploading a part 69 + // Used for both S3 presigned URLs and buffered mode with headers 70 + type PartUploadInfo struct { 71 + URL string `json:"url"` // URL to PUT the part to 72 + Method string `json:"method,omitempty"` // HTTP method (usually "PUT") 73 + Headers map[string]string `json:"headers,omitempty"` // Additional headers required for the request 74 + } 75 + 67 76 // NewXRPCHandler creates a new XRPC handler 68 - func NewXRPCHandler(pds *HoldPDS, publicURL string, blobStore BlobStore, broadcaster *EventBroadcaster) *XRPCHandler { 77 + func NewXRPCHandler(pds *HoldPDS, publicURL string, blobStore BlobStore, broadcaster *EventBroadcaster, httpClient HTTPClient) *XRPCHandler { 69 78 return &XRPCHandler{ 70 79 pds: pds, 71 80 publicURL: publicURL, 72 81 blobStore: blobStore, 73 82 broadcaster: broadcaster, 83 + httpClient: httpClient, 74 84 } 75 85 } 76 86 ··· 78 88 func corsMiddleware(next http.HandlerFunc) http.HandlerFunc { 79 89 return func(w http.ResponseWriter, r *http.Request) { 80 90 w.Header().Set("Access-Control-Allow-Origin", "*") 81 - w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS") 82 - w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") 91 + w.Header().Set("Access-Control-Allow-Methods", "GET, HEAD, POST, PUT, OPTIONS") 92 + w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization, DPoP, X-Upload-Id, X-Part-Number, X-ATCR-DID") 83 93 84 94 // Handle preflight OPTIONS requests 85 95 if r.Method == http.MethodOptions { ··· 444 454 } 445 455 446 456 // Validate DPoP + OAuth and check authorization 447 - _, err := ValidateOwnerOrCrewAdmin(r, h.pds) 457 + _, err := ValidateOwnerOrCrewAdmin(r, h.pds, h.httpClient) 448 458 if err != nil { 449 459 http.Error(w, fmt.Sprintf("unauthorized: %v", err), http.StatusForbidden) 450 460 return ··· 733 743 734 744 // Mode 3: Direct blob upload (ATProto-compliant) 735 745 // Receives raw bytes, computes CID, stores via distribution driver 736 - // TODO: Authentication check 737 - 738 - // Extract DID for ATProto blob storage (per-DID paths) 739 - did := r.URL.Query().Get("did") 740 - if did == "" { 741 - // TODO: Extract from auth context when authentication is implemented 742 - // For now, use hold's DID as fallback 743 - did = h.pds.DID() 746 + // Requires admin-level access (captain or crew admin) 747 + user, err := ValidateOwnerOrCrewAdmin(r, h.pds, h.httpClient) 748 + if err != nil { 749 + http.Error(w, fmt.Sprintf("authorization failed: %v", err), http.StatusForbidden) 750 + return 744 751 } 752 + 753 + // Use authenticated user's DID for ATProto blob storage (per-DID paths) 754 + did := user.DID 745 755 746 756 // Upload blob directly - blobStore will compute CID and store 747 757 blobCID, size, err := h.blobStore.UploadBlob(r.Context(), did, r.Body) ··· 770 780 func (h *XRPCHandler) handleBufferedPartUpload(w http.ResponseWriter, r *http.Request, uploadID, partNumberStr string) { 771 781 ctx := r.Context() 772 782 783 + // Validate blob write access 784 + // This checks DPoP + OAuth tokens and verifies user is captain or crew with blob:write permission 785 + _, err := ValidateBlobWriteAccess(r, h.pds, h.httpClient) 786 + if err != nil { 787 + http.Error(w, fmt.Sprintf("authorization failed: %v", err), http.StatusForbidden) 788 + return 789 + } 790 + 773 791 // Parse part number 774 792 partNumber, err := strconv.Atoi(partNumberStr) 775 793 if err != nil { ··· 816 834 return 817 835 } 818 836 837 + // Validate blob write access for all multipart operations 838 + // This checks DPoP + OAuth tokens and verifies user is captain or crew with blob:write permission 839 + user, err := ValidateBlobWriteAccess(r, h.pds, h.httpClient) 840 + if err != nil { 841 + http.Error(w, fmt.Sprintf("authorization failed: %v", err), http.StatusForbidden) 842 + return 843 + } 844 + 819 845 // Route based on action 820 846 switch req.Action { 821 847 case "start": ··· 844 870 return 845 871 } 846 872 847 - // Extract DID from query or header (for authorization) 848 - did := r.URL.Query().Get("did") 849 - if did == "" { 850 - did = r.Header.Get("X-ATCR-DID") 851 - } 852 - 853 - url, err := h.blobStore.GetPartUploadURL(ctx, req.UploadID, req.PartNumber, did) 873 + uploadInfo, err := h.blobStore.GetPartUploadURL(ctx, req.UploadID, req.PartNumber, user.DID) 854 874 if err != nil { 855 875 http.Error(w, fmt.Sprintf("failed to get part URL: %v", err), http.StatusInternalServerError) 856 876 return 857 877 } 858 878 859 879 w.Header().Set("Content-Type", "application/json") 860 - json.NewEncoder(w).Encode(map[string]any{ 861 - "url": url, 862 - }) 880 + json.NewEncoder(w).Encode(uploadInfo) 863 881 864 882 case "complete": 865 883 // Complete multipart upload ··· 902 920 903 921 // HandleGetBlob wraps existing presigned download URL logic 904 922 // Supports both ATProto CIDs and OCI sha256 digests 923 + // Authorization: If captain.public = true, open to all. If false, requires crew with blob:read permission. 905 924 func (h *XRPCHandler) HandleGetBlob(w http.ResponseWriter, r *http.Request) { 906 925 if r.Method != http.MethodGet && r.Method != http.MethodHead { 907 926 http.Error(w, "method not allowed", http.StatusMethodNotAllowed) ··· 918 937 919 938 if did != h.pds.DID() { 920 939 http.Error(w, "invalid did", http.StatusBadRequest) 940 + return 941 + } 942 + 943 + // Validate blob read access 944 + // If captain.public = true, returns nil (public access allowed) 945 + // If captain.public = false, validates auth and checks for blob:read permission 946 + _, err := ValidateBlobReadAccess(r, h.pds, h.httpClient) 947 + if err != nil { 948 + http.Error(w, fmt.Sprintf("authorization failed: %v", err), http.StatusForbidden) 921 949 return 922 950 } 923 951 ··· 1035 1063 } 1036 1064 1037 1065 // Validate DPoP + OAuth token from Authorization and DPoP headers 1038 - user, err := ValidateDPoPRequest(r) 1066 + user, err := ValidateDPoPRequest(r, h.httpClient) 1039 1067 if err != nil { 1040 1068 http.Error(w, fmt.Sprintf("authentication failed: %v", err), http.StatusUnauthorized) 1041 1069 return
+45 -20
pkg/hold/pds/xrpc_multipart_test.go
··· 7 7 "testing" 8 8 ) 9 9 10 + // addTestDPoPAuth adds DPoP authentication headers to a request for testing 11 + func addTestDPoPAuth(t *testing.T, req *http.Request, did string) { 12 + t.Helper() 13 + dpopHelper, err := NewDPoPTestHelper(did, "https://test-pds.example.com") 14 + if err != nil { 15 + t.Fatalf("Failed to create DPoP helper: %v", err) 16 + } 17 + if err := dpopHelper.AddDPoPToRequest(req); err != nil { 18 + t.Fatalf("Failed to add DPoP to request: %v", err) 19 + } 20 + } 21 + 10 22 // ATCR-Specific Tests: Non-standard multipart upload extensions 11 23 // 12 24 // This file contains tests for ATCR's custom multipart upload extensions ··· 28 40 } 29 41 30 42 req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 43 + // Add DPoP authentication - owner has blob:write permission 44 + addTestDPoPAuth(t, req, "did:plc:testowner123") 31 45 w := httptest.NewRecorder() 32 46 33 47 handler.HandleUploadBlob(w, req) 34 48 49 + // Should return 200 OK with upload metadata 35 50 if w.Code != http.StatusOK { 36 - t.Errorf("Expected status 200, got %d", w.Code) 51 + t.Errorf("Expected status 200 OK, got %d", w.Code) 37 52 } 38 53 39 - // Verify response contains uploadId and mode 40 54 result := assertJSONResponse(t, w, http.StatusOK) 41 55 42 56 if uploadID, ok := result["uploadId"].(string); !ok || uploadID == "" { ··· 62 76 } 63 77 64 78 req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 79 + addTestDPoPAuth(t, req, "did:plc:testowner123") 65 80 w := httptest.NewRecorder() 66 81 67 82 handler.HandleUploadBlob(w, req) ··· 80 95 81 96 uploadID := "test-upload-123" 82 97 partNumber := 1 83 - did := "did:plc:testuser" 98 + expectedDID := "did:plc:testowner123" // DID from authenticated user 84 99 85 100 body := map[string]any{ 86 101 "action": "part", ··· 88 103 "partNumber": partNumber, 89 104 } 90 105 91 - req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob?did="+did, body) 106 + req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 107 + addTestDPoPAuth(t, req, expectedDID) 92 108 w := httptest.NewRecorder() 93 109 94 110 handler.HandleUploadBlob(w, req) 95 111 112 + // Should return 200 OK with presigned URL 96 113 if w.Code != http.StatusOK { 97 - t.Errorf("Expected status 200, got %d", w.Code) 114 + t.Errorf("Expected status 200 OK, got %d", w.Code) 98 115 } 99 116 100 - // Verify response contains URL 101 117 result := assertJSONResponse(t, w, http.StatusOK) 102 118 103 119 if url, ok := result["url"].(string); !ok || url == "" { 104 120 t.Error("Expected url string in response") 105 121 } 106 122 107 - // Verify blob store was called 123 + // Verify blob store was called with authenticated user's DID 108 124 if len(blobStore.partURLCalls) != 1 { 109 125 t.Fatalf("Expected GetPartUploadURL to be called once") 110 126 } 111 127 call := blobStore.partURLCalls[0] 112 - if call.uploadID != uploadID || call.partNumber != partNumber || call.did != did { 128 + if call.uploadID != uploadID || call.partNumber != partNumber || call.did != expectedDID { 113 129 t.Errorf("Expected GetPartUploadURL(%s, %d, %s), got (%s, %d, %s)", 114 - uploadID, partNumber, did, call.uploadID, call.partNumber, call.did) 130 + uploadID, partNumber, expectedDID, call.uploadID, call.partNumber, call.did) 115 131 } 116 132 } 117 133 ··· 150 166 for _, tt := range tests { 151 167 t.Run(tt.name, func(t *testing.T) { 152 168 req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", tt.body) 169 + addTestDPoPAuth(t, req, "did:plc:testowner123") 153 170 w := httptest.NewRecorder() 154 171 155 172 handler.HandleUploadBlob(w, req) ··· 181 198 } 182 199 183 200 req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 201 + addTestDPoPAuth(t, req, "did:plc:testowner123") 184 202 w := httptest.NewRecorder() 185 203 186 204 handler.HandleUploadBlob(w, req) 187 205 206 + // Should return 200 OK with completion status 188 207 if w.Code != http.StatusOK { 189 - t.Errorf("Expected status 200, got %d", w.Code) 208 + t.Errorf("Expected status 200 OK, got %d", w.Code) 190 209 } 191 210 192 - // Verify response 193 211 result := assertJSONResponse(t, w, http.StatusOK) 194 212 195 213 if status, ok := result["status"].(string); !ok || status != "completed" { ··· 237 255 for _, tt := range tests { 238 256 t.Run(tt.name, func(t *testing.T) { 239 257 req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", tt.body) 258 + addTestDPoPAuth(t, req, "did:plc:testowner123") 240 259 w := httptest.NewRecorder() 241 260 242 261 handler.HandleUploadBlob(w, req) ··· 263 282 } 264 283 265 284 req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 285 + addTestDPoPAuth(t, req, "did:plc:testowner123") 266 286 w := httptest.NewRecorder() 267 287 268 288 handler.HandleUploadBlob(w, req) 269 289 290 + // Should return 200 OK with abort status 270 291 if w.Code != http.StatusOK { 271 - t.Errorf("Expected status 200, got %d", w.Code) 292 + t.Errorf("Expected status 200 OK, got %d", w.Code) 272 293 } 273 294 274 - // Verify response 275 295 result := assertJSONResponse(t, w, http.StatusOK) 276 296 277 297 if status, ok := result["status"].(string); !ok || status != "aborted" { ··· 293 313 } 294 314 295 315 req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 316 + addTestDPoPAuth(t, req, "did:plc:testowner123") 296 317 w := httptest.NewRecorder() 297 318 298 319 handler.HandleUploadBlob(w, req) ··· 316 337 req := httptest.NewRequest(http.MethodPut, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader(data)) 317 338 req.Header.Set("X-Upload-Id", uploadID) 318 339 req.Header.Set("X-Part-Number", partNumber) 340 + addTestDPoPAuth(t, req, "did:plc:testowner123") 319 341 w := httptest.NewRecorder() 320 342 321 343 handler.HandleUploadBlob(w, req) 322 344 345 + // Should return 200 OK with ETag 323 346 if w.Code != http.StatusOK { 324 - t.Errorf("Expected status 200, got %d", w.Code) 347 + t.Errorf("Expected status 200 OK, got %d", w.Code) 325 348 } 326 349 327 - // Verify response contains ETag 328 350 result := assertJSONResponse(t, w, http.StatusOK) 329 351 330 352 if etag, ok := result["etag"].(string); !ok || etag == "" { ··· 347 369 handler, _, _ := setupTestXRPCHandlerWithBlobs(t) 348 370 349 371 tests := []struct { 350 - name string 351 - uploadID string 352 - partNumber string 353 - setUploadID bool 354 - setPartNumber bool 372 + name string 373 + uploadID string 374 + partNumber string 375 + setUploadID bool 376 + setPartNumber bool 355 377 }{ 356 378 { 357 379 name: "missing both headers", ··· 381 403 if tt.setPartNumber { 382 404 req.Header.Set("X-Part-Number", tt.partNumber) 383 405 } 406 + addTestDPoPAuth(t, req, "did:plc:testowner123") 384 407 w := httptest.NewRecorder() 385 408 386 409 handler.HandleUploadBlob(w, req) ··· 399 422 req := httptest.NewRequest(http.MethodPut, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte("data"))) 400 423 req.Header.Set("X-Upload-Id", "test-123") 401 424 req.Header.Set("X-Part-Number", "not-a-number") 425 + addTestDPoPAuth(t, req, "did:plc:testowner123") 402 426 w := httptest.NewRecorder() 403 427 404 428 handler.HandleUploadBlob(w, req) ··· 417 441 } 418 442 419 443 req := makeXRPCPostRequest("/xrpc/com.atproto.repo.uploadBlob", body) 444 + addTestDPoPAuth(t, req, "did:plc:testowner123") 420 445 w := httptest.NewRecorder() 421 446 422 447 handler.HandleUploadBlob(w, req)
+138 -41
pkg/hold/pds/xrpc_test.go
··· 54 54 t.Fatalf("Failed to bootstrap PDS: %v", err) 55 55 } 56 56 57 - // Create XRPC handler 58 - handler := NewXRPCHandler(pds, "https://hold.example.com", nil, nil) 57 + // Create mock PDS client for DPoP validation 58 + mockClient := &mockPDSClient{} 59 + 60 + // Create XRPC handler with mock HTTP client 61 + handler := NewXRPCHandler(pds, "https://hold.example.com", nil, nil, mockClient) 59 62 60 63 return handler, ctx 61 64 } ··· 652 655 // TestHandleListRecords_EmptyCollection tests listing empty collection 653 656 func TestHandleListRecords_EmptyCollection(t *testing.T) { 654 657 pds, ctx := setupTestPDS(t) // Don't bootstrap - no records created yet 655 - handler := NewXRPCHandler(pds, "https://hold.example.com", nil, nil) 658 + mockClient := &mockPDSClient{} 659 + handler := NewXRPCHandler(pds, "https://hold.example.com", nil, nil, mockClient) 656 660 657 661 // Initialize repo manually (setupTestPDS doesn't call Bootstrap, so no crew members) 658 662 err := pds.repomgr.InitNewActor(ctx, pds.uid, "", pds.did, "", "", "") ··· 744 748 } 745 749 746 750 req := makeXRPCPostRequest("/xrpc/com.atproto.repo.deleteRecord", body) 751 + 752 + // Add DPoP authentication - owner has admin permission to delete crew 753 + ownerDID := "did:plc:testowner123" 754 + dpopHelper, err := NewDPoPTestHelper(ownerDID, "https://test-pds.example.com") 755 + if err != nil { 756 + t.Fatalf("Failed to create DPoP helper: %v", err) 757 + } 758 + if err := dpopHelper.AddDPoPToRequest(req); err != nil { 759 + t.Fatalf("Failed to add DPoP to request: %v", err) 760 + } 761 + 747 762 w := httptest.NewRecorder() 748 763 749 - // Note: This test will fail auth check since we're not providing DPoP tokens 750 - // For now, we're testing the request parsing and response structure 751 - // A real implementation would need proper auth mocking 752 764 handler.HandleDeleteRecord(w, req) 753 765 754 - // We expect 403 Forbidden due to missing auth 755 - // This tests that the endpoint is parsing JSON body correctly 756 - if w.Code != http.StatusForbidden { 757 - // If somehow auth passes (shouldn't in this test), verify response structure 758 - if w.Code == http.StatusOK { 759 - result := assertJSONResponse(t, w, http.StatusOK) 766 + // Should return 200 OK with commit metadata 767 + if w.Code != http.StatusOK { 768 + t.Errorf("Expected status 200 OK, got %d", w.Code) 769 + } 760 770 761 - // Per spec, response should have commit object 762 - if commit, ok := result["commit"].(map[string]any); !ok { 763 - t.Error("Expected commit object in response") 764 - } else { 765 - if cid, ok := commit["cid"].(string); !ok || cid == "" { 766 - t.Error("Expected cid in commit object") 767 - } 768 - if rev, ok := commit["rev"].(string); !ok || rev == "" { 769 - t.Error("Expected rev in commit object") 770 - } 771 - } 771 + result := assertJSONResponse(t, w, http.StatusOK) 772 + 773 + // Per spec, response should have commit object 774 + if commit, ok := result["commit"].(map[string]any); !ok { 775 + t.Error("Expected commit object in response") 776 + } else { 777 + if cid, ok := commit["cid"].(string); !ok || cid == "" { 778 + t.Error("Expected cid in commit object") 779 + } 780 + if rev, ok := commit["rev"].(string); !ok || rev == "" { 781 + t.Error("Expected rev in commit object") 772 782 } 773 783 } 774 784 } ··· 902 912 // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-list-repos 903 913 func TestHandleListRepos_EmptyRepo(t *testing.T) { 904 914 pds, ctx := setupTestPDS(t) // Don't bootstrap 905 - handler := NewXRPCHandler(pds, "https://hold.example.com", nil, nil) 915 + mockClient := &mockPDSClient{} 916 + handler := NewXRPCHandler(pds, "https://hold.example.com", nil, nil, mockClient) 906 917 907 918 // setupTestPDS creates the PDS/database but doesn't initialize the repo 908 919 // Check if implementation returns repos before initialization ··· 1334 1345 partUploadError error 1335 1346 1336 1347 // Track calls 1337 - downloadCalls []string // Track digests requested for download 1338 - uploadCalls []string // Track digests requested for upload 1339 - uploadBlobCalls []uploadBlobCall // Track direct blob uploads 1340 - startCalls []string // Track digests for multipart start 1341 - partURLCalls []partURLCall 1342 - completeCalls []string 1343 - abortCalls []string 1344 - partUploadCalls []partUploadCall 1348 + downloadCalls []string // Track digests requested for download 1349 + uploadCalls []string // Track digests requested for upload 1350 + uploadBlobCalls []uploadBlobCall // Track direct blob uploads 1351 + startCalls []string // Track digests for multipart start 1352 + partURLCalls []partURLCall 1353 + completeCalls []string 1354 + abortCalls []string 1355 + partUploadCalls []partUploadCall 1345 1356 } 1346 1357 1347 1358 type uploadBlobCall struct { ··· 1419 1430 return "test-upload-id", "s3native", nil 1420 1431 } 1421 1432 1422 - func (m *mockBlobStore) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (string, error) { 1433 + func (m *mockBlobStore) GetPartUploadURL(ctx context.Context, uploadID string, partNumber int, did string) (*PartUploadInfo, error) { 1423 1434 m.partURLCalls = append(m.partURLCalls, partURLCall{uploadID, partNumber, did}) 1424 1435 if m.partURLError != nil { 1425 - return "", m.partURLError 1436 + return nil, m.partURLError 1426 1437 } 1427 - return "https://s3.example.com/part/" + uploadID, nil 1438 + return &PartUploadInfo{ 1439 + URL: "https://s3.example.com/part/" + uploadID, 1440 + Method: "PUT", 1441 + }, nil 1428 1442 } 1429 1443 1430 1444 func (m *mockBlobStore) CompleteMultipartUpload(ctx context.Context, uploadID string, parts []PartInfo) error { ··· 1451 1465 return "test-etag-" + uploadID, nil 1452 1466 } 1453 1467 1454 - // setupTestXRPCHandlerWithBlobs creates handler with mock blob store 1468 + // setupTestXRPCHandlerWithBlobs creates handler with mock blob store and mock PDS client 1455 1469 func setupTestXRPCHandlerWithBlobs(t *testing.T) (*XRPCHandler, *mockBlobStore, context.Context) { 1456 1470 t.Helper() 1457 1471 ··· 1488 1502 // Create mock blob store 1489 1503 blobStore := newMockBlobStore() 1490 1504 1491 - // Create XRPC handler with mock blob store 1492 - handler := NewXRPCHandler(pds, "https://hold.example.com", blobStore, nil) 1505 + // Create mock PDS client for DPoP validation 1506 + mockClient := &mockPDSClient{} 1507 + 1508 + // Create XRPC handler with mock blob store and mock HTTP client 1509 + handler := NewXRPCHandler(pds, "https://hold.example.com", blobStore, nil, mockClient) 1493 1510 1494 1511 return handler, blobStore, ctx 1495 1512 } ··· 1507 1524 // Test standard single blob upload (POST with raw bytes) 1508 1525 req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader(blobData)) 1509 1526 req.Header.Set("Content-Type", "application/octet-stream") 1527 + 1528 + // Add DPoP authentication - owner has admin permission for blob upload 1529 + ownerDID := "did:plc:testowner123" 1530 + dpopHelper, err := NewDPoPTestHelper(ownerDID, "https://test-pds.example.com") 1531 + if err != nil { 1532 + t.Fatalf("Failed to create DPoP helper: %v", err) 1533 + } 1534 + if err := dpopHelper.AddDPoPToRequest(req); err != nil { 1535 + t.Fatalf("Failed to add DPoP to request: %v", err) 1536 + } 1537 + 1510 1538 w := httptest.NewRecorder() 1511 1539 1512 1540 handler.HandleUploadBlob(w, req) ··· 1559 1587 // Empty blob should succeed (edge case) 1560 1588 req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte{})) 1561 1589 req.Header.Set("Content-Type", "application/octet-stream") 1590 + 1591 + // Add DPoP authentication 1592 + ownerDID := "did:plc:testowner123" 1593 + dpopHelper, err := NewDPoPTestHelper(ownerDID, "https://test-pds.example.com") 1594 + if err != nil { 1595 + t.Fatalf("Failed to create DPoP helper: %v", err) 1596 + } 1597 + if err := dpopHelper.AddDPoPToRequest(req); err != nil { 1598 + t.Fatalf("Failed to add DPoP to request: %v", err) 1599 + } 1600 + 1562 1601 w := httptest.NewRecorder() 1563 1602 1564 1603 handler.HandleUploadBlob(w, req) 1565 1604 1566 - // Should succeed with empty blob 1605 + // Should return 200 OK for empty blob (edge case) 1567 1606 if w.Code != http.StatusOK { 1568 - t.Errorf("Expected status 200, got %d", w.Code) 1607 + t.Errorf("Expected status 200 OK for empty blob, got %d", w.Code) 1569 1608 } 1570 1609 1571 1610 // Verify blob store was called with 0 bytes ··· 1600 1639 1601 1640 req := httptest.NewRequest(http.MethodPost, "/xrpc/com.atproto.repo.uploadBlob", bytes.NewReader([]byte("test data"))) 1602 1641 req.Header.Set("Content-Type", "application/octet-stream") 1642 + 1643 + // Add DPoP authentication 1644 + ownerDID := "did:plc:testowner123" 1645 + dpopHelper, err := NewDPoPTestHelper(ownerDID, "https://test-pds.example.com") 1646 + if err != nil { 1647 + t.Fatalf("Failed to create DPoP helper: %v", err) 1648 + } 1649 + if err := dpopHelper.AddDPoPToRequest(req); err != nil { 1650 + t.Fatalf("Failed to add DPoP to request: %v", err) 1651 + } 1652 + 1603 1653 w := httptest.NewRecorder() 1604 1654 1605 1655 handler.HandleUploadBlob(w, req) 1606 1656 1657 + // Should get 500 Internal Server Error for blob store error 1607 1658 if w.Code != http.StatusInternalServerError { 1608 - t.Errorf("Expected status 500, got %d", w.Code) 1659 + t.Errorf("Expected status 500 for blob store error, got %d", w.Code) 1609 1660 } 1610 1661 } 1611 1662 ··· 1775 1826 t.Errorf("Expected status 500, got %d", w.Code) 1776 1827 } 1777 1828 } 1829 + 1830 + // TestHandleGetBlobCORSHeaders tests that CORS headers are set for blob downloads 1831 + // // Spec: https://docs.bsky.app/docs/api/com-atproto-sync-get-blob 1832 + func TestHandleGetBlob_CORSHeaders(t *testing.T) { 1833 + handler, _, ctx := setupTestXRPCHandlerWithBlobs(t) 1834 + 1835 + // Make hold public 1836 + _, err := handler.pds.UpdateCaptainRecord(ctx, true, false) 1837 + if err != nil { 1838 + t.Fatalf("Failed to update captain: %v", err) 1839 + } 1840 + 1841 + holdDID := "did:web:hold.example.com" 1842 + cid := "bafyreib2rxk3rkhh5ylyxj3x3gathxt3s32qvwj2lf3qg4kmzr6b7teqke" 1843 + url := fmt.Sprintf("/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", holdDID, cid) 1844 + 1845 + // Test GET request 1846 + req := httptest.NewRequest(http.MethodGet, url, nil) 1847 + w := httptest.NewRecorder() 1848 + 1849 + // Wrap with CORS middleware 1850 + corsHandler := corsMiddleware(handler.HandleGetBlob) 1851 + corsHandler(w, req) 1852 + 1853 + // Verify CORS headers are present 1854 + if origin := w.Header().Get("Access-Control-Allow-Origin"); origin != "*" { 1855 + t.Errorf("Expected Access-Control-Allow-Origin: *, got %s", origin) 1856 + } 1857 + 1858 + // Test OPTIONS preflight 1859 + req2 := httptest.NewRequest(http.MethodOptions, url, nil) 1860 + w2 := httptest.NewRecorder() 1861 + 1862 + corsHandler(w2, req2) 1863 + 1864 + if w2.Code != http.StatusOK { 1865 + t.Errorf("Expected OPTIONS to return 200, got %d", w2.Code) 1866 + } 1867 + 1868 + methods := w2.Header().Get("Access-Control-Allow-Methods") 1869 + if !strings.Contains(methods, "GET") || !strings.Contains(methods, "HEAD") { 1870 + t.Errorf("Expected Access-Control-Allow-Methods to include GET and HEAD, got %s", methods) 1871 + } 1872 + 1873 + t.Logf("✓ CORS headers correctly set for blob downloads") 1874 + }
+15 -6
pkg/hold/service.go
··· 22 22 type HoldService struct { 23 23 driver storagedriver.StorageDriver 24 24 config *Config 25 - s3Client *s3.S3 // S3 client for presigned URLs (nil if not S3 storage) 26 - bucket string // S3 bucket name 27 - s3PathPrefix string // S3 path prefix (if any) 28 - MultipartMgr *MultipartManager // Exported for access in route handlers 29 - pds HoldPDSInterface // Embedded PDS for captain/crew records 30 - authorizer auth.HoldAuthorizer // Authorizer for access control 25 + s3Client *s3.S3 // S3 client for presigned URLs (nil if not S3 storage) 26 + bucket string // S3 bucket name 27 + s3PathPrefix string // S3 path prefix (if any) 28 + MultipartMgr *MultipartManager // Exported for access in route handlers 29 + pds HoldPDSInterface // Embedded PDS for captain/crew records 30 + authorizer auth.HoldAuthorizer // Authorizer for access control 31 31 } 32 + 33 + // PresignedURLOperation defines the type of presigned URL operation 34 + type PresignedURLOperation string 35 + 36 + const ( 37 + OperationGet PresignedURLOperation = "GET" 38 + OperationHead PresignedURLOperation = "HEAD" 39 + OperationPut PresignedURLOperation = "PUT" 40 + ) 32 41 33 42 // NewHoldService creates a new hold service 34 43 // holdPDS must be a *pds.HoldPDS but we use any to avoid import cycle
+45 -10
pkg/hold/storage.go
··· 77 77 78 78 // Check if presigned URLs are disabled 79 79 if s.config.Server.DisablePresignedURLs { 80 - log.Printf("Presigned URLs disabled, using proxy URL") 81 - return s.getProxyURL(digest, did), nil 80 + log.Printf("Presigned URLs disabled, using XRPC endpoint") 81 + url := s.getProxyURL(digest, did, operation) 82 + if url == "" { 83 + return "", fmt.Errorf("XRPC proxy not supported for PUT operations - use multipart upload") 84 + } 85 + return url, nil 82 86 } 83 87 84 88 // Generate presigned URL if S3 client is available ··· 122 126 url, err := req.Presign(15 * time.Minute) 123 127 if err != nil { 124 128 log.Printf("[getPresignedURL] Presign FAILED for %s: %v", operation, err) 125 - log.Printf(" Falling back to proxy URL") 126 - return s.getProxyURL(digest, did), nil 129 + log.Printf(" Falling back to XRPC endpoint") 130 + proxyURL := s.getProxyURL(digest, did, operation) 131 + if proxyURL == "" { 132 + return "", fmt.Errorf("presign failed and XRPC proxy not supported for PUT operations") 133 + } 134 + return proxyURL, nil 127 135 } 128 136 129 137 return url, nil 130 138 } 131 139 132 - // Fallback: return proxy URL through this service 133 - return s.getProxyURL(digest, did), nil 140 + // Fallback: return XRPC endpoint through this service 141 + proxyURL := s.getProxyURL(digest, did, operation) 142 + if proxyURL == "" { 143 + return "", fmt.Errorf("S3 client not available and XRPC proxy not supported for PUT operations") 144 + } 145 + return proxyURL, nil 146 + } 147 + 148 + // getProxyURL returns XRPC endpoint for blob operations (fallback when presigned URLs unavailable) 149 + // For GET/HEAD operations, returns the XRPC getBlob endpoint 150 + // For PUT operations, this fallback is no longer supported - use multipart upload instead 151 + func (s *HoldService) getProxyURL(digest, did string, operation PresignedURLOperation) string { 152 + // For read operations, use XRPC getBlob endpoint 153 + if operation == OperationGet || operation == OperationHead { 154 + // Generate hold DID from public URL 155 + holdDID := s.getHoldDID() 156 + return fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", 157 + s.config.Server.PublicURL, holdDID, digest) 158 + } 159 + 160 + // For PUT operations, proxy fallback is not supported with XRPC 161 + // Clients should use multipart upload flow via com.atproto.repo.uploadBlob 162 + return "" 134 163 } 135 164 136 - // getProxyURL returns a proxy URL for blob operations (fallback when presigned URLs unavailable) 137 - func (s *HoldService) getProxyURL(digest, did string) string { 138 - // All operations use the same proxy endpoint 139 - return fmt.Sprintf("%s/blobs/%s?did=%s", s.config.Server.PublicURL, digest, did) 165 + // getHoldDID generates a did:web from the hold's public URL 166 + func (s *HoldService) getHoldDID() string { 167 + // Convert URL to did:web format 168 + // https://hold01.atcr.io → did:web:hold01.atcr.io 169 + url := s.config.Server.PublicURL 170 + url = strings.TrimPrefix(url, "https://") 171 + url = strings.TrimPrefix(url, "http://") 172 + url = strings.Split(url, "/")[0] // Remove path 173 + url = strings.Split(url, ":")[0] // Remove port 174 + return fmt.Sprintf("did:web:%s", url) 140 175 }