A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at codeberg-source 644 lines 20 kB view raw
1package auth 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log/slog" 10 "net/http" 11 "net/url" 12 "sync" 13 "time" 14 15 "atcr.io/pkg/atproto" 16) 17 18// RemoteHoldAuthorizer queries a hold's PDS via XRPC endpoints 19// Used by AppView to authorize access to remote holds 20// Implements caching for captain records to reduce XRPC calls 21type RemoteHoldAuthorizer struct { 22 db *sql.DB 23 httpClient *http.Client 24 cacheTTL time.Duration // TTL for captain record cache 25 recentDenials sync.Map // In-memory cache for first denials 26 stopCleanup chan struct{} // Signal to stop cleanup goroutine 27 testMode bool // If true, use HTTP for local DIDs 28 firstDenialBackoff time.Duration // Backoff duration for first denial (default: 10s) 29 cleanupInterval time.Duration // Cleanup goroutine interval (default: 10s) 30 cleanupGracePeriod time.Duration // Grace period before cleanup (default: 5s) 31 dbBackoffDurations []time.Duration // Backoff durations for DB denials (default: [1m, 5m, 15m, 1h]) 32} 33 34// denialEntry stores timestamp for in-memory first denials 35type denialEntry struct { 36 timestamp time.Time 37} 38 39// NewRemoteHoldAuthorizer creates a new remote authorizer for AppView with production defaults 40func NewRemoteHoldAuthorizer(db *sql.DB, testMode bool) HoldAuthorizer { 41 return NewRemoteHoldAuthorizerWithBackoffs(db, testMode, 42 10*time.Second, // firstDenialBackoff 43 10*time.Second, // cleanupInterval 44 5*time.Second, // cleanupGracePeriod 45 []time.Duration{ // dbBackoffDurations 46 1 * time.Minute, 47 5 * time.Minute, 48 15 * time.Minute, 49 60 * time.Minute, 50 }, 51 ) 52} 53 54// NewRemoteHoldAuthorizerWithBackoffs creates a new remote authorizer with custom backoff durations 55// Used for testing to avoid long sleeps 56func NewRemoteHoldAuthorizerWithBackoffs(db *sql.DB, testMode bool, firstDenialBackoff, cleanupInterval, cleanupGracePeriod time.Duration, dbBackoffDurations []time.Duration) HoldAuthorizer { 57 a := &RemoteHoldAuthorizer{ 58 db: db, 59 httpClient: &http.Client{ 60 Timeout: 10 * time.Second, 61 }, 62 cacheTTL: 1 * time.Hour, // 1 hour cache TTL 63 stopCleanup: make(chan struct{}), 64 testMode: testMode, 65 firstDenialBackoff: firstDenialBackoff, 66 cleanupInterval: cleanupInterval, 67 cleanupGracePeriod: cleanupGracePeriod, 68 dbBackoffDurations: dbBackoffDurations, 69 } 70 71 // Start cleanup goroutine for in-memory denials 72 go a.cleanupRecentDenials() 73 74 return a 75} 76 77// cleanupRecentDenials runs periodically to remove expired first-denial entries 78func (a *RemoteHoldAuthorizer) cleanupRecentDenials() { 79 ticker := time.NewTicker(a.cleanupInterval) 80 defer ticker.Stop() 81 82 for { 83 select { 84 case <-ticker.C: 85 now := time.Now() 86 a.recentDenials.Range(func(key, value any) bool { 87 entry := value.(denialEntry) 88 // Remove entries older than backoff + grace period 89 if now.Sub(entry.timestamp) > a.firstDenialBackoff+a.cleanupGracePeriod { 90 a.recentDenials.Delete(key) 91 } 92 return true 93 }) 94 case <-a.stopCleanup: 95 return 96 } 97 } 98} 99 100// GetCaptainRecord retrieves a captain record with caching 101// 1. Check database cache 102// 2. If cache miss or expired, query hold's XRPC endpoint 103// 3. Update cache 104func (a *RemoteHoldAuthorizer) GetCaptainRecord(ctx context.Context, holdDID string) (*atproto.CaptainRecord, error) { 105 // Try cache first 106 if a.db != nil { 107 cached, err := a.getCachedCaptainRecord(holdDID) 108 if err == nil && cached != nil { 109 // Cache hit - check if still valid 110 if time.Since(cached.UpdatedAt) < a.cacheTTL { 111 return cached.CaptainRecord, nil 112 } 113 // Cache expired - continue to fetch fresh data 114 } 115 } 116 117 // Cache miss or expired - query XRPC endpoint 118 record, err := a.fetchCaptainRecordFromXRPC(ctx, holdDID) 119 if err != nil { 120 slog.Error("Captain record fetch failed", 121 "holdDID", holdDID, 122 "denial_reason", "captain_record_fetch_failed", 123 "error", err) 124 return nil, fmt.Errorf("failed to get captain record for %s: %w", holdDID, err) 125 } 126 127 // Update cache 128 if a.db != nil { 129 if err := a.setCachedCaptainRecord(holdDID, record); err != nil { 130 // Log error but don't fail - caching is best-effort 131 slog.Warn("Failed to cache captain record", "error", err, "holdDID", holdDID) 132 } 133 } 134 135 return record, nil 136} 137 138// captainRecordWithMeta includes UpdatedAt for cache management 139type captainRecordWithMeta struct { 140 *atproto.CaptainRecord 141 UpdatedAt time.Time 142} 143 144// getCachedCaptainRecord retrieves a captain record from database cache 145func (a *RemoteHoldAuthorizer) getCachedCaptainRecord(holdDID string) (*captainRecordWithMeta, error) { 146 query := ` 147 SELECT owner_did, public, allow_all_crew, deployed_at, region, provider, updated_at 148 FROM hold_captain_records 149 WHERE hold_did = ? 150 ` 151 152 var record atproto.CaptainRecord 153 var deployedAt, region, provider sql.NullString 154 var updatedAt time.Time 155 156 err := a.db.QueryRow(query, holdDID).Scan( 157 &record.Owner, 158 &record.Public, 159 &record.AllowAllCrew, 160 &deployedAt, 161 &region, 162 &provider, 163 &updatedAt, 164 ) 165 166 if err == sql.ErrNoRows { 167 return nil, nil // Cache miss 168 } 169 170 if err != nil { 171 return nil, fmt.Errorf("cache query failed: %w", err) 172 } 173 174 // Handle nullable fields 175 if deployedAt.Valid { 176 record.DeployedAt = deployedAt.String 177 } 178 if region.Valid { 179 record.Region = region.String 180 } 181 if provider.Valid { 182 record.Provider = provider.String 183 } 184 185 return &captainRecordWithMeta{ 186 CaptainRecord: &record, 187 UpdatedAt: updatedAt, 188 }, nil 189} 190 191// setCachedCaptainRecord stores a captain record in database cache 192func (a *RemoteHoldAuthorizer) setCachedCaptainRecord(holdDID string, record *atproto.CaptainRecord) error { 193 query := ` 194 INSERT INTO hold_captain_records ( 195 hold_did, owner_did, public, allow_all_crew, 196 deployed_at, region, provider, updated_at 197 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 198 ON CONFLICT(hold_did) DO UPDATE SET 199 owner_did = excluded.owner_did, 200 public = excluded.public, 201 allow_all_crew = excluded.allow_all_crew, 202 deployed_at = excluded.deployed_at, 203 region = excluded.region, 204 provider = excluded.provider, 205 updated_at = excluded.updated_at 206 ` 207 208 _, err := a.db.Exec(query, 209 holdDID, 210 record.Owner, 211 record.Public, 212 record.AllowAllCrew, 213 nullString(record.DeployedAt), 214 nullString(record.Region), 215 nullString(record.Provider), 216 time.Now(), 217 ) 218 219 return err 220} 221 222// fetchCaptainRecordFromXRPC queries the hold's XRPC endpoint for captain record 223func (a *RemoteHoldAuthorizer) fetchCaptainRecordFromXRPC(ctx context.Context, holdDID string) (*atproto.CaptainRecord, error) { 224 // Resolve DID to URL 225 holdURL := atproto.ResolveHoldURL(holdDID) 226 227 // Build XRPC request URL 228 // GET /xrpc/com.atproto.repo.getRecord?repo={did}&collection=io.atcr.hold.captain&rkey=self 229 xrpcURL := fmt.Sprintf("%s%s?repo=%s&collection=%s&rkey=self", 230 holdURL, atproto.RepoGetRecord, url.QueryEscape(holdDID), url.QueryEscape(atproto.CaptainCollection)) 231 232 req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 233 if err != nil { 234 return nil, err 235 } 236 237 resp, err := a.httpClient.Do(req) 238 if err != nil { 239 return nil, fmt.Errorf("XRPC request failed: %w", err) 240 } 241 defer resp.Body.Close() 242 243 if resp.StatusCode != http.StatusOK { 244 body, _ := io.ReadAll(resp.Body) 245 return nil, fmt.Errorf("XRPC request failed: status %d: %s", resp.StatusCode, string(body)) 246 } 247 248 // Parse response 249 var xrpcResp struct { 250 URI string `json:"uri"` 251 CID string `json:"cid"` 252 Value struct { 253 Type string `json:"$type"` 254 Owner string `json:"owner"` 255 Public bool `json:"public"` 256 AllowAllCrew bool `json:"allowAllCrew"` 257 DeployedAt string `json:"deployedAt"` 258 Region string `json:"region,omitempty"` 259 Provider string `json:"provider,omitempty"` 260 } `json:"value"` 261 } 262 263 if err := json.NewDecoder(resp.Body).Decode(&xrpcResp); err != nil { 264 return nil, fmt.Errorf("failed to decode XRPC response: %w", err) 265 } 266 267 // Convert to our type 268 record := &atproto.CaptainRecord{ 269 Type: atproto.CaptainCollection, 270 Owner: xrpcResp.Value.Owner, 271 Public: xrpcResp.Value.Public, 272 AllowAllCrew: xrpcResp.Value.AllowAllCrew, 273 DeployedAt: xrpcResp.Value.DeployedAt, 274 Region: xrpcResp.Value.Region, 275 Provider: xrpcResp.Value.Provider, 276 } 277 278 return record, nil 279} 280 281// IsCrewMember checks if userDID is a crew member with caching 282// 1. Check approval cache (15min TTL) 283// 2. Check denial cache with exponential backoff 284// 3. If cache miss, query XRPC endpoint and update cache 285func (a *RemoteHoldAuthorizer) IsCrewMember(ctx context.Context, holdDID, userDID string) (bool, error) { 286 // Skip caching if no database 287 if a.db == nil { 288 return a.isCrewMemberNoCache(ctx, holdDID, userDID) 289 } 290 291 // Check approval cache first (15min TTL) 292 if approved, err := a.getCachedApproval(holdDID, userDID); err == nil && approved { 293 slog.Debug("Using cached crew approval", "holdDID", holdDID, "userDID", userDID) 294 return true, nil 295 } 296 297 // Check denial cache with backoff 298 if blocked, err := a.isBlockedByDenialBackoff(holdDID, userDID); err == nil && blocked { 299 // Still in backoff period - don't query again 300 // Detailed logging already emitted by isBlockedByDenialBackoff 301 return false, nil 302 } 303 304 // Cache miss or expired - query XRPC endpoint 305 slog.Debug("Crew membership cache miss, querying hold", "holdDID", holdDID, "userDID", userDID) 306 isCrew, err := a.isCrewMemberNoCache(ctx, holdDID, userDID) 307 if err != nil { 308 slog.Warn("Crew membership query error", "error", err, "holdDID", holdDID, "userDID", userDID) 309 return false, err 310 } 311 312 // Update cache based on result 313 if isCrew { 314 // Cache approval for 15 minutes 315 slog.Debug("Crew membership approved, caching for 15min", "holdDID", holdDID, "userDID", userDID) 316 _ = a.cacheApproval(holdDID, userDID, 15*time.Minute) 317 } else { 318 // Cache denial with exponential backoff 319 slog.Debug("Crew membership denied, caching with backoff", "holdDID", holdDID, "userDID", userDID) 320 _ = a.cacheDenial(holdDID, userDID) 321 } 322 323 return isCrew, nil 324} 325 326// isCrewMemberNoCache queries XRPC without caching (internal helper) 327func (a *RemoteHoldAuthorizer) isCrewMemberNoCache(ctx context.Context, holdDID, userDID string) (bool, error) { 328 // Resolve DID to URL 329 holdURL := atproto.ResolveHoldURL(holdDID) 330 331 // Build XRPC request URL 332 // GET /xrpc/com.atproto.repo.listRecords?repo={did}&collection=io.atcr.hold.crew 333 xrpcURL := fmt.Sprintf("%s%s?repo=%s&collection=%s", 334 holdURL, atproto.RepoListRecords, url.QueryEscape(holdDID), url.QueryEscape(atproto.CrewCollection)) 335 336 req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil) 337 if err != nil { 338 return false, err 339 } 340 341 resp, err := a.httpClient.Do(req) 342 if err != nil { 343 return false, fmt.Errorf("XRPC request failed: %w", err) 344 } 345 defer resp.Body.Close() 346 347 if resp.StatusCode != http.StatusOK { 348 body, _ := io.ReadAll(resp.Body) 349 return false, fmt.Errorf("XRPC request failed: status %d: %s", resp.StatusCode, string(body)) 350 } 351 352 // Parse response 353 var xrpcResp struct { 354 Records []struct { 355 URI string `json:"uri"` 356 CID string `json:"cid"` 357 Value struct { 358 Type string `json:"$type"` 359 Member string `json:"member"` 360 Role string `json:"role"` 361 Permissions []string `json:"permissions"` 362 AddedAt string `json:"addedAt"` 363 } `json:"value"` 364 } `json:"records"` 365 } 366 367 if err := json.NewDecoder(resp.Body).Decode(&xrpcResp); err != nil { 368 return false, fmt.Errorf("failed to decode XRPC response: %w", err) 369 } 370 371 // Check if userDID is in the crew list 372 for _, record := range xrpcResp.Records { 373 if record.Value.Member == userDID { 374 // TODO: Check expiration if set 375 return true, nil 376 } 377 } 378 379 return false, nil 380} 381 382// CheckReadAccess implements read authorization using shared logic 383func (a *RemoteHoldAuthorizer) CheckReadAccess(ctx context.Context, holdDID, userDID string) (bool, error) { 384 captain, err := a.GetCaptainRecord(ctx, holdDID) 385 if err != nil { 386 return false, err 387 } 388 389 return CheckReadAccessWithCaptain(captain, userDID), nil 390} 391 392// CheckWriteAccess implements write authorization using shared logic 393func (a *RemoteHoldAuthorizer) CheckWriteAccess(ctx context.Context, holdDID, userDID string) (bool, error) { 394 captain, err := a.GetCaptainRecord(ctx, holdDID) 395 if err != nil { 396 return false, err 397 } 398 399 isCrew, err := a.IsCrewMember(ctx, holdDID, userDID) 400 if err != nil { 401 return false, err 402 } 403 404 return CheckWriteAccessWithCaptain(captain, userDID, isCrew), nil 405} 406 407// nullString converts a string to sql.NullString 408func nullString(s string) sql.NullString { 409 if s == "" { 410 return sql.NullString{Valid: false} 411 } 412 return sql.NullString{String: s, Valid: true} 413} 414 415// getCachedApproval checks if user has a cached crew approval 416func (a *RemoteHoldAuthorizer) getCachedApproval(holdDID, userDID string) (bool, error) { 417 query := ` 418 SELECT expires_at 419 FROM hold_crew_approvals 420 WHERE hold_did = ? AND user_did = ? 421 ` 422 423 var expiresAt time.Time 424 err := a.db.QueryRow(query, holdDID, userDID).Scan(&expiresAt) 425 426 if err == sql.ErrNoRows { 427 return false, nil // Cache miss 428 } 429 430 if err != nil { 431 return false, err 432 } 433 434 // Check if approval has expired 435 if time.Now().After(expiresAt) { 436 // Expired - clean up 437 _ = a.deleteCachedApproval(holdDID, userDID) 438 return false, nil 439 } 440 441 return true, nil 442} 443 444// cacheApproval stores a crew approval with TTL 445func (a *RemoteHoldAuthorizer) cacheApproval(holdDID, userDID string, ttl time.Duration) error { 446 query := ` 447 INSERT INTO hold_crew_approvals (hold_did, user_did, approved_at, expires_at) 448 VALUES (?, ?, ?, ?) 449 ON CONFLICT(hold_did, user_did) DO UPDATE SET 450 approved_at = excluded.approved_at, 451 expires_at = excluded.expires_at 452 ` 453 454 now := time.Now() 455 expiresAt := now.Add(ttl) 456 457 _, err := a.db.Exec(query, holdDID, userDID, now, expiresAt) 458 return err 459} 460 461// deleteCachedApproval removes an expired approval 462func (a *RemoteHoldAuthorizer) deleteCachedApproval(holdDID, userDID string) error { 463 query := `DELETE FROM hold_crew_approvals WHERE hold_did = ? AND user_did = ?` 464 _, err := a.db.Exec(query, holdDID, userDID) 465 return err 466} 467 468// isBlockedByDenialBackoff checks if user is in denial backoff period 469// Checks in-memory cache first (for 10s first denials), then DB (for longer backoffs) 470func (a *RemoteHoldAuthorizer) isBlockedByDenialBackoff(holdDID, userDID string) (bool, error) { 471 // Check in-memory cache first (first denials with configurable backoff) 472 key := fmt.Sprintf("%s:%s", holdDID, userDID) 473 if val, ok := a.recentDenials.Load(key); ok { 474 entry := val.(denialEntry) 475 // Check if still within first denial backoff period 476 if time.Since(entry.timestamp) < a.firstDenialBackoff { 477 slog.Debug("Write access blocked by in-memory denial cache", 478 "holdDID", holdDID, 479 "userDID", userDID, 480 "denial_reason", "first_denial_backoff", 481 "backoff_type", "in_memory", 482 "backoff_duration", a.firstDenialBackoff, 483 "denied_at", entry.timestamp, 484 "retry_after", entry.timestamp.Add(a.firstDenialBackoff)) 485 return true, nil // Still blocked by in-memory first denial 486 } 487 } 488 489 // Check database for longer backoffs (second+ denials) 490 query := ` 491 SELECT next_retry_at 492 FROM hold_crew_denials 493 WHERE hold_did = ? AND user_did = ? 494 ` 495 496 var nextRetryAt time.Time 497 err := a.db.QueryRow(query, holdDID, userDID).Scan(&nextRetryAt) 498 499 if err == sql.ErrNoRows { 500 return false, nil // No denial record 501 } 502 503 if err != nil { 504 return false, err 505 } 506 507 // Check if still in backoff period 508 if time.Now().Before(nextRetryAt) { 509 slog.Debug("Write access blocked by database denial cache", 510 "holdDID", holdDID, 511 "userDID", userDID, 512 "denial_reason", "exponential_backoff", 513 "backoff_type", "database", 514 "next_retry_at", nextRetryAt, 515 "retry_in", time.Until(nextRetryAt).Round(time.Second)) 516 return true, nil // Still blocked 517 } 518 519 // Backoff period expired - can retry 520 return false, nil 521} 522 523// cacheDenial stores or updates a denial with exponential backoff 524// First denial: in-memory only (configurable backoff, default 10s) 525// Second+ denial: database with exponential backoff (configurable, default 1m/5m/15m/1h) 526func (a *RemoteHoldAuthorizer) cacheDenial(holdDID, userDID string) error { 527 key := fmt.Sprintf("%s:%s", holdDID, userDID) 528 529 // Check if this is a first denial (not in memory, not in DB) 530 _, inMemory := a.recentDenials.Load(key) 531 532 var denialCount int 533 query := `SELECT denial_count FROM hold_crew_denials WHERE hold_did = ? AND user_did = ?` 534 err := a.db.QueryRow(query, holdDID, userDID).Scan(&denialCount) 535 536 inDB := err != sql.ErrNoRows 537 if err != nil && err != sql.ErrNoRows { 538 return err 539 } 540 541 // If not in memory and not in DB, this is the first denial 542 if !inMemory && !inDB { 543 // First denial: store only in memory with configurable backoff 544 now := time.Now() 545 a.recentDenials.Store(key, denialEntry{timestamp: now}) 546 slog.Info("Cached first crew denial (in-memory)", 547 "holdDID", holdDID, 548 "userDID", userDID, 549 "denial_count", 1, 550 "backoff_type", "in_memory", 551 "backoff_duration", a.firstDenialBackoff, 552 "retry_after", now.Add(a.firstDenialBackoff)) 553 return nil 554 } 555 556 // Second+ denial: persist to database with exponential backoff 557 denialCount++ 558 backoff := a.getBackoffDuration(denialCount) 559 now := time.Now() 560 nextRetry := now.Add(backoff) 561 562 // Upsert denial record 563 upsertQuery := ` 564 INSERT INTO hold_crew_denials (hold_did, user_did, denial_count, next_retry_at, last_denied_at) 565 VALUES (?, ?, ?, ?, ?) 566 ON CONFLICT(hold_did, user_did) DO UPDATE SET 567 denial_count = excluded.denial_count, 568 next_retry_at = excluded.next_retry_at, 569 last_denied_at = excluded.last_denied_at 570 ` 571 572 _, err = a.db.Exec(upsertQuery, holdDID, userDID, denialCount, nextRetry, now) 573 if err != nil { 574 return err 575 } 576 577 // Remove from in-memory cache since we're now tracking in DB 578 a.recentDenials.Delete(key) 579 580 slog.Info("Cached crew denial with exponential backoff", 581 "holdDID", holdDID, 582 "userDID", userDID, 583 "denial_count", denialCount, 584 "backoff_type", "database", 585 "backoff_duration", backoff, 586 "next_retry_at", nextRetry) 587 588 return nil 589} 590 591// getBackoffDuration returns the backoff duration based on denial count 592// Note: First denial is in-memory only and not tracked by this function 593// This function handles second+ denials using configurable durations 594func (a *RemoteHoldAuthorizer) getBackoffDuration(denialCount int) time.Duration { 595 backoffs := a.dbBackoffDurations 596 597 idx := denialCount - 1 598 if idx >= len(backoffs) { 599 idx = len(backoffs) - 1 600 } 601 602 return backoffs[idx] 603} 604 605// ClearCrewDenial removes crew denial from both in-memory and database caches 606// This allows immediate access after a user becomes a crew member 607func (a *RemoteHoldAuthorizer) ClearCrewDenial(ctx context.Context, holdDID, userDID string) error { 608 // Clear in-memory cache 609 key := fmt.Sprintf("%s:%s", holdDID, userDID) 610 a.recentDenials.Delete(key) 611 612 // Clear database cache 613 if a.db != nil { 614 query := `DELETE FROM hold_crew_denials WHERE hold_did = ? AND user_did = ?` 615 _, err := a.db.ExecContext(ctx, query, holdDID, userDID) 616 if err != nil { 617 return fmt.Errorf("failed to clear denial cache: %w", err) 618 } 619 } 620 621 slog.Debug("Cleared crew denial cache", "holdDID", holdDID, "userDID", userDID) 622 return nil 623} 624 625// ClearAllDenials removes all crew denials from both in-memory and database caches 626// Called on startup to ensure a clean slate 627func (a *RemoteHoldAuthorizer) ClearAllDenials() error { 628 // Clear all in-memory denials 629 a.recentDenials.Range(func(key, value any) bool { 630 a.recentDenials.Delete(key) 631 return true 632 }) 633 634 // Clear all database denials 635 if a.db != nil { 636 _, err := a.db.Exec("DELETE FROM hold_crew_denials") 637 if err != nil { 638 return fmt.Errorf("failed to clear all denial caches: %w", err) 639 } 640 } 641 642 slog.Info("Cleared all crew denial caches on startup") 643 return nil 644}