A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

crazy refactor to start using holds embedded pds for crew/captain validation

+1529 -367
+21 -1
cmd/appview/config.go
··· 1 1 package main 2 2 3 3 import ( 4 + "crypto/rand" 5 + "encoding/hex" 4 6 "fmt" 5 7 "net/url" 6 8 "os" ··· 71 73 addr := getEnvOrDefault("ATCR_HTTP_ADDR", ":5000") 72 74 debugAddr := getEnvOrDefault("ATCR_DEBUG_ADDR", ":5001") 73 75 76 + // HTTP secret - only needed for multipart uploads in distribution's storage driver 77 + // Since AppView is stateless and routes all storage through middleware, this isn't 78 + // actually used, but we generate a random secret for defense in depth 79 + httpSecret := os.Getenv("REGISTRY_HTTP_SECRET") 80 + if httpSecret == "" { 81 + // Generate a random 32-byte secret 82 + randomBytes := make([]byte, 32) 83 + if _, err := rand.Read(randomBytes); err != nil { 84 + return configuration.HTTP{}, fmt.Errorf("failed to generate random secret: %w", err) 85 + } 86 + httpSecret = hex.EncodeToString(randomBytes) 87 + } 88 + 74 89 return configuration.HTTP{ 75 - Addr: addr, 90 + Addr: addr, 91 + Secret: httpSecret, 76 92 Headers: map[string][]string{ 77 93 "X-Content-Type-Options": {"nosniff"}, 78 94 }, ··· 108 124 109 125 // buildMiddlewareConfig creates middleware configuration 110 126 func buildMiddlewareConfig(defaultHold string) map[string][]configuration.Middleware { 127 + // Check test mode 128 + testMode := os.Getenv("TEST_MODE") == "true" 129 + 111 130 return map[string][]configuration.Middleware{ 112 131 "registry": { 113 132 { 114 133 Name: "atproto-resolver", 115 134 Options: configuration.Parameters{ 116 135 "default_storage_endpoint": defaultHold, 136 + "test_mode": testMode, 117 137 }, 118 138 }, 119 139 },
+41 -14
cmd/appview/serve.go
··· 20 20 "github.com/spf13/cobra" 21 21 22 22 "atcr.io/pkg/appview/middleware" 23 + "atcr.io/pkg/atproto" 24 + "atcr.io/pkg/auth" 23 25 "atcr.io/pkg/auth/oauth" 24 26 "atcr.io/pkg/auth/token" 25 27 ··· 147 149 metricsDB := db.NewMetricsDB(uiDatabase) 148 150 middleware.SetGlobalDatabase(metricsDB) 149 151 152 + // 6.6. Create RemoteHoldAuthorizer for hold authorization with caching 153 + holdAuthorizer := auth.NewRemoteHoldAuthorizer(uiDatabase) 154 + middleware.SetGlobalAuthorizer(holdAuthorizer) 155 + fmt.Println("Hold authorizer initialized with database caching") 156 + 157 + // 6.7. Extract default hold DID for OAuth server and backfill worker 158 + // This is used to create sailor profiles on first login and cache captain records 159 + // Expected format: "did:web:hold01.atcr.io" 160 + // To find a hold's DID, visit: https://hold01.atcr.io/.well-known/did.json 161 + // The extraction function normalizes URLs to DIDs for consistency 162 + defaultHoldDID := extractDefaultHoldDID(config) 163 + 150 164 // 7. Initialize UI routes with OAuth app, refresher, and device store 151 - uiTemplates, uiRouter := initializeUIRoutes(uiDatabase, uiReadOnlyDB, uiSessionStore, oauthApp, refresher, baseURL, deviceStore) 165 + uiTemplates, uiRouter := initializeUIRoutes(uiDatabase, uiReadOnlyDB, uiSessionStore, oauthApp, refresher, baseURL, deviceStore, defaultHoldDID) 152 166 153 167 // 8. Create OAuth server 154 168 oauthServer := oauth.NewServer(oauthApp) ··· 161 175 // Connect database for user avatar management 162 176 oauthServer.SetDatabase(uiDatabase) 163 177 164 - // 8.5. Extract default hold endpoint and set it on OAuth server 178 + // 8.5. Set default hold DID on OAuth server (extracted earlier) 165 179 // This is used to create sailor profiles on first login 166 - defaultHoldEndpoint := extractDefaultHoldEndpoint(config) 167 - if defaultHoldEndpoint != "" { 168 - oauthServer.SetDefaultHoldEndpoint(defaultHoldEndpoint) 169 - fmt.Printf("OAuth server will create profiles with default hold: %s\n", defaultHoldEndpoint) 180 + if defaultHoldDID != "" { 181 + oauthServer.SetDefaultHoldDID(defaultHoldDID) 182 + fmt.Printf("OAuth server will create profiles with default hold: %s\n", defaultHoldDID) 170 183 } 171 184 172 185 // 9. Initialize auth keys and create token issuer ··· 227 240 // Mount auth endpoints if enabled 228 241 if issuer != nil { 229 242 // Basic Auth token endpoint (supports device secrets and app passwords) 230 - // Reuse defaultHoldEndpoint extracted earlier 231 - tokenHandler := token.NewHandler(issuer, deviceStore, defaultHoldEndpoint) 243 + // Reuse defaultHoldDID extracted earlier 244 + tokenHandler := token.NewHandler(issuer, deviceStore, defaultHoldDID) 232 245 tokenHandler.RegisterRoutes(mux) 233 246 234 247 // Device authorization endpoints (public) ··· 351 364 return defaultValue 352 365 } 353 366 354 - // extractDefaultHoldEndpoint extracts the default hold endpoint from middleware config 355 - func extractDefaultHoldEndpoint(config *configuration.Configuration) string { 367 + // extractDefaultHoldDID extracts the default hold DID from middleware config 368 + // Returns a DID (e.g., "did:web:hold01.atcr.io") for consistency 369 + // Accepts both DIDs and URLs in config for backward compatibility 370 + // To find a hold's DID, visit: https://hold-url/.well-known/did.json 371 + func extractDefaultHoldDID(config *configuration.Configuration) string { 356 372 // Navigate through: middleware.registry[].options.default_storage_endpoint 357 373 registryMiddleware, ok := config.Middleware["registry"] 358 374 if !ok { ··· 369 385 // Extract options - options is configuration.Parameters which is map[string]any 370 386 if mw.Options != nil { 371 387 if endpoint, ok := mw.Options["default_storage_endpoint"].(string); ok { 372 - return endpoint 388 + // Normalize to DID (handles both URLs and DIDs) 389 + // This ensures we store DIDs consistently 390 + return atproto.ResolveHoldDIDFromURL(endpoint) 373 391 } 374 392 } 375 393 } ··· 447 465 // initializeUIRoutes initializes the web UI routes 448 466 // database: read-write connection for auth and writes 449 467 // readOnlyDB: read-only connection for public queries (search, user pages, etc.) 450 - func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.SessionStore, oauthApp *oauth.App, refresher *oauth.Refresher, baseURL string, deviceStore *db.DeviceStore) (*template.Template, *mux.Router) { 468 + // defaultHoldDID: DID of the default hold service (e.g., "did:web:hold01.atcr.io") 469 + func initializeUIRoutes(database *sql.DB, readOnlyDB *sql.DB, sessionStore *db.SessionStore, oauthApp *oauth.App, refresher *oauth.Refresher, baseURL string, deviceStore *db.DeviceStore, defaultHoldDID string) (*template.Template, *mux.Router) { 451 470 // Check if UI is enabled 452 471 uiEnabled := os.Getenv("ATCR_UI_ENABLED") 453 472 if uiEnabled == "false" { ··· 647 666 relayEndpoint = "https://relay1.us-east.bsky.network" 648 667 } 649 668 650 - backfillWorker, err := jetstream.NewBackfillWorker(database, relayEndpoint) 669 + // Check test mode 670 + testMode := os.Getenv("TEST_MODE") == "true" 671 + 672 + backfillWorker, err := jetstream.NewBackfillWorker(database, relayEndpoint, defaultHoldDID, testMode) 651 673 if err != nil { 652 674 fmt.Printf("Warning: Failed to create backfill worker: %v\n", err) 653 675 } else { 654 - // Run initial backfill 676 + // Run initial backfill with startup delay for Docker compose 655 677 go func() { 678 + // Wait for hold service to be ready (Docker startup race condition) 679 + startupDelay := 5 * time.Second 680 + fmt.Printf("Backfill: Waiting %s for services to be ready...\n", startupDelay) 681 + time.Sleep(startupDelay) 682 + 656 683 fmt.Printf("Backfill: Starting sync-based backfill from %s...\n", relayEndpoint) 657 684 if err := backfillWorker.Start(context.Background()); err != nil { 658 685 fmt.Printf("Backfill: Finished with error: %v\n", err)
+15 -11
cmd/hold/main.go
··· 23 23 log.Fatalf("Failed to load config: %v", err) 24 24 } 25 25 26 - // Create hold service 27 - service, err := hold.NewHoldService(cfg) 28 - if err != nil { 29 - log.Fatalf("Failed to create hold service: %v", err) 30 - } 31 - 32 26 // Initialize embedded PDS if database path is configured 27 + // This must happen before creating HoldService since service needs PDS for authorization 33 28 var holdPDS *pds.HoldPDS 34 29 var xrpcHandler *pds.XRPCHandler 35 30 if cfg.Database.Path != "" { ··· 49 44 log.Fatalf("Failed to bootstrap PDS: %v", err) 50 45 } 51 46 52 - // Create blob store adapter 53 - blobStore := pds.NewHoldServiceBlobStore(service, holdDID) 47 + log.Printf("Embedded PDS initialized successfully") 48 + } else { 49 + log.Fatalf("Database path is required for embedded PDS authorization") 50 + } 51 + 52 + // Create hold service with PDS 53 + service, err := hold.NewHoldService(cfg, holdPDS) 54 + if err != nil { 55 + log.Fatalf("Failed to create hold service: %v", err) 56 + } 54 57 55 - // Create XRPC handler 58 + // Create blob store adapter and XRPC handler 59 + if holdPDS != nil { 60 + holdDID := holdPDS.DID() 61 + blobStore := hold.NewHoldServiceBlobStore(service, holdDID) 56 62 xrpcHandler = pds.NewXRPCHandler(holdPDS, cfg.Server.PublicURL, blobStore) 57 - 58 - log.Printf("Embedded PDS initialized successfully") 59 63 } 60 64 61 65 // Setup HTTP routes
+3 -1
docker-compose.yml
··· 13 13 environment: 14 14 # Server configuration 15 15 ATCR_HTTP_ADDR: :5000 16 - ATCR_DEFAULT_HOLD: http://atcr-hold:8080 16 + ATCR_DEFAULT_HOLD: http://172.28.0.3:8080 17 17 # UI configuration 18 18 ATCR_UI_ENABLED: true 19 19 ATCR_BACKFILL_ENABLED: true 20 + # Test mode - fallback to default hold when user's hold is unreachable 21 + TEST_MODE: true 20 22 # Logging 21 23 ATCR_LOG_LEVEL: info 22 24 volumes:
+4 -4
docs/SAILOR.md
··· 31 31 4. Create Profile Management 32 32 33 33 File: pkg/atproto/profile.go (new file) 34 - - EnsureProfile(ctx, client, defaultHoldEndpoint) function 34 + - EnsureProfile(ctx, client, defaultHoldDID) function 35 35 - Logic: check if profile exists, create with default if not 36 36 37 37 5. Update Auth Handlers ··· 39 39 Files: pkg/auth/exchange/handler.go and pkg/auth/token/service.go 40 40 - Call EnsureProfile() after token validation 41 41 - Use authenticated client (has write access to user's PDS) 42 - - Pass AppView's default_hold_endpoint config 42 + - Pass AppView's default_hold_did config (format: "did:web:hold01.atcr.io") 43 43 44 44 6. Update Hold Resolution 45 45 ··· 89 89 5. Updated /auth/exchange handler to manage profile 90 90 91 91 ⏳ In Progress: 92 - - Need to update /auth/token handler similarly (add defaultHoldEndpoint parameter and profile management) 93 - - Fix compilation error in extractDefaultHoldEndpoint() - should use configuration.Middleware type not any 92 + - Need to update /auth/token handler similarly (add defaultHoldDID parameter and profile management) 93 + - Fix compilation error in extractDefaultHoldDID() - should use configuration.Middleware type not any 94 94 95 95 🔜 Remaining: 96 96 - Update findStorageEndpoint() for new priority logic (check profile → own hold → default)
+7 -7
gen/main.go
··· 7 7 // Usage: 8 8 // go run gen/main.go 9 9 // 10 - // This creates pkg/hold/pds/cbor_gen.go which should be committed to git. 11 - // Only re-run when you modify types in pkg/hold/pds/types.go 10 + // This creates pkg/atproto/cbor_gen.go which should be committed to git. 11 + // Only re-run when you modify types in pkg/atproto/types.go 12 12 13 13 import ( 14 14 "fmt" ··· 16 16 17 17 cbg "github.com/whyrusleeping/cbor-gen" 18 18 19 - "atcr.io/pkg/hold/pds" 19 + "atcr.io/pkg/atproto" 20 20 ) 21 21 22 22 func main() { 23 23 // Generate map-style encoders for CrewRecord and CaptainRecord 24 - if err := cbg.WriteMapEncodersToFile("pkg/hold/pds/cbor_gen.go", "pds", 25 - pds.CrewRecord{}, 26 - pds.CaptainRecord{}, 24 + if err := cbg.WriteMapEncodersToFile("pkg/atproto/cbor_gen.go", "atproto", 25 + atproto.CrewRecord{}, 26 + atproto.CaptainRecord{}, 27 27 ); err != nil { 28 28 fmt.Printf("Failed to generate CBOR encoders: %v\n", err) 29 29 os.Exit(1) 30 30 } 31 31 32 - fmt.Println("Generated CBOR encoders in pkg/hold/pds/cbor_gen.go") 32 + fmt.Println("Generated CBOR encoders in pkg/atproto/cbor_gen.go") 33 33 }
+20
pkg/appview/db/migrations/0003_add_crew_cache.yaml
··· 1 + description: Add crew cache tables for authorization with exponential backoff 2 + query: | 3 + CREATE TABLE IF NOT EXISTS hold_crew_approvals ( 4 + hold_did TEXT NOT NULL, 5 + user_did TEXT NOT NULL, 6 + approved_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 7 + expires_at TIMESTAMP NOT NULL, 8 + PRIMARY KEY(hold_did, user_did) 9 + ); 10 + CREATE INDEX IF NOT EXISTS idx_crew_approvals_expires ON hold_crew_approvals(expires_at); 11 + 12 + CREATE TABLE IF NOT EXISTS hold_crew_denials ( 13 + hold_did TEXT NOT NULL, 14 + user_did TEXT NOT NULL, 15 + denial_count INTEGER NOT NULL DEFAULT 1, 16 + next_retry_at TIMESTAMP NOT NULL, 17 + last_denied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 18 + PRIMARY KEY(hold_did, user_did) 19 + ); 20 + CREATE INDEX IF NOT EXISTS idx_crew_denials_retry ON hold_crew_denials(next_retry_at);
+19
pkg/appview/db/schema.go
··· 179 179 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 180 180 ); 181 181 CREATE INDEX IF NOT EXISTS idx_hold_captain_updated ON hold_captain_records(updated_at); 182 + 183 + CREATE TABLE IF NOT EXISTS hold_crew_approvals ( 184 + hold_did TEXT NOT NULL, 185 + user_did TEXT NOT NULL, 186 + approved_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 187 + expires_at TIMESTAMP NOT NULL, 188 + PRIMARY KEY(hold_did, user_did) 189 + ); 190 + CREATE INDEX IF NOT EXISTS idx_crew_approvals_expires ON hold_crew_approvals(expires_at); 191 + 192 + CREATE TABLE IF NOT EXISTS hold_crew_denials ( 193 + hold_did TEXT NOT NULL, 194 + user_did TEXT NOT NULL, 195 + denial_count INTEGER NOT NULL DEFAULT 1, 196 + next_retry_at TIMESTAMP NOT NULL, 197 + last_denied_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 198 + PRIMARY KEY(hold_did, user_did) 199 + ); 200 + CREATE INDEX IF NOT EXISTS idx_crew_denials_retry ON hold_crew_denials(next_retry_at); 182 201 ` 183 202 184 203 // InitDB initializes the SQLite database with the schema
+181 -10
pkg/appview/jetstream/backfill.go
··· 17 17 18 18 // BackfillWorker uses com.atproto.sync.listReposByCollection to backfill historical data 19 19 type BackfillWorker struct { 20 - db *sql.DB 21 - client *atproto.Client 22 - directory identity.Directory 20 + db *sql.DB 21 + client *atproto.Client 22 + directory identity.Directory 23 + defaultHoldDID string // Default hold DID from AppView config (e.g., "did:web:hold01.atcr.io") 24 + testMode bool // If true, suppress warnings for external holds 23 25 } 24 26 25 27 // BackfillState tracks backfill progress ··· 34 36 } 35 37 36 38 // NewBackfillWorker creates a backfill worker using sync API 37 - func NewBackfillWorker(database *sql.DB, relayEndpoint string) (*BackfillWorker, error) { 39 + // defaultHoldDID should be in format "did:web:hold01.atcr.io" 40 + // To find a hold's DID, visit: https://hold-url/.well-known/did.json 41 + func NewBackfillWorker(database *sql.DB, relayEndpoint, defaultHoldDID string, testMode bool) (*BackfillWorker, error) { 38 42 // Create client for relay - used only for listReposByCollection 39 43 client := atproto.NewClient(relayEndpoint, "", "") 40 44 41 45 return &BackfillWorker{ 42 - db: database, 43 - client: client, // This points to the relay 44 - directory: identity.DefaultDirectory(), 46 + db: database, 47 + client: client, // This points to the relay 48 + directory: identity.DefaultDirectory(), 49 + defaultHoldDID: defaultHoldDID, 50 + testMode: testMode, 45 51 }, nil 46 52 } 47 53 ··· 49 55 func (b *BackfillWorker) Start(ctx context.Context) error { 50 56 fmt.Println("Backfill: Starting sync-based backfill...") 51 57 58 + // First, query and cache the default hold's captain record 59 + if b.defaultHoldDID != "" { 60 + fmt.Printf("Backfill: Querying default hold captain record: %s\n", b.defaultHoldDID) 61 + if err := b.queryCaptainRecord(ctx, b.defaultHoldDID); err != nil { 62 + fmt.Printf("WARNING: Failed to query default hold captain record: %v\n", err) 63 + // Don't fail the whole backfill - just warn 64 + } 65 + } 66 + 52 67 collections := []string{ 53 - atproto.ManifestCollection, // io.atcr.manifest 54 - atproto.TagCollection, // io.atcr.tag 55 - atproto.StarCollection, // io.atcr.sailor.star 68 + atproto.ManifestCollection, // io.atcr.manifest 69 + atproto.TagCollection, // io.atcr.tag 70 + atproto.StarCollection, // io.atcr.sailor.star 71 + atproto.SailorProfileCollection, // io.atcr.sailor.profile 56 72 } 57 73 58 74 for _, collection := range collections { ··· 267 283 return b.processTagRecord(did, record) 268 284 case atproto.StarCollection: 269 285 return b.processStarRecord(did, record) 286 + case atproto.SailorProfileCollection: 287 + return b.processSailorProfileRecord(ctx, did, record) 270 288 default: 271 289 return fmt.Errorf("unsupported collection: %s", collection) 272 290 } ··· 362 380 // The subject contains the owner DID and repository 363 381 // Star count will be calculated on demand from the stars table 364 382 return db.UpsertStar(b.db, did, starRecord.Subject.DID, starRecord.Subject.Repository, starRecord.CreatedAt) 383 + } 384 + 385 + // processSailorProfileRecord processes a sailor profile record 386 + // Extracts defaultHold and queries the hold's captain record to cache it 387 + func (b *BackfillWorker) processSailorProfileRecord(ctx context.Context, did string, record *atproto.Record) error { 388 + var profileRecord atproto.SailorProfileRecord 389 + if err := json.Unmarshal(record.Value, &profileRecord); err != nil { 390 + return fmt.Errorf("failed to unmarshal sailor profile: %w", err) 391 + } 392 + 393 + // Skip if no default hold set 394 + if profileRecord.DefaultHold == "" { 395 + return nil 396 + } 397 + 398 + // Convert hold URL/DID to canonical DID 399 + holdDID := atproto.ResolveHoldDIDFromURL(profileRecord.DefaultHold) 400 + if holdDID == "" { 401 + fmt.Printf("WARNING [backfill]: Invalid hold reference in profile for %s: %s\n", did, profileRecord.DefaultHold) 402 + return nil 403 + } 404 + 405 + // Query and cache the captain record 406 + if err := b.queryCaptainRecord(ctx, holdDID); err != nil { 407 + // In test mode, only warn about default hold (local hold) 408 + // External/production holds may not have captain records yet (dev ahead of prod) 409 + if b.testMode && holdDID != b.defaultHoldDID { 410 + // Suppress warning for external holds in test mode 411 + return nil 412 + } 413 + fmt.Printf("WARNING [backfill]: Failed to query captain record for hold %s: %v\n", holdDID, err) 414 + // Don't fail the whole backfill - just skip this hold 415 + return nil 416 + } 417 + 418 + return nil 419 + } 420 + 421 + // queryCaptainRecord queries a hold's captain record and caches it in the database 422 + func (b *BackfillWorker) queryCaptainRecord(ctx context.Context, holdDID string) error { 423 + // Check if we already have it cached (skip if recently updated) 424 + existing, err := db.GetCaptainRecord(b.db, holdDID) 425 + if err == nil && existing != nil { 426 + // If cached within last hour, skip refresh 427 + if time.Since(existing.UpdatedAt) < 1*time.Hour { 428 + return nil 429 + } 430 + } 431 + 432 + // Resolve hold DID to URL 433 + // For did:web, we need to fetch .well-known/did.json 434 + holdURL, err := resolveHoldDIDToURL(ctx, holdDID) 435 + if err != nil { 436 + return fmt.Errorf("failed to resolve hold DID to URL: %w", err) 437 + } 438 + 439 + // Create client for hold's PDS 440 + holdClient := atproto.NewClient(holdURL, holdDID, "") 441 + 442 + // Query captain record with retries (for Docker startup timing) 443 + var record *atproto.Record 444 + maxRetries := 3 445 + for attempt := 1; attempt <= maxRetries; attempt++ { 446 + record, err = holdClient.GetRecord(ctx, "io.atcr.hold.captain", "self") 447 + if err == nil { 448 + break 449 + } 450 + 451 + // Retry on connection errors (hold service might still be starting) 452 + if attempt < maxRetries && strings.Contains(err.Error(), "connection refused") { 453 + fmt.Printf("Backfill: Hold not ready (attempt %d/%d), retrying in 2s...\n", attempt, maxRetries) 454 + time.Sleep(2 * time.Second) 455 + continue 456 + } 457 + 458 + return fmt.Errorf("failed to get captain record: %w", err) 459 + } 460 + 461 + // Parse captain record from the record's Value field 462 + var captainRecord struct { 463 + Owner string `json:"owner"` 464 + Public bool `json:"public"` 465 + AllowAllCrew bool `json:"allowAllCrew"` 466 + DeployedAt string `json:"deployedAt"` 467 + Region string `json:"region"` 468 + Provider string `json:"provider"` 469 + } 470 + 471 + if err := json.Unmarshal(record.Value, &captainRecord); err != nil { 472 + return fmt.Errorf("failed to parse captain record: %w", err) 473 + } 474 + 475 + // Cache in database 476 + dbRecord := &db.HoldCaptainRecord{ 477 + HoldDID: holdDID, 478 + OwnerDID: captainRecord.Owner, 479 + Public: captainRecord.Public, 480 + AllowAllCrew: captainRecord.AllowAllCrew, 481 + DeployedAt: captainRecord.DeployedAt, 482 + Region: captainRecord.Region, 483 + Provider: captainRecord.Provider, 484 + UpdatedAt: time.Now(), 485 + } 486 + 487 + if err := db.UpsertCaptainRecord(b.db, dbRecord); err != nil { 488 + return fmt.Errorf("failed to cache captain record: %w", err) 489 + } 490 + 491 + fmt.Printf("Backfill: Cached captain record for hold %s (owner: %s)\n", holdDID, captainRecord.Owner) 492 + return nil 493 + } 494 + 495 + // resolveHoldDIDToURL resolves a hold DID to its service endpoint URL 496 + // Fetches the DID document and returns both the canonical DID and service endpoint 497 + func resolveHoldDIDToURL(ctx context.Context, inputDID string) (string, error) { 498 + // For did:web, construct the .well-known URL 499 + if !strings.HasPrefix(inputDID, "did:web:") { 500 + return "", fmt.Errorf("only did:web is supported, got: %s", inputDID) 501 + } 502 + 503 + // Extract hostname from did:web:hostname[:port] 504 + hostname := strings.TrimPrefix(inputDID, "did:web:") 505 + 506 + // Try HTTP first (for local Docker), then HTTPS 507 + var serviceEndpoint string 508 + for _, scheme := range []string{"http", "https"} { 509 + testURL := fmt.Sprintf("%s://%s/.well-known/did.json", scheme, hostname) 510 + 511 + // Fetch DID document (use NewClient to initialize httpClient) 512 + client := atproto.NewClient("", "", "") 513 + didDoc, err := client.FetchDIDDocument(ctx, testURL) 514 + if err == nil && didDoc != nil { 515 + // Extract service endpoint from DID document 516 + for _, service := range didDoc.Service { 517 + if service.Type == "AtprotoPersonalDataServer" || service.Type == "AtcrHoldService" { 518 + serviceEndpoint = service.ServiceEndpoint 519 + break 520 + } 521 + } 522 + 523 + if serviceEndpoint != "" { 524 + fmt.Printf("DEBUG [backfill]: Resolved %s → canonical DID: %s, endpoint: %s\n", 525 + inputDID, didDoc.ID, serviceEndpoint) 526 + return serviceEndpoint, nil 527 + } 528 + } 529 + } 530 + 531 + // Fallback: assume the hold service is at the root of the hostname 532 + // Try HTTP first for local development 533 + url := fmt.Sprintf("http://%s", hostname) 534 + fmt.Printf("WARNING [backfill]: Failed to fetch DID document for %s, using fallback URL: %s\n", inputDID, url) 535 + return url, nil 365 536 } 366 537 367 538 // ensureUser resolves and upserts a user by DID
+51 -5
pkg/appview/middleware/registry.go
··· 29 29 IncrementPushCount(did, repository string) error 30 30 } 31 31 32 + // Global authorizer instance (set by main.go for hold authorization) 33 + var globalAuthorizer auth.HoldAuthorizer 34 + 32 35 // SetGlobalRefresher sets the global OAuth refresher instance 33 36 func SetGlobalRefresher(refresher *oauth.Refresher) { 34 37 globalRefresher = refresher ··· 42 45 globalDatabase = database 43 46 } 44 47 48 + // SetGlobalAuthorizer sets the global authorizer instance for hold access control 49 + func SetGlobalAuthorizer(authorizer auth.HoldAuthorizer) { 50 + globalAuthorizer = authorizer 51 + } 52 + 45 53 func init() { 46 54 // Register the name resolution middleware 47 55 registrymw.Register("atproto-resolver", initATProtoResolver) ··· 52 60 distribution.Namespace 53 61 directory identity.Directory 54 62 defaultStorageEndpoint string 63 + testMode bool // If true, fallback to default hold when user's hold is unreachable 55 64 repositories sync.Map // Cache of RoutingRepository instances by key (did:reponame) 56 65 } 57 66 ··· 61 70 directory := identity.DefaultDirectory() 62 71 63 72 // Get default storage endpoint from config (optional) 73 + // Normalize to DID format for consistency 64 74 defaultStorageEndpoint := "" 65 75 if endpoint, ok := options["default_storage_endpoint"].(string); ok { 66 - defaultStorageEndpoint = endpoint 76 + // Convert URL to DID if needed (or pass through if already a DID) 77 + defaultStorageEndpoint = atproto.ResolveHoldDIDFromURL(endpoint) 78 + } 79 + 80 + // Check test mode from options (passed via env var) 81 + testMode := false 82 + if tm, ok := options["test_mode"].(bool); ok { 83 + testMode = tm 67 84 } 68 85 69 86 return &NamespaceResolver{ 70 87 Namespace: ns, 71 88 directory: directory, 72 89 defaultStorageEndpoint: defaultStorageEndpoint, 90 + testMode: testMode, 73 91 }, nil 74 92 } 75 93 ··· 177 195 178 196 // Create routing repository - routes manifests to ATProto, blobs to hold service 179 197 // The registry is stateless - no local storage is used 180 - // Pass storage endpoint and DID as parameters (can't use context as it gets lost) 181 - routingRepo := storage.NewRoutingRepository(repo, atprotoClient, repositoryName, storageEndpoint, did, globalDatabase) 198 + // Pass storage endpoint, DID, and authorizer as parameters (can't use context as it gets lost) 199 + routingRepo := storage.NewRoutingRepository(repo, atprotoClient, repositoryName, storageEndpoint, did, globalDatabase, globalAuthorizer) 182 200 183 201 // Cache the repository 184 202 nr.repositories.Store(cacheKey, routingRepo) ··· 206 224 // 1. User's sailor profile defaultHold (if set) 207 225 // 2. User's own hold record (io.atcr.hold) 208 226 // 3. AppView's default hold endpoint 209 - // Returns the storage endpoint URL, or empty string if none configured 227 + // Returns a hold DID (e.g., "did:web:hold01.atcr.io"), or empty string if none configured 228 + // Note: Despite returning a DID, this is used as the "storage endpoint" throughout the code 210 229 func (nr *NamespaceResolver) findStorageEndpoint(ctx context.Context, did, pdsEndpoint string) string { 211 230 // Create ATProto client (without auth - reading public records) 212 231 client := atproto.NewClient(pdsEndpoint, did, "") ··· 219 238 } 220 239 221 240 if profile != nil && profile.DefaultHold != "" { 222 - // Profile exists with defaultHold set - use it 241 + // Profile exists with defaultHold set 242 + // In test mode, verify it's reachable before using it 243 + if nr.testMode { 244 + if nr.isHoldReachable(ctx, profile.DefaultHold) { 245 + return profile.DefaultHold 246 + } 247 + fmt.Printf("DEBUG [registry/middleware/testmode]: User's defaultHold %s unreachable, falling back to default\n", profile.DefaultHold) 248 + return nr.defaultStorageEndpoint 249 + } 223 250 return profile.DefaultHold 224 251 } 225 252 ··· 247 274 // 3. No profile defaultHold and no own hold records - use AppView default 248 275 return nr.defaultStorageEndpoint 249 276 } 277 + 278 + // isHoldReachable checks if a hold service is reachable 279 + // Used in test mode to fallback to default hold when user's hold is unavailable 280 + func (nr *NamespaceResolver) isHoldReachable(ctx context.Context, holdDID string) bool { 281 + // Try to fetch the DID document 282 + hostname := strings.TrimPrefix(holdDID, "did:web:") 283 + 284 + // Try HTTP first (local), then HTTPS 285 + for _, scheme := range []string{"http", "https"} { 286 + testURL := fmt.Sprintf("%s://%s/.well-known/did.json", scheme, hostname) 287 + client := atproto.NewClient("", "", "") 288 + _, err := client.FetchDIDDocument(ctx, testURL) 289 + if err == nil { 290 + return true 291 + } 292 + } 293 + 294 + return false 295 + }
+80 -2
pkg/appview/storage/proxy_blob_store.go
··· 10 10 "sync" 11 11 "time" 12 12 13 + "atcr.io/pkg/atproto" 14 + "atcr.io/pkg/auth" 13 15 "github.com/distribution/distribution/v3" 14 16 "github.com/opencontainers/go-digest" 15 17 ) ··· 34 36 did string 35 37 database DatabaseMetrics 36 38 repository string 39 + authorizer auth.HoldAuthorizer 40 + holdDID string 37 41 } 38 42 39 43 // NewProxyBlobStore creates a new proxy blob store 40 - func NewProxyBlobStore(storageEndpoint, did string, database DatabaseMetrics, repository string) *ProxyBlobStore { 41 - fmt.Printf("DEBUG [proxy_blob_store]: NewProxyBlobStore created with endpoint=%s, did=%s, repo=%s\n", storageEndpoint, did, repository) 44 + func NewProxyBlobStore(storageEndpoint, did string, database DatabaseMetrics, repository string, authorizer auth.HoldAuthorizer) *ProxyBlobStore { 45 + // Convert storage endpoint URL to did:web DID for authorization 46 + holdDID := atproto.ResolveHoldDIDFromURL(storageEndpoint) 47 + fmt.Printf("DEBUG [proxy_blob_store]: NewProxyBlobStore created with endpoint=%s, holdDID=%s, userDID=%s, repo=%s\n", 48 + storageEndpoint, holdDID, did, repository) 49 + 42 50 return &ProxyBlobStore{ 43 51 storageEndpoint: storageEndpoint, 44 52 httpClient: &http.Client{ ··· 54 62 did: did, 55 63 database: database, 56 64 repository: repository, 65 + authorizer: authorizer, 66 + holdDID: holdDID, 57 67 } 58 68 } 59 69 70 + // checkReadAccess verifies the user has read access to the hold 71 + func (p *ProxyBlobStore) checkReadAccess(ctx context.Context) error { 72 + if p.authorizer == nil { 73 + // No authorizer configured - allow access (backward compatibility) 74 + return nil 75 + } 76 + 77 + hasAccess, err := p.authorizer.CheckReadAccess(ctx, p.holdDID, p.did) 78 + if err != nil { 79 + return fmt.Errorf("authorization check failed: %w", err) 80 + } 81 + 82 + if !hasAccess { 83 + return distribution.ErrBlobUnknown // Return same error as missing blob for security 84 + } 85 + 86 + return nil 87 + } 88 + 89 + // checkWriteAccess verifies the user has write access to the hold 90 + func (p *ProxyBlobStore) checkWriteAccess(ctx context.Context) error { 91 + if p.authorizer == nil { 92 + // No authorizer configured - allow access (backward compatibility) 93 + return nil 94 + } 95 + 96 + hasAccess, err := p.authorizer.CheckWriteAccess(ctx, p.holdDID, p.did) 97 + if err != nil { 98 + return fmt.Errorf("authorization check failed: %w", err) 99 + } 100 + 101 + if !hasAccess { 102 + return fmt.Errorf("write access denied to hold %s", p.holdDID) 103 + } 104 + 105 + return nil 106 + } 107 + 60 108 // Stat returns the descriptor for a blob 61 109 func (p *ProxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { 110 + // Check read access 111 + if err := p.checkReadAccess(ctx); err != nil { 112 + return distribution.Descriptor{}, err 113 + } 114 + 62 115 // Get presigned HEAD URL 63 116 url, err := p.getHeadURL(ctx, dgst) 64 117 if err != nil { ··· 96 149 97 150 // Get retrieves a blob 98 151 func (p *ProxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { 152 + // Check read access 153 + if err := p.checkReadAccess(ctx); err != nil { 154 + return nil, err 155 + } 156 + 99 157 url, err := p.getDownloadURL(ctx, dgst) 100 158 if err != nil { 101 159 return nil, err ··· 117 175 118 176 // Open returns a reader for a blob 119 177 func (p *ProxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) { 178 + // Check read access 179 + if err := p.checkReadAccess(ctx); err != nil { 180 + return nil, err 181 + } 182 + 120 183 url, err := p.getDownloadURL(ctx, dgst) 121 184 if err != nil { 122 185 return nil, err ··· 141 204 142 205 // Put stores a blob 143 206 func (p *ProxyBlobStore) Put(ctx context.Context, mediaType string, content []byte) (distribution.Descriptor, error) { 207 + // Check write access 208 + if err := p.checkWriteAccess(ctx); err != nil { 209 + return distribution.Descriptor{}, err 210 + } 211 + 144 212 // Calculate digest 145 213 dgst := digest.FromBytes(content) 146 214 ··· 189 257 190 258 // ServeBlob serves a blob via HTTP redirect 191 259 func (p *ProxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { 260 + // Check read access 261 + if err := p.checkReadAccess(ctx); err != nil { 262 + return err 263 + } 264 + 192 265 // For HEAD requests, redirect to presigned HEAD URL 193 266 if r.Method == http.MethodHead { 194 267 url, err := p.getHeadURL(ctx, dgst) ··· 214 287 215 288 // Create returns a blob writer for uploading using multipart upload 216 289 func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { 290 + // Check write access 291 + if err := p.checkWriteAccess(ctx); err != nil { 292 + return nil, err 293 + } 294 + 217 295 // Parse options 218 296 var opts distribution.CreateOptions 219 297 for _, option := range options {
+6 -2
pkg/appview/storage/routing_repository.go
··· 6 6 "time" 7 7 8 8 "atcr.io/pkg/atproto" 9 + "atcr.io/pkg/auth" 9 10 "github.com/distribution/distribution/v3" 10 11 ) 11 12 ··· 26 27 manifestStore *atproto.ManifestStore // Cached manifest store instance 27 28 blobStore *ProxyBlobStore // Cached blob store instance 28 29 database DatabaseMetrics // Database for metrics tracking 30 + authorizer auth.HoldAuthorizer // Authorization for hold access 29 31 } 30 32 31 33 // NewRoutingRepository creates a new routing repository ··· 36 38 storageEndpoint string, 37 39 did string, 38 40 database DatabaseMetrics, 41 + authorizer auth.HoldAuthorizer, 39 42 ) *RoutingRepository { 40 43 return &RoutingRepository{ 41 44 Repository: baseRepo, ··· 44 47 storageEndpoint: storageEndpoint, 45 48 did: did, 46 49 database: database, 50 + authorizer: authorizer, 47 51 } 48 52 } 49 53 ··· 105 109 panic("storage endpoint not set in RoutingRepository - ensure default_storage_endpoint is configured in middleware") 106 110 } 107 111 108 - // Create and cache proxy blob store 109 - r.blobStore = NewProxyBlobStore(holdEndpoint, r.did, r.database, r.repositoryName) 112 + // Create and cache proxy blob store with authorization 113 + r.blobStore = NewProxyBlobStore(holdEndpoint, r.did, r.database, r.repositoryName, r.authorizer) 110 114 return r.blobStore 111 115 } 112 116
+36
pkg/atproto/client.go
··· 625 625 func BlobCDNURL(didOrHandle, cid string) string { 626 626 return fmt.Sprintf("https://imgs.blue/%s/%s", didOrHandle, cid) 627 627 } 628 + 629 + // DIDDocument represents a did:web document 630 + type DIDDocument struct { 631 + Context []string `json:"@context"` 632 + ID string `json:"id"` 633 + Service []struct { 634 + ID string `json:"id"` 635 + Type string `json:"type"` 636 + ServiceEndpoint string `json:"serviceEndpoint"` 637 + } `json:"service"` 638 + } 639 + 640 + // FetchDIDDocument fetches and parses a DID document from a URL 641 + func (c *Client) FetchDIDDocument(ctx context.Context, didDocURL string) (*DIDDocument, error) { 642 + req, err := http.NewRequestWithContext(ctx, "GET", didDocURL, nil) 643 + if err != nil { 644 + return nil, err 645 + } 646 + 647 + resp, err := c.httpClient.Do(req) 648 + if err != nil { 649 + return nil, fmt.Errorf("failed to fetch DID document: %w", err) 650 + } 651 + defer resp.Body.Close() 652 + 653 + if resp.StatusCode != http.StatusOK { 654 + return nil, fmt.Errorf("fetch DID document failed with status %d", resp.StatusCode) 655 + } 656 + 657 + var didDoc DIDDocument 658 + if err := json.NewDecoder(resp.Body).Decode(&didDoc); err != nil { 659 + return nil, fmt.Errorf("failed to decode DID document: %w", err) 660 + } 661 + 662 + return &didDoc, nil 663 + }
+44 -1
pkg/atproto/lexicon.go
··· 1 1 package atproto 2 2 3 + //go:generate go run github.com/whyrusleeping/cbor-gen --map-encoding CrewRecord CaptainRecord 4 + 3 5 import ( 4 6 "encoding/base64" 5 7 "encoding/json" ··· 19 21 // HoldCollection is the collection name for storage holds (BYOS) 20 22 HoldCollection = "io.atcr.hold" 21 23 22 - // HoldCrewCollection is the collection name for hold crew (membership) 24 + // HoldCrewCollection is the collection name for hold crew (membership) - LEGACY BYOS model 25 + // Stored in owner's PDS for BYOS holds 23 26 HoldCrewCollection = "io.atcr.hold.crew" 27 + 28 + // CaptainCollection is the collection name for captain records (hold ownership) - EMBEDDED PDS model 29 + // Stored in hold's embedded PDS (singleton record at rkey "self") 30 + CaptainCollection = "io.atcr.hold.captain" 31 + 32 + // CrewCollection is the collection name for crew records (access control) - EMBEDDED PDS model 33 + // Stored in hold's embedded PDS (one record per member) 34 + // Note: Uses same collection name as HoldCrewCollection but stored in different PDS (hold's PDS vs owner's PDS) 35 + CrewCollection = "io.atcr.hold.crew" 24 36 25 37 // SailorProfileCollection is the collection name for user profiles 26 38 SailorProfileCollection = "io.atcr.sailor.profile" ··· 371 383 // did:web uses hostname directly (port included if non-standard) 372 384 return "did:web:" + hostname 373 385 } 386 + 387 + // ============================================================================= 388 + // Embedded PDS Types (Hold Service) 389 + // ============================================================================= 390 + 391 + // CaptainRecord represents the hold's ownership and metadata 392 + // Collection: io.atcr.hold.captain (singleton record at rkey "self") 393 + // Stored in the hold's embedded PDS to identify the hold owner and settings 394 + // Uses CBOR encoding for efficient storage in hold's carstore 395 + type CaptainRecord struct { 396 + Type string `json:"$type" cborgen:"$type"` 397 + Owner string `json:"owner" cborgen:"owner"` // DID of hold owner 398 + Public bool `json:"public" cborgen:"public"` // Public read access 399 + AllowAllCrew bool `json:"allowAllCrew" cborgen:"allowAllCrew"` // Allow any authenticated user to register as crew 400 + DeployedAt string `json:"deployedAt" cborgen:"deployedAt"` // RFC3339 timestamp 401 + Region string `json:"region,omitempty" cborgen:"region,omitempty"` // S3 region (optional) 402 + Provider string `json:"provider,omitempty" cborgen:"provider,omitempty"` // Deployment provider (optional) 403 + } 404 + 405 + // CrewRecord represents a crew member in the hold 406 + // Collection: io.atcr.hold.crew (one record per member) 407 + // Stored in the hold's embedded PDS for access control 408 + // Uses CBOR encoding for efficient storage in hold's carstore 409 + // Note: Same collection name as HoldCrewRecord but stored in hold's PDS (not owner's PDS) 410 + type CrewRecord struct { 411 + Type string `json:"$type" cborgen:"$type"` 412 + Member string `json:"member" cborgen:"member"` 413 + Role string `json:"role" cborgen:"role"` 414 + Permissions []string `json:"permissions" cborgen:"permissions"` 415 + AddedAt string `json:"addedAt" cborgen:"addedAt"` // RFC3339 timestamp 416 + }
+35 -5
pkg/atproto/profile.go
··· 12 12 13 13 // EnsureProfile checks if a user's profile exists and creates it if needed 14 14 // This should be called during authentication (OAuth exchange or token service) 15 - // If defaultHoldEndpoint is provided, creates profile with that default (or empty if not provided) 16 - func EnsureProfile(ctx context.Context, client *Client, defaultHoldEndpoint string) error { 15 + // If defaultHoldDID is provided, creates profile with that default (or empty if not provided) 16 + // Expected format: "did:web:hold01.atcr.io" 17 + // Normalizes URLs to DIDs for consistency (for backward compatibility) 18 + func EnsureProfile(ctx context.Context, client *Client, defaultHoldDID string) error { 17 19 // Check if profile already exists 18 20 profile, err := client.GetRecord(ctx, SailorProfileCollection, ProfileRKey) 19 21 if err == nil && profile != nil { ··· 21 23 return nil 22 24 } 23 25 26 + // Normalize to DID if it's a URL (or pass through if already a DID) 27 + // This ensures we store DIDs consistently in new profiles 28 + normalizedDID := "" 29 + if defaultHoldDID != "" { 30 + normalizedDID = ResolveHoldDIDFromURL(defaultHoldDID) 31 + } 32 + 24 33 // Profile doesn't exist - create it 25 - // defaultHoldEndpoint can be empty string (user will need to configure it later) 26 - newProfile := NewSailorProfileRecord(defaultHoldEndpoint) 34 + newProfile := NewSailorProfileRecord(normalizedDID) 27 35 28 36 _, err = client.PutRecord(ctx, SailorProfileCollection, ProfileRKey, newProfile) 29 37 if err != nil { 30 38 return fmt.Errorf("failed to create sailor profile: %w", err) 31 39 } 32 40 33 - fmt.Printf("DEBUG [profile]: Created sailor profile with defaultHold=%s\n", defaultHoldEndpoint) 41 + fmt.Printf("DEBUG [profile]: Created sailor profile with defaultHold=%s\n", normalizedDID) 34 42 return nil 35 43 } 36 44 37 45 // GetProfile retrieves the user's profile from their PDS 38 46 // Returns nil if profile doesn't exist 47 + // Automatically migrates old URL-based defaultHold values to DIDs 39 48 func GetProfile(ctx context.Context, client *Client) (*SailorProfileRecord, error) { 40 49 record, err := client.GetRecord(ctx, SailorProfileCollection, ProfileRKey) 41 50 if err != nil { ··· 52 61 return nil, fmt.Errorf("failed to parse profile: %w", err) 53 62 } 54 63 64 + // Migrate old URL-based defaultHold to DID format 65 + // This ensures backward compatibility with profiles created before DID migration 66 + if profile.DefaultHold != "" && !isDID(profile.DefaultHold) { 67 + // Convert URL to DID transparently 68 + profile.DefaultHold = ResolveHoldDIDFromURL(profile.DefaultHold) 69 + fmt.Printf("DEBUG [profile]: Migrated defaultHold URL to DID: %s\n", profile.DefaultHold) 70 + } 71 + 55 72 return &profile, nil 56 73 } 57 74 75 + // isDID checks if a string is a DID (starts with "did:") 76 + func isDID(s string) bool { 77 + return len(s) > 4 && s[:4] == "did:" 78 + } 79 + 58 80 // UpdateProfile updates the user's profile 81 + // Normalizes defaultHold to DID format before saving 59 82 func UpdateProfile(ctx context.Context, client *Client, profile *SailorProfileRecord) error { 83 + // Normalize defaultHold to DID if it's a URL 84 + // This ensures we always store DIDs, even if user provides a URL 85 + if profile.DefaultHold != "" && !isDID(profile.DefaultHold) { 86 + profile.DefaultHold = ResolveHoldDIDFromURL(profile.DefaultHold) 87 + fmt.Printf("DEBUG [profile]: Normalized defaultHold to DID: %s\n", profile.DefaultHold) 88 + } 89 + 60 90 _, err := client.PutRecord(ctx, SailorProfileCollection, ProfileRKey, profile) 61 91 if err != nil { 62 92 return fmt.Errorf("failed to update profile: %w", err)
+77
pkg/auth/hold_authorizer.go
··· 1 + package auth 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + 7 + "atcr.io/pkg/atproto" 8 + ) 9 + 10 + // HoldAuthorizer checks if a DID has read/write access to a hold 11 + // Implementations can query local PDS (hold service) or remote XRPC (appview) 12 + type HoldAuthorizer interface { 13 + // CheckReadAccess checks if userDID can read from holdDID 14 + // Returns: (allowed bool, error) 15 + CheckReadAccess(ctx context.Context, holdDID, userDID string) (bool, error) 16 + 17 + // CheckWriteAccess checks if userDID can write to holdDID 18 + // Returns: (allowed bool, error) 19 + CheckWriteAccess(ctx context.Context, holdDID, userDID string) (bool, error) 20 + 21 + // GetCaptainRecord retrieves the captain record for a hold 22 + // Used to check public flag and allowAllCrew settings 23 + GetCaptainRecord(ctx context.Context, holdDID string) (*atproto.CaptainRecord, error) 24 + 25 + // IsCrewMember checks if userDID is a crew member of holdDID 26 + IsCrewMember(ctx context.Context, holdDID, userDID string) (bool, error) 27 + } 28 + 29 + // CheckReadAccessWithCaptain implements the standard read authorization logic 30 + // This is shared across all HoldAuthorizer implementations 31 + // Read access rules: 32 + // - Public hold: allow anyone (even anonymous) 33 + // - Private hold: require authentication (any authenticated user) 34 + func CheckReadAccessWithCaptain(captain *atproto.CaptainRecord, userDID string) bool { 35 + if captain.Public { 36 + // Public hold - allow anyone (even anonymous) 37 + return true 38 + } 39 + 40 + // Private hold - require authentication 41 + // Any authenticated user with a DID can read 42 + if userDID == "" { 43 + // Anonymous user trying to access private hold 44 + return false 45 + } 46 + 47 + // For MVP: assume DID presence means they have sailor.profile 48 + // Future: could query PDS to verify sailor.profile exists 49 + return true 50 + } 51 + 52 + // CheckWriteAccessWithCaptain implements the standard write authorization logic 53 + // This is shared across all HoldAuthorizer implementations 54 + // Write access rules: 55 + // - Must be authenticated 56 + // - Must be hold owner OR crew member 57 + func CheckWriteAccessWithCaptain(captain *atproto.CaptainRecord, userDID string, isCrew bool) bool { 58 + if userDID == "" { 59 + // Anonymous writes not allowed 60 + return false 61 + } 62 + 63 + // Check if DID is the hold owner 64 + if userDID == captain.Owner { 65 + // Owner always has write access 66 + return true 67 + } 68 + 69 + // Check if DID is a crew member 70 + return isCrew 71 + } 72 + 73 + // ErrHoldNotFound is returned when a hold's captain record cannot be found 74 + var ErrHoldNotFound = fmt.Errorf("hold not found") 75 + 76 + // ErrUnauthorized is returned when access is denied 77 + var ErrUnauthorized = fmt.Errorf("unauthorized")
+101
pkg/auth/hold_local.go
··· 1 + package auth 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + 7 + "atcr.io/pkg/atproto" 8 + "atcr.io/pkg/hold/pds" 9 + ) 10 + 11 + // LocalHoldAuthorizer queries the hold's own embedded PDS directly 12 + // Used by hold service to authorize access to its own storage 13 + type LocalHoldAuthorizer struct { 14 + pds *pds.HoldPDS 15 + } 16 + 17 + // NewLocalHoldAuthorizer creates a new local authorizer for hold service 18 + func NewLocalHoldAuthorizer(holdPDS *pds.HoldPDS) HoldAuthorizer { 19 + return &LocalHoldAuthorizer{ 20 + pds: holdPDS, 21 + } 22 + } 23 + 24 + // NewLocalHoldAuthorizerFromInterface creates a new local authorizer from an any 25 + // This is used to avoid import cycles - caller must pass a *pds.HoldPDS 26 + func NewLocalHoldAuthorizerFromInterface(holdPDS any) HoldAuthorizer { 27 + // Type assert to *pds.HoldPDS 28 + if pdsTyped, ok := holdPDS.(*pds.HoldPDS); ok { 29 + return &LocalHoldAuthorizer{ 30 + pds: pdsTyped, 31 + } 32 + } 33 + // Return nil if type assertion fails - caller should check 34 + return nil 35 + } 36 + 37 + // GetCaptainRecord retrieves the captain record from the hold's PDS 38 + func (a *LocalHoldAuthorizer) GetCaptainRecord(ctx context.Context, holdDID string) (*atproto.CaptainRecord, error) { 39 + // Verify that the requested holdDID matches this hold 40 + if holdDID != a.pds.DID() { 41 + return nil, fmt.Errorf("holdDID mismatch: requested %s, this hold is %s", holdDID, a.pds.DID()) 42 + } 43 + 44 + // Query the PDS for captain record 45 + _, pdsCaptain, err := a.pds.GetCaptainRecord(ctx) 46 + if err != nil { 47 + return nil, fmt.Errorf("failed to get captain record: %w", err) 48 + } 49 + 50 + // The PDS returns *atproto.CaptainRecord directly now (after we update pds to use atproto types) 51 + return pdsCaptain, nil 52 + } 53 + 54 + // IsCrewMember checks if userDID is a crew member 55 + func (a *LocalHoldAuthorizer) IsCrewMember(ctx context.Context, holdDID, userDID string) (bool, error) { 56 + // Verify that the requested holdDID matches this hold 57 + if holdDID != a.pds.DID() { 58 + return false, fmt.Errorf("holdDID mismatch: requested %s, this hold is %s", holdDID, a.pds.DID()) 59 + } 60 + 61 + // Query the PDS for crew list 62 + crewList, err := a.pds.ListCrewMembers(ctx) 63 + if err != nil { 64 + return false, fmt.Errorf("failed to list crew members: %w", err) 65 + } 66 + 67 + // Check if userDID is in the crew list 68 + for _, member := range crewList { 69 + if member.Record.Member == userDID { 70 + // TODO: Check expiration if set 71 + return true, nil 72 + } 73 + } 74 + 75 + return false, nil 76 + } 77 + 78 + // CheckReadAccess implements read authorization using shared logic 79 + func (a *LocalHoldAuthorizer) CheckReadAccess(ctx context.Context, holdDID, userDID string) (bool, error) { 80 + captain, err := a.GetCaptainRecord(ctx, holdDID) 81 + if err != nil { 82 + return false, err 83 + } 84 + 85 + return CheckReadAccessWithCaptain(captain, userDID), nil 86 + } 87 + 88 + // CheckWriteAccess implements write authorization using shared logic 89 + func (a *LocalHoldAuthorizer) CheckWriteAccess(ctx context.Context, holdDID, userDID string) (bool, error) { 90 + captain, err := a.GetCaptainRecord(ctx, holdDID) 91 + if err != nil { 92 + return false, err 93 + } 94 + 95 + isCrew, err := a.IsCrewMember(ctx, holdDID, userDID) 96 + if err != nil { 97 + return false, err 98 + } 99 + 100 + return CheckWriteAccessWithCaptain(captain, userDID, isCrew), nil 101 + }
+559
pkg/auth/hold_remote.go
··· 1 + package auth 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "net/http" 10 + "net/url" 11 + "strings" 12 + "sync" 13 + "time" 14 + 15 + "atcr.io/pkg/atproto" 16 + ) 17 + 18 + // RemoteHoldAuthorizer queries a hold's PDS via XRPC endpoints 19 + // Used by AppView to authorize access to remote holds 20 + // Implements caching for captain records to reduce XRPC calls 21 + type RemoteHoldAuthorizer struct { 22 + db *sql.DB 23 + httpClient *http.Client 24 + cacheTTL time.Duration // TTL for captain record cache 25 + recentDenials sync.Map // In-memory cache for first denials (10s backoff) 26 + stopCleanup chan struct{} // Signal to stop cleanup goroutine 27 + } 28 + 29 + // denialEntry stores timestamp for in-memory first denials 30 + type denialEntry struct { 31 + timestamp time.Time 32 + } 33 + 34 + // NewRemoteHoldAuthorizer creates a new remote authorizer for AppView 35 + func NewRemoteHoldAuthorizer(db *sql.DB) HoldAuthorizer { 36 + a := &RemoteHoldAuthorizer{ 37 + db: db, 38 + httpClient: &http.Client{ 39 + Timeout: 10 * time.Second, 40 + }, 41 + cacheTTL: 1 * time.Hour, // 1 hour cache TTL 42 + stopCleanup: make(chan struct{}), 43 + } 44 + 45 + // Start cleanup goroutine for in-memory denials 46 + go a.cleanupRecentDenials() 47 + 48 + return a 49 + } 50 + 51 + // cleanupRecentDenials runs every 10s to remove expired first-denial entries 52 + func (a *RemoteHoldAuthorizer) cleanupRecentDenials() { 53 + ticker := time.NewTicker(10 * time.Second) 54 + defer ticker.Stop() 55 + 56 + for { 57 + select { 58 + case <-ticker.C: 59 + now := time.Now() 60 + a.recentDenials.Range(func(key, value any) bool { 61 + entry := value.(denialEntry) 62 + // Remove entries older than 15 seconds (10s backoff + 5s grace) 63 + if now.Sub(entry.timestamp) > 15*time.Second { 64 + a.recentDenials.Delete(key) 65 + } 66 + return true 67 + }) 68 + case <-a.stopCleanup: 69 + return 70 + } 71 + } 72 + } 73 + 74 + // GetCaptainRecord retrieves a captain record with caching 75 + // 1. Check database cache 76 + // 2. If cache miss or expired, query hold's XRPC endpoint 77 + // 3. Update cache 78 + func (a *RemoteHoldAuthorizer) GetCaptainRecord(ctx context.Context, holdDID string) (*atproto.CaptainRecord, error) { 79 + // Try cache first 80 + if a.db != nil { 81 + cached, err := a.getCachedCaptainRecord(holdDID) 82 + if err == nil && cached != nil { 83 + // Cache hit - check if still valid 84 + if time.Since(cached.UpdatedAt) < a.cacheTTL { 85 + return cached.CaptainRecord, nil 86 + } 87 + // Cache expired - continue to fetch fresh data 88 + } 89 + } 90 + 91 + // Cache miss or expired - query XRPC endpoint 92 + record, err := a.fetchCaptainRecordFromXRPC(ctx, holdDID) 93 + if err != nil { 94 + return nil, err 95 + } 96 + 97 + // Update cache 98 + if a.db != nil { 99 + if err := a.setCachedCaptainRecord(holdDID, record); err != nil { 100 + // Log error but don't fail - caching is best-effort 101 + fmt.Printf("WARNING: Failed to cache captain record: %v\n", err) 102 + } 103 + } 104 + 105 + return record, nil 106 + } 107 + 108 + // captainRecordWithMeta includes UpdatedAt for cache management 109 + type captainRecordWithMeta struct { 110 + *atproto.CaptainRecord 111 + UpdatedAt time.Time 112 + } 113 + 114 + // getCachedCaptainRecord retrieves a captain record from database cache 115 + func (a *RemoteHoldAuthorizer) getCachedCaptainRecord(holdDID string) (*captainRecordWithMeta, error) { 116 + query := ` 117 + SELECT owner_did, public, allow_all_crew, deployed_at, region, provider, updated_at 118 + FROM hold_captain_records 119 + WHERE hold_did = ? 120 + ` 121 + 122 + var record atproto.CaptainRecord 123 + var deployedAt, region, provider sql.NullString 124 + var updatedAt time.Time 125 + 126 + err := a.db.QueryRow(query, holdDID).Scan( 127 + &record.Owner, 128 + &record.Public, 129 + &record.AllowAllCrew, 130 + &deployedAt, 131 + &region, 132 + &provider, 133 + &updatedAt, 134 + ) 135 + 136 + if err == sql.ErrNoRows { 137 + return nil, nil // Cache miss 138 + } 139 + 140 + if err != nil { 141 + return nil, fmt.Errorf("cache query failed: %w", err) 142 + } 143 + 144 + // Handle nullable fields 145 + if deployedAt.Valid { 146 + record.DeployedAt = deployedAt.String 147 + } 148 + if region.Valid { 149 + record.Region = region.String 150 + } 151 + if provider.Valid { 152 + record.Provider = provider.String 153 + } 154 + 155 + return &captainRecordWithMeta{ 156 + CaptainRecord: &record, 157 + UpdatedAt: updatedAt, 158 + }, nil 159 + } 160 + 161 + // setCachedCaptainRecord stores a captain record in database cache 162 + func (a *RemoteHoldAuthorizer) setCachedCaptainRecord(holdDID string, record *atproto.CaptainRecord) error { 163 + query := ` 164 + INSERT INTO hold_captain_records ( 165 + hold_did, owner_did, public, allow_all_crew, 166 + deployed_at, region, provider, updated_at 167 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 168 + ON CONFLICT(hold_did) DO UPDATE SET 169 + owner_did = excluded.owner_did, 170 + public = excluded.public, 171 + allow_all_crew = excluded.allow_all_crew, 172 + deployed_at = excluded.deployed_at, 173 + region = excluded.region, 174 + provider = excluded.provider, 175 + updated_at = excluded.updated_at 176 + ` 177 + 178 + _, err := a.db.Exec(query, 179 + holdDID, 180 + record.Owner, 181 + record.Public, 182 + record.AllowAllCrew, 183 + nullString(record.DeployedAt), 184 + nullString(record.Region), 185 + nullString(record.Provider), 186 + time.Now(), 187 + ) 188 + 189 + return err 190 + } 191 + 192 + // fetchCaptainRecordFromXRPC queries the hold's XRPC endpoint for captain record 193 + func (a *RemoteHoldAuthorizer) fetchCaptainRecordFromXRPC(ctx context.Context, holdDID string) (*atproto.CaptainRecord, error) { 194 + // Resolve DID to URL 195 + holdURL, err := resolveDIDToURL(holdDID) 196 + if err != nil { 197 + return nil, fmt.Errorf("failed to resolve hold DID: %w", err) 198 + } 199 + 200 + // Build XRPC request URL 201 + // GET /xrpc/com.atproto.repo.getRecord?repo={did}&collection=io.atcr.hold.captain&rkey=self 202 + xrpcURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=%s&rkey=self", 203 + holdURL, url.QueryEscape(holdDID), url.QueryEscape(atproto.CaptainCollection)) 204 + 205 + req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 206 + if err != nil { 207 + return nil, err 208 + } 209 + 210 + resp, err := a.httpClient.Do(req) 211 + if err != nil { 212 + return nil, fmt.Errorf("XRPC request failed: %w", err) 213 + } 214 + defer resp.Body.Close() 215 + 216 + if resp.StatusCode != http.StatusOK { 217 + body, _ := io.ReadAll(resp.Body) 218 + return nil, fmt.Errorf("XRPC request failed: status %d: %s", resp.StatusCode, string(body)) 219 + } 220 + 221 + // Parse response 222 + var xrpcResp struct { 223 + URI string `json:"uri"` 224 + CID string `json:"cid"` 225 + Value struct { 226 + Type string `json:"$type"` 227 + Owner string `json:"owner"` 228 + Public bool `json:"public"` 229 + AllowAllCrew bool `json:"allowAllCrew"` 230 + DeployedAt string `json:"deployedAt"` 231 + Region string `json:"region,omitempty"` 232 + Provider string `json:"provider,omitempty"` 233 + } `json:"value"` 234 + } 235 + 236 + if err := json.NewDecoder(resp.Body).Decode(&xrpcResp); err != nil { 237 + return nil, fmt.Errorf("failed to decode XRPC response: %w", err) 238 + } 239 + 240 + // Convert to our type 241 + record := &atproto.CaptainRecord{ 242 + Type: atproto.CaptainCollection, 243 + Owner: xrpcResp.Value.Owner, 244 + Public: xrpcResp.Value.Public, 245 + AllowAllCrew: xrpcResp.Value.AllowAllCrew, 246 + DeployedAt: xrpcResp.Value.DeployedAt, 247 + Region: xrpcResp.Value.Region, 248 + Provider: xrpcResp.Value.Provider, 249 + } 250 + 251 + return record, nil 252 + } 253 + 254 + // IsCrewMember checks if userDID is a crew member with caching 255 + // 1. Check approval cache (15min TTL) 256 + // 2. Check denial cache with exponential backoff 257 + // 3. If cache miss, query XRPC endpoint and update cache 258 + func (a *RemoteHoldAuthorizer) IsCrewMember(ctx context.Context, holdDID, userDID string) (bool, error) { 259 + // Skip caching if no database 260 + if a.db == nil { 261 + return a.isCrewMemberNoCache(ctx, holdDID, userDID) 262 + } 263 + 264 + // Check approval cache first (15min TTL) 265 + if approved, err := a.getCachedApproval(holdDID, userDID); err == nil && approved { 266 + return true, nil 267 + } 268 + 269 + // Check denial cache with backoff 270 + if blocked, err := a.isBlockedByDenialBackoff(holdDID, userDID); err == nil && blocked { 271 + // Still in backoff period - don't query again 272 + return false, nil 273 + } 274 + 275 + // Cache miss or expired - query XRPC endpoint 276 + isCrew, err := a.isCrewMemberNoCache(ctx, holdDID, userDID) 277 + if err != nil { 278 + return false, err 279 + } 280 + 281 + // Update cache based on result 282 + if isCrew { 283 + // Cache approval for 15 minutes 284 + _ = a.cacheApproval(holdDID, userDID, 15*time.Minute) 285 + } else { 286 + // Cache denial with exponential backoff 287 + _ = a.cacheDenial(holdDID, userDID) 288 + } 289 + 290 + return isCrew, nil 291 + } 292 + 293 + // isCrewMemberNoCache queries XRPC without caching (internal helper) 294 + func (a *RemoteHoldAuthorizer) isCrewMemberNoCache(ctx context.Context, holdDID, userDID string) (bool, error) { 295 + // Resolve DID to URL 296 + holdURL, err := resolveDIDToURL(holdDID) 297 + if err != nil { 298 + return false, fmt.Errorf("failed to resolve hold DID: %w", err) 299 + } 300 + 301 + // Build XRPC request URL 302 + // GET /xrpc/com.atproto.repo.listRecords?repo={did}&collection=io.atcr.hold.crew 303 + xrpcURL := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s", 304 + holdURL, url.QueryEscape(holdDID), url.QueryEscape(atproto.CrewCollection)) 305 + 306 + req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 307 + if err != nil { 308 + return false, err 309 + } 310 + 311 + resp, err := a.httpClient.Do(req) 312 + if err != nil { 313 + return false, fmt.Errorf("XRPC request failed: %w", err) 314 + } 315 + defer resp.Body.Close() 316 + 317 + if resp.StatusCode != http.StatusOK { 318 + body, _ := io.ReadAll(resp.Body) 319 + return false, fmt.Errorf("XRPC request failed: status %d: %s", resp.StatusCode, string(body)) 320 + } 321 + 322 + // Parse response 323 + var xrpcResp struct { 324 + Records []struct { 325 + URI string `json:"uri"` 326 + CID string `json:"cid"` 327 + Value struct { 328 + Type string `json:"$type"` 329 + Member string `json:"member"` 330 + Role string `json:"role"` 331 + Permissions []string `json:"permissions"` 332 + AddedAt string `json:"addedAt"` 333 + } `json:"value"` 334 + } `json:"records"` 335 + } 336 + 337 + if err := json.NewDecoder(resp.Body).Decode(&xrpcResp); err != nil { 338 + return false, fmt.Errorf("failed to decode XRPC response: %w", err) 339 + } 340 + 341 + // Check if userDID is in the crew list 342 + for _, record := range xrpcResp.Records { 343 + if record.Value.Member == userDID { 344 + // TODO: Check expiration if set 345 + return true, nil 346 + } 347 + } 348 + 349 + return false, nil 350 + } 351 + 352 + // CheckReadAccess implements read authorization using shared logic 353 + func (a *RemoteHoldAuthorizer) CheckReadAccess(ctx context.Context, holdDID, userDID string) (bool, error) { 354 + captain, err := a.GetCaptainRecord(ctx, holdDID) 355 + if err != nil { 356 + return false, err 357 + } 358 + 359 + return CheckReadAccessWithCaptain(captain, userDID), nil 360 + } 361 + 362 + // CheckWriteAccess implements write authorization using shared logic 363 + func (a *RemoteHoldAuthorizer) CheckWriteAccess(ctx context.Context, holdDID, userDID string) (bool, error) { 364 + captain, err := a.GetCaptainRecord(ctx, holdDID) 365 + if err != nil { 366 + return false, err 367 + } 368 + 369 + isCrew, err := a.IsCrewMember(ctx, holdDID, userDID) 370 + if err != nil { 371 + return false, err 372 + } 373 + 374 + return CheckWriteAccessWithCaptain(captain, userDID, isCrew), nil 375 + } 376 + 377 + // resolveDIDToURL converts a did:web DID to an HTTPS URL 378 + // Example: did:web:hold01.atcr.io → https://hold01.atcr.io 379 + func resolveDIDToURL(did string) (string, error) { 380 + // Handle did:web format 381 + if !strings.HasPrefix(did, "did:web:") { 382 + return "", fmt.Errorf("only did:web is supported, got: %s", did) 383 + } 384 + 385 + // Extract hostname from did:web:hostname 386 + hostname := strings.TrimPrefix(did, "did:web:") 387 + 388 + // Convert to HTTPS URL 389 + return "https://" + hostname, nil 390 + } 391 + 392 + // nullString converts a string to sql.NullString 393 + func nullString(s string) sql.NullString { 394 + if s == "" { 395 + return sql.NullString{Valid: false} 396 + } 397 + return sql.NullString{String: s, Valid: true} 398 + } 399 + 400 + // getCachedApproval checks if user has a cached crew approval 401 + func (a *RemoteHoldAuthorizer) getCachedApproval(holdDID, userDID string) (bool, error) { 402 + query := ` 403 + SELECT expires_at 404 + FROM hold_crew_approvals 405 + WHERE hold_did = ? AND user_did = ? 406 + ` 407 + 408 + var expiresAt time.Time 409 + err := a.db.QueryRow(query, holdDID, userDID).Scan(&expiresAt) 410 + 411 + if err == sql.ErrNoRows { 412 + return false, nil // Cache miss 413 + } 414 + 415 + if err != nil { 416 + return false, err 417 + } 418 + 419 + // Check if approval has expired 420 + if time.Now().After(expiresAt) { 421 + // Expired - clean up 422 + _ = a.deleteCachedApproval(holdDID, userDID) 423 + return false, nil 424 + } 425 + 426 + return true, nil 427 + } 428 + 429 + // cacheApproval stores a crew approval with TTL 430 + func (a *RemoteHoldAuthorizer) cacheApproval(holdDID, userDID string, ttl time.Duration) error { 431 + query := ` 432 + INSERT INTO hold_crew_approvals (hold_did, user_did, approved_at, expires_at) 433 + VALUES (?, ?, ?, ?) 434 + ON CONFLICT(hold_did, user_did) DO UPDATE SET 435 + approved_at = excluded.approved_at, 436 + expires_at = excluded.expires_at 437 + ` 438 + 439 + now := time.Now() 440 + expiresAt := now.Add(ttl) 441 + 442 + _, err := a.db.Exec(query, holdDID, userDID, now, expiresAt) 443 + return err 444 + } 445 + 446 + // deleteCachedApproval removes an expired approval 447 + func (a *RemoteHoldAuthorizer) deleteCachedApproval(holdDID, userDID string) error { 448 + query := `DELETE FROM hold_crew_approvals WHERE hold_did = ? AND user_did = ?` 449 + _, err := a.db.Exec(query, holdDID, userDID) 450 + return err 451 + } 452 + 453 + // isBlockedByDenialBackoff checks if user is in denial backoff period 454 + // Checks in-memory cache first (for 10s first denials), then DB (for longer backoffs) 455 + func (a *RemoteHoldAuthorizer) isBlockedByDenialBackoff(holdDID, userDID string) (bool, error) { 456 + // Check in-memory cache first (first denials with 10s backoff) 457 + key := fmt.Sprintf("%s:%s", holdDID, userDID) 458 + if val, ok := a.recentDenials.Load(key); ok { 459 + entry := val.(denialEntry) 460 + // Check if still within 10s backoff 461 + if time.Since(entry.timestamp) < 10*time.Second { 462 + return true, nil // Still blocked by in-memory first denial 463 + } 464 + } 465 + 466 + // Check database for longer backoffs (second+ denials) 467 + query := ` 468 + SELECT next_retry_at 469 + FROM hold_crew_denials 470 + WHERE hold_did = ? AND user_did = ? 471 + ` 472 + 473 + var nextRetryAt time.Time 474 + err := a.db.QueryRow(query, holdDID, userDID).Scan(&nextRetryAt) 475 + 476 + if err == sql.ErrNoRows { 477 + return false, nil // No denial record 478 + } 479 + 480 + if err != nil { 481 + return false, err 482 + } 483 + 484 + // Check if still in backoff period 485 + if time.Now().Before(nextRetryAt) { 486 + return true, nil // Still blocked 487 + } 488 + 489 + // Backoff period expired - can retry 490 + return false, nil 491 + } 492 + 493 + // cacheDenial stores or updates a denial with exponential backoff 494 + // First denial: in-memory only (10s backoff) 495 + // Second+ denial: database with exponential backoff (1m, 5m, 15m, 1h) 496 + func (a *RemoteHoldAuthorizer) cacheDenial(holdDID, userDID string) error { 497 + key := fmt.Sprintf("%s:%s", holdDID, userDID) 498 + 499 + // Check if this is a first denial (not in memory, not in DB) 500 + _, inMemory := a.recentDenials.Load(key) 501 + 502 + var denialCount int 503 + query := `SELECT denial_count FROM hold_crew_denials WHERE hold_did = ? AND user_did = ?` 504 + err := a.db.QueryRow(query, holdDID, userDID).Scan(&denialCount) 505 + 506 + inDB := err != sql.ErrNoRows 507 + if err != nil && err != sql.ErrNoRows { 508 + return err 509 + } 510 + 511 + // If not in memory and not in DB, this is the first denial 512 + if !inMemory && !inDB { 513 + // First denial: store only in memory with 10s backoff 514 + a.recentDenials.Store(key, denialEntry{timestamp: time.Now()}) 515 + return nil 516 + } 517 + 518 + // Second+ denial: persist to database with exponential backoff 519 + denialCount++ 520 + backoff := getBackoffDuration(denialCount) 521 + now := time.Now() 522 + nextRetry := now.Add(backoff) 523 + 524 + // Upsert denial record 525 + upsertQuery := ` 526 + INSERT INTO hold_crew_denials (hold_did, user_did, denial_count, next_retry_at, last_denied_at) 527 + VALUES (?, ?, ?, ?, ?) 528 + ON CONFLICT(hold_did, user_did) DO UPDATE SET 529 + denial_count = excluded.denial_count, 530 + next_retry_at = excluded.next_retry_at, 531 + last_denied_at = excluded.last_denied_at 532 + ` 533 + 534 + _, err = a.db.Exec(upsertQuery, holdDID, userDID, denialCount, nextRetry, now) 535 + 536 + // Remove from in-memory cache since we're now tracking in DB 537 + a.recentDenials.Delete(key) 538 + 539 + return err 540 + } 541 + 542 + // getBackoffDuration returns the backoff duration based on denial count 543 + // Note: First denial (10s) is in-memory only and not tracked by this function 544 + // This function handles second+ denials: 1m, 5m, 15m, 1h 545 + func getBackoffDuration(denialCount int) time.Duration { 546 + backoffs := []time.Duration{ 547 + 1 * time.Minute, // 1st DB denial (2nd overall) - being added soon 548 + 5 * time.Minute, // 2nd DB denial (3rd overall) - probably not happening 549 + 15 * time.Minute, // 3rd DB denial (4th overall) - definitely not soon 550 + 60 * time.Minute, // 4th+ DB denial (5th+ overall) - stop hammering 551 + } 552 + 553 + idx := denialCount - 1 554 + if idx >= len(backoffs) { 555 + idx = len(backoffs) - 1 556 + } 557 + 558 + return backoffs[idx] 559 + }
+12 -10
pkg/auth/oauth/server.go
··· 27 27 28 28 // Server handles OAuth authorization for the AppView 29 29 type Server struct { 30 - app *App 31 - refresher *Refresher 32 - uiSessionStore UISessionStore 33 - db *sql.DB 34 - defaultHoldEndpoint string 30 + app *App 31 + refresher *Refresher 32 + uiSessionStore UISessionStore 33 + db *sql.DB 34 + defaultHoldDID string // Default hold DID (e.g., "did:web:hold01.atcr.io") 35 35 } 36 36 37 37 // NewServer creates a new OAuth server ··· 41 41 } 42 42 } 43 43 44 - // SetDefaultHoldEndpoint sets the default hold endpoint for profile creation 45 - func (s *Server) SetDefaultHoldEndpoint(endpoint string) { 46 - s.defaultHoldEndpoint = endpoint 44 + // SetDefaultHoldDID sets the default hold DID for profile creation 45 + // Expected format: "did:web:hold01.atcr.io" 46 + // To find a hold's DID, visit: https://hold-url/.well-known/did.json 47 + func (s *Server) SetDefaultHoldDID(did string) { 48 + s.defaultHoldDID = did 47 49 } 48 50 49 51 // SetRefresher sets the refresher for invalidating session cache ··· 271 273 client := atproto.NewClientWithIndigoClient(pdsEndpoint, did, session.APIClient()) 272 274 273 275 // Ensure sailor profile exists (creates with default hold if configured, or empty profile if not) 274 - fmt.Printf("DEBUG [oauth/server]: Ensuring profile exists for %s (defaultHold=%s)\n", did, s.defaultHoldEndpoint) 275 - if err := atproto.EnsureProfile(ctx, client, s.defaultHoldEndpoint); err != nil { 276 + fmt.Printf("DEBUG [oauth/server]: Ensuring profile exists for %s (defaultHold=%s)\n", did, s.defaultHoldDID) 277 + if err := atproto.EnsureProfile(ctx, client, s.defaultHoldDID); err != nil { 276 278 fmt.Printf("WARNING [oauth/server]: Failed to ensure profile for %s: %v\n", did, err) 277 279 // Continue anyway - profile creation is not critical for avatar fetch 278 280 } else {
+12 -10
pkg/auth/token/handler.go
··· 18 18 19 19 // Handler handles /auth/token requests 20 20 type Handler struct { 21 - issuer *Issuer 22 - validator *atproto.SessionValidator 23 - deviceStore *db.DeviceStore // For validating device secrets 24 - defaultHoldEndpoint string 21 + issuer *Issuer 22 + validator *atproto.SessionValidator 23 + deviceStore *db.DeviceStore // For validating device secrets 24 + defaultHoldDID string 25 25 } 26 26 27 27 // NewHandler creates a new token handler 28 - func NewHandler(issuer *Issuer, deviceStore *db.DeviceStore, defaultHoldEndpoint string) *Handler { 28 + // defaultHoldDID should be in format "did:web:hold01.atcr.io" 29 + // To find a hold's DID, visit: https://hold-url/.well-known/did.json 30 + func NewHandler(issuer *Issuer, deviceStore *db.DeviceStore, defaultHoldDID string) *Handler { 29 31 return &Handler{ 30 - issuer: issuer, 31 - validator: atproto.NewSessionValidator(), 32 - deviceStore: deviceStore, 33 - defaultHoldEndpoint: defaultHoldEndpoint, 32 + issuer: issuer, 33 + validator: atproto.NewSessionValidator(), 34 + deviceStore: deviceStore, 35 + defaultHoldDID: defaultHoldDID, 34 36 } 35 37 } 36 38 ··· 157 159 atprotoClient := mainAtproto.NewClient(pdsEndpoint, did, accessToken) 158 160 159 161 // Ensure profile exists (will create with default hold if not exists and default is configured) 160 - if err := mainAtproto.EnsureProfile(r.Context(), atprotoClient, h.defaultHoldEndpoint); err != nil { 162 + if err := mainAtproto.EnsureProfile(r.Context(), atprotoClient, h.defaultHoldDID); err != nil { 161 163 // Log error but don't fail auth - profile management is not critical 162 164 fmt.Printf("WARNING: failed to ensure profile for %s: %v\n", did, err) 163 165 }
-181
pkg/hold/authorization.go
··· 1 - package hold 2 - 3 - import ( 4 - "context" 5 - "encoding/json" 6 - "fmt" 7 - "log" 8 - "time" 9 - 10 - "atcr.io/pkg/atproto" 11 - "github.com/bluesky-social/indigo/atproto/identity" 12 - "github.com/bluesky-social/indigo/atproto/syntax" 13 - ) 14 - 15 - // isAuthorizedRead checks if a DID can read from this hold 16 - // Authorization: 17 - // - Public hold: allow anonymous (empty DID) or any authenticated user 18 - // - Private hold: require authentication (any user with sailor.profile) 19 - func (s *HoldService) isAuthorizedRead(did string) bool { 20 - // Check hold public flag 21 - isPublic, err := s.isHoldPublic() 22 - if err != nil { 23 - log.Printf("ERROR: Failed to check hold public flag: %v", err) 24 - // Fail secure - deny access on error 25 - return false 26 - } 27 - 28 - if isPublic { 29 - // Public hold - allow anyone (even anonymous) 30 - return true 31 - } 32 - 33 - // Private hold - require authentication 34 - // Any authenticated user with sailor.profile can read 35 - if did == "" { 36 - // Anonymous user trying to access private hold 37 - return false 38 - } 39 - 40 - // For MVP: assume DID presence means they have sailor.profile 41 - // Future: could query PDS to verify sailor.profile exists 42 - return true 43 - } 44 - 45 - // isAuthorizedWrite checks if a DID can write to this hold 46 - // Authorization: must be hold owner OR crew member 47 - func (s *HoldService) isAuthorizedWrite(did string) bool { 48 - if did == "" { 49 - // Anonymous writes not allowed 50 - return false 51 - } 52 - 53 - // Check if DID is the hold owner 54 - ownerDID := s.config.Registration.OwnerDID 55 - if ownerDID == "" { 56 - log.Printf("ERROR: Hold owner DID not configured") 57 - return false 58 - } 59 - 60 - if did == ownerDID { 61 - // Owner always has write access 62 - return true 63 - } 64 - 65 - // Check if DID is a crew member 66 - isCrew, err := s.isCrewMember(did) 67 - if err != nil { 68 - log.Printf("ERROR: Failed to check crew membership: %v", err) 69 - return false 70 - } 71 - 72 - return isCrew 73 - } 74 - 75 - // isHoldPublic checks if this hold allows public (anonymous) reads 76 - func (s *HoldService) isHoldPublic() (bool, error) { 77 - // Use cached config value for now 78 - // Future: could query PDS for hold record to get live value 79 - return s.config.Server.Public, nil 80 - } 81 - 82 - // isCrewMember checks if a DID is a crew member of this hold 83 - // Supports both explicit DID matching and pattern-based matching (wildcards, handle globs) 84 - func (s *HoldService) isCrewMember(did string) (bool, error) { 85 - ownerDID := s.config.Registration.OwnerDID 86 - if ownerDID == "" { 87 - return false, fmt.Errorf("hold owner DID not configured") 88 - } 89 - 90 - ctx := context.Background() 91 - 92 - // Resolve owner's PDS endpoint using indigo 93 - directory := identity.DefaultDirectory() 94 - ownerDIDParsed, err := syntax.ParseDID(ownerDID) 95 - if err != nil { 96 - return false, fmt.Errorf("invalid owner DID: %w", err) 97 - } 98 - 99 - ident, err := directory.LookupDID(ctx, ownerDIDParsed) 100 - if err != nil { 101 - return false, fmt.Errorf("failed to resolve owner PDS: %w", err) 102 - } 103 - 104 - pdsEndpoint := ident.PDSEndpoint() 105 - if pdsEndpoint == "" { 106 - return false, fmt.Errorf("no PDS endpoint found for owner") 107 - } 108 - 109 - // Build this hold's URI for filtering 110 - publicURL := s.config.Server.PublicURL 111 - if publicURL == "" { 112 - return false, fmt.Errorf("hold public URL not configured") 113 - } 114 - holdName, err := extractHostname(publicURL) 115 - if err != nil { 116 - return false, fmt.Errorf("failed to extract hold name: %w", err) 117 - } 118 - holdURI := fmt.Sprintf("at://%s/%s/%s", ownerDID, atproto.HoldCollection, holdName) 119 - 120 - // Create unauthenticated client to read public records 121 - client := atproto.NewClient(pdsEndpoint, ownerDID, "") 122 - 123 - // List crew records for this hold 124 - // Crew records are public, so we can read them without auth 125 - records, err := client.ListRecords(ctx, atproto.HoldCrewCollection, 100) 126 - if err != nil { 127 - return false, fmt.Errorf("failed to list crew records: %w", err) 128 - } 129 - 130 - // Resolve handle once for pattern matching (lazily, only if needed) 131 - var handle string 132 - var handleResolved bool 133 - 134 - // Check crew records for both explicit DID and pattern matches 135 - for _, record := range records { 136 - var crewRecord atproto.HoldCrewRecord 137 - if err := json.Unmarshal(record.Value, &crewRecord); err != nil { 138 - continue 139 - } 140 - 141 - // Only check crew records for THIS hold (prevents cross-hold access) 142 - if crewRecord.Hold != holdURI { 143 - continue 144 - } 145 - 146 - // Check expiration (if set) 147 - if crewRecord.ExpiresAt != nil && time.Now().After(*crewRecord.ExpiresAt) { 148 - continue // Skip expired membership 149 - } 150 - 151 - // Check explicit DID match 152 - if crewRecord.Member != nil && *crewRecord.Member == did { 153 - // Found explicit crew membership 154 - return true, nil 155 - } 156 - 157 - // Check pattern match (if pattern is set) 158 - if crewRecord.MemberPattern != nil && *crewRecord.MemberPattern != "" { 159 - // Lazy handle resolution - only resolve if we encounter a pattern 160 - if !handleResolved { 161 - handle, err = resolveHandle(did) 162 - if err != nil { 163 - log.Printf("Warning: failed to resolve handle for DID %s: %v", did, err) 164 - // Continue checking explicit DIDs even if handle resolution fails 165 - handleResolved = true // Mark as attempted (don't retry) 166 - handle = "" // Empty handle won't match patterns 167 - } else { 168 - handleResolved = true 169 - } 170 - } 171 - 172 - // If we have a handle, check pattern match 173 - if handle != "" && matchPattern(*crewRecord.MemberPattern, handle) { 174 - // Found pattern-based crew membership 175 - return true, nil 176 - } 177 - } 178 - } 179 - 180 - return false, nil 181 - }
+7 -8
pkg/hold/pds/blobstore_adapter.go pkg/hold/blobstore_adapter.go
··· 1 - package pds 1 + package hold 2 2 3 3 import ( 4 4 "context" 5 5 6 - "atcr.io/pkg/hold" 6 + "atcr.io/pkg/hold/pds" 7 7 ) 8 8 9 - // HoldServiceBlobStore adapts the hold service to implement the BlobStore interface 9 + // HoldServiceBlobStore adapts the hold service to implement the pds.BlobStore interface 10 10 type HoldServiceBlobStore struct { 11 - service *hold.HoldService 11 + service *HoldService 12 12 holdDID string 13 13 } 14 14 15 15 // NewHoldServiceBlobStore creates a blob store adapter for the hold service 16 - func NewHoldServiceBlobStore(service *hold.HoldService, holdDID string) *HoldServiceBlobStore { 16 + func NewHoldServiceBlobStore(service *HoldService, holdDID string) pds.BlobStore { 17 17 return &HoldServiceBlobStore{ 18 18 service: service, 19 19 holdDID: holdDID, ··· 23 23 // GetPresignedDownloadURL returns a presigned URL for downloading a blob 24 24 func (b *HoldServiceBlobStore) GetPresignedDownloadURL(digest string) (string, error) { 25 25 // Use the hold service's existing presigned URL logic 26 - // We need to expose a wrapper method on HoldService 27 26 ctx := context.Background() 28 - url, err := b.service.GetPresignedURL(ctx, hold.OperationGet, digest, b.holdDID) 27 + url, err := b.service.GetPresignedURL(ctx, OperationGet, digest, b.holdDID) 29 28 if err != nil { 30 29 return "", err 31 30 } ··· 36 35 func (b *HoldServiceBlobStore) GetPresignedUploadURL(digest string) (string, error) { 37 36 // Use the hold service's existing presigned URL logic 38 37 ctx := context.Background() 39 - url, err := b.service.GetPresignedURL(ctx, hold.OperationPut, digest, b.holdDID) 38 + url, err := b.service.GetPresignedURL(ctx, OperationPut, digest, b.holdDID) 40 39 if err != nil { 41 40 return "", err 42 41 }
+12 -13
pkg/hold/pds/captain.go
··· 6 6 "fmt" 7 7 "time" 8 8 9 + "atcr.io/pkg/atproto" 9 10 "github.com/bluesky-social/indigo/repo" 10 11 "github.com/ipfs/go-cid" 11 12 ) ··· 17 18 18 19 // CreateCaptainRecord creates the captain record for the hold 19 20 func (p *HoldPDS) CreateCaptainRecord(ctx context.Context, ownerDID string, public bool, allowAllCrew bool) (cid.Cid, error) { 20 - captainRecord := &CaptainRecord{ 21 - Type: CaptainCollection, 21 + captainRecord := &atproto.CaptainRecord{ 22 + Type: atproto.CaptainCollection, 22 23 Owner: ownerDID, 23 24 Public: public, 24 25 AllowAllCrew: allowAllCrew, ··· 26 27 } 27 28 28 29 // Create record in repo with fixed rkey "self" 29 - recordCID, rkey, err := p.repo.CreateRecord(ctx, CaptainCollection, captainRecord) 30 + recordCID, rkey, err := p.repo.CreateRecord(ctx, atproto.CaptainCollection, captainRecord) 30 31 if err != nil { 31 32 return cid.Undef, fmt.Errorf("failed to create captain record: %w", err) 32 33 } ··· 48 49 return cid.Undef, fmt.Errorf("failed to persist commit: %w", err) 49 50 } 50 51 51 - // Create a new session for the next operation 52 - rootStr := root.String() 53 - newSession, err := p.carstore.NewDeltaSession(ctx, p.uid, &rootStr) 52 + // Create a new session for the next operation (use revision string, not CID) 53 + newSession, err := p.carstore.NewDeltaSession(ctx, p.uid, &rev) 54 54 if err != nil { 55 55 return cid.Undef, fmt.Errorf("failed to create new session: %w", err) 56 56 } ··· 71 71 } 72 72 73 73 // GetCaptainRecord retrieves the captain record 74 - func (p *HoldPDS) GetCaptainRecord(ctx context.Context) (cid.Cid, *CaptainRecord, error) { 75 - path := fmt.Sprintf("%s/%s", CaptainCollection, CaptainRkey) 74 + func (p *HoldPDS) GetCaptainRecord(ctx context.Context) (cid.Cid, *atproto.CaptainRecord, error) { 75 + path := fmt.Sprintf("%s/%s", atproto.CaptainCollection, CaptainRkey) 76 76 77 77 // Get the record bytes and decode manually 78 78 recordCID, recBytes, err := p.repo.GetRecordBytes(ctx, path) ··· 81 81 } 82 82 83 83 // Decode the CBOR bytes into our CaptainRecord type 84 - var captainRecord CaptainRecord 84 + var captainRecord atproto.CaptainRecord 85 85 if err := captainRecord.UnmarshalCBOR(bytes.NewReader(*recBytes)); err != nil { 86 86 return cid.Undef, nil, fmt.Errorf("failed to decode captain record: %w", err) 87 87 } ··· 102 102 existing.AllowAllCrew = allowAllCrew 103 103 104 104 // Update record in repo 105 - path := fmt.Sprintf("%s/%s", CaptainCollection, CaptainRkey) 105 + path := fmt.Sprintf("%s/%s", atproto.CaptainCollection, CaptainRkey) 106 106 recordCID, err := p.repo.UpdateRecord(ctx, path, existing) 107 107 if err != nil { 108 108 return cid.Undef, fmt.Errorf("failed to update captain record: %w", err) ··· 125 125 return cid.Undef, fmt.Errorf("failed to persist commit: %w", err) 126 126 } 127 127 128 - // Create a new session for the next operation 129 - rootStr := root.String() 130 - newSession, err := p.carstore.NewDeltaSession(ctx, p.uid, &rootStr) 128 + // Create a new session for the next operation (use revision string, not CID) 129 + newSession, err := p.carstore.NewDeltaSession(ctx, p.uid, &rev) 131 130 if err != nil { 132 131 return cid.Undef, fmt.Errorf("failed to create new session: %w", err) 133 132 }
+1 -1
pkg/hold/pds/cbor_gen.go pkg/atproto/cbor_gen.go
··· 1 1 // Code generated by github.com/whyrusleeping/cbor-gen. DO NOT EDIT. 2 2 3 - package pds 3 + package atproto 4 4 5 5 import ( 6 6 "fmt"
+12 -12
pkg/hold/pds/crew.go
··· 7 7 "strings" 8 8 "time" 9 9 10 + "atcr.io/pkg/atproto" 10 11 "github.com/bluesky-social/indigo/repo" 11 12 "github.com/ipfs/go-cid" 12 13 ) 13 14 14 15 // AddCrewMember adds a new crew member to the hold and commits to carstore 15 16 func (p *HoldPDS) AddCrewMember(ctx context.Context, memberDID, role string, permissions []string) (cid.Cid, error) { 16 - crewRecord := &CrewRecord{ 17 - Type: CrewCollection, 17 + crewRecord := &atproto.CrewRecord{ 18 + Type: atproto.CrewCollection, 18 19 Member: memberDID, 19 20 Role: role, 20 21 Permissions: permissions, ··· 22 23 } 23 24 24 25 // Create record in repo (using memberDID as rkey for easy lookup) 25 - recordCID, _, err := p.repo.CreateRecord(ctx, CrewCollection, crewRecord) 26 + recordCID, _, err := p.repo.CreateRecord(ctx, atproto.CrewCollection, crewRecord) 26 27 if err != nil { 27 28 return cid.Undef, fmt.Errorf("failed to create crew record: %w", err) 28 29 } ··· 44 45 return cid.Undef, fmt.Errorf("failed to persist commit: %w", err) 45 46 } 46 47 47 - // Create a new session for the next operation (old session is now closed) 48 - rootStr := root.String() 49 - newSession, err := p.carstore.NewDeltaSession(ctx, p.uid, &rootStr) 48 + // Create a new session for the next operation (use revision string, not CID) 49 + newSession, err := p.carstore.NewDeltaSession(ctx, p.uid, &rev) 50 50 if err != nil { 51 51 return cid.Undef, fmt.Errorf("failed to create new session: %w", err) 52 52 } ··· 65 65 } 66 66 67 67 // GetCrewMember retrieves a crew member by their record key 68 - func (p *HoldPDS) GetCrewMember(ctx context.Context, rkey string) (cid.Cid, *CrewRecord, error) { 69 - path := fmt.Sprintf("%s/%s", CrewCollection, rkey) 68 + func (p *HoldPDS) GetCrewMember(ctx context.Context, rkey string) (cid.Cid, *atproto.CrewRecord, error) { 69 + path := fmt.Sprintf("%s/%s", atproto.CrewCollection, rkey) 70 70 71 71 // Get the record bytes and decode manually (indigo doesn't know our custom type) 72 72 recordCID, recBytes, err := p.repo.GetRecordBytes(ctx, path) ··· 75 75 } 76 76 77 77 // Decode the CBOR bytes into our CrewRecord type 78 - var crewRecord CrewRecord 78 + var crewRecord atproto.CrewRecord 79 79 if err := crewRecord.UnmarshalCBOR(bytes.NewReader(*recBytes)); err != nil { 80 80 return cid.Undef, nil, fmt.Errorf("failed to decode crew record: %w", err) 81 81 } ··· 87 87 type CrewMemberWithKey struct { 88 88 Rkey string 89 89 Cid cid.Cid 90 - Record *CrewRecord 90 + Record *atproto.CrewRecord 91 91 } 92 92 93 93 // ListCrewMembers returns all crew members with their rkeys 94 94 func (p *HoldPDS) ListCrewMembers(ctx context.Context) ([]*CrewMemberWithKey, error) { 95 95 var crew []*CrewMemberWithKey 96 96 97 - err := p.repo.ForEach(ctx, CrewCollection, func(k string, v cid.Cid) error { 97 + err := p.repo.ForEach(ctx, atproto.CrewCollection, func(k string, v cid.Cid) error { 98 98 // Extract rkey from full path (k is like "io.atcr.hold.crew/3m37dr2ddit22") 99 99 parts := strings.Split(k, "/") 100 100 rkey := parts[len(parts)-1] ··· 127 127 128 128 // RemoveCrewMember removes a crew member 129 129 func (p *HoldPDS) RemoveCrewMember(ctx context.Context, rkey string) error { 130 - path := fmt.Sprintf("%s/%s", CrewCollection, rkey) 130 + path := fmt.Sprintf("%s/%s", atproto.CrewCollection, rkey) 131 131 132 132 err := p.repo.DeleteRecord(ctx, path) 133 133 if err != nil {
+28 -11
pkg/hold/pds/did.go
··· 4 4 "encoding/json" 5 5 "fmt" 6 6 "net/url" 7 - "strings" 8 7 ) 9 8 10 9 // DIDDocument represents a did:web document ··· 35 34 36 35 // GenerateDIDDocument creates a DID document for a did:web identity 37 36 func (p *HoldPDS) GenerateDIDDocument(publicURL string) (*DIDDocument, error) { 38 - // Extract hostname from public URL 39 - hostname := strings.TrimPrefix(publicURL, "http://") 40 - hostname = strings.TrimPrefix(hostname, "https://") 41 - hostname = strings.Split(hostname, "/")[0] // Remove any path 42 - hostname = strings.Split(hostname, ":")[0] // Remove port for DID 37 + // Parse URL to extract host and port 38 + u, err := url.Parse(publicURL) 39 + if err != nil { 40 + return nil, fmt.Errorf("failed to parse public URL: %w", err) 41 + } 42 + 43 + hostname := u.Hostname() 44 + port := u.Port() 45 + 46 + // Build host string (include non-standard ports per did:web spec) 47 + host := hostname 48 + if port != "" && port != "80" && port != "443" { 49 + host = fmt.Sprintf("%s:%s", hostname, port) 50 + } 43 51 44 - did := fmt.Sprintf("did:web:%s", hostname) 52 + did := fmt.Sprintf("did:web:%s", host) 45 53 46 54 // Get public key in multibase format using indigo's crypto 47 55 pubKey, err := p.signingKey.PublicKey() ··· 58 66 }, 59 67 ID: did, 60 68 AlsoKnownAs: []string{ 61 - fmt.Sprintf("at://%s", hostname), 69 + fmt.Sprintf("at://%s", host), 62 70 }, 63 71 VerificationMethod: []VerificationMethod{ 64 72 { ··· 99 107 } 100 108 101 109 // GenerateDIDFromURL creates a did:web identifier from a public URL 102 - // Example: "http://hold1.example.com:8080" -> "did:web:hold1.example.com" 110 + // Example: "http://hold1.example.com:8080" -> "did:web:hold1.example.com:8080" 111 + // Note: Per did:web spec, non-standard ports (not 80/443) are included in the DID 103 112 func GenerateDIDFromURL(publicURL string) string { 104 113 // Parse URL 105 114 u, err := url.Parse(publicURL) 106 115 if err != nil { 107 116 // Fallback: assume it's just a hostname 108 - return fmt.Sprintf("did:web:%s", strings.Split(publicURL, ":")[0]) 117 + return fmt.Sprintf("did:web:%s", publicURL) 109 118 } 110 119 111 - // Use hostname without port for DID 120 + // Get hostname 112 121 hostname := u.Hostname() 113 122 if hostname == "" { 114 123 hostname = "localhost" 124 + } 125 + 126 + // Get port 127 + port := u.Port() 128 + 129 + // Include port in DID if it's non-standard (not 80 for http, not 443 for https) 130 + if port != "" && port != "80" && port != "443" { 131 + return fmt.Sprintf("did:web:%s:%s", hostname, port) 115 132 } 116 133 117 134 return fmt.Sprintf("did:web:%s", hostname)
+88 -16
pkg/hold/pds/server.go
··· 5 5 "fmt" 6 6 "os" 7 7 "path/filepath" 8 + "time" 8 9 10 + "atcr.io/pkg/atproto" 9 11 "github.com/bluesky-social/indigo/atproto/atcrypto" 10 12 "github.com/bluesky-social/indigo/carstore" 11 13 "github.com/bluesky-social/indigo/models" ··· 59 61 var session *carstore.DeltaSession 60 62 var r *repo.Repo 61 63 62 - // Create a session connected to this user's data in carstore 63 - session, err = cs.NewDeltaSession(ctx, uid, nil) 64 - if err != nil { 65 - return nil, fmt.Errorf("failed to create delta session: %w", err) 66 - } 67 - 68 64 if !hasValidRepo { 69 - // No valid repo - create new empty repo 65 + // No valid repo - create new session with nil (new repo) 66 + session, err = cs.NewDeltaSession(ctx, uid, nil) 67 + if err != nil { 68 + return nil, fmt.Errorf("failed to create delta session: %w", err) 69 + } 70 + // Create new empty repo 70 71 r = repo.NewRepo(ctx, did, session) 71 72 } else { 72 - // Repo exists with valid head - load from existing head 73 + // Repo exists with valid head - create session pointing to current head 74 + headStr := head.String() 75 + session, err = cs.NewDeltaSession(ctx, uid, &headStr) 76 + if err != nil { 77 + return nil, fmt.Errorf("failed to create delta session: %w", err) 78 + } 79 + // Load from existing head 73 80 r, err = repo.OpenRepo(ctx, session, head) 74 81 if err != nil { 75 82 return nil, fmt.Errorf("failed to open existing repo: %w", err) ··· 104 111 return nil 105 112 } 106 113 107 - // Check if repo already has commits 108 - head, err := p.carstore.GetUserRepoHead(ctx, p.uid) 109 - if err != nil || !head.Defined() { 110 - // No repo exists yet, bootstrap 111 - fmt.Printf("🚀 Bootstrapping hold PDS with owner: %s\n", ownerDID) 112 - } else { 113 - // Repo exists and is valid 114 - fmt.Printf("⏭️ Skipping PDS bootstrap: repo already initialized (head: %s)\n", head.String()[:16]) 114 + // Check if captain record already exists (idempotent bootstrap) 115 + _, _, err := p.GetCaptainRecord(ctx) 116 + if err == nil { 117 + // Captain record exists, we're good 118 + fmt.Printf("✅ Captain record exists, skipping bootstrap\n") 115 119 return nil 116 120 } 117 121 122 + // No captain record - check if this is a new repo or existing repo 123 + head, err := p.carstore.GetUserRepoHead(ctx, p.uid) 124 + isNewRepo := (err != nil || !head.Defined()) 125 + 126 + if isNewRepo { 127 + fmt.Printf("🚀 Bootstrapping new hold PDS with owner: %s\n", ownerDID) 128 + // For new repo, create records inline to avoid session issues 129 + return p.bootstrapNewRepo(ctx, ownerDID, public, allowAllCrew) 130 + } 131 + 132 + // Existing repo - use normal record creation flow 133 + fmt.Printf("ℹ️ Repo already initialized (head: %s), creating captain record...\n", head.String()[:16]) 134 + 118 135 // Create captain record (hold ownership and settings) 119 136 _, err = p.CreateCaptainRecord(ctx, ownerDID, public, allowAllCrew) 120 137 if err != nil { ··· 130 147 } 131 148 132 149 fmt.Printf("✅ Added %s as hold admin\n", ownerDID) 150 + return nil 151 + } 152 + 153 + // bootstrapNewRepo handles bootstrapping a brand new repo (avoids session juggling issues) 154 + func (p *HoldPDS) bootstrapNewRepo(ctx context.Context, ownerDID string, public bool, allowAllCrew bool) error { 155 + // Create captain and crew records in a single commit 156 + captainRecord := &atproto.CaptainRecord{ 157 + Type: atproto.CaptainCollection, 158 + Owner: ownerDID, 159 + Public: public, 160 + AllowAllCrew: allowAllCrew, 161 + DeployedAt: time.Now().Format(time.RFC3339), 162 + } 163 + 164 + crewRecord := &atproto.CrewRecord{ 165 + Type: atproto.CrewCollection, 166 + Member: ownerDID, 167 + Role: "admin", 168 + Permissions: []string{"blob:read", "blob:write", "crew:admin"}, 169 + AddedAt: time.Now().Format(time.RFC3339), 170 + } 171 + 172 + // Create both records in the repo 173 + _, _, err := p.repo.CreateRecord(ctx, atproto.CaptainCollection, captainRecord) 174 + if err != nil { 175 + return fmt.Errorf("failed to create captain record: %w", err) 176 + } 177 + 178 + _, _, err = p.repo.CreateRecord(ctx, atproto.CrewCollection, crewRecord) 179 + if err != nil { 180 + return fmt.Errorf("failed to create crew record: %w", err) 181 + } 182 + 183 + // Commit everything in one go 184 + signer := func(ctx context.Context, did string, data []byte) ([]byte, error) { 185 + return p.signingKey.HashAndSign(data) 186 + } 187 + 188 + root, rev, err := p.repo.Commit(ctx, signer) 189 + if err != nil { 190 + return fmt.Errorf("failed to commit bootstrap records: %w", err) 191 + } 192 + 193 + // Close the session with the new root 194 + _, err = p.session.CloseWithRoot(ctx, root, rev) 195 + if err != nil { 196 + return fmt.Errorf("failed to persist bootstrap commit: %w", err) 197 + } 198 + 199 + fmt.Printf("✅ Created captain record (public=%v, allowAllCrew=%v)\n", public, allowAllCrew) 200 + fmt.Printf("✅ Added %s as hold admin\n", ownerDID) 201 + 202 + // DON'T create a new session here - let subsequent operations handle that 203 + // The PDS is now bootstrapped and will be reloaded properly on next restart 204 + 133 205 return nil 134 206 } 135 207
-32
pkg/hold/pds/types.go
··· 1 - package pds 2 - 3 - //go:generate go run github.com/whyrusleeping/cbor-gen --map-encoding CrewRecord CaptainRecord 4 - 5 - // ATProto record types for the hold service 6 - 7 - // CaptainRecord represents the hold's ownership and metadata 8 - // Collection: io.atcr.hold.captain (single record per hold) 9 - type CaptainRecord struct { 10 - Type string `json:"$type" cborgen:"$type"` 11 - Owner string `json:"owner" cborgen:"owner"` // DID of hold owner 12 - Public bool `json:"public" cborgen:"public"` // Public read access 13 - AllowAllCrew bool `json:"allowAllCrew" cborgen:"allowAllCrew"` // Allow any authenticated user to register as crew 14 - DeployedAt string `json:"deployedAt" cborgen:"deployedAt"` // RFC3339 timestamp 15 - Region string `json:"region,omitempty" cborgen:"region,omitempty"` // S3 region (optional) 16 - Provider string `json:"provider,omitempty" cborgen:"provider,omitempty"` // Deployment provider (optional) 17 - } 18 - 19 - // CrewRecord represents a crew member in the hold 20 - // Collection: io.atcr.hold.crew (one record per member) 21 - type CrewRecord struct { 22 - Type string `json:"$type" cborgen:"$type"` 23 - Member string `json:"member" cborgen:"member"` 24 - Role string `json:"role" cborgen:"role"` 25 - Permissions []string `json:"permissions" cborgen:"permissions"` 26 - AddedAt string `json:"addedAt" cborgen:"addedAt"` // RFC3339 timestamp 27 - } 28 - 29 - const ( 30 - CaptainCollection = "io.atcr.hold.captain" 31 - CrewCollection = "io.atcr.hold.crew" 32 - )
+6 -5
pkg/hold/pds/xrpc.go
··· 7 7 "net/http" 8 8 "strings" 9 9 10 + "atcr.io/pkg/atproto" 10 11 "github.com/bluesky-social/indigo/repo" 11 12 "github.com/bluesky-social/indigo/util" 12 13 "github.com/ipfs/go-cid" ··· 151 152 "did": h.pds.DID(), 152 153 "handle": h.pds.DID(), 153 154 "didDoc": didDoc, 154 - "collections": []string{CrewCollection}, 155 + "collections": []string{atproto.CrewCollection}, 155 156 "handleIsCorrect": true, 156 157 } 157 158 ··· 181 182 } 182 183 183 184 // Only support crew collection for now 184 - if collection != CrewCollection { 185 + if collection != atproto.CrewCollection { 185 186 http.Error(w, "collection not found", http.StatusNotFound) 186 187 return 187 188 } ··· 223 224 } 224 225 225 226 // Only support crew collection for now 226 - if collection != CrewCollection { 227 + if collection != atproto.CrewCollection { 227 228 http.Error(w, "collection not found", http.StatusNotFound) 228 229 return 229 230 } ··· 273 274 } 274 275 275 276 // Only support crew collection for now 276 - if collection != CrewCollection { 277 + if collection != atproto.CrewCollection { 277 278 http.Error(w, "collection not found", http.StatusNotFound) 278 279 return 279 280 } ··· 551 552 if member.Record.Member == user.DID { 552 553 // Already a crew member, return success with existing record 553 554 response := map[string]any{ 554 - "uri": fmt.Sprintf("at://%s/%s/%s", h.pds.DID(), CrewCollection, member.Rkey), 555 + "uri": fmt.Sprintf("at://%s/%s/%s", h.pds.DID(), atproto.CrewCollection, member.Rkey), 555 556 "cid": member.Cid.String(), 556 557 "status": "already_member", 557 558 "message": "User is already a crew member",
+51 -5
pkg/hold/service.go
··· 7 7 "net/http" 8 8 "net/url" 9 9 10 + "atcr.io/pkg/auth" 10 11 "github.com/aws/aws-sdk-go/service/s3" 11 12 storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" 12 13 "github.com/distribution/distribution/v3/registry/storage/driver/factory" 13 14 ) 15 + 16 + // HoldPDSInterface is the minimal interface needed from the embedded PDS 17 + // This avoids a circular import between pkg/hold and pkg/hold/pds 18 + type HoldPDSInterface interface { 19 + DID() string 20 + } 14 21 15 22 // HoldService provides presigned URLs for blob storage in a hold 16 23 type HoldService struct { 17 24 driver storagedriver.StorageDriver 18 25 config *Config 19 - s3Client *s3.S3 // S3 client for presigned URLs (nil if not S3 storage) 20 - bucket string // S3 bucket name 21 - s3PathPrefix string // S3 path prefix (if any) 22 - MultipartMgr *MultipartManager // Exported for access in route handlers 26 + s3Client *s3.S3 // S3 client for presigned URLs (nil if not S3 storage) 27 + bucket string // S3 bucket name 28 + s3PathPrefix string // S3 path prefix (if any) 29 + MultipartMgr *MultipartManager // Exported for access in route handlers 30 + pds HoldPDSInterface // Embedded PDS for captain/crew records 31 + authorizer auth.HoldAuthorizer // Authorizer for access control 23 32 } 24 33 25 34 // NewHoldService creates a new hold service 26 - func NewHoldService(cfg *Config) (*HoldService, error) { 35 + // holdPDS must be a *pds.HoldPDS but we use any to avoid import cycle 36 + func NewHoldService(cfg *Config, holdPDS any) (*HoldService, error) { 27 37 // Create storage driver from config 28 38 ctx := context.Background() 29 39 driver, err := factory.Create(ctx, cfg.Storage.Type(), cfg.Storage.Parameters()) ··· 31 41 return nil, fmt.Errorf("failed to create storage driver: %w", err) 32 42 } 33 43 44 + // Create local authorizer using the embedded PDS 45 + // This requires casting holdPDS to the concrete type expected by auth 46 + authorizer := auth.NewLocalHoldAuthorizerFromInterface(holdPDS) 47 + 48 + // Cast to our interface for storage 49 + pdsInterface, ok := holdPDS.(HoldPDSInterface) 50 + if !ok { 51 + return nil, fmt.Errorf("holdPDS must implement HoldPDSInterface") 52 + } 53 + 34 54 service := &HoldService{ 35 55 driver: driver, 36 56 config: cfg, 37 57 MultipartMgr: NewMultipartManager(), 58 + pds: pdsInterface, 59 + authorizer: authorizer, 38 60 } 39 61 40 62 // Initialize S3 client for presigned URLs (if using S3 storage) ··· 48 70 // GetPresignedURL is a public wrapper around getPresignedURL for use by PDS blob store 49 71 func (s *HoldService) GetPresignedURL(ctx context.Context, operation PresignedURLOperation, digest string, did string) (string, error) { 50 72 return s.getPresignedURL(ctx, operation, digest, did) 73 + } 74 + 75 + // isAuthorizedRead checks if the given DID has read access to this hold 76 + // This is a helper wrapper around the authorizer for internal use 77 + func (s *HoldService) isAuthorizedRead(did string) bool { 78 + ctx := context.Background() 79 + allowed, err := s.authorizer.CheckReadAccess(ctx, s.pds.DID(), did) 80 + if err != nil { 81 + log.Printf("Authorization check failed: %v", err) 82 + return false 83 + } 84 + return allowed 85 + } 86 + 87 + // isAuthorizedWrite checks if the given DID has write access to this hold 88 + // This is a helper wrapper around the authorizer for internal use 89 + func (s *HoldService) isAuthorizedWrite(did string) bool { 90 + ctx := context.Background() 91 + allowed, err := s.authorizer.CheckWriteAccess(ctx, s.pds.DID(), did) 92 + if err != nil { 93 + log.Printf("Authorization check failed: %v", err) 94 + return false 95 + } 96 + return allowed 51 97 } 52 98 53 99 // HealthHandler handles health check requests