A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
81
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove user oauth flow. hold now contains captain record indicating owner

+1087 -562
+2 -59
cmd/hold/main.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "encoding/json" 6 5 "fmt" 7 6 "log" 8 7 "net/http" 9 8 "strconv" 10 9 "strings" 11 - "time" 12 10 13 - "atcr.io/pkg/atproto" 14 11 "atcr.io/pkg/hold" 15 12 "atcr.io/pkg/hold/pds" 16 - indigooauth "github.com/bluesky-social/indigo/atproto/auth/oauth" 17 13 18 14 // Import storage drivers 19 15 _ "github.com/distribution/distribution/v3/registry/storage/driver/filesystem" ··· 48 44 log.Fatalf("Failed to initialize embedded PDS: %v", err) 49 45 } 50 46 51 - // Bootstrap PDS with hold owner as first crew member 52 - if err := holdPDS.Bootstrap(ctx, cfg.Registration.OwnerDID); err != nil { 47 + // Bootstrap PDS with captain record and hold owner as first crew member 48 + if err := holdPDS.Bootstrap(ctx, cfg.Registration.OwnerDID, cfg.Server.Public, cfg.Registration.AllowAllCrew); err != nil { 53 49 log.Fatalf("Failed to bootstrap PDS: %v", err) 54 50 } 55 51 ··· 114 110 service.HandleMultipartPartUpload(w, r, uploadID, partNumber, did, service.MultipartMgr) 115 111 }) 116 112 117 - // Pre-register OAuth callback route (will be populated by auto-registration) 118 - var oauthCallbackHandler http.HandlerFunc 119 - mux.HandleFunc("/auth/oauth/callback", func(w http.ResponseWriter, r *http.Request) { 120 - if oauthCallbackHandler != nil { 121 - oauthCallbackHandler(w, r) 122 - } else { 123 - http.Error(w, "OAuth callback not initialized", http.StatusServiceUnavailable) 124 - } 125 - }) 126 - 127 - // OAuth client metadata endpoint for ATProto OAuth 128 - // The hold service serves its metadata at /client-metadata.json 129 - // This is referenced by its client ID URL 130 - mux.HandleFunc("/client-metadata.json", func(w http.ResponseWriter, r *http.Request) { 131 - // Create a temporary config to generate metadata (indigo provides this) 132 - redirectURI := cfg.Server.PublicURL + "/auth/oauth/callback" 133 - clientID := cfg.Server.PublicURL + "/client-metadata.json" 134 - 135 - // Define scopes needed for hold registration and crew management 136 - // Omit action parameter to allow all actions (create, update, delete) 137 - scopes := []string{ 138 - "atproto", 139 - fmt.Sprintf("repo:%s", atproto.HoldCollection), 140 - fmt.Sprintf("repo:%s", atproto.HoldCrewCollection), 141 - fmt.Sprintf("repo:%s", atproto.SailorProfileCollection), 142 - } 143 - 144 - config := indigooauth.NewPublicConfig(clientID, redirectURI, scopes) 145 - metadata := config.ClientMetadata() 146 - 147 - // Serve as JSON 148 - w.Header().Set("Content-Type", "application/json") 149 - w.Header().Set("Access-Control-Allow-Origin", "*") 150 - json.NewEncoder(w).Encode(metadata) 151 - }) 152 113 mux.HandleFunc("/blobs/", func(w http.ResponseWriter, r *http.Request) { 153 114 switch r.Method { 154 115 case http.MethodGet, http.MethodHead: ··· 182 143 serverErr <- err 183 144 } 184 145 }() 185 - 186 - // Give server a moment to start 187 - time.Sleep(100 * time.Millisecond) 188 - 189 - // Auto-register if owner DID is set (now that server is running) 190 - if cfg.Registration.OwnerDID != "" { 191 - if err := service.AutoRegister(&oauthCallbackHandler); err != nil { 192 - log.Printf("WARNING: Auto-registration failed: %v", err) 193 - log.Printf("You can register manually later using the /register endpoint") 194 - } else { 195 - log.Printf("Successfully registered hold service in PDS") 196 - } 197 - 198 - // Reconcile allow-all crew state 199 - if err := service.ReconcileAllowAllCrew(&oauthCallbackHandler); err != nil { 200 - log.Printf("WARNING: Failed to reconcile allow-all crew state: %v", err) 201 - } 202 - } 203 146 204 147 // Wait for server error or shutdown 205 148 if err := <-serverErr; err != nil {
+267 -18
docs/EMBEDDED_PDS.md
··· 146 146 │ ├── io.atcr.hold.exportImage (data portability) 147 147 │ └── io.atcr.hold.getStats (metadata) 148 148 └── Records (hold's own PDS): 149 - ├── io.atcr.hold.crew (crew membership) 149 + ├── io.atcr.hold.captain (single record: ownership & metadata) 150 + ├── io.atcr.hold.crew/* (crew membership & permissions) 150 151 └── io.atcr.hold.config (hold configuration) 151 152 ``` 152 153 ··· 270 271 271 272 **This is standard ATProto federation** - services pass OAuth tokens with DPoP proofs between each other. Hold independently validates tokens against the user's PDS, so there's no trust relationship required. 272 273 273 - **Crew records stored in hold's PDS:** 274 + **Records stored in hold's PDS:** 275 + 274 276 ```json 277 + // io.atcr.hold.captain (single record - hold metadata) 278 + { 279 + "$type": "io.atcr.hold.captain", 280 + "owner": "did:plc:alice123", 281 + "public": false, 282 + "deployedAt": "2025-10-14T...", 283 + "region": "iad", 284 + "provider": "fly.io" 285 + } 286 + 287 + // io.atcr.hold.crew/* (access control records) 275 288 { 276 289 "$type": "io.atcr.hold.crew", 277 290 "member": "did:plc:alice123", ··· 280 293 "addedAt": "2025-10-14T..." 281 294 } 282 295 ``` 296 + 297 + **Semantic separation:** 298 + - **Captain record** = Hold ownership and metadata (who owns it, where it's deployed) 299 + - **Crew records** = Access control (who can use it, what permissions they have) 283 300 284 301 **Security considerations:** 285 302 - User's OAuth token is exposed to hold during delegation ··· 467 484 468 485 ### 6. Hold Discovery & Registration 469 486 470 - **Current:** Hold registers by creating records in owner's PDS 471 - **New:** Hold is its own identity - how does AppView discover available holds? 487 + **Decision: No registration records needed in owner's PDS.** 488 + 489 + Since holds are ATProto actors with did:web identity, they are self-describing: 490 + 491 + **Hold's PDS contains everything:** 492 + ``` 493 + did:web:hold01.atcr.io 494 + ├── io.atcr.hold.captain → { owner: "did:plc:alice123", ... } 495 + └── io.atcr.hold.crew/* → Access control records 496 + ``` 497 + 498 + **DID Document with Multiple Services:** 499 + 500 + Holds expose multiple service endpoints to distinguish themselves from generic PDSs: 501 + 502 + ```json 503 + { 504 + "@context": ["https://www.w3.org/ns/did/v1", ...], 505 + "id": "did:web:hold01.atcr.io", 506 + "service": [ 507 + { 508 + "id": "#atproto_pds", 509 + "type": "AtprotoPersonalDataServer", 510 + "serviceEndpoint": "https://hold01.atcr.io" 511 + }, 512 + { 513 + "id": "#atcr_hold", 514 + "type": "AtcrHoldService", 515 + "serviceEndpoint": "https://hold01.atcr.io" 516 + } 517 + ] 518 + } 519 + ``` 520 + 521 + **Service semantics:** 522 + - **`#atproto_pds`** - Standard ATProto PDS operations (crew queries, record sync) 523 + - **`#atcr_hold`** - ATCR-specific operations (blob storage, presigned URLs) 524 + 525 + **Discovery patterns:** 526 + 527 + 1. **Direct deployment** - Owner deploys hold, knows the DID 528 + 2. **Sailor profiles** - Users reference holds by DID in their profile 529 + 3. **DID resolution** - `did:web:hold01.atcr.io` → `https://hold01.atcr.io/.well-known/did.json` 530 + 4. **Service lookup** - Check for `#atcr_hold` service to identify ATCR holds 531 + 5. **Crew queries** - AppView queries hold's PDS directly via `#atproto_pds` endpoint 532 + 533 + **AppView resolution flow:** 534 + ```go 535 + // 1. Get hold DID from sailor profile 536 + holdDID := profile.DefaultHold // "did:web:hold01.atcr.io" 537 + 538 + // 2. Resolve DID document 539 + didDoc := resolveDidWeb(holdDID) 540 + 541 + // 3. Extract service endpoints 542 + pdsEndpoint := didDoc.GetService("#atproto_pds") // XRPC operations 543 + holdEndpoint := didDoc.GetService("#atcr_hold") // Blob operations 544 + 545 + // 4. Query crew list via PDS endpoint 546 + crew := xrpcClient.ListRecords(pdsEndpoint, "io.atcr.hold.crew") 472 547 473 - Possibilities: 474 - - Holds publish to feeds 475 - - AppView maintains directory 476 - - DIDs are manually configured 477 - - ATProto directory service 548 + // 5. Check if user has access 549 + hasAccess := crew.Contains(userDID) 550 + ``` 551 + 552 + **No need for reverse lookup** (owner → holds). Users know their holds because they deployed them. 553 + 554 + **Benefits:** 555 + - ✅ Single source of truth (hold's PDS) 556 + - ✅ No cross-PDS writes during registration 557 + - ✅ Self-describing ATProto actors 558 + - ✅ Standard DID resolution patterns 559 + - ✅ Clear service semantics (PDS vs ATCR-specific) 560 + - ✅ Discoverable via service type 561 + 562 + **OAuth implications:** 563 + - OAuth registration flow no longer needed (hold is self-describing) 564 + - OAuth code kept for backward compatibility with legacy registration records 565 + - Future: Remove OAuth after migration period 478 566 479 567 ### 7. Multi-Tenancy 480 568 ··· 603 691 604 692 **Key insight:** Other ATProto services will "just work" as long as they can retrieve records from the hold's PDS. We don't need to implement full social features for the hold to participate in the ecosystem. 605 693 606 - ### Crew Management: Individual Records 694 + ### Crew Management: Captain + Individual Records 607 695 608 - **Decision: Individual crew record per user (remove wildcard logic)** 696 + **Decision: Captain record (ownership) + Individual crew records (access control)** 609 697 610 698 ```json 611 - // io.atcr.hold.crew/{rkey} 699 + // io.atcr.hold.captain (single record - hold metadata) 700 + { 701 + "$type": "io.atcr.hold.captain", 702 + "owner": "did:plc:alice123", 703 + "public": false, 704 + "deployedAt": "2025-10-14T...", 705 + "region": "iad", 706 + "provider": "fly.io" 707 + } 708 + 709 + // io.atcr.hold.crew/{rkey} (access control) 612 710 { 613 711 "$type": "io.atcr.hold.crew", 614 712 "member": "did:plc:alice123", ··· 617 715 "addedAt": "2025-10-14T..." 618 716 } 619 717 620 - // io.atcr.hold.config/policy 718 + // io.atcr.hold.config/policy (optional) 621 719 { 622 720 "$type": "io.atcr.hold.config", 623 721 "access": "public", // or "allowlist" ··· 627 725 } 628 726 ``` 629 727 728 + **Semantic separation:** 729 + - **Captain record** = Who owns/deployed the hold (billing, deletion, migration rights) 730 + - **Crew records** = Who can use the hold (access control, permissions) 731 + - **Config record** = Hold-wide policies 732 + 630 733 **Authorization logic:** 631 734 ```go 632 735 func (p *HoldPDS) CheckAccess(ctx context.Context, userDID string) (bool, error) { ··· 661 764 - **Private team hold:** `access: "allowlist"` - explicit crew membership 662 765 - **Hybrid:** Public access + explicit admin crew records for elevated permissions 663 766 767 + ### Phase 2: XRPC Endpoints Implementation ✅ COMPLETED 768 + 769 + **Critical Implementation Lessons Learned:** 770 + 771 + #### 1. Custom Record Types Require Manual CBOR Decoding 772 + 773 + Indigo's `repo.GetRecord()` uses its lexicon decoder which only knows about built-in ATProto types. For custom types, you must use `GetRecordBytes()` and decode manually: 774 + 775 + ```go 776 + // ❌ WRONG - Fails with "unrecognized lexicon type" 777 + record, err := repo.GetRecord(ctx, path, &CrewRecord{}) 778 + 779 + // ✅ CORRECT - Manual CBOR decoding 780 + recordCID, recBytes, err := repo.GetRecordBytes(ctx, path) 781 + var crewRecord CrewRecord 782 + err = crewRecord.UnmarshalCBOR(bytes.NewReader(*recBytes)) 783 + ``` 784 + 785 + **Why:** Indigo's lexicon system doesn't know about `io.atcr.hold.crew` or other custom types. 786 + 787 + #### 2. JSON Struct Tags Must Match CBOR Tags Exactly 788 + 789 + For CID verification to work, JSON and CBOR encodings must produce identical bytes: 790 + 791 + ```go 792 + // ❌ WRONG - JSON uses capital field names (Member, Role) 793 + type CrewRecord struct { 794 + Type string `cborgen:"$type"` 795 + Member string `cborgen:"member"` 796 + Role string `cborgen:"role"` 797 + Permissions []string `cborgen:"permissions"` 798 + AddedAt string `cborgen:"addedAt"` 799 + } 800 + 801 + // ✅ CORRECT - JSON tags match CBOR tags 802 + type CrewRecord struct { 803 + Type string `json:"$type" cborgen:"$type"` 804 + Member string `json:"member" cborgen:"member"` 805 + Role string `json:"role" cborgen:"role"` 806 + Permissions []string `json:"permissions" cborgen:"permissions"` 807 + AddedAt string `json:"addedAt" cborgen:"addedAt"` 808 + } 809 + ``` 810 + 811 + **Why:** Verification code CBOR-encodes the JSON record and compares the CID. Mismatched field names produce different bytes and thus different CIDs. 812 + 813 + #### 3. MST ForEach Returns Full Paths 814 + 815 + The `repo.ForEach()` callback receives full collection paths, not just record keys: 816 + 817 + ```go 818 + // ❌ WRONG - Prepends collection prefix again 819 + err := repo.ForEach(ctx, "io.atcr.hold.crew", func(k string, v cid.Cid) error { 820 + // k is already "io.atcr.hold.crew/3m37dr2ddit22" 821 + path := fmt.Sprintf("%s/%s", collection, k) // Double path! 822 + return nil 823 + }) 824 + 825 + // ✅ CORRECT - Extract just the rkey 826 + err := repo.ForEach(ctx, "io.atcr.hold.crew", func(k string, v cid.Cid) error { 827 + // k = "io.atcr.hold.crew/3m37dr2ddit22" 828 + parts := strings.Split(k, "/") 829 + rkey := parts[len(parts)-1] // "3m37dr2ddit22" 830 + return nil 831 + }) 832 + ``` 833 + 834 + #### 4. All Record Endpoints Must Return CIDs 835 + 836 + Per ATProto spec, `com.atproto.repo.getRecord` and `listRecords` must include the record's CID: 837 + 838 + ```go 839 + // ✅ CORRECT - Include CID in response 840 + response := map[string]any{ 841 + "uri": fmt.Sprintf("at://%s/%s/%s", did, collection, rkey), 842 + "cid": recordCID.String(), // Required! 843 + "value": record, 844 + } 845 + ``` 846 + 847 + **Why:** Clients need the CID to verify record integrity via `com.atproto.sync.getRecord`. 848 + 849 + #### 5. sync.getRecord CAR Files Must Include Full MST Path 850 + 851 + The `com.atproto.sync.getRecord` endpoint must return a CAR file with ALL blocks needed to verify the record: 852 + 853 + ```go 854 + // ❌ WRONG - Only includes the record block 855 + blk, _ := repo.Blockstore().Get(ctx, recordCID) 856 + // Write single block to CAR 857 + 858 + // ✅ CORRECT - Capture all accessed blocks 859 + loggingBS := util.NewLoggingBstore(session) 860 + tempRepo, _ := repo.OpenRepo(ctx, loggingBS, repoHead) 861 + _, _, _ = tempRepo.GetRecordBytes(ctx, path) 862 + blocks := loggingBS.GetLoggedBlocks() // Commit + MST nodes + record 863 + // Write all blocks to CAR 864 + ``` 865 + 866 + **Components included:** 867 + 1. **Commit block** - Repo head with signature, data root, version 868 + 2. **MST tree nodes** - Path from root to record (log N depth) 869 + 3. **Record block** - The actual record data 870 + 871 + **Why:** Clients need the full Merkle path to cryptographically verify the record against the repo head. 872 + 873 + #### 6. CAR Root Must Be Repo Head, Not Record CID 874 + 875 + The CAR file's root CID must be the repo head (commit), not the record: 876 + 877 + ```go 878 + // ❌ WRONG - Uses record CID as root 879 + header := &car.CarHeader{ 880 + Roots: []cid.Cid{recordCID}, 881 + Version: 1, 882 + } 883 + 884 + // ✅ CORRECT - Uses repo head as root 885 + repoHead, _ := carstore.GetUserRepoHead(ctx, uid) 886 + header := &car.CarHeader{ 887 + Roots: []cid.Cid{repoHead}, // Commit CID 888 + Version: 1, 889 + } 890 + ``` 891 + 892 + **Why:** The CAR represents a slice of the repo from head to record, not just the record itself. 893 + 894 + #### 7. Empty Collections Should Return Empty Arrays 895 + 896 + Handle empty collections gracefully instead of returning errors: 897 + 898 + ```go 899 + // ✅ CORRECT - Return empty array for missing collection 900 + err := repo.ForEach(ctx, collection, func(k string, v cid.Cid) error { 901 + // ... 902 + }) 903 + if err != nil { 904 + if err.Error() == "mst: not found" { 905 + return []*CrewMemberWithKey{}, nil // Empty collection 906 + } 907 + return nil, err // Real error 908 + } 909 + ``` 910 + 911 + **Why:** ATProto expects empty arrays for non-existent collections, not 404 errors. 912 + 664 913 ### Next Steps 665 914 666 - 1. **Add indigo dependencies** - carstore, repo, MST 667 - 2. **Implement HoldPDS with carstore** - Create pkg/hold/pds 668 - 3. **Add crew management** - CRUD operations for crew records 669 - 4. **Implement standard PDS endpoints** - describeServer, describeRepo, getRecord, listRecords 670 - 5. **Add DID document** - did:web identity generation 915 + 1. ~~**Add indigo dependencies**~~ ✅ 916 + 2. ~~**Implement HoldPDS with carstore**~~ ✅ 917 + 3. ~~**Add crew management**~~ ✅ 918 + 4. ~~**Implement standard PDS endpoints**~~ ✅ 919 + 5. ~~**Add DID document**~~ ✅ 671 920 6. **Custom XRPC methods** - getUploadUrl, getDownloadUrl (presigned URLs) 672 921 7. **Wire up in cmd/hold** - Serve XRPC alongside existing HTTP 673 922 8. **Test basic operations** - Add/list crew, policy checks
+2 -1
gen/main.go
··· 20 20 ) 21 21 22 22 func main() { 23 - // Generate map-style encoders for CrewRecord 23 + // Generate map-style encoders for CrewRecord and CaptainRecord 24 24 if err := cbg.WriteMapEncodersToFile("pkg/hold/pds/cbor_gen.go", "pds", 25 25 pds.CrewRecord{}, 26 + pds.CaptainRecord{}, 26 27 ); err != nil { 27 28 fmt.Printf("Failed to generate CBOR encoders: %v\n", err) 28 29 os.Exit(1)
+194
pkg/hold/pds/auth.go
··· 1 + package pds 2 + 3 + import ( 4 + "context" 5 + "encoding/base64" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "net/http" 10 + "strings" 11 + 12 + "github.com/bluesky-social/indigo/atproto/identity" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + ) 15 + 16 + // ValidatedUser represents a successfully validated user from DPoP + OAuth 17 + type ValidatedUser struct { 18 + DID string 19 + Handle string 20 + PDS string 21 + Authorized bool 22 + } 23 + 24 + // ValidateDPoPRequest validates a request with DPoP + OAuth tokens 25 + // This implements the standard ATProto token validation flow: 26 + // 1. Extract Authorization header (DPoP <token>) 27 + // 2. Extract DPoP header (proof JWT) 28 + // 3. Call user's PDS to validate token via com.atproto.server.getSession 29 + // 4. Return validated user DID 30 + func ValidateDPoPRequest(r *http.Request) (*ValidatedUser, error) { 31 + // Extract Authorization header 32 + authHeader := r.Header.Get("Authorization") 33 + if authHeader == "" { 34 + return nil, fmt.Errorf("missing Authorization header") 35 + } 36 + 37 + // Check for DPoP authorization scheme 38 + parts := strings.SplitN(authHeader, " ", 2) 39 + if len(parts) != 2 { 40 + return nil, fmt.Errorf("invalid Authorization header format") 41 + } 42 + 43 + if parts[0] != "DPoP" { 44 + return nil, fmt.Errorf("expected DPoP authorization scheme, got: %s", parts[0]) 45 + } 46 + 47 + accessToken := parts[1] 48 + if accessToken == "" { 49 + return nil, fmt.Errorf("missing access token") 50 + } 51 + 52 + // Extract DPoP header 53 + dpopProof := r.Header.Get("DPoP") 54 + if dpopProof == "" { 55 + return nil, fmt.Errorf("missing DPoP header") 56 + } 57 + 58 + // TODO: We could verify the DPoP proof locally (signature, HTM, HTU, etc.) 59 + // For now, we'll rely on the PDS to validate everything 60 + 61 + // The token contains the user's DID in its claims, but we can't trust it without validation 62 + // We need to call the user's PDS to validate the token 63 + // Problem: We don't know which PDS to call yet! 64 + 65 + // For now, we'll parse the JWT to extract the DID/PDS hint (unverified) 66 + // Then validate against that PDS 67 + // This is safe because the PDS will verify the token is valid for that DID 68 + 69 + did, pds, err := extractDIDFromToken(accessToken) 70 + if err != nil { 71 + return nil, fmt.Errorf("failed to extract DID from token: %w", err) 72 + } 73 + 74 + // Validate token with the user's PDS 75 + session, err := validateTokenWithPDS(r.Context(), pds, accessToken, dpopProof) 76 + if err != nil { 77 + return nil, fmt.Errorf("token validation failed: %w", err) 78 + } 79 + 80 + // Verify the DID matches 81 + if session.DID != did { 82 + return nil, fmt.Errorf("token DID mismatch: expected %s, got %s", did, session.DID) 83 + } 84 + 85 + return &ValidatedUser{ 86 + DID: session.DID, 87 + Handle: session.Handle, 88 + PDS: pds, 89 + Authorized: true, 90 + }, nil 91 + } 92 + 93 + // extractDIDFromToken extracts the DID and PDS from an unverified JWT token 94 + // This is just for routing purposes - the token will be validated by the PDS 95 + func extractDIDFromToken(token string) (string, string, error) { 96 + // JWT format: header.payload.signature 97 + parts := strings.Split(token, ".") 98 + if len(parts) != 3 { 99 + return "", "", fmt.Errorf("invalid JWT format") 100 + } 101 + 102 + // Decode payload (base64url) 103 + payload, err := decodeBase64URL(parts[1]) 104 + if err != nil { 105 + return "", "", fmt.Errorf("failed to decode payload: %w", err) 106 + } 107 + 108 + // Parse JSON 109 + var claims struct { 110 + Sub string `json:"sub"` // DID 111 + Iss string `json:"iss"` // PDS URL (issuer) 112 + } 113 + 114 + if err := json.Unmarshal(payload, &claims); err != nil { 115 + return "", "", fmt.Errorf("failed to parse claims: %w", err) 116 + } 117 + 118 + if claims.Sub == "" { 119 + return "", "", fmt.Errorf("missing sub claim (DID)") 120 + } 121 + 122 + if claims.Iss == "" { 123 + return "", "", fmt.Errorf("missing iss claim (PDS)") 124 + } 125 + 126 + return claims.Sub, claims.Iss, nil 127 + } 128 + 129 + // decodeBase64URL decodes base64url (RFC 4648) 130 + func decodeBase64URL(s string) ([]byte, error) { 131 + // Use Go's RawURLEncoding (base64url without padding) 132 + return base64.RawURLEncoding.DecodeString(s) 133 + } 134 + 135 + // SessionResponse represents the response from com.atproto.server.getSession 136 + type SessionResponse struct { 137 + DID string `json:"did"` 138 + Handle string `json:"handle"` 139 + } 140 + 141 + // validateTokenWithPDS calls the user's PDS to validate the token 142 + func validateTokenWithPDS(ctx context.Context, pdsURL, accessToken, dpopProof string) (*SessionResponse, error) { 143 + // Call com.atproto.server.getSession with DPoP headers 144 + url := fmt.Sprintf("%s/xrpc/com.atproto.server.getSession", strings.TrimSuffix(pdsURL, "/")) 145 + 146 + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) 147 + if err != nil { 148 + return nil, fmt.Errorf("failed to create request: %w", err) 149 + } 150 + 151 + // Add DPoP authorization headers 152 + req.Header.Set("Authorization", "DPoP "+accessToken) 153 + req.Header.Set("DPoP", dpopProof) 154 + 155 + resp, err := http.DefaultClient.Do(req) 156 + if err != nil { 157 + return nil, fmt.Errorf("failed to call PDS: %w", err) 158 + } 159 + defer resp.Body.Close() 160 + 161 + if resp.StatusCode != http.StatusOK { 162 + body, _ := io.ReadAll(resp.Body) 163 + return nil, fmt.Errorf("PDS returned status %d: %s", resp.StatusCode, string(body)) 164 + } 165 + 166 + var session SessionResponse 167 + if err := json.NewDecoder(resp.Body).Decode(&session); err != nil { 168 + return nil, fmt.Errorf("failed to decode session: %w", err) 169 + } 170 + 171 + return &session, nil 172 + } 173 + 174 + // ResolveDIDToPDS resolves a DID to its PDS endpoint (for reference) 175 + // This is an alternative approach if we don't trust the token's issuer claim 176 + func ResolveDIDToPDS(ctx context.Context, did string) (string, error) { 177 + directory := identity.DefaultDirectory() 178 + didParsed, err := syntax.ParseDID(did) 179 + if err != nil { 180 + return "", fmt.Errorf("invalid DID: %w", err) 181 + } 182 + 183 + ident, err := directory.LookupDID(ctx, didParsed) 184 + if err != nil { 185 + return "", fmt.Errorf("failed to resolve DID: %w", err) 186 + } 187 + 188 + pdsEndpoint := ident.PDSEndpoint() 189 + if pdsEndpoint == "" { 190 + return "", fmt.Errorf("no PDS endpoint found for DID") 191 + } 192 + 193 + return pdsEndpoint, nil 194 + }
+146
pkg/hold/pds/captain.go
··· 1 + package pds 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/repo" 10 + "github.com/ipfs/go-cid" 11 + ) 12 + 13 + const ( 14 + // CaptainRkey is the fixed rkey for the captain record (singleton) 15 + CaptainRkey = "self" 16 + ) 17 + 18 + // CreateCaptainRecord creates the captain record for the hold 19 + func (p *HoldPDS) CreateCaptainRecord(ctx context.Context, ownerDID string, public bool, allowAllCrew bool) (cid.Cid, error) { 20 + captainRecord := &CaptainRecord{ 21 + Type: CaptainCollection, 22 + Owner: ownerDID, 23 + Public: public, 24 + AllowAllCrew: allowAllCrew, 25 + DeployedAt: time.Now().Format(time.RFC3339), 26 + } 27 + 28 + // Create record in repo with fixed rkey "self" 29 + recordCID, rkey, err := p.repo.CreateRecord(ctx, CaptainCollection, captainRecord) 30 + if err != nil { 31 + return cid.Undef, fmt.Errorf("failed to create captain record: %w", err) 32 + } 33 + 34 + // Create signer function from signing key 35 + signer := func(ctx context.Context, did string, data []byte) ([]byte, error) { 36 + return p.signingKey.HashAndSign(data) 37 + } 38 + 39 + // Commit the changes to get new root CID 40 + root, rev, err := p.repo.Commit(ctx, signer) 41 + if err != nil { 42 + return cid.Undef, fmt.Errorf("failed to commit captain record: %w", err) 43 + } 44 + 45 + // Close the delta session with the new root 46 + _, err = p.session.CloseWithRoot(ctx, root, rev) 47 + if err != nil { 48 + return cid.Undef, fmt.Errorf("failed to persist commit: %w", err) 49 + } 50 + 51 + // Create a new session for the next operation 52 + rootStr := root.String() 53 + newSession, err := p.carstore.NewDeltaSession(ctx, p.uid, &rootStr) 54 + if err != nil { 55 + return cid.Undef, fmt.Errorf("failed to create new session: %w", err) 56 + } 57 + 58 + // Load repo from the newly committed head 59 + newRepo, err := repo.OpenRepo(ctx, newSession, root) 60 + if err != nil { 61 + return cid.Undef, fmt.Errorf("failed to reload repo after commit: %w", err) 62 + } 63 + 64 + // Update the stored session and repo 65 + p.session = newSession 66 + p.repo = newRepo 67 + 68 + fmt.Printf("Created captain record with rkey: %s, cid: %s\n", rkey, recordCID) 69 + 70 + return recordCID, nil 71 + } 72 + 73 + // GetCaptainRecord retrieves the captain record 74 + func (p *HoldPDS) GetCaptainRecord(ctx context.Context) (cid.Cid, *CaptainRecord, error) { 75 + path := fmt.Sprintf("%s/%s", CaptainCollection, CaptainRkey) 76 + 77 + // Get the record bytes and decode manually 78 + recordCID, recBytes, err := p.repo.GetRecordBytes(ctx, path) 79 + if err != nil { 80 + return cid.Undef, nil, fmt.Errorf("failed to get captain record: %w", err) 81 + } 82 + 83 + // Decode the CBOR bytes into our CaptainRecord type 84 + var captainRecord CaptainRecord 85 + if err := captainRecord.UnmarshalCBOR(bytes.NewReader(*recBytes)); err != nil { 86 + return cid.Undef, nil, fmt.Errorf("failed to decode captain record: %w", err) 87 + } 88 + 89 + return recordCID, &captainRecord, nil 90 + } 91 + 92 + // UpdateCaptainRecord updates the captain record (e.g., to change public/allowAllCrew settings) 93 + func (p *HoldPDS) UpdateCaptainRecord(ctx context.Context, public bool, allowAllCrew bool) (cid.Cid, error) { 94 + // Get existing record to preserve other fields 95 + _, existing, err := p.GetCaptainRecord(ctx) 96 + if err != nil { 97 + return cid.Undef, fmt.Errorf("failed to get existing captain record: %w", err) 98 + } 99 + 100 + // Update the fields 101 + existing.Public = public 102 + existing.AllowAllCrew = allowAllCrew 103 + 104 + // Update record in repo 105 + path := fmt.Sprintf("%s/%s", CaptainCollection, CaptainRkey) 106 + recordCID, err := p.repo.UpdateRecord(ctx, path, existing) 107 + if err != nil { 108 + return cid.Undef, fmt.Errorf("failed to update captain record: %w", err) 109 + } 110 + 111 + // Create signer function from signing key 112 + signer := func(ctx context.Context, did string, data []byte) ([]byte, error) { 113 + return p.signingKey.HashAndSign(data) 114 + } 115 + 116 + // Commit the changes 117 + root, rev, err := p.repo.Commit(ctx, signer) 118 + if err != nil { 119 + return cid.Undef, fmt.Errorf("failed to commit captain record update: %w", err) 120 + } 121 + 122 + // Close the delta session with the new root 123 + _, err = p.session.CloseWithRoot(ctx, root, rev) 124 + if err != nil { 125 + return cid.Undef, fmt.Errorf("failed to persist commit: %w", err) 126 + } 127 + 128 + // Create a new session for the next operation 129 + rootStr := root.String() 130 + newSession, err := p.carstore.NewDeltaSession(ctx, p.uid, &rootStr) 131 + if err != nil { 132 + return cid.Undef, fmt.Errorf("failed to create new session: %w", err) 133 + } 134 + 135 + // Load repo from the newly committed head 136 + newRepo, err := repo.OpenRepo(ctx, newSession, root) 137 + if err != nil { 138 + return cid.Undef, fmt.Errorf("failed to reload repo after commit: %w", err) 139 + } 140 + 141 + // Update the stored session and repo 142 + p.session = newSession 143 + p.repo = newRepo 144 + 145 + return recordCID, nil 146 + }
+319
pkg/hold/pds/cbor_gen.go
··· 293 293 294 294 return nil 295 295 } 296 + func (t *CaptainRecord) MarshalCBOR(w io.Writer) error { 297 + if t == nil { 298 + _, err := w.Write(cbg.CborNull) 299 + return err 300 + } 301 + 302 + cw := cbg.NewCborWriter(w) 303 + fieldCount := 7 304 + 305 + if t.Region == "" { 306 + fieldCount-- 307 + } 308 + 309 + if t.Provider == "" { 310 + fieldCount-- 311 + } 312 + 313 + if _, err := cw.Write(cbg.CborEncodeMajorType(cbg.MajMap, uint64(fieldCount))); err != nil { 314 + return err 315 + } 316 + 317 + // t.Type (string) (string) 318 + if len("$type") > 8192 { 319 + return xerrors.Errorf("Value in field \"$type\" was too long") 320 + } 321 + 322 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("$type"))); err != nil { 323 + return err 324 + } 325 + if _, err := cw.WriteString(string("$type")); err != nil { 326 + return err 327 + } 328 + 329 + if len(t.Type) > 8192 { 330 + return xerrors.Errorf("Value in field t.Type was too long") 331 + } 332 + 333 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Type))); err != nil { 334 + return err 335 + } 336 + if _, err := cw.WriteString(string(t.Type)); err != nil { 337 + return err 338 + } 339 + 340 + // t.Owner (string) (string) 341 + if len("owner") > 8192 { 342 + return xerrors.Errorf("Value in field \"owner\" was too long") 343 + } 344 + 345 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("owner"))); err != nil { 346 + return err 347 + } 348 + if _, err := cw.WriteString(string("owner")); err != nil { 349 + return err 350 + } 351 + 352 + if len(t.Owner) > 8192 { 353 + return xerrors.Errorf("Value in field t.Owner was too long") 354 + } 355 + 356 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Owner))); err != nil { 357 + return err 358 + } 359 + if _, err := cw.WriteString(string(t.Owner)); err != nil { 360 + return err 361 + } 362 + 363 + // t.Public (bool) (bool) 364 + if len("public") > 8192 { 365 + return xerrors.Errorf("Value in field \"public\" was too long") 366 + } 367 + 368 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("public"))); err != nil { 369 + return err 370 + } 371 + if _, err := cw.WriteString(string("public")); err != nil { 372 + return err 373 + } 374 + 375 + if err := cbg.WriteBool(w, t.Public); err != nil { 376 + return err 377 + } 378 + 379 + // t.Region (string) (string) 380 + if t.Region != "" { 381 + 382 + if len("region") > 8192 { 383 + return xerrors.Errorf("Value in field \"region\" was too long") 384 + } 385 + 386 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("region"))); err != nil { 387 + return err 388 + } 389 + if _, err := cw.WriteString(string("region")); err != nil { 390 + return err 391 + } 392 + 393 + if len(t.Region) > 8192 { 394 + return xerrors.Errorf("Value in field t.Region was too long") 395 + } 396 + 397 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Region))); err != nil { 398 + return err 399 + } 400 + if _, err := cw.WriteString(string(t.Region)); err != nil { 401 + return err 402 + } 403 + } 404 + 405 + // t.Provider (string) (string) 406 + if t.Provider != "" { 407 + 408 + if len("provider") > 8192 { 409 + return xerrors.Errorf("Value in field \"provider\" was too long") 410 + } 411 + 412 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("provider"))); err != nil { 413 + return err 414 + } 415 + if _, err := cw.WriteString(string("provider")); err != nil { 416 + return err 417 + } 418 + 419 + if len(t.Provider) > 8192 { 420 + return xerrors.Errorf("Value in field t.Provider was too long") 421 + } 422 + 423 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Provider))); err != nil { 424 + return err 425 + } 426 + if _, err := cw.WriteString(string(t.Provider)); err != nil { 427 + return err 428 + } 429 + } 430 + 431 + // t.DeployedAt (string) (string) 432 + if len("deployedAt") > 8192 { 433 + return xerrors.Errorf("Value in field \"deployedAt\" was too long") 434 + } 435 + 436 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("deployedAt"))); err != nil { 437 + return err 438 + } 439 + if _, err := cw.WriteString(string("deployedAt")); err != nil { 440 + return err 441 + } 442 + 443 + if len(t.DeployedAt) > 8192 { 444 + return xerrors.Errorf("Value in field t.DeployedAt was too long") 445 + } 446 + 447 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.DeployedAt))); err != nil { 448 + return err 449 + } 450 + if _, err := cw.WriteString(string(t.DeployedAt)); err != nil { 451 + return err 452 + } 453 + 454 + // t.AllowAllCrew (bool) (bool) 455 + if len("allowAllCrew") > 8192 { 456 + return xerrors.Errorf("Value in field \"allowAllCrew\" was too long") 457 + } 458 + 459 + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("allowAllCrew"))); err != nil { 460 + return err 461 + } 462 + if _, err := cw.WriteString(string("allowAllCrew")); err != nil { 463 + return err 464 + } 465 + 466 + if err := cbg.WriteBool(w, t.AllowAllCrew); err != nil { 467 + return err 468 + } 469 + return nil 470 + } 471 + 472 + func (t *CaptainRecord) UnmarshalCBOR(r io.Reader) (err error) { 473 + *t = CaptainRecord{} 474 + 475 + cr := cbg.NewCborReader(r) 476 + 477 + maj, extra, err := cr.ReadHeader() 478 + if err != nil { 479 + return err 480 + } 481 + defer func() { 482 + if err == io.EOF { 483 + err = io.ErrUnexpectedEOF 484 + } 485 + }() 486 + 487 + if maj != cbg.MajMap { 488 + return fmt.Errorf("cbor input should be of type map") 489 + } 490 + 491 + if extra > cbg.MaxLength { 492 + return fmt.Errorf("CaptainRecord: map struct too large (%d)", extra) 493 + } 494 + 495 + n := extra 496 + 497 + nameBuf := make([]byte, 12) 498 + for i := uint64(0); i < n; i++ { 499 + nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 8192) 500 + if err != nil { 501 + return err 502 + } 503 + 504 + if !ok { 505 + // Field doesn't exist on this type, so ignore it 506 + if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil { 507 + return err 508 + } 509 + continue 510 + } 511 + 512 + switch string(nameBuf[:nameLen]) { 513 + // t.Type (string) (string) 514 + case "$type": 515 + 516 + { 517 + sval, err := cbg.ReadStringWithMax(cr, 8192) 518 + if err != nil { 519 + return err 520 + } 521 + 522 + t.Type = string(sval) 523 + } 524 + // t.Owner (string) (string) 525 + case "owner": 526 + 527 + { 528 + sval, err := cbg.ReadStringWithMax(cr, 8192) 529 + if err != nil { 530 + return err 531 + } 532 + 533 + t.Owner = string(sval) 534 + } 535 + // t.Public (bool) (bool) 536 + case "public": 537 + 538 + maj, extra, err = cr.ReadHeader() 539 + if err != nil { 540 + return err 541 + } 542 + if maj != cbg.MajOther { 543 + return fmt.Errorf("booleans must be major type 7") 544 + } 545 + switch extra { 546 + case 20: 547 + t.Public = false 548 + case 21: 549 + t.Public = true 550 + default: 551 + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) 552 + } 553 + // t.Region (string) (string) 554 + case "region": 555 + 556 + { 557 + sval, err := cbg.ReadStringWithMax(cr, 8192) 558 + if err != nil { 559 + return err 560 + } 561 + 562 + t.Region = string(sval) 563 + } 564 + // t.Provider (string) (string) 565 + case "provider": 566 + 567 + { 568 + sval, err := cbg.ReadStringWithMax(cr, 8192) 569 + if err != nil { 570 + return err 571 + } 572 + 573 + t.Provider = string(sval) 574 + } 575 + // t.DeployedAt (string) (string) 576 + case "deployedAt": 577 + 578 + { 579 + sval, err := cbg.ReadStringWithMax(cr, 8192) 580 + if err != nil { 581 + return err 582 + } 583 + 584 + t.DeployedAt = string(sval) 585 + } 586 + // t.AllowAllCrew (bool) (bool) 587 + case "allowAllCrew": 588 + 589 + maj, extra, err = cr.ReadHeader() 590 + if err != nil { 591 + return err 592 + } 593 + if maj != cbg.MajOther { 594 + return fmt.Errorf("booleans must be major type 7") 595 + } 596 + switch extra { 597 + case 20: 598 + t.AllowAllCrew = false 599 + case 21: 600 + t.AllowAllCrew = true 601 + default: 602 + return fmt.Errorf("booleans are either major type 7, value 20 or 21 (got %d)", extra) 603 + } 604 + 605 + default: 606 + // Field doesn't exist on this type, so ignore it 607 + if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil { 608 + return err 609 + } 610 + } 611 + } 612 + 613 + return nil 614 + }
+5
pkg/hold/pds/did.go
··· 77 77 Type: "AtprotoPersonalDataServer", 78 78 ServiceEndpoint: publicURL, 79 79 }, 80 + { 81 + ID: "#atcr_hold", 82 + Type: "AtcrHoldService", 83 + ServiceEndpoint: publicURL, 84 + }, 80 85 }, 81 86 } 82 87
+10 -2
pkg/hold/pds/server.go
··· 98 98 return p.signingKey 99 99 } 100 100 101 - // Bootstrap initializes the hold with the owner as the first crew member 102 - func (p *HoldPDS) Bootstrap(ctx context.Context, ownerDID string) error { 101 + // Bootstrap initializes the hold with the captain record and owner as first crew member 102 + func (p *HoldPDS) Bootstrap(ctx context.Context, ownerDID string, public bool, allowAllCrew bool) error { 103 103 if ownerDID == "" { 104 104 return nil 105 105 } ··· 114 114 fmt.Printf("⏭️ Skipping PDS bootstrap: repo already initialized (head: %s)\n", head.String()[:16]) 115 115 return nil 116 116 } 117 + 118 + // Create captain record (hold ownership and settings) 119 + _, err = p.CreateCaptainRecord(ctx, ownerDID, public, allowAllCrew) 120 + if err != nil { 121 + return fmt.Errorf("failed to create captain record: %w", err) 122 + } 123 + 124 + fmt.Printf("✅ Created captain record (public=%v, allowAllCrew=%v)\n", public, allowAllCrew) 117 125 118 126 // Add hold owner as first crew member with admin role 119 127 _, err = p.AddCrewMember(ctx, ownerDID, "admin", []string{"blob:read", "blob:write", "crew:admin"})
+17 -1
pkg/hold/pds/types.go
··· 1 1 package pds 2 2 3 + //go:generate go run github.com/whyrusleeping/cbor-gen --map-encoding CrewRecord CaptainRecord 4 + 3 5 // ATProto record types for the hold service 4 6 7 + // CaptainRecord represents the hold's ownership and metadata 8 + // Collection: io.atcr.hold.captain (single record per hold) 9 + type CaptainRecord struct { 10 + Type string `json:"$type" cborgen:"$type"` 11 + Owner string `json:"owner" cborgen:"owner"` // DID of hold owner 12 + Public bool `json:"public" cborgen:"public"` // Public read access 13 + AllowAllCrew bool `json:"allowAllCrew" cborgen:"allowAllCrew"` // Allow any authenticated user to register as crew 14 + DeployedAt string `json:"deployedAt" cborgen:"deployedAt"` // RFC3339 timestamp 15 + Region string `json:"region,omitempty" cborgen:"region,omitempty"` // S3 region (optional) 16 + Provider string `json:"provider,omitempty" cborgen:"provider,omitempty"` // Deployment provider (optional) 17 + } 18 + 5 19 // CrewRecord represents a crew member in the hold 20 + // Collection: io.atcr.hold.crew (one record per member) 6 21 type CrewRecord struct { 7 22 Type string `json:"$type" cborgen:"$type"` 8 23 Member string `json:"member" cborgen:"member"` ··· 12 27 } 13 28 14 29 const ( 15 - CrewCollection = "io.atcr.hold.crew" 30 + CaptainCollection = "io.atcr.hold.captain" 31 + CrewCollection = "io.atcr.hold.crew" 16 32 )
+103
pkg/hold/pds/xrpc.go
··· 79 79 // DID document and handle resolution 80 80 mux.HandleFunc("/.well-known/did.json", corsMiddleware(h.HandleDIDDocument)) 81 81 mux.HandleFunc("/.well-known/atproto-did", corsMiddleware(h.HandleAtprotoDID)) 82 + 83 + // Custom ATCR endpoints 84 + mux.HandleFunc("/xrpc/io.atcr.hold.requestCrew", corsMiddleware(h.HandleRequestCrew)) 82 85 } 83 86 84 87 // HandleHealth returns health check information ··· 480 483 w.Header().Set("Content-Type", "text/plain") 481 484 fmt.Fprint(w, h.pds.DID()) 482 485 } 486 + 487 + // HandleRequestCrew handles crew membership requests 488 + // This endpoint allows authenticated users to request crew membership 489 + // Authorization is checked against captain record settings 490 + func (h *XRPCHandler) HandleRequestCrew(w http.ResponseWriter, r *http.Request) { 491 + if r.Method != http.MethodPost { 492 + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) 493 + return 494 + } 495 + 496 + // Validate DPoP + OAuth token from Authorization and DPoP headers 497 + user, err := ValidateDPoPRequest(r) 498 + if err != nil { 499 + http.Error(w, fmt.Sprintf("authentication failed: %v", err), http.StatusUnauthorized) 500 + return 501 + } 502 + 503 + // Parse request body (optional parameters) 504 + var req struct { 505 + Role string `json:"role"` // Requested role (default: "member") 506 + Permissions []string `json:"permissions"` // Requested permissions 507 + } 508 + 509 + // Body is optional - if empty, just use defaults 510 + if r.Body != nil && r.ContentLength > 0 { 511 + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { 512 + http.Error(w, fmt.Sprintf("invalid request body: %v", err), http.StatusBadRequest) 513 + return 514 + } 515 + } 516 + 517 + // Get captain record to check authorization settings 518 + _, captain, err := h.pds.GetCaptainRecord(r.Context()) 519 + if err != nil { 520 + http.Error(w, fmt.Sprintf("failed to get captain record: %v", err), http.StatusInternalServerError) 521 + return 522 + } 523 + 524 + // Check authorization: 525 + // 1. If allowAllCrew is true, any authenticated user can join 526 + // 2. If user is the owner, they can always join (though they should already be crew) 527 + // 3. Otherwise, deny 528 + isOwner := user.DID == captain.Owner 529 + if !captain.AllowAllCrew && !isOwner { 530 + http.Error(w, "crew registration not allowed (HOLD_ALLOW_ALL_CREW=false)", http.StatusForbidden) 531 + return 532 + } 533 + 534 + // Set defaults if not provided 535 + if req.Role == "" { 536 + req.Role = "member" 537 + } 538 + if len(req.Permissions) == 0 { 539 + req.Permissions = []string{"blob:read", "blob:write"} 540 + } 541 + 542 + // Check if user is already a crew member 543 + // List all crew members and check if this DID is already present 544 + crew, err := h.pds.ListCrewMembers(r.Context()) 545 + if err != nil { 546 + http.Error(w, fmt.Sprintf("failed to list crew members: %v", err), http.StatusInternalServerError) 547 + return 548 + } 549 + 550 + for _, member := range crew { 551 + if member.Record.Member == user.DID { 552 + // Already a crew member, return success with existing record 553 + response := map[string]any{ 554 + "uri": fmt.Sprintf("at://%s/%s/%s", h.pds.DID(), CrewCollection, member.Rkey), 555 + "cid": member.Cid.String(), 556 + "status": "already_member", 557 + "message": "User is already a crew member", 558 + } 559 + w.Header().Set("Content-Type", "application/json") 560 + w.WriteHeader(http.StatusOK) 561 + json.NewEncoder(w).Encode(response) 562 + return 563 + } 564 + } 565 + 566 + // Create new crew record 567 + recordCID, err := h.pds.AddCrewMember(r.Context(), user.DID, req.Role, req.Permissions) 568 + if err != nil { 569 + http.Error(w, fmt.Sprintf("failed to create crew record: %v", err), http.StatusInternalServerError) 570 + return 571 + } 572 + 573 + // Return success response 574 + // Note: rkey is generated by AddCrewMember (TID), we don't have direct access to it 575 + // For now, return just the CID. In production, AddCrewMember should return both CID and rkey 576 + response := map[string]any{ 577 + "cid": recordCID.String(), 578 + "status": "created", 579 + "message": "Successfully added to crew", 580 + } 581 + 582 + w.Header().Set("Content-Type", "application/json") 583 + w.WriteHeader(http.StatusCreated) 584 + json.NewEncoder(w).Encode(response) 585 + }
-481
pkg/hold/registration.go
··· 1 - package hold 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "errors" 7 - "fmt" 8 - "log" 9 - "net/http" 10 - "net/url" 11 - "strings" 12 - "time" 13 - 14 - "atcr.io/pkg/atproto" 15 - "atcr.io/pkg/auth/oauth" 16 - "github.com/bluesky-social/indigo/atproto/identity" 17 - "github.com/bluesky-social/indigo/atproto/syntax" 18 - ) 19 - 20 - // HealthHandler handles health check requests 21 - func (s *HoldService) HealthHandler(w http.ResponseWriter, r *http.Request) { 22 - w.Header().Set("Content-Type", "application/json") 23 - w.Write([]byte(`{"status":"ok"}`)) 24 - } 25 - 26 - // isHoldRegistered checks if a hold with the given public URL is already registered in the PDS 27 - func (s *HoldService) isHoldRegistered(ctx context.Context, did, pdsEndpoint, publicURL string) (bool, error) { 28 - // We need to query the PDS without authentication to check public records 29 - // ATProto records are publicly readable, so we can use an unauthenticated client 30 - client := atproto.NewClient(pdsEndpoint, did, "") 31 - 32 - // List all hold records for this DID 33 - records, err := client.ListRecords(ctx, atproto.HoldCollection, 100) 34 - if err != nil { 35 - return false, fmt.Errorf("failed to list hold records: %w", err) 36 - } 37 - 38 - // Check if any hold record matches our public URL 39 - for _, record := range records { 40 - var holdRecord atproto.HoldRecord 41 - if err := json.Unmarshal(record.Value, &holdRecord); err != nil { 42 - continue 43 - } 44 - 45 - if holdRecord.Endpoint == publicURL { 46 - return true, nil 47 - } 48 - } 49 - 50 - return false, nil 51 - } 52 - 53 - // AutoRegister registers this hold service in the owner's PDS 54 - // Checks if already registered first, then does OAuth if needed 55 - func (s *HoldService) AutoRegister(callbackHandler *http.HandlerFunc) error { 56 - reg := &s.config.Registration 57 - publicURL := s.config.Server.PublicURL 58 - 59 - if publicURL == "" { 60 - return fmt.Errorf("HOLD_PUBLIC_URL not set") 61 - } 62 - 63 - if reg.OwnerDID == "" { 64 - return fmt.Errorf("HOLD_OWNER not set - required for registration") 65 - } 66 - 67 - ctx := context.Background() 68 - 69 - log.Printf("Checking registration status for DID: %s", reg.OwnerDID) 70 - 71 - // Resolve DID to PDS endpoint using indigo 72 - directory := identity.DefaultDirectory() 73 - didParsed, err := syntax.ParseDID(reg.OwnerDID) 74 - if err != nil { 75 - return fmt.Errorf("invalid owner DID: %w", err) 76 - } 77 - 78 - ident, err := directory.LookupDID(ctx, didParsed) 79 - if err != nil { 80 - return fmt.Errorf("failed to resolve PDS for DID: %w", err) 81 - } 82 - 83 - pdsEndpoint := ident.PDSEndpoint() 84 - if pdsEndpoint == "" { 85 - return fmt.Errorf("no PDS endpoint found for DID") 86 - } 87 - 88 - log.Printf("PDS endpoint: %s", pdsEndpoint) 89 - 90 - // Check if hold is already registered 91 - isRegistered, err := s.isHoldRegistered(ctx, reg.OwnerDID, pdsEndpoint, publicURL) 92 - if err != nil { 93 - log.Printf("Warning: failed to check registration status: %v", err) 94 - log.Printf("Proceeding with OAuth registration...") 95 - } else if isRegistered { 96 - log.Printf("✓ Hold service already registered in PDS") 97 - log.Printf("Public URL: %s", publicURL) 98 - return nil 99 - } 100 - 101 - // Not registered, need to do OAuth 102 - log.Printf("Hold not registered, starting OAuth flow...") 103 - 104 - // Get handle from DID document (already resolved above) 105 - handle := ident.Handle.String() 106 - if handle == "" || handle == "handle.invalid" { 107 - return fmt.Errorf("no valid handle found for DID") 108 - } 109 - 110 - log.Printf("Resolved handle: %s", handle) 111 - log.Printf("Starting OAuth registration for hold service") 112 - log.Printf("Public URL: %s", publicURL) 113 - 114 - return s.registerWithOAuth(publicURL, handle, reg.OwnerDID, pdsEndpoint, callbackHandler) 115 - } 116 - 117 - // registerWithOAuth performs OAuth flow and registers the hold 118 - func (s *HoldService) registerWithOAuth(publicURL, handle, did, pdsEndpoint string, callbackHandler *http.HandlerFunc) error { 119 - // Run OAuth flow to get authenticated client 120 - client, err := s.runOAuthFlow(callbackHandler, "Hold service registration") 121 - if err != nil { 122 - return err 123 - } 124 - 125 - log.Printf("Authorization received!") 126 - log.Printf("OAuth session obtained successfully") 127 - log.Printf("DID: %s", did) 128 - log.Printf("PDS: %s", pdsEndpoint) 129 - 130 - return s.registerWithClient(publicURL, did, client) 131 - } 132 - 133 - // registerWithClient registers the hold using an authenticated ATProto client 134 - func (s *HoldService) registerWithClient(publicURL, did string, client *atproto.Client) error { 135 - // Derive hold name from URL (hostname) 136 - holdName, err := extractHostname(publicURL) 137 - if err != nil { 138 - return fmt.Errorf("failed to extract hostname from URL: %w", err) 139 - } 140 - 141 - log.Printf("Registering hold service: url=%s, name=%s, owner=%s", publicURL, holdName, did) 142 - 143 - ctx := context.Background() 144 - 145 - // Create HoldRecord 146 - holdRecord := atproto.NewHoldRecord(publicURL, did, s.config.Server.Public) 147 - 148 - // Use hostname as record key 149 - holdResult, err := client.PutRecord(ctx, atproto.HoldCollection, holdName, holdRecord) 150 - if err != nil { 151 - return fmt.Errorf("failed to create hold record: %w", err) 152 - } 153 - 154 - log.Printf("✓ Created hold record: %s", holdResult.URI) 155 - 156 - // Create HoldCrewRecord for the owner 157 - crewRecord := atproto.NewHoldCrewRecord(holdResult.URI, did, "owner") 158 - 159 - crewRKey := fmt.Sprintf("%s-%s", holdName, did) 160 - crewResult, err := client.PutRecord(ctx, atproto.HoldCrewCollection, crewRKey, crewRecord) 161 - if err != nil { 162 - return fmt.Errorf("failed to create crew record: %w", err) 163 - } 164 - 165 - log.Printf("✓ Created crew record: %s", crewResult.URI) 166 - 167 - // Update sailor profile to set this as the default hold 168 - profile, err := atproto.GetProfile(ctx, client) 169 - if err != nil { 170 - log.Printf("Warning: failed to get sailor profile: %v", err) 171 - } else { 172 - if profile == nil { 173 - // Create new profile with this hold as default 174 - profile = atproto.NewSailorProfileRecord(publicURL) 175 - } else { 176 - // Update existing profile with new defaultHold 177 - profile.DefaultHold = publicURL 178 - profile.UpdatedAt = time.Now() 179 - } 180 - 181 - err = atproto.UpdateProfile(ctx, client, profile) 182 - if err != nil { 183 - log.Printf("Warning: failed to update sailor profile: %v", err) 184 - } else { 185 - log.Printf("✓ Updated sailor profile defaultHold: %s", publicURL) 186 - } 187 - } 188 - 189 - log.Print("\n" + strings.Repeat("=", 80)) 190 - log.Printf("REGISTRATION COMPLETE") 191 - log.Print(strings.Repeat("=", 80)) 192 - log.Printf("Hold service is now registered and ready to use!") 193 - log.Print(strings.Repeat("=", 80) + "\n") 194 - 195 - return nil 196 - } 197 - 198 - // extractHostname extracts the hostname from a URL to use as the hold name 199 - func extractHostname(urlStr string) (string, error) { 200 - u, err := url.Parse(urlStr) 201 - if err != nil { 202 - return "", err 203 - } 204 - // Remove port if present 205 - hostname := u.Hostname() 206 - if hostname == "" { 207 - return "", fmt.Errorf("no hostname in URL") 208 - } 209 - return hostname, nil 210 - } 211 - 212 - // ReconcileAllowAllCrew reconciles the allow-all crew record state with the environment variable 213 - // Called on every startup to ensure the PDS record matches the desired configuration 214 - func (s *HoldService) ReconcileAllowAllCrew(callbackHandler *http.HandlerFunc) error { 215 - ownerDID := s.config.Registration.OwnerDID 216 - if ownerDID == "" { 217 - // No owner DID configured, skip reconciliation 218 - return nil 219 - } 220 - 221 - desiredState := s.config.Registration.AllowAllCrew 222 - 223 - log.Printf("Checking allow-all crew state (desired: %v)", desiredState) 224 - 225 - // Query PDS for current state 226 - actualState, err := s.hasAllowAllCrewRecord() 227 - if err != nil { 228 - return fmt.Errorf("failed to check allow-all crew record: %w", err) 229 - } 230 - 231 - log.Printf("Allow-all crew record exists: %v", actualState) 232 - 233 - // States match - nothing to do 234 - if desiredState == actualState { 235 - if desiredState { 236 - log.Printf("✓ Allow-all crew enabled (all authenticated users can push)") 237 - } else { 238 - log.Printf("✓ Allow-all crew disabled (explicit crew membership required)") 239 - } 240 - return nil 241 - } 242 - 243 - // State mismatch - need to reconcile 244 - if desiredState && !actualState { 245 - // Need to create wildcard crew record 246 - log.Printf("Creating allow-all crew record (HOLD_ALLOW_ALL_CREW=true)") 247 - return s.createAllowAllCrewRecord(callbackHandler) 248 - } 249 - 250 - if !desiredState && actualState { 251 - // Need to delete wildcard crew record 252 - log.Printf("Deleting allow-all crew record (HOLD_ALLOW_ALL_CREW=false)") 253 - return s.deleteAllowAllCrewRecord(callbackHandler) 254 - } 255 - 256 - return nil 257 - } 258 - 259 - // hasAllowAllCrewRecord checks if the allow-all crew record exists in the PDS for THIS hold 260 - func (s *HoldService) hasAllowAllCrewRecord() (bool, error) { 261 - ownerDID := s.config.Registration.OwnerDID 262 - publicURL := s.config.Server.PublicURL 263 - if ownerDID == "" { 264 - return false, fmt.Errorf("hold owner DID not configured") 265 - } 266 - if publicURL == "" { 267 - return false, fmt.Errorf("hold public URL not configured") 268 - } 269 - 270 - ctx := context.Background() 271 - 272 - // Resolve owner's PDS endpoint 273 - directory := identity.DefaultDirectory() 274 - ownerDIDParsed, err := syntax.ParseDID(ownerDID) 275 - if err != nil { 276 - return false, fmt.Errorf("invalid owner DID: %w", err) 277 - } 278 - 279 - ident, err := directory.LookupDID(ctx, ownerDIDParsed) 280 - if err != nil { 281 - return false, fmt.Errorf("failed to resolve owner PDS: %w", err) 282 - } 283 - 284 - pdsEndpoint := ident.PDSEndpoint() 285 - if pdsEndpoint == "" { 286 - return false, fmt.Errorf("no PDS endpoint found for owner") 287 - } 288 - 289 - // Build hold-specific rkey 290 - holdName, err := extractHostname(publicURL) 291 - if err != nil { 292 - return false, fmt.Errorf("failed to extract hostname: %w", err) 293 - } 294 - crewRKey := fmt.Sprintf("allow-all-%s", holdName) 295 - 296 - // Create unauthenticated client to read public records 297 - client := atproto.NewClient(pdsEndpoint, ownerDID, "") 298 - 299 - // Query for hold-specific allow-all record 300 - record, err := client.GetRecord(ctx, atproto.HoldCrewCollection, crewRKey) 301 - if err != nil { 302 - // Record doesn't exist 303 - if errors.Is(err, atproto.ErrRecordNotFound) { 304 - return false, nil 305 - } 306 - return false, fmt.Errorf("failed to get crew record: %w", err) 307 - } 308 - 309 - // Verify it's the wildcard record (memberPattern: "*") 310 - var crewRecord atproto.HoldCrewRecord 311 - if err := json.Unmarshal(record.Value, &crewRecord); err != nil { 312 - return false, fmt.Errorf("failed to unmarshal crew record: %w", err) 313 - } 314 - 315 - // Check if it's the exact wildcard pattern 316 - if crewRecord.MemberPattern == nil || *crewRecord.MemberPattern != "*" { 317 - return false, nil 318 - } 319 - 320 - // Verify it's for this hold (defensive check) 321 - expectedHoldURI := fmt.Sprintf("at://%s/%s/%s", ownerDID, atproto.HoldCollection, holdName) 322 - return crewRecord.Hold == expectedHoldURI, nil 323 - } 324 - 325 - // createAllowAllCrewRecord creates a wildcard crew record allowing all authenticated users 326 - func (s *HoldService) createAllowAllCrewRecord(callbackHandler *http.HandlerFunc) error { 327 - ownerDID := s.config.Registration.OwnerDID 328 - publicURL := s.config.Server.PublicURL 329 - 330 - // Run OAuth flow to get authenticated client 331 - client, err := s.runOAuthFlow(callbackHandler, "Creating allow-all crew record") 332 - if err != nil { 333 - return err 334 - } 335 - 336 - ctx := context.Background() 337 - 338 - // Get hold URI 339 - holdName, err := extractHostname(publicURL) 340 - if err != nil { 341 - return fmt.Errorf("failed to extract hostname: %w", err) 342 - } 343 - 344 - holdURI := fmt.Sprintf("at://%s/%s/%s", ownerDID, atproto.HoldCollection, holdName) 345 - 346 - // Create wildcard crew record 347 - crewRecord := atproto.NewHoldCrewRecordWithPattern(holdURI, "*", "write") 348 - 349 - // Use hold-specific rkey to support multiple holds with different allow-all settings 350 - crewRKey := fmt.Sprintf("allow-all-%s", holdName) 351 - _, err = client.PutRecord(ctx, atproto.HoldCrewCollection, crewRKey, crewRecord) 352 - if err != nil { 353 - return fmt.Errorf("failed to create allow-all crew record: %w", err) 354 - } 355 - 356 - log.Printf("✓ Created allow-all crew record (allows all authenticated users)") 357 - return nil 358 - } 359 - 360 - // deleteAllowAllCrewRecord deletes the wildcard crew record for this hold 361 - func (s *HoldService) deleteAllowAllCrewRecord(callbackHandler *http.HandlerFunc) error { 362 - // Safety check: only delete if it's the exact wildcard pattern for THIS hold 363 - isWildcard, err := s.hasAllowAllCrewRecord() 364 - if err != nil { 365 - return fmt.Errorf("failed to check allow-all crew record: %w", err) 366 - } 367 - 368 - if !isWildcard { 369 - log.Printf("Note: 'allow-all' crew record not found for this hold (may exist for other holds)") 370 - return nil 371 - } 372 - 373 - // Get hold name for rkey 374 - holdName, err := extractHostname(s.config.Server.PublicURL) 375 - if err != nil { 376 - return fmt.Errorf("failed to extract hostname: %w", err) 377 - } 378 - crewRKey := fmt.Sprintf("allow-all-%s", holdName) 379 - 380 - // Run OAuth flow to get authenticated client 381 - client, err := s.runOAuthFlow(callbackHandler, "Deleting allow-all crew record") 382 - if err != nil { 383 - return err 384 - } 385 - 386 - ctx := context.Background() 387 - 388 - // Delete the hold-specific allow-all record 389 - err = client.DeleteRecord(ctx, atproto.HoldCrewCollection, crewRKey) 390 - if err != nil { 391 - return fmt.Errorf("failed to delete allow-all crew record: %w", err) 392 - } 393 - 394 - log.Printf("✓ Deleted allow-all crew record for this hold") 395 - return nil 396 - } 397 - 398 - // getHoldRegistrationScopes returns the OAuth scopes needed for hold registration and crew management 399 - func getHoldRegistrationScopes() []string { 400 - return []string{ 401 - "atproto", 402 - fmt.Sprintf("repo:%s", atproto.HoldCollection), 403 - fmt.Sprintf("repo:%s", atproto.HoldCrewCollection), 404 - fmt.Sprintf("repo:%s", atproto.SailorProfileCollection), 405 - } 406 - } 407 - 408 - // runOAuthFlow performs OAuth flow and returns an authenticated client 409 - // Reusable helper to avoid code duplication across registration and reconciliation 410 - func (s *HoldService) runOAuthFlow(callbackHandler *http.HandlerFunc, purpose string) (*atproto.Client, error) { 411 - ownerDID := s.config.Registration.OwnerDID 412 - publicURL := s.config.Server.PublicURL 413 - 414 - ctx := context.Background() 415 - 416 - // Resolve owner's PDS endpoint 417 - directory := identity.DefaultDirectory() 418 - ownerDIDParsed, err := syntax.ParseDID(ownerDID) 419 - if err != nil { 420 - return nil, fmt.Errorf("invalid owner DID: %w", err) 421 - } 422 - 423 - ident, err := directory.LookupDID(ctx, ownerDIDParsed) 424 - if err != nil { 425 - return nil, fmt.Errorf("failed to resolve owner PDS: %w", err) 426 - } 427 - 428 - pdsEndpoint := ident.PDSEndpoint() 429 - if pdsEndpoint == "" { 430 - return nil, fmt.Errorf("no PDS endpoint found for owner") 431 - } 432 - 433 - handle := ident.Handle.String() 434 - if handle == "" || handle == "handle.invalid" { 435 - return nil, fmt.Errorf("no valid handle found for DID") 436 - } 437 - 438 - // Determine base URL for OAuth 439 - var baseURL string 440 - if s.config.Server.TestMode { 441 - parsedURL, err := url.Parse(publicURL) 442 - if err != nil { 443 - return nil, fmt.Errorf("failed to parse public URL: %w", err) 444 - } 445 - port := parsedURL.Port() 446 - if port == "" { 447 - port = "8080" 448 - } 449 - baseURL = fmt.Sprintf("http://127.0.0.1:%s", port) 450 - } else { 451 - baseURL = publicURL 452 - } 453 - 454 - // Run OAuth flow 455 - result, err := oauth.InteractiveFlowWithCallback( 456 - ctx, 457 - baseURL, 458 - handle, 459 - getHoldRegistrationScopes(), 460 - func(handler http.HandlerFunc) error { 461 - *callbackHandler = handler 462 - return nil 463 - }, 464 - func(authURL string) error { 465 - log.Print("\n" + strings.Repeat("=", 80)) 466 - log.Printf("OAUTH REQUIRED: %s", purpose) 467 - log.Print(strings.Repeat("=", 80)) 468 - log.Printf("\nVisit: %s\n", authURL) 469 - log.Printf("Waiting for authorization...") 470 - log.Print(strings.Repeat("=", 80) + "\n") 471 - return nil 472 - }, 473 - ) 474 - if err != nil { 475 - return nil, fmt.Errorf("OAuth flow failed: %w", err) 476 - } 477 - 478 - // Create authenticated client 479 - apiClient := result.Session.APIClient() 480 - return atproto.NewClientWithIndigoClient(pdsEndpoint, ownerDID, apiClient), nil 481 - }
+22
pkg/hold/service.go
··· 4 4 "context" 5 5 "fmt" 6 6 "log" 7 + "net/http" 8 + "net/url" 7 9 8 10 "github.com/aws/aws-sdk-go/service/s3" 9 11 storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" ··· 47 49 func (s *HoldService) GetPresignedURL(ctx context.Context, operation PresignedURLOperation, digest string, did string) (string, error) { 48 50 return s.getPresignedURL(ctx, operation, digest, did) 49 51 } 52 + 53 + // HealthHandler handles health check requests 54 + func (s *HoldService) HealthHandler(w http.ResponseWriter, r *http.Request) { 55 + w.Header().Set("Content-Type", "application/json") 56 + w.Write([]byte(`{"status":"ok"}`)) 57 + } 58 + 59 + // extractHostname extracts the hostname from a URL 60 + func extractHostname(urlStr string) (string, error) { 61 + u, err := url.Parse(urlStr) 62 + if err != nil { 63 + return "", err 64 + } 65 + // Remove port if present 66 + hostname := u.Hostname() 67 + if hostname == "" { 68 + return "", fmt.Errorf("no hostname in URL") 69 + } 70 + return hostname, nil 71 + }