A container registry that uses the AT Protocol for manifest storage and S3 for blob storage.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

clean up unused locks

+12 -44
+2 -18
pkg/appview/storage/manifest_store.go
··· 10 10 "log/slog" 11 11 "net/http" 12 12 "strings" 13 - "sync" 14 13 "time" 15 14 16 15 "atcr.io/pkg/appview/readme" ··· 22 21 // ManifestStore implements distribution.ManifestService 23 22 // It stores manifests in ATProto as records 24 23 type ManifestStore struct { 25 - ctx *RegistryContext // Context with user/hold info 26 - mu sync.RWMutex // Protects lastFetchedHoldDID 27 - lastFetchedHoldDID string // Hold DID from most recently fetched manifest (for pull) 28 - blobStore distribution.BlobStore // Blob store for fetching config during push 24 + ctx *RegistryContext // Context with user/hold info 25 + blobStore distribution.BlobStore // Blob store for fetching config during push 29 26 } 30 27 31 28 // NewManifestStore creates a new ATProto-backed manifest store ··· 65 62 if err := json.Unmarshal(record.Value, &manifestRecord); err != nil { 66 63 return nil, fmt.Errorf("failed to unmarshal manifest record: %w", err) 67 64 } 68 - 69 - // Store the hold DID for subsequent blob requests during pull 70 - // Prefer HoldDID (new format) with fallback to HoldEndpoint (legacy URL format) 71 - // The routing repository will cache this for concurrent blob fetches 72 - s.mu.Lock() 73 - if manifestRecord.HoldDID != "" { 74 - // New format: DID reference (preferred) 75 - s.lastFetchedHoldDID = manifestRecord.HoldDID 76 - } else if manifestRecord.HoldEndpoint != "" { 77 - // Legacy format: URL reference - convert to DID 78 - s.lastFetchedHoldDID = atproto.ResolveHoldDIDFromURL(manifestRecord.HoldEndpoint) 79 - } 80 - s.mu.Unlock() 81 65 82 66 var ociManifest []byte 83 67
+10 -26
pkg/appview/storage/routing_repository.go
··· 7 7 import ( 8 8 "context" 9 9 "log/slog" 10 - "sync" 11 10 12 11 "github.com/distribution/distribution/v3" 13 12 ) 14 13 15 14 // RoutingRepository routes manifests to ATProto and blobs to external hold service 16 15 // The registry (AppView) is stateless and NEVER stores blobs locally 16 + // NOTE: A fresh instance is created per-request (see middleware/registry.go) 17 + // so no mutex is needed - each request has its own instance 17 18 type RoutingRepository struct { 18 19 distribution.Repository 19 20 Ctx *RegistryContext // All context and services (exported for token updates) 20 - mu sync.Mutex // Protects manifestStore and blobStore 21 - manifestStore *ManifestStore // Cached manifest store instance 22 - blobStore *ProxyBlobStore // Cached blob store instance 21 + manifestStore *ManifestStore // Manifest store instance (lazy-initialized) 22 + blobStore *ProxyBlobStore // Blob store instance (lazy-initialized) 23 23 } 24 24 25 25 // NewRoutingRepository creates a new routing repository ··· 32 32 33 33 // Manifests returns the ATProto-backed manifest service 34 34 func (r *RoutingRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { 35 - r.mu.Lock() 36 - // Create or return cached manifest store 35 + // Lazy-initialize manifest store (no mutex needed - one instance per request) 37 36 if r.manifestStore == nil { 38 37 // Ensure blob store is created first (needed for label extraction during push) 39 - // Release lock while calling Blobs to avoid deadlock 40 - r.mu.Unlock() 41 38 blobStore := r.Blobs(ctx) 42 - r.mu.Lock() 43 - 44 - // Double-check after reacquiring lock (another goroutine might have set it) 45 - if r.manifestStore == nil { 46 - r.manifestStore = NewManifestStore(r.Ctx, blobStore) 47 - } 39 + r.manifestStore = NewManifestStore(r.Ctx, blobStore) 48 40 } 49 - manifestStore := r.manifestStore 50 - r.mu.Unlock() 51 - 52 - return manifestStore, nil 41 + return r.manifestStore, nil 53 42 } 54 43 55 44 // Blobs returns a proxy blob store that routes to external hold service 56 45 // The registry (AppView) NEVER stores blobs locally - all blobs go through hold service 57 46 func (r *RoutingRepository) Blobs(ctx context.Context) distribution.BlobStore { 58 - r.mu.Lock() 59 - // Return cached blob store if available 47 + // Return cached blob store if available (no mutex needed - one instance per request) 60 48 if r.blobStore != nil { 61 - blobStore := r.blobStore 62 - r.mu.Unlock() 63 49 slog.Debug("Returning cached blob store", "component", "storage/blobs", "did", r.Ctx.DID, "repo", r.Ctx.Repository) 64 - return blobStore 50 + return r.blobStore 65 51 } 66 52 67 53 // Determine if this is a pull (GET/HEAD) or push (PUT/POST/etc) operation ··· 103 89 104 90 // Create and cache proxy blob store 105 91 r.blobStore = NewProxyBlobStore(r.Ctx) 106 - blobStore := r.blobStore 107 - r.mu.Unlock() 108 - return blobStore 92 + return r.blobStore 109 93 } 110 94 111 95 // Tags returns the tag service