A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
81
fork

Configure Feed

Select the types of activity you want to include in your feed.

don't use in-memory for holddid caching, just reference from db

+124 -304
+9 -7
CLAUDE.md
··· 349 349 - Implements `distribution.Repository` interface 350 350 - Uses RegistryContext to pass DID, PDS endpoint, hold DID, OAuth refresher, etc. 351 351 352 - **hold_cache.go**: In-memory hold DID cache 353 - - Caches `(DID, repository) → holdDid` for pull operations 354 - - TTL: 10 minutes (covers typical pull operations) 355 - - Cleanup: Background goroutine runs every 5 minutes 356 - - **NOTE:** Simple in-memory cache for MVP. For production: use Redis or similar 357 - - Prevents expensive PDS manifest lookups on every blob request during pull 352 + **Database-based hold DID lookups**: 353 + - Queries SQLite `manifests` table for hold DID (indexed, fast) 354 + - No in-memory caching needed - database IS the cache 355 + - Persistent across restarts, multi-instance safe 356 + - Pull operations use hold DID from latest manifest (historical reference) 357 + - Push operations use fresh discovery from profile/default 358 + - Function: `db.GetLatestHoldDIDForRepo(did, repository)` in `pkg/appview/db/queries.go` 358 359 359 360 **proxy_blob_store.go**: External storage proxy (routes to hold via XRPC) 360 361 - Resolves hold DID → HTTP URL for XRPC requests (did:web resolution) ··· 604 605 605 606 **General:** 606 607 - Middleware is in `pkg/appview/middleware/` (auth.go, registry.go) 607 - - Storage routing is in `pkg/appview/storage/` (routing_repository.go, proxy_blob_store.go, hold_cache.go) 608 + - Storage routing is in `pkg/appview/storage/` (routing_repository.go, proxy_blob_store.go) 609 + - Hold DID lookups use database queries (no in-memory caching) 608 610 - Storage drivers imported as `_ "github.com/distribution/distribution/v3/registry/storage/driver/s3-aws"` 609 611 - Hold service reuses distribution's driver factory for multi-backend support 610 612
+29
pkg/appview/db/queries.go
··· 724 724 return &m, nil 725 725 } 726 726 727 + // GetLatestHoldDIDForRepo returns the hold DID from the most recent manifest for a repository 728 + // Returns empty string if no manifests exist (e.g., first push) 729 + // This is used instead of the in-memory cache to determine which hold to use for blob operations 730 + func GetLatestHoldDIDForRepo(db *sql.DB, did, repository string) (string, error) { 731 + var holdDID string 732 + err := db.QueryRow(` 733 + SELECT hold_endpoint 734 + FROM manifests 735 + WHERE did = ? AND repository = ? 736 + ORDER BY created_at DESC 737 + LIMIT 1 738 + `, did, repository).Scan(&holdDID) 739 + 740 + if err == sql.ErrNoRows { 741 + // No manifests yet - return empty string (first push case) 742 + return "", nil 743 + } 744 + if err != nil { 745 + return "", err 746 + } 747 + 748 + return holdDID, nil 749 + } 750 + 727 751 // GetRepositoriesForDID returns all unique repository names for a DID 728 752 // Used by backfill to reconcile annotations for all repositories 729 753 func GetRepositoriesForDID(db *sql.DB, did string) ([]string, error) { ··· 1574 1598 // IncrementPushCount increments the push count for a repository 1575 1599 func (m *MetricsDB) IncrementPushCount(did, repository string) error { 1576 1600 return IncrementPushCount(m.db, did, repository) 1601 + } 1602 + 1603 + // GetLatestHoldDIDForRepo returns the hold DID from the most recent manifest for a repository 1604 + func (m *MetricsDB) GetLatestHoldDIDForRepo(did, repository string) (string, error) { 1605 + return GetLatestHoldDIDForRepo(m.db, did, repository) 1577 1606 } 1578 1607 1579 1608 // GetFeaturedRepositories fetches top repositories sorted by stars and pulls
+2 -1
pkg/appview/storage/context.go
··· 8 8 "atcr.io/pkg/auth/oauth" 9 9 ) 10 10 11 - // DatabaseMetrics interface for tracking pull/push counts 11 + // DatabaseMetrics interface for tracking pull/push counts and querying hold DIDs 12 12 type DatabaseMetrics interface { 13 13 IncrementPullCount(did, repository string) error 14 14 IncrementPushCount(did, repository string) error 15 + GetLatestHoldDIDForRepo(did, repository string) (string, error) 15 16 } 16 17 17 18 // ReadmeCache interface for README content caching
+5
pkg/appview/storage/context_test.go
··· 29 29 return nil 30 30 } 31 31 32 + func (m *mockDatabaseMetrics) GetLatestHoldDIDForRepo(did, repository string) (string, error) { 33 + // Return empty string for mock - tests can override if needed 34 + return "", nil 35 + } 36 + 32 37 func (m *mockDatabaseMetrics) getPullCount() int { 33 38 m.mu.Lock() 34 39 defer m.mu.Unlock()
-98
pkg/appview/storage/hold_cache.go
··· 1 - package storage 2 - 3 - import ( 4 - "sync" 5 - "time" 6 - ) 7 - 8 - // HoldCache caches hold DIDs for (DID, repository) pairs 9 - // This avoids expensive ATProto lookups on every blob request during pulls 10 - // 11 - // NOTE: This is a simple in-memory cache for MVP. For production deployments: 12 - // - Use Redis or similar for distributed caching 13 - // - Consider implementing cache size limits 14 - // - Monitor memory usage under high load 15 - type HoldCache struct { 16 - mu sync.RWMutex 17 - cache map[string]*holdCacheEntry 18 - } 19 - 20 - type holdCacheEntry struct { 21 - holdDID string 22 - expiresAt time.Time 23 - } 24 - 25 - var globalHoldCache = &HoldCache{ 26 - cache: make(map[string]*holdCacheEntry), 27 - } 28 - 29 - func init() { 30 - // Start background cleanup goroutine 31 - go func() { 32 - ticker := time.NewTicker(5 * time.Minute) 33 - defer ticker.Stop() 34 - for range ticker.C { 35 - globalHoldCache.Cleanup() 36 - } 37 - }() 38 - } 39 - 40 - // GetGlobalHoldCache returns the global hold cache instance 41 - func GetGlobalHoldCache() *HoldCache { 42 - return globalHoldCache 43 - } 44 - 45 - // Set stores a hold DID for a (DID, repository) pair with a TTL 46 - func (c *HoldCache) Set(did, repository, holdDID string, ttl time.Duration) { 47 - c.mu.Lock() 48 - defer c.mu.Unlock() 49 - 50 - key := did + ":" + repository 51 - c.cache[key] = &holdCacheEntry{ 52 - holdDID: holdDID, 53 - expiresAt: time.Now().Add(ttl), 54 - } 55 - } 56 - 57 - // Get retrieves a hold DID for a (DID, repository) pair 58 - // Returns empty string and false if not found or expired 59 - func (c *HoldCache) Get(did, repository string) (string, bool) { 60 - c.mu.RLock() 61 - defer c.mu.RUnlock() 62 - 63 - key := did + ":" + repository 64 - entry, ok := c.cache[key] 65 - if !ok { 66 - return "", false 67 - } 68 - 69 - // Check if expired 70 - if time.Now().After(entry.expiresAt) { 71 - // Don't delete here (would need write lock), let cleanup handle it 72 - return "", false 73 - } 74 - 75 - return entry.holdDID, true 76 - } 77 - 78 - // Cleanup removes expired entries (called automatically every 5 minutes) 79 - func (c *HoldCache) Cleanup() { 80 - c.mu.Lock() 81 - defer c.mu.Unlock() 82 - 83 - now := time.Now() 84 - removed := 0 85 - for key, entry := range c.cache { 86 - if now.After(entry.expiresAt) { 87 - delete(c.cache, key) 88 - removed++ 89 - } 90 - } 91 - 92 - // Log cleanup stats for monitoring 93 - if removed > 0 || len(c.cache) > 100 { 94 - // Log if we removed entries OR if cache is growing large 95 - // This helps identify if cache size is becoming a concern 96 - println("Hold cache cleanup: removed", removed, "entries, remaining", len(c.cache)) 97 - } 98 - }
-150
pkg/appview/storage/hold_cache_test.go
··· 1 - package storage 2 - 3 - import ( 4 - "testing" 5 - "time" 6 - ) 7 - 8 - func TestHoldCache_SetAndGet(t *testing.T) { 9 - cache := &HoldCache{ 10 - cache: make(map[string]*holdCacheEntry), 11 - } 12 - 13 - did := "did:plc:test123" 14 - repo := "myapp" 15 - holdDID := "did:web:hold01.atcr.io" 16 - ttl := 10 * time.Minute 17 - 18 - // Set a value 19 - cache.Set(did, repo, holdDID, ttl) 20 - 21 - // Get the value - should succeed 22 - gotHoldDID, ok := cache.Get(did, repo) 23 - if !ok { 24 - t.Fatal("Expected Get to return true, got false") 25 - } 26 - if gotHoldDID != holdDID { 27 - t.Errorf("Expected hold DID %q, got %q", holdDID, gotHoldDID) 28 - } 29 - } 30 - 31 - func TestHoldCache_GetNonExistent(t *testing.T) { 32 - cache := &HoldCache{ 33 - cache: make(map[string]*holdCacheEntry), 34 - } 35 - 36 - // Get non-existent value 37 - _, ok := cache.Get("did:plc:nonexistent", "repo") 38 - if ok { 39 - t.Error("Expected Get to return false for non-existent key") 40 - } 41 - } 42 - 43 - func TestHoldCache_ExpiredEntry(t *testing.T) { 44 - cache := &HoldCache{ 45 - cache: make(map[string]*holdCacheEntry), 46 - } 47 - 48 - did := "did:plc:test123" 49 - repo := "myapp" 50 - holdDID := "did:web:hold01.atcr.io" 51 - 52 - // Set with very short TTL 53 - cache.Set(did, repo, holdDID, 10*time.Millisecond) 54 - 55 - // Wait for expiration 56 - time.Sleep(20 * time.Millisecond) 57 - 58 - // Get should return false 59 - _, ok := cache.Get(did, repo) 60 - if ok { 61 - t.Error("Expected Get to return false for expired entry") 62 - } 63 - } 64 - 65 - func TestHoldCache_Cleanup(t *testing.T) { 66 - cache := &HoldCache{ 67 - cache: make(map[string]*holdCacheEntry), 68 - } 69 - 70 - // Add multiple entries with different TTLs 71 - cache.Set("did:plc:1", "repo1", "hold1", 10*time.Millisecond) 72 - cache.Set("did:plc:2", "repo2", "hold2", 1*time.Hour) 73 - cache.Set("did:plc:3", "repo3", "hold3", 10*time.Millisecond) 74 - 75 - // Wait for some to expire 76 - time.Sleep(20 * time.Millisecond) 77 - 78 - // Run cleanup 79 - cache.Cleanup() 80 - 81 - // Verify expired entries are removed 82 - if _, ok := cache.Get("did:plc:1", "repo1"); ok { 83 - t.Error("Expected expired entry 1 to be removed") 84 - } 85 - if _, ok := cache.Get("did:plc:3", "repo3"); ok { 86 - t.Error("Expected expired entry 3 to be removed") 87 - } 88 - 89 - // Verify non-expired entry remains 90 - if _, ok := cache.Get("did:plc:2", "repo2"); !ok { 91 - t.Error("Expected non-expired entry to remain") 92 - } 93 - } 94 - 95 - func TestHoldCache_ConcurrentAccess(t *testing.T) { 96 - cache := &HoldCache{ 97 - cache: make(map[string]*holdCacheEntry), 98 - } 99 - 100 - done := make(chan bool) 101 - 102 - // Concurrent writes 103 - for i := 0; i < 10; i++ { 104 - go func(id int) { 105 - did := "did:plc:concurrent" 106 - repo := "repo" + string(rune(id)) 107 - holdDID := "hold" + string(rune(id)) 108 - cache.Set(did, repo, holdDID, 1*time.Minute) 109 - done <- true 110 - }(i) 111 - } 112 - 113 - // Concurrent reads 114 - for i := 0; i < 10; i++ { 115 - go func(id int) { 116 - repo := "repo" + string(rune(id)) 117 - cache.Get("did:plc:concurrent", repo) 118 - done <- true 119 - }(i) 120 - } 121 - 122 - // Wait for all goroutines 123 - for i := 0; i < 20; i++ { 124 - <-done 125 - } 126 - } 127 - 128 - func TestHoldCache_KeyFormat(t *testing.T) { 129 - cache := &HoldCache{ 130 - cache: make(map[string]*holdCacheEntry), 131 - } 132 - 133 - did := "did:plc:test" 134 - repo := "myrepo" 135 - holdDID := "did:web:hold" 136 - 137 - cache.Set(did, repo, holdDID, 1*time.Minute) 138 - 139 - // Verify the key is stored correctly (did:repo) 140 - expectedKey := did + ":" + repo 141 - if _, exists := cache.cache[expectedKey]; !exists { 142 - t.Errorf("Expected key %q to exist in cache", expectedKey) 143 - } 144 - } 145 - 146 - // TODO: Add more comprehensive tests: 147 - // - Test GetGlobalHoldCache() 148 - // - Test cache size monitoring 149 - // - Benchmark cache performance under load 150 - // - Test cleanup goroutine timing
+18 -22
pkg/appview/storage/routing_repository.go
··· 1 1 // Package storage implements the storage routing layer for AppView. 2 2 // It routes manifests to ATProto PDS (as io.atcr.manifest records) and 3 - // blobs to hold services via XRPC, with hold DID caching for efficient pulls. 3 + // blobs to hold services via XRPC, with database-based hold DID lookups. 4 4 // All storage operations are proxied - AppView stores nothing locally. 5 5 package storage 6 6 ··· 8 8 "context" 9 9 "log/slog" 10 10 "sync" 11 - "time" 12 11 13 12 "github.com/distribution/distribution/v3" 14 13 ) ··· 50 49 manifestStore := r.manifestStore 51 50 r.mu.Unlock() 52 51 53 - // After any manifest operation, cache the hold DID for blob fetches 54 - // We use a goroutine to avoid blocking, and check after a short delay to allow the operation to complete 55 - go func() { 56 - time.Sleep(100 * time.Millisecond) // Brief delay to let manifest fetch complete 57 - if holdDID := manifestStore.GetLastFetchedHoldDID(); holdDID != "" { 58 - // Cache for 10 minutes - should cover typical pull operations 59 - GetGlobalHoldCache().Set(r.Ctx.DID, r.Ctx.Repository, holdDID, 10*time.Minute) 60 - slog.Debug("Cached hold DID", "component", "storage/routing", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", holdDID) 61 - } 62 - }() 63 - 64 52 return manifestStore, nil 65 53 } 66 54 ··· 76 64 return blobStore 77 65 } 78 66 79 - // For pull operations, check if we have a cached hold DID from a recent manifest fetch 67 + // For pull operations, check database for hold DID from the most recent manifest 80 68 // This ensures blobs are fetched from the hold recorded in the manifest, not re-discovered 81 69 holdDID := r.Ctx.HoldDID // Default to discovery-based DID 70 + holdSource := "discovery" 82 71 83 - if cachedHoldDID, ok := GetGlobalHoldCache().Get(r.Ctx.DID, r.Ctx.Repository); ok { 84 - // Use cached hold DID from manifest 85 - holdDID = cachedHoldDID 86 - slog.Debug("Using cached hold from manifest", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", cachedHoldDID) 87 - } else { 88 - // No cached hold, use discovery-based DID (for push or first pull) 89 - slog.Debug("Using discovery-based hold", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", holdDID) 72 + if r.Ctx.Database != nil { 73 + // Query database for the latest manifest's hold DID 74 + if dbHoldDID, err := r.Ctx.Database.GetLatestHoldDIDForRepo(r.Ctx.DID, r.Ctx.Repository); err == nil && dbHoldDID != "" { 75 + // Use hold DID from database (pull case - use historical reference) 76 + holdDID = dbHoldDID 77 + holdSource = "database" 78 + slog.Debug("Using hold from database manifest", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", dbHoldDID) 79 + } else if err != nil { 80 + // Log error but don't fail - fall back to discovery-based DID 81 + slog.Warn("Failed to query database for hold DID", "component", "storage/blobs", "error", err) 82 + } 83 + // If dbHoldDID is empty (no manifests yet), fall through to use discovery-based DID 90 84 } 91 85 92 86 if holdDID == "" { ··· 94 88 panic("hold DID not set in RegistryContext - ensure default_hold_did is configured in middleware") 95 89 } 96 90 97 - // Update context with the correct hold DID (may be cached or discovered) 91 + slog.Debug("Using hold DID for blobs", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository, "hold", holdDID, "source", holdSource) 92 + 93 + // Update context with the correct hold DID (may be from database or discovered) 98 94 r.Ctx.HoldDID = holdDID 99 95 100 96 // Create and cache proxy blob store
+61 -26
pkg/appview/storage/routing_repository_test.go
··· 4 4 "context" 5 5 "sync" 6 6 "testing" 7 - "time" 8 7 9 8 "github.com/distribution/distribution/v3" 10 9 "github.com/stretchr/testify/assert" ··· 12 11 13 12 "atcr.io/pkg/atproto" 14 13 ) 14 + 15 + // mockDatabase is a simple mock for testing 16 + type mockDatabase struct { 17 + holdDID string 18 + err error 19 + } 20 + 21 + func (m *mockDatabase) IncrementPullCount(did, repository string) error { 22 + return nil 23 + } 24 + 25 + func (m *mockDatabase) IncrementPushCount(did, repository string) error { 26 + return nil 27 + } 28 + 29 + func (m *mockDatabase) GetLatestHoldDIDForRepo(did, repository string) (string, error) { 30 + if m.err != nil { 31 + return "", m.err 32 + } 33 + return m.holdDID, nil 34 + } 15 35 16 36 func TestNewRoutingRepository(t *testing.T) { 17 37 ctx := &RegistryContext{ ··· 89 109 assert.NotNil(t, repo.manifestStore) 90 110 } 91 111 92 - // TestRoutingRepository_Blobs_WithCache tests blob store with cached hold DID 93 - func TestRoutingRepository_Blobs_WithCache(t *testing.T) { 94 - // Pre-populate the hold cache 95 - cache := GetGlobalHoldCache() 96 - cachedHoldDID := "did:web:cached.hold.io" 97 - cache.Set("did:plc:test123", "myapp", cachedHoldDID, 10*time.Minute) 112 + // TestRoutingRepository_Blobs_WithDatabase tests blob store with database hold DID 113 + func TestRoutingRepository_Blobs_WithDatabase(t *testing.T) { 114 + dbHoldDID := "did:web:database.hold.io" 98 115 99 116 ctx := &RegistryContext{ 100 117 DID: "did:plc:test123", 101 118 Repository: "myapp", 102 119 HoldDID: "did:web:default.hold.io", // Discovery-based hold (should be overridden) 103 120 ATProtoClient: atproto.NewClient("https://pds.example.com", "did:plc:test123", ""), 121 + Database: &mockDatabase{holdDID: dbHoldDID}, 104 122 } 105 123 106 124 repo := NewRoutingRepository(nil, ctx) 107 125 blobStore := repo.Blobs(context.Background()) 108 126 109 127 assert.NotNil(t, blobStore) 110 - // Verify the hold DID was updated to use the cached value 111 - assert.Equal(t, cachedHoldDID, repo.Ctx.HoldDID, "should use cached hold DID") 128 + // Verify the hold DID was updated to use the database value 129 + assert.Equal(t, dbHoldDID, repo.Ctx.HoldDID, "should use database hold DID") 112 130 } 113 131 114 - // TestRoutingRepository_Blobs_WithoutCache tests blob store with discovery-based hold 115 - func TestRoutingRepository_Blobs_WithoutCache(t *testing.T) { 132 + // TestRoutingRepository_Blobs_WithoutDatabase tests blob store with discovery-based hold 133 + func TestRoutingRepository_Blobs_WithoutDatabase(t *testing.T) { 116 134 discoveryHoldDID := "did:web:discovery.hold.io" 117 135 118 - // Use a different DID/repo to avoid cache contamination from other tests 119 136 ctx := &RegistryContext{ 120 137 DID: "did:plc:nocache456", 121 138 Repository: "uncached-app", 122 139 HoldDID: discoveryHoldDID, 123 140 ATProtoClient: atproto.NewClient("https://pds.example.com", "did:plc:nocache456", ""), 141 + Database: nil, // No database 124 142 } 125 143 126 144 repo := NewRoutingRepository(nil, ctx) ··· 131 149 assert.Equal(t, discoveryHoldDID, repo.Ctx.HoldDID, "should use discovery-based hold DID") 132 150 } 133 151 152 + // TestRoutingRepository_Blobs_DatabaseEmptyFallback tests fallback when database returns empty hold DID 153 + func TestRoutingRepository_Blobs_DatabaseEmptyFallback(t *testing.T) { 154 + discoveryHoldDID := "did:web:discovery.hold.io" 155 + 156 + ctx := &RegistryContext{ 157 + DID: "did:plc:test123", 158 + Repository: "newapp", 159 + HoldDID: discoveryHoldDID, 160 + ATProtoClient: atproto.NewClient("https://pds.example.com", "did:plc:test123", ""), 161 + Database: &mockDatabase{holdDID: ""}, // Empty string (no manifests yet) 162 + } 163 + 164 + repo := NewRoutingRepository(nil, ctx) 165 + blobStore := repo.Blobs(context.Background()) 166 + 167 + assert.NotNil(t, blobStore) 168 + // Verify the hold DID falls back to discovery-based 169 + assert.Equal(t, discoveryHoldDID, repo.Ctx.HoldDID, "should fall back to discovery-based hold DID when database returns empty") 170 + } 171 + 134 172 // TestRoutingRepository_BlobStoreCaching tests that blob store is cached 135 173 func TestRoutingRepository_BlobStoreCaching(t *testing.T) { 136 174 ctx := &RegistryContext{ ··· 254 292 assert.NotNil(t, cachedBlobStore) 255 293 } 256 294 257 - // TestRoutingRepository_HoldCachePopulation tests that hold DID cache is populated after manifest fetch 258 - // Note: This test verifies the goroutine behavior with a delay 259 - func TestRoutingRepository_HoldCachePopulation(t *testing.T) { 295 + // TestRoutingRepository_Blobs_Priority tests that database hold DID takes priority over discovery 296 + func TestRoutingRepository_Blobs_Priority(t *testing.T) { 297 + dbHoldDID := "did:web:database.hold.io" 298 + discoveryHoldDID := "did:web:discovery.hold.io" 299 + 260 300 ctx := &RegistryContext{ 261 301 DID: "did:plc:test123", 262 302 Repository: "myapp", 263 - HoldDID: "did:web:hold01.atcr.io", 303 + HoldDID: discoveryHoldDID, // Discovery-based hold 264 304 ATProtoClient: atproto.NewClient("https://pds.example.com", "did:plc:test123", ""), 305 + Database: &mockDatabase{holdDID: dbHoldDID}, // Database has a different hold DID 265 306 } 266 307 267 308 repo := NewRoutingRepository(nil, ctx) 309 + blobStore := repo.Blobs(context.Background()) 268 310 269 - // Create manifest store (which triggers the cache population goroutine) 270 - _, err := repo.Manifests(context.Background()) 271 - require.NoError(t, err) 272 - 273 - // Wait for goroutine to complete (it has a 100ms sleep) 274 - time.Sleep(200 * time.Millisecond) 275 - 276 - // Note: We can't easily verify the cache was populated without a real manifest fetch 277 - // The actual caching happens in GetLastFetchedHoldDID() which requires manifest operations 278 - // This test primarily verifies the Manifests() call doesn't panic with the goroutine 311 + assert.NotNil(t, blobStore) 312 + // Database hold DID should take priority over discovery 313 + assert.Equal(t, dbHoldDID, repo.Ctx.HoldDID, "database hold DID should take priority over discovery") 279 314 }