···8282 key_path: /var/lib/atcr/auth/private-key.pem
8383 # X.509 certificate matching the JWT signing key.
8484 cert_path: /var/lib/atcr/auth/private-key.crt
8585+# Credential helper download settings.
8686+credential_helper:
8787+ # Tangled repository URL for credential helper downloads.
8888+ tangled_repo: ""
8589# Legal page customization for self-hosted instances.
8690legal:
8791 # Organization name for Terms of Service and Privacy Policy. Defaults to server.client_name.
···9296ai:
9397 # Anthropic API key for AI Image Advisor. Also reads CLAUDE_API_KEY env var as fallback.
9498 api_key: ""
9999+# ATProto labeler for content moderation (DMCA takedowns).
100100+labeler:
101101+ # DID or URL of the ATProto labeler (e.g., did:web:labeler.atcr.io). Empty disables label filtering.
102102+ did: ""
95103# Stripe billing integration (requires -tags billing build).
96104billing:
97105 # Stripe secret key. Can also be set via STRIPE_SECRET_KEY env var (takes precedence). Billing is enabled automatically when set.
+6
config-hold.example.yaml
···135135 secret: ""
136136 # Minimum interval between re-scans of the same manifest. When set, the hold proactively scans manifests when the scanner is idle. Default: 168h (7 days). Set to 0 to disable.
137137 rescan_interval: 168h0m0s
138138+# Labeler subscription settings. When configured, the hold consumes takedown labels from the named labeler and purges affected records on receipt; GC consults the cache to gate blob cleanup. Empty subscribe_url disables.
139139+labeler:
140140+ # DID or URL of the ATProto labeler (e.g., did:web:labeler.atcr.io). Empty disables labeler integration.
141141+ did: ""
142142+ # Reversibility window for takedowns. Blobs survive this long after a takedown so the action can be reversed. After this window the GC reclaims them. Default: 720h (30 days).
143143+ grace_window: 720h0m0s
+5
deploy/upcloud/configs/hold.yaml.tmpl
···6161scanner:
6262 secret: "{{.ScannerSecret}}"
6363 rescan_interval: 168h0m0s
6464+labeler:
6565+ # Subscribe to the appview's labeler so takedowns purge records on this
6666+ # hold and the GC honors the reversibility window. Empty disables.
6767+ did: "did:web:seamark.dev"
6868+ grace_window: 720h0m0s
6469
+7
docker-compose.yml
···6565 HOLD_REGISTRATION_ALLOW_ALL_CREW: true
6666 HOLD_SERVER_TEST_MODE: true
6767 HOLD_LOG_LEVEL: debug
6868+ # Subscribe to the dev labeler so takedowns purge records on this hold and
6969+ # GC honors the reversibility window. Same value the appview uses for
7070+ # ATCR_LABELER_DID — accepts a did:web identifier or a raw URL.
7171+ HOLD_LABELER_DID: http://172.28.0.4:5002
7272+ # Short grace window for dev so the takedown→GC path is exercisable without
7373+ # waiting weeks. Production default is 720h (30 days).
7474+ HOLD_LABELER_GRACE_WINDOW: 1h
6875 LOG_SHIPPER_BACKEND: victoria
6976 LOG_SHIPPER_URL: http://172.28.0.10:9428
7077 # S3 storage config comes from env_file (AWS_*, S3_*)
+2
docs/HOLD_XRPC_ENDPOINTS.md
···3737|----------|--------|-------------|
3838| `/xrpc/com.atproto.repo.deleteRecord` | POST | Delete a record |
3939| `/xrpc/com.atproto.repo.uploadBlob` | POST | Upload ATProto blob |
4040+| `/xrpc/io.atcr.hold.purgeManifest` | POST | Purge layer/scan/image-config records for a manifest (eager delete + takedown). Idempotent. |
40414142### Auth Required (Service Token or DPoP)
4243···8283| `/xrpc/io.atcr.hold.getQuota` | GET | none | Get user quota info |
8384| `/xrpc/io.atcr.hold.getLayersForManifest` | GET | none | Get layer records for a manifest AT-URI |
8485| `/xrpc/io.atcr.hold.image.getConfig` | GET | none | Get OCI image config record for a manifest digest |
8686+| `/xrpc/io.atcr.hold.purgeManifest` | POST | owner/crew admin | Purge layer/scan/image-config records for a single manifest URI. Called by appview on UI delete; called internally on takedown receipt. Does not delete S3 blobs (GC handles those). |
8587| `/xrpc/io.atcr.hold.listTiers` | GET | none | List hold's available tiers with quotas and features (scanOnPush) |
8688| `/xrpc/io.atcr.hold.updateCrewTier` | POST | appview token | Update crew member's tier |
8789
+54
lexicons/io/atcr/hold/purgeManifest.json
···11+{
22+ "lexicon": 1,
33+ "id": "io.atcr.hold.purgeManifest",
44+ "defs": {
55+ "main": {
66+ "type": "procedure",
77+ "description": "Purge layer, scan, and image-config records associated with a manifest. Used by the appview when a user deletes a manifest, and by the hold's own labeler subscriber on takedown receipt. Idempotent: missing records are not errors. Does not delete S3 blobs (GC handles that based on remaining references).",
88+ "input": {
99+ "encoding": "application/json",
1010+ "schema": {
1111+ "type": "object",
1212+ "required": ["manifestUri"],
1313+ "properties": {
1414+ "manifestUri": {
1515+ "type": "string",
1616+ "format": "at-uri",
1717+ "description": "AT-URI of the manifest record, e.g. at://did:plc:xyz/io.atcr.manifest/<digest>"
1818+ }
1919+ }
2020+ }
2121+ },
2222+ "output": {
2323+ "encoding": "application/json",
2424+ "schema": {
2525+ "type": "object",
2626+ "required": ["success", "layersDeleted", "scanDeleted", "imageConfigDeleted"],
2727+ "properties": {
2828+ "success": {
2929+ "type": "boolean",
3030+ "description": "Whether the purge completed successfully"
3131+ },
3232+ "layersDeleted": {
3333+ "type": "integer",
3434+ "description": "Number of layer records deleted"
3535+ },
3636+ "scanDeleted": {
3737+ "type": "boolean",
3838+ "description": "Whether a scan record was deleted"
3939+ },
4040+ "imageConfigDeleted": {
4141+ "type": "boolean",
4242+ "description": "Whether an image config record was deleted"
4343+ }
4444+ }
4545+ }
4646+ },
4747+ "errors": [
4848+ { "name": "AuthRequired" },
4949+ { "name": "InvalidRequest" },
5050+ { "name": "PurgeFailed" }
5151+ ]
5252+ }
5353+ }
5454+}
+12-5
pkg/appview/db/labels.go
···2222}
23232424// Label represents an ATProto label mirrored from a labeler service.
2525+// Exp is the optional expiration timestamp from the ATProto label spec;
2626+// nil means the label does not expire.
2527type Label struct {
2628 ID int64
2729 Src string
···2931 Val string
3032 Neg bool
3133 Cts time.Time
3434+ Exp *time.Time
3235 SubjectDID string
3336 SubjectRepo string
3437 Seq int64
···4952 WHERE l2.src = l1.src AND l2.uri = l1.uri AND l2.val = l1.val
5053 AND l2.neg = 1 AND l2.id > l1.id
5154 )
5252- AND (l1.exp IS NULL OR l1.exp > CURRENT_TIMESTAMP)
5555+ AND (l1.exp IS NULL OR datetime(l1.exp) > CURRENT_TIMESTAMP)
5356 )`,
5457 did, repository,
5558 ).Scan(&exists)
···58615962// UpsertLabel inserts or updates a label from a labeler subscription.
6063func UpsertLabel(db DBTX, l *Label) error {
6464+ var exp any
6565+ if l.Exp != nil {
6666+ exp = l.Exp.UTC().Format(time.RFC3339)
6767+ }
6168 _, err := db.Exec(
6262- `INSERT INTO labels (src, uri, val, neg, cts, subject_did, subject_repo, seq)
6363- VALUES (?, ?, ?, ?, ?, ?, ?, ?)
6464- ON CONFLICT(src, uri, val, neg) DO UPDATE SET cts = excluded.cts, seq = excluded.seq`,
6565- l.Src, l.URI, l.Val, l.Neg, l.Cts.UTC().Format(time.RFC3339),
6969+ `INSERT INTO labels (src, uri, val, neg, cts, exp, subject_did, subject_repo, seq)
7070+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
7171+ ON CONFLICT(src, uri, val, neg) DO UPDATE SET cts = excluded.cts, exp = excluded.exp, seq = excluded.seq`,
7272+ l.Src, l.URI, l.Val, l.Neg, l.Cts.UTC().Format(time.RFC3339), exp,
6673 l.SubjectDID, l.SubjectRepo, l.Seq,
6774 )
6875 return err
+138
pkg/appview/db/labels_test.go
···11+package db
22+33+import (
44+ "testing"
55+ "time"
66+)
77+88+// TestIsTakenDown_ExpRespected verifies that the takedown check honors the
99+// optional ATProto label expiration: NULL and future-dated exp values count
1010+// the label as active, while past-dated exp values exclude it.
1111+func TestIsTakenDown_ExpRespected(t *testing.T) {
1212+ db, err := InitDB("file:TestIsTakenDown_ExpRespected?mode=memory&cache=shared", LibsqlConfig{})
1313+ if err != nil {
1414+ t.Fatalf("init db: %v", err)
1515+ }
1616+ defer db.Close()
1717+1818+ now := time.Now().UTC()
1919+ past := now.Add(-1 * time.Hour)
2020+ future := now.Add(1 * time.Hour)
2121+2222+ cases := []struct {
2323+ name string
2424+ did string
2525+ exp *time.Time
2626+ wantHit bool
2727+ }{
2828+ {"no_exp_active", "did:plc:noexp", nil, true},
2929+ {"future_exp_active", "did:plc:future", &future, true},
3030+ {"past_exp_inactive", "did:plc:past", &past, false},
3131+ }
3232+3333+ for _, tc := range cases {
3434+ label := &Label{
3535+ Src: "did:plc:labeler",
3636+ URI: "at://" + tc.did + "/io.atcr.repo.page/foo",
3737+ Val: "!takedown",
3838+ Neg: false,
3939+ Cts: now,
4040+ Exp: tc.exp,
4141+ SubjectDID: tc.did,
4242+ Seq: 1,
4343+ }
4444+ if err := UpsertLabel(db, label); err != nil {
4545+ t.Fatalf("%s: upsert label: %v", tc.name, err)
4646+ }
4747+4848+ got, err := IsTakenDown(db, tc.did, "foo")
4949+ if err != nil {
5050+ t.Fatalf("%s: IsTakenDown: %v", tc.name, err)
5151+ }
5252+ if got != tc.wantHit {
5353+ t.Errorf("%s: IsTakenDown = %v, want %v", tc.name, got, tc.wantHit)
5454+ }
5555+ }
5656+}
5757+5858+// TestIsTakenDown_NegationWinsOverExp verifies that a later negation row
5959+// suppresses an earlier non-expired takedown — exp doesn't shield it from
6060+// being reversed by a !takedown neg=1 with a higher id.
6161+func TestIsTakenDown_NegationWinsOverExp(t *testing.T) {
6262+ db, err := InitDB("file:TestIsTakenDown_NegationWinsOverExp?mode=memory&cache=shared", LibsqlConfig{})
6363+ if err != nil {
6464+ t.Fatalf("init db: %v", err)
6565+ }
6666+ defer db.Close()
6767+6868+ did := "did:plc:reversed"
6969+ uri := "at://" + did + "/io.atcr.repo.page/bar"
7070+ src := "did:plc:labeler"
7171+ now := time.Now().UTC()
7272+ future := now.Add(24 * time.Hour)
7373+7474+ if err := UpsertLabel(db, &Label{
7575+ Src: src, URI: uri, Val: "!takedown", Neg: false,
7676+ Cts: now, Exp: &future, SubjectDID: did, Seq: 1,
7777+ }); err != nil {
7878+ t.Fatalf("seed takedown: %v", err)
7979+ }
8080+ if err := UpsertLabel(db, &Label{
8181+ Src: src, URI: uri, Val: "!takedown", Neg: true,
8282+ Cts: now.Add(time.Minute), SubjectDID: did, Seq: 2,
8383+ }); err != nil {
8484+ t.Fatalf("seed reversal: %v", err)
8585+ }
8686+8787+ got, err := IsTakenDown(db, did, "bar")
8888+ if err != nil {
8989+ t.Fatalf("IsTakenDown: %v", err)
9090+ }
9191+ if got {
9292+ t.Errorf("IsTakenDown = true, want false (reversal should suppress non-expired takedown)")
9393+ }
9494+}
9595+9696+// TestUpsertLabel_ExpUpdatedOnConflict verifies that upserting an existing
9797+// label row updates the exp column (not just cts/seq) — so a labeler that
9898+// extends or removes an expiration is reflected.
9999+func TestUpsertLabel_ExpUpdatedOnConflict(t *testing.T) {
100100+ db, err := InitDB("file:TestUpsertLabel_ExpUpdatedOnConflict?mode=memory&cache=shared", LibsqlConfig{})
101101+ if err != nil {
102102+ t.Fatalf("init db: %v", err)
103103+ }
104104+ defer db.Close()
105105+106106+ did := "did:plc:updated"
107107+ src := "did:plc:labeler"
108108+ uri := "at://" + did + "/io.atcr.repo.page/baz"
109109+ now := time.Now().UTC()
110110+ past := now.Add(-1 * time.Hour)
111111+ future := now.Add(1 * time.Hour)
112112+113113+ // Seed with an already-expired label — IsTakenDown should be false.
114114+ if err := UpsertLabel(db, &Label{
115115+ Src: src, URI: uri, Val: "!takedown", Neg: false,
116116+ Cts: now, Exp: &past, SubjectDID: did, Seq: 1,
117117+ }); err != nil {
118118+ t.Fatalf("seed: %v", err)
119119+ }
120120+ if got, _ := IsTakenDown(db, did, "baz"); got {
121121+ t.Fatalf("expected expired label to be inactive before update")
122122+ }
123123+124124+ // Re-upsert with a future exp — same UNIQUE key, should update exp.
125125+ if err := UpsertLabel(db, &Label{
126126+ Src: src, URI: uri, Val: "!takedown", Neg: false,
127127+ Cts: now.Add(time.Minute), Exp: &future, SubjectDID: did, Seq: 2,
128128+ }); err != nil {
129129+ t.Fatalf("re-upsert: %v", err)
130130+ }
131131+ got, err := IsTakenDown(db, did, "baz")
132132+ if err != nil {
133133+ t.Fatalf("IsTakenDown after update: %v", err)
134134+ }
135135+ if !got {
136136+ t.Errorf("IsTakenDown = false, want true after extending exp into the future")
137137+ }
138138+}
···11+description: Add optional exp (expiration) column to labels for ATProto label spec
22+query: |
33+ ALTER TABLE labels ADD COLUMN exp TIMESTAMP;
+1-1
pkg/appview/db/queries.go
···3737 WHERE l2.src = l1.src AND l2.uri = l1.uri AND l2.val = l1.val
3838 AND l2.neg = 1 AND l2.id > l1.id
3939 )
4040- AND (l1.exp IS NULL OR l1.exp > CURRENT_TIMESTAMP)
4040+ AND (l1.exp IS NULL OR datetime(l1.exp) > CURRENT_TIMESTAMP)
4141 )`
4242}
4343
+1
pkg/appview/db/schema.sql
···306306 val TEXT NOT NULL,
307307 neg BOOLEAN NOT NULL DEFAULT 0,
308308 cts TIMESTAMP NOT NULL,
309309+ exp TIMESTAMP,
309310 subject_did TEXT NOT NULL,
310311 subject_repo TEXT NOT NULL DEFAULT '',
311312 seq INTEGER NOT NULL DEFAULT 0,
+118
pkg/appview/handlers/hold_purge.go
···11+package handlers
22+33+import (
44+ "bytes"
55+ "context"
66+ "encoding/json"
77+ "io"
88+ "log/slog"
99+ "net/http"
1010+ "time"
1111+1212+ "atcr.io/pkg/atproto"
1313+ "atcr.io/pkg/auth"
1414+ "atcr.io/pkg/auth/oauth"
1515+)
1616+1717+// purgeManifestRequest is the JSON body sent to io.atcr.hold.purgeManifest.
1818+type purgeManifestRequest struct {
1919+ ManifestURI string `json:"manifestUri"`
2020+}
2121+2222+// purgeOnHold tells the hold to delete the layer, scan, and image-config
2323+// records associated with a single manifest. This is best-effort: callers
2424+// should treat all errors as "log and continue" because lazy GC on the hold
2525+// will catch up either way (and on third-party holds the user may not even
2626+// have the captain/crew-admin permission needed for the call to succeed).
2727+//
2828+// holdDID identifies which hold owns the manifest's blobs (typically the
2929+// `hold_endpoint` column on the manifests row, or a freshly-resolved value
3030+// from the manifest record). userDID + pdsEndpoint are the OAuth-acting
3131+// user — the service token is minted from their PDS with audience = holdDID.
3232+func purgeOnHold(ctx context.Context, refresher *oauth.Refresher, userDID, pdsEndpoint, holdDID, manifestURI string) {
3333+ if holdDID == "" || manifestURI == "" {
3434+ return
3535+ }
3636+ if refresher == nil {
3737+ slog.Debug("purgeOnHold: OAuth refresher unavailable; skipping",
3838+ "hold_did", holdDID, "manifest", manifestURI)
3939+ return
4040+ }
4141+4242+ timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
4343+ defer cancel()
4444+4545+ holdURL, err := atproto.ResolveHoldURL(timeoutCtx, holdDID)
4646+ if err != nil {
4747+ slog.Warn("purgeOnHold: failed to resolve hold URL",
4848+ "hold_did", holdDID, "error", err)
4949+ return
5050+ }
5151+5252+ serviceToken, err := auth.GetOrFetchServiceToken(timeoutCtx, refresher, userDID, holdDID, pdsEndpoint)
5353+ if err != nil {
5454+ slog.Warn("purgeOnHold: failed to mint service token",
5555+ "hold_did", holdDID, "user_did", userDID, "error", err)
5656+ return
5757+ }
5858+5959+ body, err := json.Marshal(purgeManifestRequest{ManifestURI: manifestURI})
6060+ if err != nil {
6161+ slog.Warn("purgeOnHold: failed to marshal request",
6262+ "hold_did", holdDID, "error", err)
6363+ return
6464+ }
6565+6666+ req, err := http.NewRequestWithContext(timeoutCtx, http.MethodPost,
6767+ holdURL+atproto.HoldPurgeManifest, bytes.NewReader(body))
6868+ if err != nil {
6969+ slog.Warn("purgeOnHold: failed to create request",
7070+ "hold_did", holdDID, "error", err)
7171+ return
7272+ }
7373+ req.Header.Set("Authorization", "Bearer "+serviceToken)
7474+ req.Header.Set("Content-Type", "application/json")
7575+7676+ resp, err := http.DefaultClient.Do(req)
7777+ if err != nil {
7878+ slog.Warn("purgeOnHold: request failed",
7979+ "hold_did", holdDID, "manifest", manifestURI, "error", err)
8080+ return
8181+ }
8282+ defer resp.Body.Close()
8383+8484+ if resp.StatusCode == http.StatusForbidden || resp.StatusCode == http.StatusUnauthorized {
8585+ // Sailor pushing to a third-party hold won't have captain/crew-admin
8686+ // rights; that's expected. Lazy GC on that hold will reclaim later.
8787+ slog.Debug("purgeOnHold: not authorized on hold (lazy GC will handle)",
8888+ "hold_did", holdDID, "manifest", manifestURI, "status", resp.StatusCode)
8989+ return
9090+ }
9191+ if resp.StatusCode != http.StatusOK {
9292+ body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
9393+ slog.Warn("purgeOnHold: hold returned non-OK status",
9494+ "hold_did", holdDID, "manifest", manifestURI,
9595+ "status", resp.StatusCode, "body", string(body))
9696+ return
9797+ }
9898+9999+ var out struct {
100100+ Success bool `json:"success"`
101101+ LayersDeleted int `json:"layersDeleted"`
102102+ ScanDeleted bool `json:"scanDeleted"`
103103+ ImageConfigDeleted bool `json:"imageConfigDeleted"`
104104+ }
105105+ if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
106106+ slog.Warn("purgeOnHold: failed to parse response",
107107+ "hold_did", holdDID, "manifest", manifestURI, "error", err)
108108+ return
109109+ }
110110+111111+ slog.Info("purgeOnHold: purge succeeded",
112112+ "hold_did", holdDID,
113113+ "manifest", manifestURI,
114114+ "layers_deleted", out.LayersDeleted,
115115+ "scan_deleted", out.ScanDeleted,
116116+ "image_config_deleted", out.ImageConfigDeleted,
117117+ )
118118+}
+25
pkg/appview/handlers/images.go
···156156 }
157157 }
158158159159+ // Read the appview's cached manifest row before deleting it so we know
160160+ // which hold owned the blobs. Best-effort — if not cached, the manifest
161161+ // record from the PDS would also have it but we don't pre-fetch the PDS
162162+ // record just for this. Without a hold DID we just skip the eager purge
163163+ // and fall back to lazy GC on the hold.
164164+ var holdDID string
165165+ if cached, err := db.GetManifestDetail(h.ReadOnlyDB, user.DID, repo, digest); err == nil && cached != nil {
166166+ holdDID = cached.HoldEndpoint
167167+ }
168168+159169 // Compute rkey for manifest record (digest without "sha256:" prefix)
160170 rkey := strings.TrimPrefix(digest, "sha256:")
161171···176186 return
177187 }
178188189189+ // Tell the hold to drop its layer/scan/image-config records for this
190190+ // manifest. Best-effort — failures here only mean the hold's lazy GC
191191+ // will clean up later, so we don't reflect the failure to the user.
192192+ manifestURI := atproto.BuildManifestURI(user.DID, digest)
193193+ purgeOnHold(r.Context(), h.Refresher, user.DID, user.PDSEndpoint, holdDID, manifestURI)
194194+179195 w.WriteHeader(http.StatusOK)
180196}
181197···233249 for _, digest := range digests {
234250 rkey := strings.TrimPrefix(digest, "sha256:")
235251252252+ // Snapshot hold ownership before the delete so we can purge after.
253253+ var holdDID string
254254+ if cached, err := db.GetManifestDetail(h.ReadOnlyDB, user.DID, req.Repo, digest); err == nil && cached != nil {
255255+ holdDID = cached.HoldEndpoint
256256+ }
257257+236258 if err := pdsClient.DeleteRecord(r.Context(), atproto.ManifestCollection, rkey); err != nil {
237259 if handleOAuthError(r.Context(), h.Refresher, user.DID, err) {
238260 render.Status(r, http.StatusUnauthorized)
···252274 failures = append(failures, failure{Digest: digest, Error: fmt.Sprintf("cache: %v", err)})
253275 continue
254276 }
277277+278278+ manifestURI := atproto.BuildManifestURI(user.DID, digest)
279279+ purgeOnHold(r.Context(), h.Refresher, user.DID, user.PDSEndpoint, holdDID, manifestURI)
255280256281 deleted++
257282 }
+10
pkg/appview/labeler/subscriber.go
···129129 cts, _ := time.Parse(time.RFC3339, le.Cts)
130130 did, repo := extractSubjectFromURI(le.Uri)
131131132132+ // Exp is optional in the ATProto label spec — treat unparseable
133133+ // values as "no expiration" rather than dropping the label.
134134+ var exp *time.Time
135135+ if le.Exp != nil {
136136+ if t, err := time.Parse(time.RFC3339, *le.Exp); err == nil {
137137+ exp = &t
138138+ }
139139+ }
140140+132141 label := &db.Label{
133142 Src: le.Src,
134143 URI: le.Uri,
135144 Val: le.Val,
136145 Neg: le.Neg != nil && *le.Neg,
137146 Cts: cts,
147147+ Exp: exp,
138148 SubjectDID: did,
139149 SubjectRepo: repo,
140150 Seq: seq,
+8
pkg/atproto/endpoints.go
···7373 // Method: DELETE
7474 // Response: {"success": true, "crew_deleted": bool, "layers_deleted": int, "stats_deleted": int}
7575 HoldDeleteUserData = "/xrpc/io.atcr.hold.deleteUserData"
7676+7777+ // HoldPurgeManifest purges layer, scan, and image-config records for a single
7878+ // manifest. Called by the appview on UI manifest delete and by the hold's own
7979+ // labeler subscriber on takedown. Idempotent.
8080+ // Method: POST
8181+ // Request: {"manifestUri": "at://did:.../io.atcr.manifest/<digest>"}
8282+ // Response: {"success": true, "layersDeleted": N, "scanDeleted": bool, "imageConfigDeleted": bool}
8383+ HoldPurgeManifest = "/xrpc/io.atcr.hold.purgeManifest"
7684)
77857886// Hold service crew management endpoints (io.atcr.hold.*)
+28
pkg/hold/config.go
···5252 GC gc.Config `yaml:"gc" comment:"Garbage collection settings."`
5353 Quota quota.Config `yaml:"quota" comment:"Storage quota tiers. Empty disables quota enforcement."`
5454 Scanner ScannerConfig `yaml:"scanner" comment:"Vulnerability scanner settings. Empty disables scanning."`
5555+ Labeler LabelerConfig `yaml:"labeler" comment:"Labeler subscription settings. When configured, the hold consumes takedown labels from the named labeler and purges affected records on receipt; GC consults the cache to gate blob cleanup. Empty subscribe_url disables."`
5556 configPath string `yaml:"-"` // internal: path to YAML file for subsystem config loading
5657}
5758···180181 return URLFromDIDWeb(s.AppviewDID)
181182}
182183184184+// LabelerConfig defines labeler subscription settings.
185185+//
186186+// When DID is set, the hold opens a websocket to the labeler's
187187+// com.atproto.label.subscribeLabels endpoint and only honors labels whose
188188+// Src matches the same DID. Active takedowns are cached locally with their
189189+// Cts timestamp; on receipt of a !takedown label the hold immediately purges
190190+// layer/scan/image-config records for the labeled manifest (or all manifests
191191+// by the labeled DID for user-level takedowns). Negations drop the cache
192192+// entry. The GC consults the cache when computing referenced sets so blobs
193193+// survive a configurable grace window before being collected, preserving
194194+// reversibility.
195195+type LabelerConfig struct {
196196+ // DID or URL of the labeler service. Accepts did:web:... (resolved to
197197+ // the corresponding HTTPS host) or a raw http/https URL. Empty disables
198198+ // labeler integration.
199199+ DID string `yaml:"did" comment:"DID or URL of the ATProto labeler (e.g., did:web:labeler.atcr.io). Empty disables labeler integration."`
200200+201201+ // Grace window for reversibility. Until a takedown is older than this,
202202+ // the GC keeps blobs referenced even though their layer records were
203203+ // purged. After this window blobs become eligible for collection.
204204+ GraceWindow time.Duration `yaml:"grace_window" comment:"Reversibility window for takedowns. Blobs survive this long after a takedown so the action can be reversed. After this window the GC reclaims them. Default: 720h (30 days)."`
205205+}
206206+183207// ScannerConfig defines vulnerability scanner settings
184208type ScannerConfig struct {
185209 // Shared secret for scanner WebSocket authentication. Empty disables scanning.
···270294 // Scanner defaults
271295 v.SetDefault("scanner.secret", "")
272296 v.SetDefault("scanner.rescan_interval", "168h") // 7 days
297297+298298+ // Labeler defaults
299299+ v.SetDefault("labeler.did", "")
300300+ v.SetDefault("labeler.grace_window", "720h") // 30 days
273301274302 // Log shipper defaults
275303 v.SetDefault("log_shipper.batch_size", 100)
+85-3
pkg/hold/gc/gc.go
···8383 Duration time.Duration `json:"duration"`
8484}
85858686+// TakedownGate is the interface the GC uses to consult the labeler cache when
8787+// computing reachability. Defined here (rather than imported) to keep the GC
8888+// package free of any direct dependency on the labeler package.
8989+//
9090+// IsTakenDown returns the takedown's creation timestamp and a boolean
9191+// indicating whether the manifest URI is currently under takedown (either by
9292+// per-manifest label or via a user-level label on its DID).
9393+type TakedownGate interface {
9494+ IsTakenDown(manifestURI string) (cts time.Time, ok bool)
9595+}
9696+9797+// Option configures optional GC behavior.
9898+type Option func(*GarbageCollector)
9999+100100+// WithTakedownCache wires a takedown gate (typically the hold's labeler cache)
101101+// and a grace window. When set, analyzeRecords protects blobs of taken-down
102102+// manifests from collection until grace expires, and skips reconciliation of
103103+// their layer records so the labeler-driven purge isn't undone.
104104+//
105105+// Passing a nil gate or a non-positive window leaves the GC behaving as before.
106106+func WithTakedownCache(gate TakedownGate, graceWindow time.Duration) Option {
107107+ return func(gc *GarbageCollector) {
108108+ gc.takedownGate = gate
109109+ gc.takedownGrace = graceWindow
110110+ }
111111+}
112112+86113// GarbageCollector handles cleanup of orphaned blobs from storage
87114type GarbageCollector struct {
88115 pds *pds.HoldPDS
89116 s3 *s3.S3Service
90117 cfg Config
91118 logger *slog.Logger
119119+120120+ // takedownGate, if non-nil, is consulted in analyzeRecords to gate blob
121121+ // reachability and skip reconcile for taken-down manifests.
122122+ takedownGate TakedownGate
123123+ takedownGrace time.Duration
9212493125 // stopCh signals the background goroutine to stop
94126 stopCh chan struct{}
···151183 totalRecords int
152184}
153185154154-// NewGarbageCollector creates a new GC instance
155155-func NewGarbageCollector(holdPDS *pds.HoldPDS, s3svc *s3.S3Service, cfg Config) *GarbageCollector {
156156- return &GarbageCollector{
186186+// NewGarbageCollector creates a new GC instance. Optional behavior (such as
187187+// the labeler-aware takedown gate) is configured via Option arguments.
188188+func NewGarbageCollector(holdPDS *pds.HoldPDS, s3svc *s3.S3Service, cfg Config, opts ...Option) *GarbageCollector {
189189+ gc := &GarbageCollector{
157190 pds: holdPDS,
158191 s3: s3svc,
159192 cfg: cfg,
···161194 stopCh: make(chan struct{}),
162195 predecessorCache: make(map[string]bool),
163196 }
197197+ for _, opt := range opts {
198198+ opt(gc)
199199+ }
200200+ return gc
201201+}
202202+203203+// isManifestTakenDown reports whether the labeler cache (if any) currently
204204+// holds a takedown for this manifest URI, returning the takedown's cts so the
205205+// caller can decide whether the grace window has elapsed.
206206+func (gc *GarbageCollector) isManifestTakenDown(manifestURI string) (time.Time, bool) {
207207+ if gc.takedownGate == nil {
208208+ return time.Time{}, false
209209+ }
210210+ return gc.takedownGate.IsTakenDown(manifestURI)
211211+}
212212+213213+// takedownExpired reports whether a takedown's cts is older than the
214214+// configured grace window. With a non-positive window every takedown is
215215+// considered expired immediately (i.e. blobs are never protected).
216216+func (gc *GarbageCollector) takedownExpired(cts time.Time) bool {
217217+ if gc.takedownGrace <= 0 {
218218+ return true
219219+ }
220220+ return time.Since(cts) > gc.takedownGrace
164221}
165222166223// tryStart attempts to mark GC as running. Returns false if already running.
···636693 fetchedUsers[did] = true
637694 for _, m := range manifests {
638695 result.manifestsChecked++
696696+697697+ // Labeler-aware reachability:
698698+ // - in-grace takedown: blobs stay referenced (so a reversal can
699699+ // restore content), but the manifest is NOT added to
700700+ // knownManifests so reconcileMissingRecords won't recreate the
701701+ // layer records the labeler subscriber just purged.
702702+ // - past-grace takedown: skip entirely — digests fall out of
703703+ // the referenced set and become eligible for blob GC.
704704+ if cts, taken := gc.isManifestTakenDown(m.URI); taken {
705705+ if !gc.takedownExpired(cts) {
706706+ for _, layer := range m.Record.Layers {
707707+ result.referenced[layer.Digest] = true
708708+ }
709709+ if m.Record.Config != nil && m.Record.Config.Digest != "" {
710710+ result.referenced[m.Record.Config.Digest] = true
711711+ }
712712+ gc.logger.Debug("Manifest under in-grace takedown: blobs protected, reconcile skipped",
713713+ "manifest", m.URI, "cts", cts)
714714+ } else {
715715+ gc.logger.Debug("Manifest takedown past grace: orphaning blobs",
716716+ "manifest", m.URI, "cts", cts)
717717+ }
718718+ continue
719719+ }
720720+639721 knownManifests[m.URI] = m
640722641723 // Add all layer digests to referenced set
+68
pkg/hold/gc/takedown_gate_test.go
···11+package gc
22+33+import (
44+ "testing"
55+ "time"
66+)
77+88+// stubGate satisfies TakedownGate for tests so we can exercise the GC's
99+// reachability decisions without standing up a full labeler cache or PDS.
1010+type stubGate struct {
1111+ uri string
1212+ cts time.Time
1313+}
1414+1515+func (s stubGate) IsTakenDown(uri string) (time.Time, bool) {
1616+ if uri == s.uri {
1717+ return s.cts, true
1818+ }
1919+ return time.Time{}, false
2020+}
2121+2222+func TestIsManifestTakenDown(t *testing.T) {
2323+ t.Run("nil gate", func(t *testing.T) {
2424+ gc := &GarbageCollector{}
2525+ if _, ok := gc.isManifestTakenDown("at://x"); ok {
2626+ t.Fatalf("nil gate should report no takedowns")
2727+ }
2828+ })
2929+ t.Run("matching uri", func(t *testing.T) {
3030+ cts := time.Now()
3131+ gc := &GarbageCollector{takedownGate: stubGate{uri: "at://x", cts: cts}}
3232+ got, ok := gc.isManifestTakenDown("at://x")
3333+ if !ok {
3434+ t.Fatalf("gate should report takedown for matching URI")
3535+ }
3636+ if !got.Equal(cts) {
3737+ t.Fatalf("cts = %v, want %v", got, cts)
3838+ }
3939+ })
4040+ t.Run("non-matching uri", func(t *testing.T) {
4141+ gc := &GarbageCollector{takedownGate: stubGate{uri: "at://x", cts: time.Now()}}
4242+ if _, ok := gc.isManifestTakenDown("at://y"); ok {
4343+ t.Fatalf("non-matching URI should not report takedown")
4444+ }
4545+ })
4646+}
4747+4848+func TestTakedownExpired(t *testing.T) {
4949+ tests := []struct {
5050+ name string
5151+ cts time.Time
5252+ grace time.Duration
5353+ expired bool
5454+ }{
5555+ {"in-window", time.Now().Add(-time.Hour), 24 * time.Hour, false},
5656+ {"past-window", time.Now().Add(-48 * time.Hour), 24 * time.Hour, true},
5757+ {"zero grace expires immediately", time.Now(), 0, true},
5858+ {"negative grace expires immediately", time.Now(), -time.Hour, true},
5959+ }
6060+ for _, tt := range tests {
6161+ t.Run(tt.name, func(t *testing.T) {
6262+ gc := &GarbageCollector{takedownGrace: tt.grace}
6363+ if got := gc.takedownExpired(tt.cts); got != tt.expired {
6464+ t.Fatalf("takedownExpired = %v, want %v", got, tt.expired)
6565+ }
6666+ })
6767+ }
6868+}
+177
pkg/hold/labeler/cache.go
···11+// Package labeler provides a labeler subscription client for the hold service.
22+//
33+// The hold subscribes to one labeler and mirrors active takedowns into a local
44+// cache (in-memory + SQLite). On takedown receipt the hold purges layer, scan,
55+// and image-config records for the affected manifest. The cache is consulted
66+// by the GC to gate blob deletion: while a takedown is within its grace window,
77+// the manifest's blob digests stay in the GC's referenced set so reversal can
88+// still restore them.
99+package labeler
1010+1111+import (
1212+ "database/sql"
1313+ "fmt"
1414+ "strings"
1515+ "sync"
1616+ "time"
1717+)
1818+1919+// Cache holds active takedown URIs and their creation timestamps. It is
2020+// thread-safe and persists state to SQLite so the hold can answer takedown
2121+// queries before the labeler subscription has caught up after a restart.
2222+type Cache struct {
2323+ mu sync.RWMutex
2424+ // manifest URI → cts. Includes both per-record URIs (at://did/coll/rkey)
2525+ // and per-DID URIs (at://did) for user-level takedowns.
2626+ entries map[string]time.Time
2727+2828+ db *sql.DB
2929+}
3030+3131+// NewCache opens (or creates) the takedown_cache and labeler_cursor tables on
3232+// the given DB and loads any existing entries into memory.
3333+func NewCache(db *sql.DB) (*Cache, error) {
3434+ c := &Cache{
3535+ entries: make(map[string]time.Time),
3636+ db: db,
3737+ }
3838+3939+ stmts := []string{
4040+ `CREATE TABLE IF NOT EXISTS takedown_cache (
4141+ uri TEXT PRIMARY KEY,
4242+ src TEXT NOT NULL,
4343+ cts TIMESTAMP NOT NULL
4444+ )`,
4545+ `CREATE INDEX IF NOT EXISTS idx_takedown_cache_cts ON takedown_cache(cts)`,
4646+ `CREATE TABLE IF NOT EXISTS labeler_cursor (
4747+ labeler_did TEXT PRIMARY KEY,
4848+ cursor INTEGER NOT NULL
4949+ )`,
5050+ }
5151+ for _, s := range stmts {
5252+ if _, err := db.Exec(s); err != nil {
5353+ return nil, fmt.Errorf("init labeler schema: %w", err)
5454+ }
5555+ }
5656+5757+ rows, err := db.Query(`SELECT uri, cts FROM takedown_cache`)
5858+ if err != nil {
5959+ return nil, fmt.Errorf("load takedown cache: %w", err)
6060+ }
6161+ defer rows.Close()
6262+ for rows.Next() {
6363+ var uri string
6464+ var cts time.Time
6565+ if err := rows.Scan(&uri, &cts); err != nil {
6666+ return nil, fmt.Errorf("scan takedown_cache row: %w", err)
6767+ }
6868+ c.entries[uri] = cts
6969+ }
7070+ return c, nil
7171+}
7272+7373+// Set records a positive takedown for uri at cts. Idempotent: re-applying
7474+// updates the timestamp (newer takedowns win).
7575+func (c *Cache) Set(uri, src string, cts time.Time) error {
7676+ c.mu.Lock()
7777+ c.entries[uri] = cts
7878+ c.mu.Unlock()
7979+8080+ _, err := c.db.Exec(
8181+ `INSERT INTO takedown_cache (uri, src, cts) VALUES (?, ?, ?)
8282+ ON CONFLICT(uri) DO UPDATE SET src = excluded.src, cts = excluded.cts`,
8383+ uri, src, cts,
8484+ )
8585+ if err != nil {
8686+ return fmt.Errorf("persist takedown: %w", err)
8787+ }
8888+ return nil
8989+}
9090+9191+// Negate removes a takedown entry. Idempotent.
9292+func (c *Cache) Negate(uri string) error {
9393+ c.mu.Lock()
9494+ delete(c.entries, uri)
9595+ c.mu.Unlock()
9696+9797+ _, err := c.db.Exec(`DELETE FROM takedown_cache WHERE uri = ?`, uri)
9898+ if err != nil {
9999+ return fmt.Errorf("delete takedown: %w", err)
100100+ }
101101+ return nil
102102+}
103103+104104+// IsTakenDown reports whether a manifest URI is taken down, either directly
105105+// (per-manifest takedown) or via a user-level takedown on its DID. The returned
106106+// timestamp is the earliest cts that applies (i.e. the longest-standing
107107+// takedown), which is what the grace check should compare against.
108108+func (c *Cache) IsTakenDown(manifestURI string) (cts time.Time, ok bool) {
109109+ c.mu.RLock()
110110+ defer c.mu.RUnlock()
111111+112112+ if t, has := c.entries[manifestURI]; has {
113113+ cts = t
114114+ ok = true
115115+ }
116116+117117+ // User-level: at://<did> with no path, applies to every record by that DID.
118118+ did := didFromManifestURI(manifestURI)
119119+ if did != "" {
120120+ userURI := "at://" + did
121121+ if t, has := c.entries[userURI]; has {
122122+ if !ok || t.Before(cts) {
123123+ cts = t
124124+ ok = true
125125+ }
126126+ }
127127+ }
128128+ return cts, ok
129129+}
130130+131131+// IsExpired returns true if cts is older than the grace window.
132132+func IsExpired(cts time.Time, graceWindow time.Duration) bool {
133133+ if graceWindow <= 0 {
134134+ return true
135135+ }
136136+ return time.Since(cts) > graceWindow
137137+}
138138+139139+// GetCursor returns the last persisted cursor for a labeler DID (0 if none).
140140+func (c *Cache) GetCursor(labelerDID string) (int64, error) {
141141+ var cursor int64
142142+ err := c.db.QueryRow(`SELECT cursor FROM labeler_cursor WHERE labeler_did = ?`, labelerDID).Scan(&cursor)
143143+ if err == sql.ErrNoRows {
144144+ return 0, nil
145145+ }
146146+ if err != nil {
147147+ return 0, fmt.Errorf("read labeler cursor: %w", err)
148148+ }
149149+ return cursor, nil
150150+}
151151+152152+// SetCursor persists the cursor for a labeler DID.
153153+func (c *Cache) SetCursor(labelerDID string, cursor int64) error {
154154+ _, err := c.db.Exec(
155155+ `INSERT INTO labeler_cursor (labeler_did, cursor) VALUES (?, ?)
156156+ ON CONFLICT(labeler_did) DO UPDATE SET cursor = excluded.cursor`,
157157+ labelerDID, cursor,
158158+ )
159159+ if err != nil {
160160+ return fmt.Errorf("persist labeler cursor: %w", err)
161161+ }
162162+ return nil
163163+}
164164+165165+// didFromManifestURI extracts the authority (DID) from an at:// URI.
166166+// Returns "" for malformed input.
167167+func didFromManifestURI(uri string) string {
168168+ const prefix = "at://"
169169+ if !strings.HasPrefix(uri, prefix) {
170170+ return ""
171171+ }
172172+ rest := uri[len(prefix):]
173173+ if i := strings.IndexByte(rest, '/'); i >= 0 {
174174+ return rest[:i]
175175+ }
176176+ return rest
177177+}
+191
pkg/hold/labeler/cache_test.go
···11+package labeler
22+33+import (
44+ "database/sql"
55+ "testing"
66+ "time"
77+88+ _ "github.com/tursodatabase/go-libsql"
99+)
1010+1111+func newTestCache(t *testing.T) *Cache {
1212+ t.Helper()
1313+ db, err := sql.Open("libsql", ":memory:")
1414+ if err != nil {
1515+ t.Fatalf("open in-memory db: %v", err)
1616+ }
1717+ t.Cleanup(func() { _ = db.Close() })
1818+1919+ c, err := NewCache(db)
2020+ if err != nil {
2121+ t.Fatalf("NewCache: %v", err)
2222+ }
2323+ return c
2424+}
2525+2626+func TestCacheSetAndIsTakenDown(t *testing.T) {
2727+ c := newTestCache(t)
2828+ uri := "at://did:plc:alice/io.atcr.manifest/abc"
2929+ cts := time.Now().UTC().Add(-time.Hour)
3030+3131+ if _, ok := c.IsTakenDown(uri); ok {
3232+ t.Fatalf("expected not taken down before Set")
3333+ }
3434+ if err := c.Set(uri, "did:web:labeler", cts); err != nil {
3535+ t.Fatalf("Set: %v", err)
3636+ }
3737+3838+ got, ok := c.IsTakenDown(uri)
3939+ if !ok {
4040+ t.Fatalf("expected taken down after Set")
4141+ }
4242+ if !got.Equal(cts) {
4343+ t.Fatalf("cts = %v, want %v", got, cts)
4444+ }
4545+}
4646+4747+func TestCacheNegateRemovesEntry(t *testing.T) {
4848+ c := newTestCache(t)
4949+ uri := "at://did:plc:alice/io.atcr.manifest/abc"
5050+ if err := c.Set(uri, "did:web:labeler", time.Now()); err != nil {
5151+ t.Fatal(err)
5252+ }
5353+ if err := c.Negate(uri); err != nil {
5454+ t.Fatalf("Negate: %v", err)
5555+ }
5656+ if _, ok := c.IsTakenDown(uri); ok {
5757+ t.Fatalf("expected not taken down after Negate")
5858+ }
5959+}
6060+6161+func TestCacheUserLevelTakedownAppliesToAllManifests(t *testing.T) {
6262+ c := newTestCache(t)
6363+ userURI := "at://did:plc:alice"
6464+ cts := time.Now().UTC()
6565+6666+ if err := c.Set(userURI, "did:web:labeler", cts); err != nil {
6767+ t.Fatal(err)
6868+ }
6969+7070+ manifestURI := "at://did:plc:alice/io.atcr.manifest/anything"
7171+ got, ok := c.IsTakenDown(manifestURI)
7272+ if !ok {
7373+ t.Fatalf("user-level takedown should apply to manifest URI")
7474+ }
7575+ if !got.Equal(cts) {
7676+ t.Fatalf("cts = %v, want %v", got, cts)
7777+ }
7878+7979+ otherURI := "at://did:plc:bob/io.atcr.manifest/x"
8080+ if _, ok := c.IsTakenDown(otherURI); ok {
8181+ t.Fatalf("user-level takedown for alice must not affect bob")
8282+ }
8383+}
8484+8585+func TestCacheChoosesEarliestCtsAcrossSources(t *testing.T) {
8686+ c := newTestCache(t)
8787+ earlier := time.Now().UTC().Add(-2 * time.Hour)
8888+ later := time.Now().UTC()
8989+9090+ manifestURI := "at://did:plc:alice/io.atcr.manifest/x"
9191+ userURI := "at://did:plc:alice"
9292+9393+ // Per-manifest later, user-level earlier — IsTakenDown should report the earlier one.
9494+ if err := c.Set(manifestURI, "did:web:labeler", later); err != nil {
9595+ t.Fatal(err)
9696+ }
9797+ if err := c.Set(userURI, "did:web:labeler", earlier); err != nil {
9898+ t.Fatal(err)
9999+ }
100100+101101+ got, ok := c.IsTakenDown(manifestURI)
102102+ if !ok {
103103+ t.Fatalf("expected taken down")
104104+ }
105105+ if !got.Equal(earlier) {
106106+ t.Fatalf("cts = %v, want earliest (%v)", got, earlier)
107107+ }
108108+}
109109+110110+func TestCachePersistsAcrossInstances(t *testing.T) {
111111+ db, err := sql.Open("libsql", ":memory:")
112112+ if err != nil {
113113+ t.Fatalf("open: %v", err)
114114+ }
115115+ t.Cleanup(func() { _ = db.Close() })
116116+117117+ first, err := NewCache(db)
118118+ if err != nil {
119119+ t.Fatalf("first NewCache: %v", err)
120120+ }
121121+ uri := "at://did:plc:alice/io.atcr.manifest/abc"
122122+ cts := time.Now().UTC()
123123+ if err := first.Set(uri, "did:web:labeler", cts); err != nil {
124124+ t.Fatal(err)
125125+ }
126126+127127+ // New cache instance, same DB — should warm-load entry.
128128+ second, err := NewCache(db)
129129+ if err != nil {
130130+ t.Fatalf("second NewCache: %v", err)
131131+ }
132132+ if _, ok := second.IsTakenDown(uri); !ok {
133133+ t.Fatalf("second cache should see persisted takedown")
134134+ }
135135+}
136136+137137+func TestCacheCursorRoundTrip(t *testing.T) {
138138+ c := newTestCache(t)
139139+ const labeler = "did:web:labeler"
140140+141141+ got, err := c.GetCursor(labeler)
142142+ if err != nil {
143143+ t.Fatal(err)
144144+ }
145145+ if got != 0 {
146146+ t.Fatalf("initial cursor = %d, want 0", got)
147147+ }
148148+149149+ if err := c.SetCursor(labeler, 42); err != nil {
150150+ t.Fatal(err)
151151+ }
152152+ got, err = c.GetCursor(labeler)
153153+ if err != nil {
154154+ t.Fatal(err)
155155+ }
156156+ if got != 42 {
157157+ t.Fatalf("cursor = %d, want 42", got)
158158+ }
159159+160160+ if err := c.SetCursor(labeler, 100); err != nil {
161161+ t.Fatal(err)
162162+ }
163163+ got, err = c.GetCursor(labeler)
164164+ if err != nil {
165165+ t.Fatal(err)
166166+ }
167167+ if got != 100 {
168168+ t.Fatalf("cursor after upsert = %d, want 100", got)
169169+ }
170170+}
171171+172172+func TestIsExpired(t *testing.T) {
173173+ tests := []struct {
174174+ name string
175175+ cts time.Time
176176+ grace time.Duration
177177+ expired bool
178178+ }{
179179+ {"in-window", time.Now().Add(-time.Hour), 24 * time.Hour, false},
180180+ {"past-window", time.Now().Add(-48 * time.Hour), 24 * time.Hour, true},
181181+ {"zero grace expires immediately", time.Now(), 0, true},
182182+ {"negative grace expires immediately", time.Now(), -time.Hour, true},
183183+ }
184184+ for _, tt := range tests {
185185+ t.Run(tt.name, func(t *testing.T) {
186186+ if got := IsExpired(tt.cts, tt.grace); got != tt.expired {
187187+ t.Fatalf("IsExpired = %v, want %v", got, tt.expired)
188188+ }
189189+ })
190190+ }
191191+}
+374
pkg/hold/labeler/subscriber.go
···11+package labeler
22+33+import (
44+ "bytes"
55+ "context"
66+ "errors"
77+ "fmt"
88+ "log/slog"
99+ "net/url"
1010+ "strings"
1111+ "time"
1212+1313+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1414+ "github.com/bluesky-social/indigo/events"
1515+ "github.com/gorilla/websocket"
1616+)
1717+1818+// TakedownLabelValue is the label value the hold treats as a takedown trigger.
1919+// Mirrors what pkg/labeler/takedown.go emits.
2020+const TakedownLabelValue = "!takedown"
2121+2222+// Purger is the subset of HoldPDS the subscriber needs to act on takedowns.
2323+// Defined as an interface so tests can substitute a stub without standing up
2424+// a full PDS, and to avoid an import cycle with pkg/hold/pds.
2525+type Purger interface {
2626+ PurgeManifestRecords(ctx context.Context, manifestURI string) (PurgeOutcome, error)
2727+ PurgeUserManifests(ctx context.Context, userDID string) (PurgeOutcome, error)
2828+}
2929+3030+// PurgeOutcome mirrors pds.PurgeResult without creating an import cycle.
3131+type PurgeOutcome struct {
3232+ LayersDeleted int
3333+ ScanDeleted bool
3434+ ImageConfigDeleted bool
3535+}
3636+3737+// Subscriber connects to a labeler's subscribeLabels endpoint, mirrors
3838+// takedowns into the local cache, and triggers record purges on the hold.
3939+type Subscriber struct {
4040+ labelerURL string
4141+ labelerDID string
4242+ cache *Cache
4343+ purger Purger
4444+ stopCh chan struct{}
4545+}
4646+4747+// NewSubscriber builds a subscriber for the given labeler. labelerDIDOrURL
4848+// may be either:
4949+//
5050+// - a did:web identifier (e.g. did:web:labeler.atcr.io) → resolved to https://labeler.atcr.io
5151+// - a raw http/https URL (e.g. http://172.28.0.4:5002 for dev)
5252+//
5353+// The websocket URL is derived from the resolved HTTPS endpoint; the
5454+// labeler's DID (used to filter the Src field on incoming labels) is derived
5555+// the same way the appview's labeler subscriber derives it, so a single
5656+// config field suffices.
5757+func NewSubscriber(labelerDIDOrURL string, cache *Cache, purger Purger) *Subscriber {
5858+ httpURL := parseLabelerURL(labelerDIDOrURL)
5959+ return &Subscriber{
6060+ labelerURL: httpURL,
6161+ labelerDID: deriveLabelerDID(labelerDIDOrURL, httpURL),
6262+ cache: cache,
6363+ purger: purger,
6464+ stopCh: make(chan struct{}),
6565+ }
6666+}
6767+6868+// LabelerDID returns the DID derived from the labeler URL. Useful for the
6969+// caller to log the trusted source.
7070+func (s *Subscriber) LabelerDID() string { return s.labelerDID }
7171+7272+// Start runs the subscription loop in a goroutine.
7373+func (s *Subscriber) Start() {
7474+ go s.run()
7575+}
7676+7777+// Stop signals the subscriber to shut down. Safe to call once.
7878+func (s *Subscriber) Stop() {
7979+ close(s.stopCh)
8080+}
8181+8282+func (s *Subscriber) run() {
8383+ backoff := time.Second
8484+ for {
8585+ select {
8686+ case <-s.stopCh:
8787+ return
8888+ default:
8989+ }
9090+9191+ if err := s.connect(); err != nil {
9292+ slog.Warn("Hold labeler subscription error, reconnecting",
9393+ "labeler", s.labelerURL,
9494+ "error", err,
9595+ "backoff", backoff,
9696+ )
9797+ select {
9898+ case <-s.stopCh:
9999+ return
100100+ case <-time.After(backoff):
101101+ }
102102+ if backoff < 30*time.Second {
103103+ backoff *= 2
104104+ }
105105+ } else {
106106+ backoff = time.Second
107107+ }
108108+ }
109109+}
110110+111111+func (s *Subscriber) connect() error {
112112+ cursor, err := s.cache.GetCursor(s.labelerDID)
113113+ if err != nil {
114114+ return fmt.Errorf("get cursor: %w", err)
115115+ }
116116+117117+ wsURL := toWebSocketURL(s.labelerURL) + "/xrpc/com.atproto.label.subscribeLabels"
118118+ if cursor > 0 {
119119+ wsURL += fmt.Sprintf("?cursor=%d", cursor)
120120+ }
121121+122122+ slog.Info("Hold connecting to labeler", "url", wsURL, "cursor", cursor)
123123+ conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
124124+ if err != nil {
125125+ return fmt.Errorf("websocket dial: %w", err)
126126+ }
127127+ defer conn.Close()
128128+ slog.Info("Hold connected to labeler", "url", s.labelerURL)
129129+130130+ for {
131131+ select {
132132+ case <-s.stopCh:
133133+ return nil
134134+ default:
135135+ }
136136+137137+ mt, payload, err := conn.ReadMessage()
138138+ if err != nil {
139139+ return fmt.Errorf("read: %w", err)
140140+ }
141141+ if mt != websocket.BinaryMessage {
142142+ slog.Warn("Hold labeler: ignoring non-binary frame", "type", mt)
143143+ continue
144144+ }
145145+146146+ seq, labels, err := decodeFrame(payload)
147147+ if err != nil {
148148+ if errors.Is(err, errInfoFrame) {
149149+ continue
150150+ }
151151+ return fmt.Errorf("decode frame: %w", err)
152152+ }
153153+154154+ for _, lbl := range labels {
155155+ s.applyLabel(seq, lbl)
156156+ }
157157+158158+ if err := s.cache.SetCursor(s.labelerDID, seq); err != nil {
159159+ slog.Warn("Hold labeler: failed to persist cursor", "seq", seq, "error", err)
160160+ }
161161+ }
162162+}
163163+164164+// applyLabel processes one label. Only takedown labels from a trusted source
165165+// trigger cache mutations and record purges; everything else is ignored.
166166+func (s *Subscriber) applyLabel(seq int64, lbl *comatproto.LabelDefs_Label) {
167167+ if lbl == nil {
168168+ return
169169+ }
170170+ if lbl.Val != TakedownLabelValue {
171171+ return
172172+ }
173173+ if !s.trustsSource(lbl.Src) {
174174+ slog.Debug("Hold labeler: ignoring untrusted source",
175175+ "src", lbl.Src, "uri", lbl.Uri, "seq", seq)
176176+ return
177177+ }
178178+179179+ cts, _ := time.Parse(time.RFC3339, lbl.Cts)
180180+ negated := lbl.Neg != nil && *lbl.Neg
181181+182182+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
183183+ defer cancel()
184184+185185+ if negated {
186186+ if err := s.cache.Negate(lbl.Uri); err != nil {
187187+ slog.Warn("Hold labeler: failed to drop takedown from cache",
188188+ "uri", lbl.Uri, "error", err)
189189+ return
190190+ }
191191+ slog.Info("Hold labeler: takedown reversed", "uri", lbl.Uri, "seq", seq)
192192+ return
193193+ }
194194+195195+ if err := s.cache.Set(lbl.Uri, lbl.Src, cts); err != nil {
196196+ slog.Warn("Hold labeler: failed to record takedown",
197197+ "uri", lbl.Uri, "error", err)
198198+ return
199199+ }
200200+201201+ // User-level vs per-record. The labeler emits per-record labels for every
202202+ // individual manifest in a repo-level takedown plus a summary; for a
203203+ // user-level takedown only at://<did> is emitted. We dispatch on shape so
204204+ // we don't try to PurgeManifestRecords on a non-manifest URI.
205205+ switch shape := classifyURI(lbl.Uri); shape.Kind {
206206+ case uriKindManifest:
207207+ out, err := s.purger.PurgeManifestRecords(ctx, lbl.Uri)
208208+ if err != nil {
209209+ slog.Warn("Hold labeler: purge failed", "uri", lbl.Uri, "error", err)
210210+ return
211211+ }
212212+ slog.Info("Hold labeler: purged manifest on takedown",
213213+ "uri", lbl.Uri, "layers", out.LayersDeleted,
214214+ "scan", out.ScanDeleted, "config", out.ImageConfigDeleted)
215215+ case uriKindUser:
216216+ out, err := s.purger.PurgeUserManifests(ctx, shape.DID)
217217+ if err != nil {
218218+ slog.Warn("Hold labeler: user-level purge failed",
219219+ "did", shape.DID, "error", err)
220220+ return
221221+ }
222222+ slog.Info("Hold labeler: purged all manifests for user on takedown",
223223+ "did", shape.DID, "layers", out.LayersDeleted)
224224+ default:
225225+ // Repo-level summary URI (at://did/io.atcr.repo/<repo>) and other
226226+ // non-record subjects: cached for GC's reachability check, but no
227227+ // records to purge directly. The per-manifest labels in the same
228228+ // stream do the actual record removal.
229229+ slog.Debug("Hold labeler: cached non-record takedown",
230230+ "uri", lbl.Uri, "kind", string(shape.Kind))
231231+ }
232232+}
233233+234234+// uriKind classifies the shape of a label subject URI.
235235+type uriKind string
236236+237237+const (
238238+ uriKindManifest uriKind = "manifest"
239239+ uriKindUser uriKind = "user"
240240+ uriKindOther uriKind = "other"
241241+)
242242+243243+type uriShape struct {
244244+ Kind uriKind
245245+ DID string
246246+}
247247+248248+// classifyURI inspects an at:// URI and reports whether it points at a single
249249+// manifest record, an entire user, or something else (repo summary etc.).
250250+func classifyURI(uri string) uriShape {
251251+ const prefix = "at://"
252252+ if !strings.HasPrefix(uri, prefix) {
253253+ return uriShape{Kind: uriKindOther}
254254+ }
255255+ rest := uri[len(prefix):]
256256+ parts := strings.SplitN(rest, "/", 3)
257257+ if len(parts) == 0 || parts[0] == "" {
258258+ return uriShape{Kind: uriKindOther}
259259+ }
260260+ did := parts[0]
261261+ if len(parts) == 1 {
262262+ return uriShape{Kind: uriKindUser, DID: did}
263263+ }
264264+ if len(parts) == 3 && parts[1] == "io.atcr.manifest" && parts[2] != "" {
265265+ return uriShape{Kind: uriKindManifest, DID: did}
266266+ }
267267+ return uriShape{Kind: uriKindOther, DID: did}
268268+}
269269+270270+// errInfoFrame signals that the frame was an info frame and the caller should
271271+// continue without treating it as a label or an error.
272272+var errInfoFrame = errors.New("hold labeler: info frame")
273273+274274+// decodeFrame parses a single subscribeLabels binary frame.
275275+//
276276+// ATProto event-stream framing is two concatenated CBOR objects: a {op,t}
277277+// header and a body. We dispatch on op/t; for op=1, t="#labels" we return the
278278+// labels body. #info frames are logged and signal errInfoFrame so the caller
279279+// loops; error frames become Go errors so the run loop reconnects.
280280+func decodeFrame(payload []byte) (int64, []*comatproto.LabelDefs_Label, error) {
281281+ r := bytes.NewReader(payload)
282282+ var header events.EventHeader
283283+ if err := header.UnmarshalCBOR(r); err != nil {
284284+ return 0, nil, fmt.Errorf("unmarshal header: %w", err)
285285+ }
286286+287287+ switch {
288288+ case header.Op == events.EvtKindErrorFrame:
289289+ var ef events.ErrorFrame
290290+ if err := ef.UnmarshalCBOR(r); err != nil {
291291+ return 0, nil, fmt.Errorf("unmarshal error frame: %w", err)
292292+ }
293293+ return 0, nil, fmt.Errorf("labeler error frame: %s — %s", ef.Error, ef.Message)
294294+ case header.Op == events.EvtKindMessage && header.MsgType == "#labels":
295295+ var body comatproto.LabelSubscribeLabels_Labels
296296+ if err := body.UnmarshalCBOR(r); err != nil {
297297+ return 0, nil, fmt.Errorf("unmarshal labels body: %w", err)
298298+ }
299299+ return body.Seq, body.Labels, nil
300300+ case header.Op == events.EvtKindMessage && header.MsgType == "#info":
301301+ var info comatproto.LabelSubscribeLabels_Info
302302+ if err := info.UnmarshalCBOR(r); err != nil {
303303+ return 0, nil, fmt.Errorf("unmarshal info body: %w", err)
304304+ }
305305+ message := ""
306306+ if info.Message != nil {
307307+ message = *info.Message
308308+ }
309309+ slog.Info("Hold labeler: info frame", "name", info.Name, "message", message)
310310+ return 0, nil, errInfoFrame
311311+ default:
312312+ return 0, nil, fmt.Errorf("unexpected frame op=%d t=%q", header.Op, header.MsgType)
313313+ }
314314+}
315315+316316+// trustsSource reports whether labels with the given Src DID should be
317317+// honored. Today this is "matches the configured labeler DID" — a single
318318+// trusted source. If we ever want a list, this becomes a set membership
319319+// check without touching callers.
320320+func (s *Subscriber) trustsSource(src string) bool {
321321+ return src == s.labelerDID
322322+}
323323+324324+// parseLabelerURL accepts either a did:web:... identifier or a raw http/https
325325+// URL and returns the HTTPS (or HTTP for did:web pointing at a hostname with
326326+// %3A-encoded port in test mode) endpoint to talk to. did:web hosts with
327327+// %3A-encoded ports are decoded back to colons. Mirrors the appview's
328328+// ParseLabelerURL so a single config field works in both places.
329329+func parseLabelerURL(labelerDIDOrURL string) string {
330330+ if strings.HasPrefix(labelerDIDOrURL, "http://") || strings.HasPrefix(labelerDIDOrURL, "https://") {
331331+ return labelerDIDOrURL
332332+ }
333333+ if strings.HasPrefix(labelerDIDOrURL, "did:web:") {
334334+ host := strings.TrimPrefix(labelerDIDOrURL, "did:web:")
335335+ host = strings.ReplaceAll(host, "%3A", ":")
336336+ return "https://" + host
337337+ }
338338+ return labelerDIDOrURL
339339+}
340340+341341+// deriveLabelerDID returns the canonical labeler DID for source filtering.
342342+// When the operator gave us a did:web identifier directly, we use it as-is.
343343+// When they gave us a URL, we derive a did:web from its host (so dev URLs
344344+// like http://172.28.0.4:5002 yield did:web:172.28.0.4%3A5002, matching the
345345+// labeler's own self-served identity).
346346+func deriveLabelerDID(labelerDIDOrURL, httpURL string) string {
347347+ if strings.HasPrefix(labelerDIDOrURL, "did:") {
348348+ return labelerDIDOrURL
349349+ }
350350+ u, err := url.Parse(httpURL)
351351+ if err != nil {
352352+ return labelerDIDOrURL
353353+ }
354354+ host := u.Hostname()
355355+ if port := u.Port(); port != "" {
356356+ host += "%3A" + port
357357+ }
358358+ return "did:web:" + host
359359+}
360360+361361+// toWebSocketURL converts an HTTP URL to a WebSocket URL. http→ws, https→wss.
362362+func toWebSocketURL(httpURL string) string {
363363+ u, err := url.Parse(httpURL)
364364+ if err != nil {
365365+ return httpURL
366366+ }
367367+ switch u.Scheme {
368368+ case "https":
369369+ u.Scheme = "wss"
370370+ default:
371371+ u.Scheme = "ws"
372372+ }
373373+ return u.String()
374374+}
+210
pkg/hold/labeler/subscriber_test.go
···11+package labeler
22+33+import (
44+ "context"
55+ "sync"
66+ "testing"
77+ "time"
88+99+ comatproto "github.com/bluesky-social/indigo/api/atproto"
1010+)
1111+1212+// stubPurger captures purge calls so we can assert what the subscriber routed.
1313+type stubPurger struct {
1414+ mu sync.Mutex
1515+ manifestCalls []string
1616+ userLevelCalls []string
1717+ purgeManifestErr error
1818+ purgeUserError error
1919+ manifestOutcome PurgeOutcome
2020+ userLevelOutcome PurgeOutcome
2121+}
2222+2323+func (s *stubPurger) PurgeManifestRecords(_ context.Context, uri string) (PurgeOutcome, error) {
2424+ s.mu.Lock()
2525+ s.manifestCalls = append(s.manifestCalls, uri)
2626+ s.mu.Unlock()
2727+ return s.manifestOutcome, s.purgeManifestErr
2828+}
2929+3030+func (s *stubPurger) PurgeUserManifests(_ context.Context, did string) (PurgeOutcome, error) {
3131+ s.mu.Lock()
3232+ s.userLevelCalls = append(s.userLevelCalls, did)
3333+ s.mu.Unlock()
3434+ return s.userLevelOutcome, s.purgeUserError
3535+}
3636+3737+func (s *stubPurger) snapshot() (manifests, users []string) {
3838+ s.mu.Lock()
3939+ defer s.mu.Unlock()
4040+ return append([]string(nil), s.manifestCalls...), append([]string(nil), s.userLevelCalls...)
4141+}
4242+4343+func ptrBool(b bool) *bool { return &b }
4444+4545+func TestApplyLabelManifestTakedownPurges(t *testing.T) {
4646+ cache := newTestCache(t)
4747+ purger := &stubPurger{}
4848+ // Use a did:web identifier directly so we control exactly what Src must
4949+ // match — derived URL is https://labeler.example.com.
5050+ sub := NewSubscriber("did:web:labeler.example.com", cache, purger)
5151+5252+ uri := "at://did:plc:alice/io.atcr.manifest/abc"
5353+ cts := time.Now().UTC().Format(time.RFC3339)
5454+5555+ sub.applyLabel(1, &comatproto.LabelDefs_Label{
5656+ Src: "did:web:labeler.example.com",
5757+ Uri: uri,
5858+ Val: TakedownLabelValue,
5959+ Cts: cts,
6060+ })
6161+6262+ manifests, users := purger.snapshot()
6363+ if len(manifests) != 1 || manifests[0] != uri {
6464+ t.Fatalf("manifest purges = %v, want [%q]", manifests, uri)
6565+ }
6666+ if len(users) != 0 {
6767+ t.Fatalf("expected no user-level purges, got %v", users)
6868+ }
6969+ if _, ok := cache.IsTakenDown(uri); !ok {
7070+ t.Fatalf("cache should record the takedown")
7171+ }
7272+}
7373+7474+func TestApplyLabelUserLevelTakedownPurgesAllManifests(t *testing.T) {
7575+ cache := newTestCache(t)
7676+ purger := &stubPurger{}
7777+ sub := NewSubscriber("did:web:labeler.example.com", cache, purger)
7878+7979+ uri := "at://did:plc:alice"
8080+ sub.applyLabel(1, &comatproto.LabelDefs_Label{
8181+ Src: "did:web:labeler.example.com",
8282+ Uri: uri,
8383+ Val: TakedownLabelValue,
8484+ Cts: time.Now().UTC().Format(time.RFC3339),
8585+ })
8686+8787+ manifests, users := purger.snapshot()
8888+ if len(users) != 1 || users[0] != "did:plc:alice" {
8989+ t.Fatalf("user-level purges = %v, want [did:plc:alice]", users)
9090+ }
9191+ if len(manifests) != 0 {
9292+ t.Fatalf("expected no per-manifest purges, got %v", manifests)
9393+ }
9494+ if _, ok := cache.IsTakenDown("at://did:plc:alice/io.atcr.manifest/anything"); !ok {
9595+ t.Fatalf("user-level entry should mask any manifest URI for that DID")
9696+ }
9797+}
9898+9999+func TestApplyLabelNegationDropsCacheNoPurge(t *testing.T) {
100100+ cache := newTestCache(t)
101101+ uri := "at://did:plc:alice/io.atcr.manifest/abc"
102102+ if err := cache.Set(uri, "did:web:labeler.example.com", time.Now()); err != nil {
103103+ t.Fatal(err)
104104+ }
105105+ purger := &stubPurger{}
106106+ sub := NewSubscriber("did:web:labeler.example.com", cache, purger)
107107+108108+ sub.applyLabel(2, &comatproto.LabelDefs_Label{
109109+ Src: "did:web:labeler.example.com",
110110+ Uri: uri,
111111+ Val: TakedownLabelValue,
112112+ Neg: ptrBool(true),
113113+ Cts: time.Now().UTC().Format(time.RFC3339),
114114+ })
115115+116116+ if _, ok := cache.IsTakenDown(uri); ok {
117117+ t.Fatalf("negation should drop the takedown from cache")
118118+ }
119119+ manifests, _ := purger.snapshot()
120120+ if len(manifests) != 0 {
121121+ t.Fatalf("negation must not trigger purge, got %v", manifests)
122122+ }
123123+}
124124+125125+func TestApplyLabelIgnoresUntrustedSource(t *testing.T) {
126126+ cache := newTestCache(t)
127127+ purger := &stubPurger{}
128128+ // Subscriber is configured for did:web:operator; a label whose Src is a
129129+ // different DID must be ignored (today's single-DID trust model).
130130+ sub := NewSubscriber("did:web:operator", cache, purger)
131131+132132+ sub.applyLabel(1, &comatproto.LabelDefs_Label{
133133+ Src: "did:web:rogue",
134134+ Uri: "at://did:plc:alice/io.atcr.manifest/abc",
135135+ Val: TakedownLabelValue,
136136+ Cts: time.Now().UTC().Format(time.RFC3339),
137137+ })
138138+139139+ manifests, users := purger.snapshot()
140140+ if len(manifests) != 0 || len(users) != 0 {
141141+ t.Fatalf("untrusted source must not trigger purge: manifests=%v users=%v", manifests, users)
142142+ }
143143+}
144144+145145+func TestSubscriberDerivesDIDFromURL(t *testing.T) {
146146+ tests := []struct {
147147+ input string
148148+ wantURL string
149149+ wantDID string
150150+ }{
151151+ {"did:web:labeler.atcr.io", "https://labeler.atcr.io", "did:web:labeler.atcr.io"},
152152+ {"did:web:172.28.0.4%3A5002", "https://172.28.0.4:5002", "did:web:172.28.0.4%3A5002"},
153153+ {"http://172.28.0.4:5002", "http://172.28.0.4:5002", "did:web:172.28.0.4%3A5002"},
154154+ {"https://labeler.atcr.io", "https://labeler.atcr.io", "did:web:labeler.atcr.io"},
155155+ }
156156+ for _, tt := range tests {
157157+ t.Run(tt.input, func(t *testing.T) {
158158+ sub := NewSubscriber(tt.input, nil, nil)
159159+ if sub.labelerURL != tt.wantURL {
160160+ t.Errorf("labelerURL = %q, want %q", sub.labelerURL, tt.wantURL)
161161+ }
162162+ if sub.labelerDID != tt.wantDID {
163163+ t.Errorf("labelerDID = %q, want %q", sub.labelerDID, tt.wantDID)
164164+ }
165165+ })
166166+ }
167167+}
168168+169169+func TestApplyLabelIgnoresNonTakedownValues(t *testing.T) {
170170+ cache := newTestCache(t)
171171+ purger := &stubPurger{}
172172+ sub := NewSubscriber("did:web:labeler.example.com", cache, purger)
173173+174174+ sub.applyLabel(1, &comatproto.LabelDefs_Label{
175175+ Src: "did:web:labeler.example.com",
176176+ Uri: "at://did:plc:alice/io.atcr.manifest/abc",
177177+ Val: "spam",
178178+ Cts: time.Now().UTC().Format(time.RFC3339),
179179+ })
180180+181181+ manifests, users := purger.snapshot()
182182+ if len(manifests) != 0 || len(users) != 0 {
183183+ t.Fatalf("non-takedown labels must not trigger purge: manifests=%v users=%v", manifests, users)
184184+ }
185185+}
186186+187187+func TestClassifyURI(t *testing.T) {
188188+ tests := []struct {
189189+ uri string
190190+ kind uriKind
191191+ did string
192192+ }{
193193+ {"at://did:plc:alice/io.atcr.manifest/abc", uriKindManifest, "did:plc:alice"},
194194+ {"at://did:plc:alice", uriKindUser, "did:plc:alice"},
195195+ {"at://did:plc:alice/io.atcr.repo/myimage", uriKindOther, "did:plc:alice"},
196196+ {"https://example.com", uriKindOther, ""},
197197+ {"", uriKindOther, ""},
198198+ }
199199+ for _, tt := range tests {
200200+ t.Run(tt.uri, func(t *testing.T) {
201201+ got := classifyURI(tt.uri)
202202+ if got.Kind != tt.kind {
203203+ t.Fatalf("kind = %s, want %s", got.Kind, tt.kind)
204204+ }
205205+ if got.DID != tt.did {
206206+ t.Fatalf("did = %s, want %s", got.DID, tt.did)
207207+ }
208208+ })
209209+ }
210210+}
+254
pkg/hold/pds/purge.go
···11+package pds
22+33+import (
44+ "context"
55+ "fmt"
66+ "log/slog"
77+ "strings"
88+99+ "atcr.io/pkg/atproto"
1010+ lexutil "github.com/bluesky-social/indigo/lex/util"
1111+ "github.com/bluesky-social/indigo/repo"
1212+ "github.com/ipfs/go-cid"
1313+)
1414+1515+// PurgeResult summarizes a PurgeManifestRecords call.
1616+type PurgeResult struct {
1717+ LayersDeleted int
1818+ ScanDeleted bool
1919+ ImageConfigDeleted bool
2020+}
2121+2222+// PurgeManifestRecords removes layer, scan, and image-config records associated
2323+// with a single manifest AT-URI. The manifest record itself lives in the user's
2424+// PDS and is not touched. S3 blobs are not removed; the GC handles them based
2525+// on remaining references and the labeler grace window.
2626+//
2727+// Idempotent: missing records are not errors. Used by both the
2828+// io.atcr.hold.purgeManifest XRPC handler (called by the appview on UI delete)
2929+// and the hold's labeler subscriber (called on takedown receipt).
3030+func (p *HoldPDS) PurgeManifestRecords(ctx context.Context, manifestURI string) (*PurgeResult, error) {
3131+ if manifestURI == "" {
3232+ return nil, fmt.Errorf("manifest URI is required")
3333+ }
3434+3535+ res := &PurgeResult{}
3636+3737+ // Layer records: TID rkeys, multiple per manifest. Look up the rkeys by
3838+ // scanning the layer index and matching the manifest field.
3939+ rkeys, err := p.listLayerRkeysForManifest(ctx, manifestURI)
4040+ if err != nil {
4141+ return res, fmt.Errorf("list layer rkeys: %w", err)
4242+ }
4343+ for _, rkey := range rkeys {
4444+ if err := p.DeleteLayerRecord(ctx, rkey); err != nil {
4545+ slog.Warn("Failed to delete layer record during purge",
4646+ "rkey", rkey, "manifest", manifestURI, "error", err)
4747+ continue
4848+ }
4949+ res.LayersDeleted++
5050+ }
5151+5252+ // Scan + image-config records share a deterministic rkey scheme based on
5353+ // the manifest digest, so we can address them directly.
5454+ manifestDigest, err := atproto.ParseManifestURI(manifestURI)
5555+ if err != nil {
5656+ // Without a parseable digest we can't compute the deterministic rkey.
5757+ // Layer records may still have been deleted above; return what we did.
5858+ slog.Warn("Cannot parse manifest URI for scan/config purge",
5959+ "manifest", manifestURI, "error", err)
6060+ return res, nil
6161+ }
6262+ rkey := atproto.ScanRecordKey(manifestDigest)
6363+ res.ScanDeleted = p.tryDeleteRecord(ctx, atproto.ScanCollection, rkey)
6464+ res.ImageConfigDeleted = p.tryDeleteRecord(ctx, atproto.ImageConfigCollection, rkey)
6565+6666+ slog.Info("Purged manifest records",
6767+ "manifest", manifestURI,
6868+ "layers", res.LayersDeleted,
6969+ "scan", res.ScanDeleted,
7070+ "imageConfig", res.ImageConfigDeleted,
7171+ )
7272+ return res, nil
7373+}
7474+7575+// PurgeUserManifests purges every manifest's records for a given DID. Used by
7676+// user-level takedowns (URI = at://<did>) where the labeler has not enumerated
7777+// individual manifest URIs.
7878+//
7979+// Discovers the set of manifest URIs from the layer index (every layer record
8080+// names its manifest) so we can reuse PurgeManifestRecords for each.
8181+func (p *HoldPDS) PurgeUserManifests(ctx context.Context, userDID string) (*PurgeResult, error) {
8282+ if userDID == "" {
8383+ return nil, fmt.Errorf("user DID is required")
8484+ }
8585+ if p.recordsIndex == nil {
8686+ return nil, fmt.Errorf("records index not available")
8787+ }
8888+8989+ manifestURIs, err := p.collectUserManifestURIs(ctx, userDID)
9090+ if err != nil {
9191+ return nil, fmt.Errorf("collect manifest URIs: %w", err)
9292+ }
9393+9494+ combined := &PurgeResult{}
9595+ for uri := range manifestURIs {
9696+ r, err := p.PurgeManifestRecords(ctx, uri)
9797+ if err != nil {
9898+ slog.Warn("Failed to purge manifest in user-level purge",
9999+ "manifest", uri, "user", userDID, "error", err)
100100+ continue
101101+ }
102102+ combined.LayersDeleted += r.LayersDeleted
103103+ if r.ScanDeleted {
104104+ combined.ScanDeleted = true
105105+ }
106106+ if r.ImageConfigDeleted {
107107+ combined.ImageConfigDeleted = true
108108+ }
109109+ }
110110+111111+ slog.Info("Purged all manifests for user",
112112+ "user", userDID,
113113+ "manifestCount", len(manifestURIs),
114114+ "layersDeleted", combined.LayersDeleted,
115115+ )
116116+ return combined, nil
117117+}
118118+119119+// listLayerRkeysForManifest scans the layer record index, decodes each record
120120+// from the carstore, and returns the rkeys whose `manifest` field matches.
121121+//
122122+// Mirrors ListLayerRecordsForManifest but returns rkeys (which the caller
123123+// needs for deletion) instead of decoded record values.
124124+func (p *HoldPDS) listLayerRkeysForManifest(ctx context.Context, manifestURI string) ([]string, error) {
125125+ if p.recordsIndex == nil {
126126+ return nil, fmt.Errorf("records index not available")
127127+ }
128128+129129+ session, err := p.carstore.ReadOnlySession(p.uid)
130130+ if err != nil {
131131+ return nil, fmt.Errorf("create session: %w", err)
132132+ }
133133+ head, err := p.carstore.GetUserRepoHead(ctx, p.uid)
134134+ if err != nil {
135135+ return nil, fmt.Errorf("get repo head: %w", err)
136136+ }
137137+ if !head.Defined() {
138138+ return nil, nil
139139+ }
140140+ repoHandle, err := repo.OpenRepo(ctx, session, head)
141141+ if err != nil {
142142+ return nil, fmt.Errorf("open repo: %w", err)
143143+ }
144144+145145+ var rkeys []string
146146+ cursor := ""
147147+ const batch = 1000
148148+ for {
149149+ indexRecords, nextCursor, err := p.recordsIndex.ListRecords(atproto.LayerCollection, batch, cursor, false)
150150+ if err != nil {
151151+ return nil, fmt.Errorf("list layer records: %w", err)
152152+ }
153153+ for _, rec := range indexRecords {
154154+ path := rec.Collection + "/" + rec.Rkey
155155+ _, recBytes, err := repoHandle.GetRecordBytes(ctx, path)
156156+ if err != nil {
157157+ continue
158158+ }
159159+ val, err := lexutil.CborDecodeValue(*recBytes)
160160+ if err != nil {
161161+ continue
162162+ }
163163+ layer, ok := val.(*atproto.LayerRecord)
164164+ if !ok {
165165+ continue
166166+ }
167167+ if layer.Manifest == manifestURI {
168168+ rkeys = append(rkeys, rec.Rkey)
169169+ }
170170+ }
171171+ if nextCursor == "" {
172172+ break
173173+ }
174174+ cursor = nextCursor
175175+ }
176176+ return rkeys, nil
177177+}
178178+179179+// collectUserManifestURIs walks the user's layer records and returns the set
180180+// of unique manifest AT-URIs they reference.
181181+func (p *HoldPDS) collectUserManifestURIs(ctx context.Context, userDID string) (map[string]struct{}, error) {
182182+ uris := make(map[string]struct{})
183183+184184+ session, err := p.carstore.ReadOnlySession(p.uid)
185185+ if err != nil {
186186+ return nil, fmt.Errorf("create session: %w", err)
187187+ }
188188+ head, err := p.carstore.GetUserRepoHead(ctx, p.uid)
189189+ if err != nil {
190190+ return nil, fmt.Errorf("get repo head: %w", err)
191191+ }
192192+ if !head.Defined() {
193193+ return uris, nil
194194+ }
195195+ repoHandle, err := repo.OpenRepo(ctx, session, head)
196196+ if err != nil {
197197+ return nil, fmt.Errorf("open repo: %w", err)
198198+ }
199199+200200+ cursor := ""
201201+ const batch = 200
202202+ for {
203203+ records, nextCursor, err := p.recordsIndex.ListRecordsByDID(atproto.LayerCollection, userDID, batch, cursor)
204204+ if err != nil {
205205+ return nil, fmt.Errorf("list layer records by DID: %w", err)
206206+ }
207207+ for _, rec := range records {
208208+ path := rec.Collection + "/" + rec.Rkey
209209+ _, recBytes, err := repoHandle.GetRecordBytes(ctx, path)
210210+ if err != nil {
211211+ continue
212212+ }
213213+ val, err := lexutil.CborDecodeValue(*recBytes)
214214+ if err != nil {
215215+ continue
216216+ }
217217+ layer, ok := val.(*atproto.LayerRecord)
218218+ if !ok {
219219+ continue
220220+ }
221221+ if layer.Manifest != "" && strings.HasPrefix(layer.Manifest, "at://"+userDID+"/") {
222222+ uris[layer.Manifest] = struct{}{}
223223+ }
224224+ }
225225+ if nextCursor == "" {
226226+ break
227227+ }
228228+ cursor = nextCursor
229229+ }
230230+ return uris, nil
231231+}
232232+233233+// tryDeleteRecord deletes a record at the given collection/rkey from both the
234234+// repo and the records index. Returns true if a record was actually deleted,
235235+// false if it didn't exist or the delete failed (failures are logged).
236236+func (p *HoldPDS) tryDeleteRecord(ctx context.Context, collection, rkey string) bool {
237237+ // Probe via GetRecord — if the record doesn't exist we skip silently.
238238+ if _, _, err := p.repomgr.GetRecord(ctx, p.uid, collection, rkey, cid.Undef); err != nil {
239239+ return false
240240+ }
241241+242242+ if err := p.repomgr.DeleteRecord(ctx, p.uid, collection, rkey); err != nil {
243243+ slog.Warn("Failed to delete record from repo",
244244+ "collection", collection, "rkey", rkey, "error", err)
245245+ return false
246246+ }
247247+ if p.recordsIndex != nil {
248248+ if err := p.recordsIndex.DeleteRecord(collection, rkey); err != nil {
249249+ slog.Warn("Failed to delete record from index",
250250+ "collection", collection, "rkey", rkey, "error", err)
251251+ }
252252+ }
253253+ return true
254254+}
+150
pkg/hold/pds/purge_test.go
···11+package pds
22+33+import (
44+ "strings"
55+ "testing"
66+77+ "atcr.io/pkg/atproto"
88+)
99+1010+func TestPurgeManifestRecordsRemovesAll(t *testing.T) {
1111+ pds := setupTestPDSWithIndex(t, "did:plc:owner")
1212+ ctx := sharedCtx
1313+1414+ const userDID = "did:plc:alice"
1515+ const manifestDigest = "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f"
1616+ manifestURI := atproto.BuildManifestURI(userDID, manifestDigest)
1717+1818+ // Two layer records for the same manifest plus one for a different one
1919+ mustCreateLayer(t, pds, manifestURI, "sha256:layer-a", 1024)
2020+ mustCreateLayer(t, pds, manifestURI, "sha256:layer-b", 2048)
2121+ otherURI := atproto.BuildManifestURI(userDID, "sha256:0123456789abcdef")
2222+ mustCreateLayer(t, pds, otherURI, "sha256:layer-c", 4096)
2323+2424+ // Scan record at the deterministic rkey
2525+ scanRkey := atproto.ScanRecordKey(manifestDigest)
2626+ scanRec := &atproto.ScanRecord{
2727+ Type: atproto.ScanCollection,
2828+ Manifest: manifestURI,
2929+ ScannedAt: "2026-05-02T00:00:00Z",
3030+ }
3131+ if _, _, _, err := pds.repomgr.UpsertRecord(ctx, pds.uid, atproto.ScanCollection, scanRkey, scanRec); err != nil {
3232+ t.Fatalf("create scan record: %v", err)
3333+ }
3434+3535+ // Image config record at the same deterministic rkey scheme
3636+ cfgRec := &atproto.ImageConfigRecord{
3737+ Type: atproto.ImageConfigCollection,
3838+ Manifest: manifestURI,
3939+ }
4040+ if _, _, _, err := pds.repomgr.UpsertRecord(ctx, pds.uid, atproto.ImageConfigCollection, scanRkey, cfgRec); err != nil {
4141+ t.Fatalf("create image config record: %v", err)
4242+ }
4343+4444+ res, err := pds.PurgeManifestRecords(ctx, manifestURI)
4545+ if err != nil {
4646+ t.Fatalf("PurgeManifestRecords: %v", err)
4747+ }
4848+ if res.LayersDeleted != 2 {
4949+ t.Errorf("LayersDeleted = %d, want 2", res.LayersDeleted)
5050+ }
5151+ if !res.ScanDeleted {
5252+ t.Errorf("ScanDeleted = false, want true")
5353+ }
5454+ if !res.ImageConfigDeleted {
5555+ t.Errorf("ImageConfigDeleted = false, want true")
5656+ }
5757+5858+ // Layer records for OTHER manifest must remain.
5959+ rkeys, err := pds.listLayerRkeysForManifest(ctx, otherURI)
6060+ if err != nil {
6161+ t.Fatalf("list other layers: %v", err)
6262+ }
6363+ if len(rkeys) != 1 {
6464+ t.Fatalf("other manifest layer rkeys = %d, want 1", len(rkeys))
6565+ }
6666+}
6767+6868+func TestPurgeManifestRecordsIdempotent(t *testing.T) {
6969+ pds := setupTestPDSWithIndex(t, "did:plc:owner")
7070+ ctx := sharedCtx
7171+7272+ manifestURI := atproto.BuildManifestURI("did:plc:alice", "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f")
7373+7474+ res, err := pds.PurgeManifestRecords(ctx, manifestURI)
7575+ if err != nil {
7676+ t.Fatalf("first PurgeManifestRecords: %v", err)
7777+ }
7878+ if res.LayersDeleted != 0 || res.ScanDeleted || res.ImageConfigDeleted {
7979+ t.Fatalf("expected zero-result purge on empty hold, got %+v", res)
8080+ }
8181+8282+ // Second call must also succeed (idempotent).
8383+ if _, err := pds.PurgeManifestRecords(ctx, manifestURI); err != nil {
8484+ t.Fatalf("second PurgeManifestRecords: %v", err)
8585+ }
8686+}
8787+8888+func TestPurgeManifestRecordsRequiresURI(t *testing.T) {
8989+ pds := setupTestPDSWithIndex(t, "did:plc:owner")
9090+ if _, err := pds.PurgeManifestRecords(sharedCtx, ""); err == nil || !strings.Contains(err.Error(), "manifest URI is required") {
9191+ t.Fatalf("expected manifest URI error, got %v", err)
9292+ }
9393+}
9494+9595+func TestPurgeUserManifestsCollectsAcrossManifests(t *testing.T) {
9696+ pds := setupTestPDSWithIndex(t, "did:plc:owner")
9797+ ctx := sharedCtx
9898+9999+ const userDID = "did:plc:alice"
100100+ manifestA := atproto.BuildManifestURI(userDID, "sha256:e692418e4cbaf90ca69d05a66403747baa33ee08806650b51fab815ad7fc331f")
101101+ manifestB := atproto.BuildManifestURI(userDID, "sha256:0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef")
102102+103103+ mustCreateLayer(t, pds, manifestA, "sha256:layer-a", 100)
104104+ mustCreateLayer(t, pds, manifestA, "sha256:layer-b", 200)
105105+ mustCreateLayer(t, pds, manifestB, "sha256:layer-c", 300)
106106+107107+ // Different user's layer must survive
108108+ manifestOther := atproto.BuildManifestURI("did:plc:bob", "sha256:bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")
109109+ mustCreateLayer(t, pds, manifestOther, "sha256:layer-other", 400)
110110+111111+ res, err := pds.PurgeUserManifests(ctx, userDID)
112112+ if err != nil {
113113+ t.Fatalf("PurgeUserManifests: %v", err)
114114+ }
115115+ if res.LayersDeleted != 3 {
116116+ t.Errorf("LayersDeleted = %d, want 3", res.LayersDeleted)
117117+ }
118118+119119+ bobRkeys, err := pds.listLayerRkeysForManifest(ctx, manifestOther)
120120+ if err != nil {
121121+ t.Fatalf("list bob's layers: %v", err)
122122+ }
123123+ if len(bobRkeys) != 1 {
124124+ t.Fatalf("bob's layer count = %d, want 1 (purge crossed user boundary)", len(bobRkeys))
125125+ }
126126+}
127127+128128+// mustCreateLayer is a tiny helper for purge tests — keeps the table-driven
129129+// TestPurge body focused on the assertions.
130130+func mustCreateLayer(t *testing.T, pds *HoldPDS, manifestURI, digest string, size int64) {
131131+ t.Helper()
132132+ rec := atproto.NewLayerRecord(digest, size, "application/vnd.oci.image.layer.v1.tar+gzip", didFromManifest(manifestURI), manifestURI)
133133+ if _, _, err := pds.CreateLayerRecord(sharedCtx, rec); err != nil {
134134+ t.Fatalf("CreateLayerRecord(%s): %v", digest, err)
135135+ }
136136+}
137137+138138+// didFromManifest pulls the authority out of an at:// URI for layer record
139139+// construction in tests. Production code uses richer helpers.
140140+func didFromManifest(uri string) string {
141141+ const prefix = "at://"
142142+ if !strings.HasPrefix(uri, prefix) {
143143+ return ""
144144+ }
145145+ rest := uri[len(prefix):]
146146+ if i := strings.IndexByte(rest, '/'); i >= 0 {
147147+ return rest[:i]
148148+ }
149149+ return rest
150150+}
+37
pkg/hold/pds/xrpc.go
···203203 r.Use(h.requireOwnerOrCrewAdmin)
204204 r.Post(atproto.RepoDeleteRecord, h.HandleDeleteRecord)
205205 r.Post(atproto.RepoUploadBlob, h.HandleUploadBlob)
206206+ r.Post(atproto.HoldPurgeManifest, h.HandlePurgeManifest)
206207 })
207208208209 // Auth-only endpoints (DPoP auth)
···888889 "cid": head.String(),
889890 "rev": rev,
890891 },
892892+ })
893893+}
894894+895895+// HandlePurgeManifest deletes layer, scan, and image-config records associated
896896+// with a single manifest AT-URI. Idempotent. Auth: owner or crew admin
897897+// (enforced by middleware). The manifest record itself lives in the user's PDS
898898+// and is not affected. S3 blobs are not removed; the GC handles those based on
899899+// remaining references and the labeler grace window.
900900+func (h *XRPCHandler) HandlePurgeManifest(w http.ResponseWriter, r *http.Request) {
901901+ var input struct {
902902+ ManifestURI string `json:"manifestUri"`
903903+ }
904904+ if err := json.NewDecoder(r.Body).Decode(&input); err != nil {
905905+ http.Error(w, fmt.Sprintf("invalid JSON body: %v", err), http.StatusBadRequest)
906906+ return
907907+ }
908908+ if input.ManifestURI == "" {
909909+ http.Error(w, "manifestUri is required", http.StatusBadRequest)
910910+ return
911911+ }
912912+ if !strings.HasPrefix(input.ManifestURI, "at://") {
913913+ http.Error(w, "manifestUri must be an at:// URI", http.StatusBadRequest)
914914+ return
915915+ }
916916+917917+ res, err := h.pds.PurgeManifestRecords(r.Context(), input.ManifestURI)
918918+ if err != nil {
919919+ http.Error(w, fmt.Sprintf("purge failed: %v", err), http.StatusInternalServerError)
920920+ return
921921+ }
922922+923923+ render.JSON(w, r, map[string]any{
924924+ "success": true,
925925+ "layersDeleted": res.LayersDeleted,
926926+ "scanDeleted": res.ScanDeleted,
927927+ "imageConfigDeleted": res.ImageConfigDeleted,
891928 })
892929}
893930
+75-7
pkg/hold/server.go
···1515 "atcr.io/pkg/hold/admin"
1616 holddb "atcr.io/pkg/hold/db"
1717 "atcr.io/pkg/hold/gc"
1818+ holdlabeler "atcr.io/pkg/hold/labeler"
1819 "atcr.io/pkg/hold/oci"
1920 "atcr.io/pkg/hold/pds"
2021 "atcr.io/pkg/hold/quota"
···2526 "github.com/go-chi/chi/v5/middleware"
2627)
27282929+// purgerAdapter bridges *pds.HoldPDS to the holdlabeler.Purger interface, which
3030+// uses its own outcome type to avoid an import cycle with pkg/hold/pds.
3131+type purgerAdapter struct {
3232+ pds *pds.HoldPDS
3333+}
3434+3535+func (a purgerAdapter) PurgeManifestRecords(ctx context.Context, manifestURI string) (holdlabeler.PurgeOutcome, error) {
3636+ r, err := a.pds.PurgeManifestRecords(ctx, manifestURI)
3737+ if err != nil || r == nil {
3838+ return holdlabeler.PurgeOutcome{}, err
3939+ }
4040+ return holdlabeler.PurgeOutcome{
4141+ LayersDeleted: r.LayersDeleted,
4242+ ScanDeleted: r.ScanDeleted,
4343+ ImageConfigDeleted: r.ImageConfigDeleted,
4444+ }, nil
4545+}
4646+4747+func (a purgerAdapter) PurgeUserManifests(ctx context.Context, userDID string) (holdlabeler.PurgeOutcome, error) {
4848+ r, err := a.pds.PurgeUserManifests(ctx, userDID)
4949+ if err != nil || r == nil {
5050+ return holdlabeler.PurgeOutcome{}, err
5151+ }
5252+ return holdlabeler.PurgeOutcome{
5353+ LayersDeleted: r.LayersDeleted,
5454+ ScanDeleted: r.ScanDeleted,
5555+ ImageConfigDeleted: r.ImageConfigDeleted,
5656+ }, nil
5757+}
5858+2859// HoldServer is the hold service with an exposed router for extensibility.
2960// Consumers can add routes to Router before calling Serve().
3061type HoldServer struct {
···4172 Config *Config
42734374 // internal fields for shutdown
4444- httpServer *http.Server
4545- broadcaster *pds.EventBroadcaster
4646- scanBroadcaster *pds.ScanBroadcaster
4747- garbageCollector *gc.GarbageCollector
4848- adminUI *admin.AdminUI
4949- holdDB *holddb.HoldDB // shared database connection (nil for :memory:)
7575+ httpServer *http.Server
7676+ broadcaster *pds.EventBroadcaster
7777+ scanBroadcaster *pds.ScanBroadcaster
7878+ garbageCollector *gc.GarbageCollector
7979+ adminUI *admin.AdminUI
8080+ holdDB *holddb.HoldDB // shared database connection (nil for :memory:)
8181+ labelerSubscriber *holdlabeler.Subscriber
8282+ labelerCache *holdlabeler.Cache
5083}
51845285// NewHoldServer initializes PDS, storage, quota, XRPC handlers, and returns
···210243 "rescanInterval", rescanInterval)
211244 }
212245246246+ // Initialize labeler cache + subscriber if a labeler is configured.
247247+ // The cache is created either way so the GC can take a non-nil
248248+ // pointer; without a configured DID it just stays empty and exerts
249249+ // no effect.
250250+ if s.holdDB != nil {
251251+ cache, cacheErr := holdlabeler.NewCache(s.holdDB.DB)
252252+ if cacheErr != nil {
253253+ return nil, fmt.Errorf("failed to initialize labeler cache: %w", cacheErr)
254254+ }
255255+ s.labelerCache = cache
256256+ if cfg.Labeler.DID != "" {
257257+ s.labelerSubscriber = holdlabeler.NewSubscriber(
258258+ cfg.Labeler.DID,
259259+ s.labelerCache,
260260+ purgerAdapter{pds: s.PDS},
261261+ )
262262+ slog.Info("Hold labeler subscriber initialized",
263263+ "labeler", cfg.Labeler.DID,
264264+ "grace_window", cfg.Labeler.GraceWindow)
265265+ }
266266+ }
267267+213268 // Initialize garbage collector
214214- s.garbageCollector = gc.NewGarbageCollector(s.PDS, s3Service, cfg.GC)
269269+ s.garbageCollector = gc.NewGarbageCollector(s.PDS, s3Service, cfg.GC,
270270+ gc.WithTakedownCache(s.labelerCache, cfg.Labeler.GraceWindow))
215271 slog.Info("Garbage collector initialized",
216272 "enabled", cfg.GC.Enabled)
217273 }
···333389 s.garbageCollector.Start(context.Background())
334390 }
335391392392+ // Start labeler subscriber if configured.
393393+ if s.labelerSubscriber != nil {
394394+ s.labelerSubscriber.Start()
395395+ slog.Info("Hold labeler subscriber started", "labeler_did", s.labelerSubscriber.LabelerDID())
396396+ }
397397+336398 // Wait for signal or server error
337399 select {
338400 case err := <-serverErr:
···362424 if s.garbageCollector != nil {
363425 s.garbageCollector.Stop()
364426 slog.Info("Garbage collector stopped")
427427+ }
428428+429429+ // Stop labeler subscriber
430430+ if s.labelerSubscriber != nil {
431431+ s.labelerSubscriber.Stop()
432432+ slog.Info("Labeler subscriber stopped")
365433 }
366434367435 // Close scan broadcaster database connection