A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
1package auth
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log/slog"
10 "net/http"
11 "net/url"
12 "sync"
13 "time"
14
15 "atcr.io/pkg/atproto"
16)
17
18// RemoteHoldAuthorizer queries a hold's PDS via XRPC endpoints
19// Used by AppView to authorize access to remote holds
20// Implements caching for captain records to reduce XRPC calls
21type RemoteHoldAuthorizer struct {
22 db *sql.DB
23 httpClient *http.Client
24 cacheTTL time.Duration // TTL for captain record cache
25 recentDenials sync.Map // In-memory cache for first denials
26 stopCleanup chan struct{} // Signal to stop cleanup goroutine
27 testMode bool // If true, use HTTP for local DIDs
28 firstDenialBackoff time.Duration // Backoff duration for first denial (default: 10s)
29 cleanupInterval time.Duration // Cleanup goroutine interval (default: 10s)
30 cleanupGracePeriod time.Duration // Grace period before cleanup (default: 5s)
31 dbBackoffDurations []time.Duration // Backoff durations for DB denials (default: [1m, 5m, 15m, 1h])
32}
33
34// denialEntry stores timestamp for in-memory first denials
35type denialEntry struct {
36 timestamp time.Time
37}
38
39// NewRemoteHoldAuthorizer creates a new remote authorizer for AppView with production defaults
40func NewRemoteHoldAuthorizer(db *sql.DB, testMode bool) HoldAuthorizer {
41 return NewRemoteHoldAuthorizerWithBackoffs(db, testMode,
42 10*time.Second, // firstDenialBackoff
43 10*time.Second, // cleanupInterval
44 5*time.Second, // cleanupGracePeriod
45 []time.Duration{ // dbBackoffDurations
46 1 * time.Minute,
47 5 * time.Minute,
48 15 * time.Minute,
49 60 * time.Minute,
50 },
51 )
52}
53
54// NewRemoteHoldAuthorizerWithBackoffs creates a new remote authorizer with custom backoff durations
55// Used for testing to avoid long sleeps
56func NewRemoteHoldAuthorizerWithBackoffs(db *sql.DB, testMode bool, firstDenialBackoff, cleanupInterval, cleanupGracePeriod time.Duration, dbBackoffDurations []time.Duration) HoldAuthorizer {
57 a := &RemoteHoldAuthorizer{
58 db: db,
59 httpClient: &http.Client{
60 Timeout: 10 * time.Second,
61 },
62 cacheTTL: 1 * time.Hour, // 1 hour cache TTL
63 stopCleanup: make(chan struct{}),
64 testMode: testMode,
65 firstDenialBackoff: firstDenialBackoff,
66 cleanupInterval: cleanupInterval,
67 cleanupGracePeriod: cleanupGracePeriod,
68 dbBackoffDurations: dbBackoffDurations,
69 }
70
71 // Start cleanup goroutine for in-memory denials
72 go a.cleanupRecentDenials()
73
74 return a
75}
76
77// cleanupRecentDenials runs periodically to remove expired first-denial entries
78func (a *RemoteHoldAuthorizer) cleanupRecentDenials() {
79 ticker := time.NewTicker(a.cleanupInterval)
80 defer ticker.Stop()
81
82 for {
83 select {
84 case <-ticker.C:
85 now := time.Now()
86 a.recentDenials.Range(func(key, value any) bool {
87 entry := value.(denialEntry)
88 // Remove entries older than backoff + grace period
89 if now.Sub(entry.timestamp) > a.firstDenialBackoff+a.cleanupGracePeriod {
90 a.recentDenials.Delete(key)
91 }
92 return true
93 })
94 case <-a.stopCleanup:
95 return
96 }
97 }
98}
99
100// GetCaptainRecord retrieves a captain record with caching
101// 1. Check database cache
102// 2. If cache miss or expired, query hold's XRPC endpoint
103// 3. Update cache
104func (a *RemoteHoldAuthorizer) GetCaptainRecord(ctx context.Context, holdDID string) (*atproto.CaptainRecord, error) {
105 // Try cache first
106 if a.db != nil {
107 cached, err := a.getCachedCaptainRecord(holdDID)
108 if err == nil && cached != nil {
109 // Cache hit - check if still valid
110 if time.Since(cached.UpdatedAt) < a.cacheTTL {
111 return cached.CaptainRecord, nil
112 }
113 // Cache expired - continue to fetch fresh data
114 }
115 }
116
117 // Cache miss or expired - query XRPC endpoint
118 record, err := a.fetchCaptainRecordFromXRPC(ctx, holdDID)
119 if err != nil {
120 slog.Error("Captain record fetch failed",
121 "holdDID", holdDID,
122 "denial_reason", "captain_record_fetch_failed",
123 "error", err)
124 return nil, fmt.Errorf("failed to get captain record for %s: %w", holdDID, err)
125 }
126
127 // Update cache
128 if a.db != nil {
129 if err := a.setCachedCaptainRecord(holdDID, record); err != nil {
130 // Log error but don't fail - caching is best-effort
131 slog.Warn("Failed to cache captain record", "error", err, "holdDID", holdDID)
132 }
133 }
134
135 return record, nil
136}
137
138// captainRecordWithMeta includes UpdatedAt for cache management
139type captainRecordWithMeta struct {
140 *atproto.CaptainRecord
141 UpdatedAt time.Time
142}
143
144// getCachedCaptainRecord retrieves a captain record from database cache
145func (a *RemoteHoldAuthorizer) getCachedCaptainRecord(holdDID string) (*captainRecordWithMeta, error) {
146 query := `
147 SELECT owner_did, public, allow_all_crew, deployed_at, region, provider, updated_at
148 FROM hold_captain_records
149 WHERE hold_did = ?
150 `
151
152 var record atproto.CaptainRecord
153 var deployedAt, region, provider sql.NullString
154 var updatedAt time.Time
155
156 err := a.db.QueryRow(query, holdDID).Scan(
157 &record.Owner,
158 &record.Public,
159 &record.AllowAllCrew,
160 &deployedAt,
161 ®ion,
162 &provider,
163 &updatedAt,
164 )
165
166 if err == sql.ErrNoRows {
167 return nil, nil // Cache miss
168 }
169
170 if err != nil {
171 return nil, fmt.Errorf("cache query failed: %w", err)
172 }
173
174 // Handle nullable fields
175 if deployedAt.Valid {
176 record.DeployedAt = deployedAt.String
177 }
178 if region.Valid {
179 record.Region = region.String
180 }
181 if provider.Valid {
182 record.Provider = provider.String
183 }
184
185 return &captainRecordWithMeta{
186 CaptainRecord: &record,
187 UpdatedAt: updatedAt,
188 }, nil
189}
190
191// setCachedCaptainRecord stores a captain record in database cache
192func (a *RemoteHoldAuthorizer) setCachedCaptainRecord(holdDID string, record *atproto.CaptainRecord) error {
193 query := `
194 INSERT INTO hold_captain_records (
195 hold_did, owner_did, public, allow_all_crew,
196 deployed_at, region, provider, updated_at
197 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
198 ON CONFLICT(hold_did) DO UPDATE SET
199 owner_did = excluded.owner_did,
200 public = excluded.public,
201 allow_all_crew = excluded.allow_all_crew,
202 deployed_at = excluded.deployed_at,
203 region = excluded.region,
204 provider = excluded.provider,
205 updated_at = excluded.updated_at
206 `
207
208 _, err := a.db.Exec(query,
209 holdDID,
210 record.Owner,
211 record.Public,
212 record.AllowAllCrew,
213 nullString(record.DeployedAt),
214 nullString(record.Region),
215 nullString(record.Provider),
216 time.Now(),
217 )
218
219 return err
220}
221
222// fetchCaptainRecordFromXRPC queries the hold's XRPC endpoint for captain record
223func (a *RemoteHoldAuthorizer) fetchCaptainRecordFromXRPC(ctx context.Context, holdDID string) (*atproto.CaptainRecord, error) {
224 // Resolve DID to URL
225 holdURL := atproto.ResolveHoldURL(holdDID)
226
227 // Build XRPC request URL
228 // GET /xrpc/com.atproto.repo.getRecord?repo={did}&collection=io.atcr.hold.captain&rkey=self
229 xrpcURL := fmt.Sprintf("%s%s?repo=%s&collection=%s&rkey=self",
230 holdURL, atproto.RepoGetRecord, url.QueryEscape(holdDID), url.QueryEscape(atproto.CaptainCollection))
231
232 req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil)
233 if err != nil {
234 return nil, err
235 }
236
237 resp, err := a.httpClient.Do(req)
238 if err != nil {
239 return nil, fmt.Errorf("XRPC request failed: %w", err)
240 }
241 defer resp.Body.Close()
242
243 if resp.StatusCode != http.StatusOK {
244 body, _ := io.ReadAll(resp.Body)
245 return nil, fmt.Errorf("XRPC request failed: status %d: %s", resp.StatusCode, string(body))
246 }
247
248 // Parse response
249 var xrpcResp struct {
250 URI string `json:"uri"`
251 CID string `json:"cid"`
252 Value struct {
253 Type string `json:"$type"`
254 Owner string `json:"owner"`
255 Public bool `json:"public"`
256 AllowAllCrew bool `json:"allowAllCrew"`
257 DeployedAt string `json:"deployedAt"`
258 Region string `json:"region,omitempty"`
259 Provider string `json:"provider,omitempty"`
260 } `json:"value"`
261 }
262
263 if err := json.NewDecoder(resp.Body).Decode(&xrpcResp); err != nil {
264 return nil, fmt.Errorf("failed to decode XRPC response: %w", err)
265 }
266
267 // Convert to our type
268 record := &atproto.CaptainRecord{
269 Type: atproto.CaptainCollection,
270 Owner: xrpcResp.Value.Owner,
271 Public: xrpcResp.Value.Public,
272 AllowAllCrew: xrpcResp.Value.AllowAllCrew,
273 DeployedAt: xrpcResp.Value.DeployedAt,
274 Region: xrpcResp.Value.Region,
275 Provider: xrpcResp.Value.Provider,
276 }
277
278 return record, nil
279}
280
281// IsCrewMember checks if userDID is a crew member with caching
282// 1. Check approval cache (15min TTL)
283// 2. Check denial cache with exponential backoff
284// 3. If cache miss, query XRPC endpoint and update cache
285func (a *RemoteHoldAuthorizer) IsCrewMember(ctx context.Context, holdDID, userDID string) (bool, error) {
286 // Skip caching if no database
287 if a.db == nil {
288 return a.isCrewMemberNoCache(ctx, holdDID, userDID)
289 }
290
291 // Check approval cache first (15min TTL)
292 if approved, err := a.getCachedApproval(holdDID, userDID); err == nil && approved {
293 slog.Debug("Using cached crew approval", "holdDID", holdDID, "userDID", userDID)
294 return true, nil
295 }
296
297 // Check denial cache with backoff
298 if blocked, err := a.isBlockedByDenialBackoff(holdDID, userDID); err == nil && blocked {
299 // Still in backoff period - don't query again
300 // Detailed logging already emitted by isBlockedByDenialBackoff
301 return false, nil
302 }
303
304 // Cache miss or expired - query XRPC endpoint
305 slog.Debug("Crew membership cache miss, querying hold", "holdDID", holdDID, "userDID", userDID)
306 isCrew, err := a.isCrewMemberNoCache(ctx, holdDID, userDID)
307 if err != nil {
308 slog.Warn("Crew membership query error", "error", err, "holdDID", holdDID, "userDID", userDID)
309 return false, err
310 }
311
312 // Update cache based on result
313 if isCrew {
314 // Cache approval for 15 minutes
315 slog.Debug("Crew membership approved, caching for 15min", "holdDID", holdDID, "userDID", userDID)
316 _ = a.cacheApproval(holdDID, userDID, 15*time.Minute)
317 } else {
318 // Cache denial with exponential backoff
319 slog.Debug("Crew membership denied, caching with backoff", "holdDID", holdDID, "userDID", userDID)
320 _ = a.cacheDenial(holdDID, userDID)
321 }
322
323 return isCrew, nil
324}
325
326// isCrewMemberNoCache queries XRPC without caching (internal helper)
327func (a *RemoteHoldAuthorizer) isCrewMemberNoCache(ctx context.Context, holdDID, userDID string) (bool, error) {
328 // Resolve DID to URL
329 holdURL := atproto.ResolveHoldURL(holdDID)
330
331 // Build XRPC request URL
332 // GET /xrpc/com.atproto.repo.listRecords?repo={did}&collection=io.atcr.hold.crew
333 xrpcURL := fmt.Sprintf("%s%s?repo=%s&collection=%s",
334 holdURL, atproto.RepoListRecords, url.QueryEscape(holdDID), url.QueryEscape(atproto.CrewCollection))
335
336 req, err := http.NewRequestWithContext(ctx, "GET", xrpcURL, nil)
337 if err != nil {
338 return false, err
339 }
340
341 resp, err := a.httpClient.Do(req)
342 if err != nil {
343 return false, fmt.Errorf("XRPC request failed: %w", err)
344 }
345 defer resp.Body.Close()
346
347 if resp.StatusCode != http.StatusOK {
348 body, _ := io.ReadAll(resp.Body)
349 return false, fmt.Errorf("XRPC request failed: status %d: %s", resp.StatusCode, string(body))
350 }
351
352 // Parse response
353 var xrpcResp struct {
354 Records []struct {
355 URI string `json:"uri"`
356 CID string `json:"cid"`
357 Value struct {
358 Type string `json:"$type"`
359 Member string `json:"member"`
360 Role string `json:"role"`
361 Permissions []string `json:"permissions"`
362 AddedAt string `json:"addedAt"`
363 } `json:"value"`
364 } `json:"records"`
365 }
366
367 if err := json.NewDecoder(resp.Body).Decode(&xrpcResp); err != nil {
368 return false, fmt.Errorf("failed to decode XRPC response: %w", err)
369 }
370
371 // Check if userDID is in the crew list
372 for _, record := range xrpcResp.Records {
373 if record.Value.Member == userDID {
374 // TODO: Check expiration if set
375 return true, nil
376 }
377 }
378
379 return false, nil
380}
381
382// CheckReadAccess implements read authorization using shared logic
383func (a *RemoteHoldAuthorizer) CheckReadAccess(ctx context.Context, holdDID, userDID string) (bool, error) {
384 captain, err := a.GetCaptainRecord(ctx, holdDID)
385 if err != nil {
386 return false, err
387 }
388
389 return CheckReadAccessWithCaptain(captain, userDID), nil
390}
391
392// CheckWriteAccess implements write authorization using shared logic
393func (a *RemoteHoldAuthorizer) CheckWriteAccess(ctx context.Context, holdDID, userDID string) (bool, error) {
394 captain, err := a.GetCaptainRecord(ctx, holdDID)
395 if err != nil {
396 return false, err
397 }
398
399 isCrew, err := a.IsCrewMember(ctx, holdDID, userDID)
400 if err != nil {
401 return false, err
402 }
403
404 return CheckWriteAccessWithCaptain(captain, userDID, isCrew), nil
405}
406
407// nullString converts a string to sql.NullString
408func nullString(s string) sql.NullString {
409 if s == "" {
410 return sql.NullString{Valid: false}
411 }
412 return sql.NullString{String: s, Valid: true}
413}
414
415// getCachedApproval checks if user has a cached crew approval
416func (a *RemoteHoldAuthorizer) getCachedApproval(holdDID, userDID string) (bool, error) {
417 query := `
418 SELECT expires_at
419 FROM hold_crew_approvals
420 WHERE hold_did = ? AND user_did = ?
421 `
422
423 var expiresAt time.Time
424 err := a.db.QueryRow(query, holdDID, userDID).Scan(&expiresAt)
425
426 if err == sql.ErrNoRows {
427 return false, nil // Cache miss
428 }
429
430 if err != nil {
431 return false, err
432 }
433
434 // Check if approval has expired
435 if time.Now().After(expiresAt) {
436 // Expired - clean up
437 _ = a.deleteCachedApproval(holdDID, userDID)
438 return false, nil
439 }
440
441 return true, nil
442}
443
444// cacheApproval stores a crew approval with TTL
445func (a *RemoteHoldAuthorizer) cacheApproval(holdDID, userDID string, ttl time.Duration) error {
446 query := `
447 INSERT INTO hold_crew_approvals (hold_did, user_did, approved_at, expires_at)
448 VALUES (?, ?, ?, ?)
449 ON CONFLICT(hold_did, user_did) DO UPDATE SET
450 approved_at = excluded.approved_at,
451 expires_at = excluded.expires_at
452 `
453
454 now := time.Now()
455 expiresAt := now.Add(ttl)
456
457 _, err := a.db.Exec(query, holdDID, userDID, now, expiresAt)
458 return err
459}
460
461// deleteCachedApproval removes an expired approval
462func (a *RemoteHoldAuthorizer) deleteCachedApproval(holdDID, userDID string) error {
463 query := `DELETE FROM hold_crew_approvals WHERE hold_did = ? AND user_did = ?`
464 _, err := a.db.Exec(query, holdDID, userDID)
465 return err
466}
467
468// isBlockedByDenialBackoff checks if user is in denial backoff period
469// Checks in-memory cache first (for 10s first denials), then DB (for longer backoffs)
470func (a *RemoteHoldAuthorizer) isBlockedByDenialBackoff(holdDID, userDID string) (bool, error) {
471 // Check in-memory cache first (first denials with configurable backoff)
472 key := fmt.Sprintf("%s:%s", holdDID, userDID)
473 if val, ok := a.recentDenials.Load(key); ok {
474 entry := val.(denialEntry)
475 // Check if still within first denial backoff period
476 if time.Since(entry.timestamp) < a.firstDenialBackoff {
477 slog.Debug("Write access blocked by in-memory denial cache",
478 "holdDID", holdDID,
479 "userDID", userDID,
480 "denial_reason", "first_denial_backoff",
481 "backoff_type", "in_memory",
482 "backoff_duration", a.firstDenialBackoff,
483 "denied_at", entry.timestamp,
484 "retry_after", entry.timestamp.Add(a.firstDenialBackoff))
485 return true, nil // Still blocked by in-memory first denial
486 }
487 }
488
489 // Check database for longer backoffs (second+ denials)
490 query := `
491 SELECT next_retry_at
492 FROM hold_crew_denials
493 WHERE hold_did = ? AND user_did = ?
494 `
495
496 var nextRetryAt time.Time
497 err := a.db.QueryRow(query, holdDID, userDID).Scan(&nextRetryAt)
498
499 if err == sql.ErrNoRows {
500 return false, nil // No denial record
501 }
502
503 if err != nil {
504 return false, err
505 }
506
507 // Check if still in backoff period
508 if time.Now().Before(nextRetryAt) {
509 slog.Debug("Write access blocked by database denial cache",
510 "holdDID", holdDID,
511 "userDID", userDID,
512 "denial_reason", "exponential_backoff",
513 "backoff_type", "database",
514 "next_retry_at", nextRetryAt,
515 "retry_in", time.Until(nextRetryAt).Round(time.Second))
516 return true, nil // Still blocked
517 }
518
519 // Backoff period expired - can retry
520 return false, nil
521}
522
523// cacheDenial stores or updates a denial with exponential backoff
524// First denial: in-memory only (configurable backoff, default 10s)
525// Second+ denial: database with exponential backoff (configurable, default 1m/5m/15m/1h)
526func (a *RemoteHoldAuthorizer) cacheDenial(holdDID, userDID string) error {
527 key := fmt.Sprintf("%s:%s", holdDID, userDID)
528
529 // Check if this is a first denial (not in memory, not in DB)
530 _, inMemory := a.recentDenials.Load(key)
531
532 var denialCount int
533 query := `SELECT denial_count FROM hold_crew_denials WHERE hold_did = ? AND user_did = ?`
534 err := a.db.QueryRow(query, holdDID, userDID).Scan(&denialCount)
535
536 inDB := err != sql.ErrNoRows
537 if err != nil && err != sql.ErrNoRows {
538 return err
539 }
540
541 // If not in memory and not in DB, this is the first denial
542 if !inMemory && !inDB {
543 // First denial: store only in memory with configurable backoff
544 now := time.Now()
545 a.recentDenials.Store(key, denialEntry{timestamp: now})
546 slog.Info("Cached first crew denial (in-memory)",
547 "holdDID", holdDID,
548 "userDID", userDID,
549 "denial_count", 1,
550 "backoff_type", "in_memory",
551 "backoff_duration", a.firstDenialBackoff,
552 "retry_after", now.Add(a.firstDenialBackoff))
553 return nil
554 }
555
556 // Second+ denial: persist to database with exponential backoff
557 denialCount++
558 backoff := a.getBackoffDuration(denialCount)
559 now := time.Now()
560 nextRetry := now.Add(backoff)
561
562 // Upsert denial record
563 upsertQuery := `
564 INSERT INTO hold_crew_denials (hold_did, user_did, denial_count, next_retry_at, last_denied_at)
565 VALUES (?, ?, ?, ?, ?)
566 ON CONFLICT(hold_did, user_did) DO UPDATE SET
567 denial_count = excluded.denial_count,
568 next_retry_at = excluded.next_retry_at,
569 last_denied_at = excluded.last_denied_at
570 `
571
572 _, err = a.db.Exec(upsertQuery, holdDID, userDID, denialCount, nextRetry, now)
573 if err != nil {
574 return err
575 }
576
577 // Remove from in-memory cache since we're now tracking in DB
578 a.recentDenials.Delete(key)
579
580 slog.Info("Cached crew denial with exponential backoff",
581 "holdDID", holdDID,
582 "userDID", userDID,
583 "denial_count", denialCount,
584 "backoff_type", "database",
585 "backoff_duration", backoff,
586 "next_retry_at", nextRetry)
587
588 return nil
589}
590
591// getBackoffDuration returns the backoff duration based on denial count
592// Note: First denial is in-memory only and not tracked by this function
593// This function handles second+ denials using configurable durations
594func (a *RemoteHoldAuthorizer) getBackoffDuration(denialCount int) time.Duration {
595 backoffs := a.dbBackoffDurations
596
597 idx := denialCount - 1
598 if idx >= len(backoffs) {
599 idx = len(backoffs) - 1
600 }
601
602 return backoffs[idx]
603}
604
605// ClearCrewDenial removes crew denial from both in-memory and database caches
606// This allows immediate access after a user becomes a crew member
607func (a *RemoteHoldAuthorizer) ClearCrewDenial(ctx context.Context, holdDID, userDID string) error {
608 // Clear in-memory cache
609 key := fmt.Sprintf("%s:%s", holdDID, userDID)
610 a.recentDenials.Delete(key)
611
612 // Clear database cache
613 if a.db != nil {
614 query := `DELETE FROM hold_crew_denials WHERE hold_did = ? AND user_did = ?`
615 _, err := a.db.ExecContext(ctx, query, holdDID, userDID)
616 if err != nil {
617 return fmt.Errorf("failed to clear denial cache: %w", err)
618 }
619 }
620
621 slog.Debug("Cleared crew denial cache", "holdDID", holdDID, "userDID", userDID)
622 return nil
623}
624
625// ClearAllDenials removes all crew denials from both in-memory and database caches
626// Called on startup to ensure a clean slate
627func (a *RemoteHoldAuthorizer) ClearAllDenials() error {
628 // Clear all in-memory denials
629 a.recentDenials.Range(func(key, value any) bool {
630 a.recentDenials.Delete(key)
631 return true
632 })
633
634 // Clear all database denials
635 if a.db != nil {
636 _, err := a.db.Exec("DELETE FROM hold_crew_denials")
637 if err != nil {
638 return fmt.Errorf("failed to clear all denial caches: %w", err)
639 }
640 }
641
642 slog.Info("Cleared all crew denial caches on startup")
643 return nil
644}