A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
80
fork

Configure Feed

Select the types of activity you want to include in your feed.

pass through 429 retry-after from pds

+304 -3
+50
pkg/appview/middleware/registry.go
··· 660 660 next.ServeHTTP(w, r) 661 661 }) 662 662 } 663 + 664 + // retryAfterResponseWriter wraps http.ResponseWriter and, on the first 665 + // WriteHeader call, injects a Retry-After header if the status is 429 and 666 + // a retry-after duration was recorded in the request context. 667 + type retryAfterResponseWriter struct { 668 + http.ResponseWriter 669 + carrier *storage.RetryAfterCarrier 670 + wroteHeader bool 671 + } 672 + 673 + func (w *retryAfterResponseWriter) WriteHeader(code int) { 674 + if !w.wroteHeader { 675 + w.wroteHeader = true 676 + if code == http.StatusTooManyRequests { 677 + if d := w.carrier.Duration(); d > 0 { 678 + // Round up to whole seconds; minimum of 1 to avoid 0-second hints. 679 + secs := int64(d.Seconds()) 680 + if d%time.Second != 0 { 681 + secs++ 682 + } 683 + if secs < 1 { 684 + secs = 1 685 + } 686 + w.Header().Set("Retry-After", fmt.Sprintf("%d", secs)) 687 + } 688 + } 689 + } 690 + w.ResponseWriter.WriteHeader(code) 691 + } 692 + 693 + func (w *retryAfterResponseWriter) Write(b []byte) (int, error) { 694 + if !w.wroteHeader { 695 + // Implicit 200 — still fire WriteHeader so flag flips. 696 + w.WriteHeader(http.StatusOK) 697 + } 698 + return w.ResponseWriter.Write(b) 699 + } 700 + 701 + // RetryAfterMiddleware installs a per-request RetryAfterCarrier in the 702 + // request context and wraps the response writer so deeper handlers (e.g., 703 + // the manifest store, when an upstream PDS returns 429) can cause a 704 + // Retry-After header to be emitted on 429 responses. 705 + func RetryAfterMiddleware(next http.Handler) http.Handler { 706 + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 707 + carrier := storage.NewRetryAfterCarrier() 708 + ctx := context.WithValue(r.Context(), storage.RetryAfterContextKey, carrier) 709 + wrapped := &retryAfterResponseWriter{ResponseWriter: w, carrier: carrier} 710 + next.ServeHTTP(wrapped, r.WithContext(ctx)) 711 + }) 712 + }
+3 -2
pkg/appview/server.go
··· 484 484 ctx := context.Background() 485 485 app := handlers.NewApp(ctx, cfg.Distribution) 486 486 487 - // Wrap with auth method extraction middleware 488 - wrappedApp := middleware.ExtractAuthMethod(app) 487 + // Wrap with auth method extraction middleware, then with the Retry-After 488 + // emitter so it can read the carrier installed before deeper handlers run. 489 + wrappedApp := middleware.RetryAfterMiddleware(middleware.ExtractAuthMethod(app)) 489 490 490 491 // Mount registry at /v2/ 491 492 mainRouter.Handle("/v2/*", wrappedApp)
+33 -1
pkg/appview/storage/manifest_store.go
··· 16 16 "atcr.io/pkg/appview/readme" 17 17 "atcr.io/pkg/atproto" 18 18 "github.com/distribution/distribution/v3" 19 + "github.com/distribution/distribution/v3/registry/api/errcode" 19 20 "github.com/opencontainers/go-digest" 20 21 ) 22 + 23 + // rateLimitToErrcode converts an upstream PDS rate-limit error into a 24 + // distribution errcode.Error (HTTP 429). When ctx contains a 25 + // RetryAfterCarrier, also stashes the retry-after duration so HTTP 26 + // middleware can emit a Retry-After response header. Returns the original 27 + // error untouched when it isn't a rate-limit error. 28 + func rateLimitToErrcode(ctx context.Context, err error) error { 29 + var rl *atproto.RateLimitError 30 + if !errors.As(err, &rl) { 31 + return err 32 + } 33 + if rl.RetryAfter > 0 { 34 + SetRetryAfter(ctx, rl.RetryAfter) 35 + } 36 + return errcode.ErrorCodeTooManyRequests.WithMessage(rl.Error()) 37 + } 21 38 22 39 // pullDedup deduplicates pull notifications per puller+owner+repo within a 5-minute window. 23 40 // This prevents CI workflows (e.g., imagetools create --append) from inflating download counts ··· 182 199 // Upload manifest as blob to PDS 183 200 blobRef, err := s.ctx.ATProtoClient.UploadBlob(ctx, payload, mediaType) 184 201 if err != nil { 202 + if rl := rateLimitToErrcode(ctx, err); rl != err { 203 + return "", rl 204 + } 185 205 return "", fmt.Errorf("failed to upload manifest blob: %w", err) 186 206 } 187 207 ··· 266 286 rkey := digestToRKey(dgst) 267 287 _, err = s.ctx.ATProtoClient.PutRecord(ctx, atproto.ManifestCollection, rkey, manifestRecord) 268 288 if err != nil { 289 + if rl := rateLimitToErrcode(ctx, err); rl != err { 290 + return "", rl 291 + } 269 292 return "", fmt.Errorf("failed to store manifest record in ATProto: %w", err) 270 293 } 271 294 ··· 292 315 tagRecord := atproto.NewTagRecord(s.ctx.ATProtoClient.DID(), s.ctx.Repository, tag, dgst.String(), mediaType) 293 316 _, err = s.ctx.ATProtoClient.PutRecord(ctx, atproto.TagCollection, tagRKey, tagRecord) 294 317 if err != nil { 318 + if rl := rateLimitToErrcode(ctx, err); rl != err { 319 + return "", rl 320 + } 295 321 return "", fmt.Errorf("failed to store tag in ATProto: %w", err) 296 322 } 297 323 } ··· 389 415 // Delete removes a manifest 390 416 func (s *ManifestStore) Delete(ctx context.Context, dgst digest.Digest) error { 391 417 rkey := digestToRKey(dgst) 392 - return s.ctx.ATProtoClient.DeleteRecord(ctx, atproto.ManifestCollection, rkey) 418 + if err := s.ctx.ATProtoClient.DeleteRecord(ctx, atproto.ManifestCollection, rkey); err != nil { 419 + if rl := rateLimitToErrcode(ctx, err); rl != err { 420 + return rl 421 + } 422 + return err 423 + } 424 + return nil 393 425 } 394 426 395 427 // digestToRKey converts a digest to an ATProto record key
+59
pkg/appview/storage/manifest_store_test.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "errors" 7 + "fmt" 7 8 "io" 8 9 "net/http" 9 10 "net/http/httptest" 10 11 "testing" 12 + "time" 11 13 12 14 "atcr.io/pkg/atproto" 13 15 "github.com/distribution/distribution/v3" 16 + "github.com/distribution/distribution/v3/registry/api/errcode" 14 17 "github.com/opencontainers/go-digest" 15 18 ) 16 19 ··· 961 964 t.Errorf("Put() should succeed when all child manifests exist, got error: %v", err) 962 965 } 963 966 } 967 + 968 + // TestManifestStore_Put_RateLimitBecomesErrcode verifies that a 429 from the 969 + // upstream PDS surfaces as errcode.ErrorCodeTooManyRequests with a 970 + // Retry-After hint stashed on the carrier in context. 971 + func TestManifestStore_Put_RateLimitBecomesErrcode(t *testing.T) { 972 + ociManifest := []byte(`{ 973 + "schemaVersion":2, 974 + "mediaType":"application/vnd.oci.image.manifest.v1+json", 975 + "config":{"digest":"sha256:cfg","size":1}, 976 + "layers":[{"digest":"sha256:l1","size":1}] 977 + }`) 978 + 979 + resetAt := time.Now().Add(30 * time.Second).Unix() 980 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 981 + // Let the manifest blob upload succeed so we hit putRecord. 982 + if r.URL.Path == atproto.RepoUploadBlob { 983 + w.WriteHeader(http.StatusOK) 984 + w.Write([]byte(`{"blob":{"$type":"blob","ref":{"$link":"bafytest"},"mimeType":"application/json","size":1}}`)) 985 + return 986 + } 987 + // putRecord returns 429. 988 + w.Header().Set("ratelimit-limit", "100") 989 + w.Header().Set("ratelimit-remaining", "0") 990 + w.Header().Set("ratelimit-reset", fmt.Sprintf("%d", resetAt)) 991 + w.WriteHeader(http.StatusTooManyRequests) 992 + w.Write([]byte(`{"error":"RateLimitExceeded","message":"Rate Limit Exceeded"}`)) 993 + })) 994 + defer server.Close() 995 + 996 + client := atproto.NewClient(server.URL, "did:plc:test123", "token") 997 + db := &mockHoldDIDLookup{} 998 + rctx := mockRegistryContext(client, "myapp", "did:web:hold.example.com", "did:plc:test123", "test.handle", db) 999 + store := NewManifestStore(rctx, nil) 1000 + 1001 + carrier := NewRetryAfterCarrier() 1002 + ctx := context.WithValue(context.Background(), RetryAfterContextKey, carrier) 1003 + 1004 + _, err := store.Put(ctx, &rawManifest{ 1005 + mediaType: "application/vnd.oci.image.manifest.v1+json", 1006 + payload: ociManifest, 1007 + }) 1008 + if err == nil { 1009 + t.Fatal("expected error, got nil") 1010 + } 1011 + 1012 + var ec errcode.Error 1013 + if !errors.As(err, &ec) { 1014 + t.Fatalf("expected errcode.Error, got %T: %v", err, err) 1015 + } 1016 + if ec.Code != errcode.ErrorCodeTooManyRequests { 1017 + t.Errorf("Code = %v, want ErrorCodeTooManyRequests", ec.Code) 1018 + } 1019 + if got := carrier.Duration(); got <= 0 { 1020 + t.Errorf("expected carrier to have a Retry-After duration, got %v", got) 1021 + } 1022 + }
+57
pkg/appview/storage/retryafter.go
··· 1 + package storage 2 + 3 + import ( 4 + "context" 5 + "sync" 6 + "time" 7 + ) 8 + 9 + // RetryAfterCarrier is a request-scoped, mutable container for a Retry-After 10 + // hint emitted by storage handlers (e.g., when an upstream PDS returns 429). 11 + // HTTP middleware injects an empty carrier into the request context; deep 12 + // handlers populate it via SetRetryAfter when they convert a rate-limit error 13 + // into a 429 response. The middleware then reads it back to set the 14 + // Retry-After response header. 15 + type RetryAfterCarrier struct { 16 + mu sync.Mutex 17 + duration time.Duration 18 + } 19 + 20 + const RetryAfterContextKey contextKey = "atcr.retry-after" 21 + 22 + // NewRetryAfterCarrier returns an empty carrier ready to be stored in context. 23 + func NewRetryAfterCarrier() *RetryAfterCarrier { 24 + return &RetryAfterCarrier{} 25 + } 26 + 27 + // Set records a retry-after hint. Largest value wins (a later, longer 28 + // throttle window in a multi-write request shouldn't be clobbered by a 29 + // shorter one). 30 + func (c *RetryAfterCarrier) Set(d time.Duration) { 31 + if c == nil || d <= 0 { 32 + return 33 + } 34 + c.mu.Lock() 35 + if d > c.duration { 36 + c.duration = d 37 + } 38 + c.mu.Unlock() 39 + } 40 + 41 + // Duration returns the recorded retry-after value, or 0 if none was set. 42 + func (c *RetryAfterCarrier) Duration() time.Duration { 43 + if c == nil { 44 + return 0 45 + } 46 + c.mu.Lock() 47 + defer c.mu.Unlock() 48 + return c.duration 49 + } 50 + 51 + // SetRetryAfter is a convenience helper for handlers that have a context but 52 + // not a direct carrier reference. 53 + func SetRetryAfter(ctx context.Context, d time.Duration) { 54 + if c, ok := ctx.Value(RetryAfterContextKey).(*RetryAfterCarrier); ok { 55 + c.Set(d) 56 + } 57 + }
+49
pkg/atproto/client.go
··· 23 23 ErrRecordNotFound = errors.New("record not found") 24 24 ) 25 25 26 + // RateLimitError indicates that the upstream PDS returned 429 (RateLimitExceeded). 27 + // It carries an optional RetryAfter duration derived from PDS rate-limit headers 28 + // so callers can surface it to clients (e.g., as a Retry-After response header). 29 + type RateLimitError struct { 30 + Wrapped error 31 + RetryAfter time.Duration // 0 if unknown 32 + } 33 + 34 + func (e *RateLimitError) Error() string { 35 + if e.Wrapped != nil { 36 + return e.Wrapped.Error() 37 + } 38 + return "rate limit exceeded" 39 + } 40 + 41 + func (e *RateLimitError) Unwrap() error { return e.Wrapped } 42 + 43 + // asRateLimitError inspects err and, if it represents a 429 from the PDS, 44 + // returns a *RateLimitError wrapping it. Returns nil otherwise. 45 + func asRateLimitError(err error) *RateLimitError { 46 + if err == nil { 47 + return nil 48 + } 49 + var xrpcErr *xrpc.Error 50 + if errors.As(err, &xrpcErr) && xrpcErr.StatusCode == http.StatusTooManyRequests { 51 + var retryAfter time.Duration 52 + if xrpcErr.Ratelimit != nil && !xrpcErr.Ratelimit.Reset.IsZero() { 53 + if d := time.Until(xrpcErr.Ratelimit.Reset); d > 0 { 54 + retryAfter = d 55 + } 56 + } 57 + return &RateLimitError{Wrapped: err, RetryAfter: retryAfter} 58 + } 59 + var apiErr *atclient.APIError 60 + if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusTooManyRequests { 61 + return &RateLimitError{Wrapped: err} 62 + } 63 + return nil 64 + } 65 + 26 66 // ClientProvider abstracts OAuth vs Basic Auth client creation. 27 67 // This allows the same code path for all PDS operations regardless of auth type. 28 68 type ClientProvider interface { ··· 146 186 return client.LexDo(ctx, "POST", "application/json", "com.atproto.repo.putRecord", nil, payload, &result) 147 187 }) 148 188 if err != nil { 189 + if rl := asRateLimitError(err); rl != nil { 190 + return nil, rl 191 + } 149 192 return nil, fmt.Errorf("putRecord failed: %w", err) 150 193 } 151 194 return &result, nil ··· 198 241 return client.LexDo(ctx, "POST", "application/json", "com.atproto.repo.deleteRecord", nil, payload, &result) 199 242 }) 200 243 if err != nil { 244 + if rl := asRateLimitError(err); rl != nil { 245 + return rl 246 + } 201 247 return fmt.Errorf("deleteRecord failed: %w", err) 202 248 } 203 249 return nil ··· 250 296 return client.LexDo(ctx, "POST", mimeType, "com.atproto.repo.uploadBlob", nil, bytes.NewReader(data), &result) 251 297 }) 252 298 if err != nil { 299 + if rl := asRateLimitError(err); rl != nil { 300 + return nil, rl 301 + } 253 302 return nil, fmt.Errorf("uploadBlob failed: %w", err) 254 303 } 255 304 return &result.Blob, nil
+53
pkg/atproto/client_test.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "errors" 7 + "fmt" 6 8 "net/http" 7 9 "net/http/httptest" 8 10 "strings" ··· 1043 1045 t.Error("Expected error from GetBlob, got nil") 1044 1046 } 1045 1047 } 1048 + 1049 + // TestPutRecord_RateLimited verifies that a 429 from the PDS surfaces as a 1050 + // *RateLimitError carrying the Retry-After hint derived from ratelimit-reset. 1051 + func TestPutRecord_RateLimited(t *testing.T) { 1052 + resetAt := time.Now().Add(45 * time.Second).Unix() 1053 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1054 + w.Header().Set("ratelimit-limit", "100") 1055 + w.Header().Set("ratelimit-remaining", "0") 1056 + w.Header().Set("ratelimit-reset", fmt.Sprintf("%d", resetAt)) 1057 + w.WriteHeader(http.StatusTooManyRequests) 1058 + w.Write([]byte(`{"error":"RateLimitExceeded","message":"Rate Limit Exceeded"}`)) 1059 + })) 1060 + defer server.Close() 1061 + 1062 + client := NewClient(server.URL, "did:plc:test123", "test-token") 1063 + _, err := client.PutRecord(context.Background(), ManifestCollection, "abc", map[string]any{"k": "v"}) 1064 + if err == nil { 1065 + t.Fatal("expected error, got nil") 1066 + } 1067 + 1068 + var rl *RateLimitError 1069 + if !errors.As(err, &rl) { 1070 + t.Fatalf("expected *RateLimitError, got %T: %v", err, err) 1071 + } 1072 + if rl.RetryAfter <= 0 { 1073 + t.Errorf("expected non-zero RetryAfter, got %v", rl.RetryAfter) 1074 + } 1075 + if rl.RetryAfter > 60*time.Second { 1076 + t.Errorf("RetryAfter %v exceeds expected upper bound", rl.RetryAfter) 1077 + } 1078 + } 1079 + 1080 + // TestPutRecord_NonRateLimitErrorPassthrough verifies that non-429 errors 1081 + // are not coerced into RateLimitError. 1082 + func TestPutRecord_NonRateLimitErrorPassthrough(t *testing.T) { 1083 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 1084 + w.WriteHeader(http.StatusBadRequest) 1085 + w.Write([]byte(`{"error":"InvalidRequest","message":"bad"}`)) 1086 + })) 1087 + defer server.Close() 1088 + 1089 + client := NewClient(server.URL, "did:plc:test123", "test-token") 1090 + _, err := client.PutRecord(context.Background(), ManifestCollection, "abc", map[string]any{"k": "v"}) 1091 + if err == nil { 1092 + t.Fatal("expected error, got nil") 1093 + } 1094 + var rl *RateLimitError 1095 + if errors.As(err, &rl) { 1096 + t.Fatalf("did not expect *RateLimitError for 400, got %v", err) 1097 + } 1098 + }