···3232 Auth AuthConfig `yaml:"auth" comment:"JWT authentication settings."`
3333 CredentialHelper CredentialHelperConfig `yaml:"credential_helper" comment:"Credential helper download settings."`
3434 Legal LegalConfig `yaml:"legal" comment:"Legal page customization for self-hosted instances."`
3535+ Labeler LabelerRefConfig `yaml:"labeler" comment:"ATProto labeler for content moderation (DMCA takedowns)."`
3536 Billing billing.Config `yaml:"billing" comment:"Stripe billing integration (requires -tags billing build)."`
3637 Distribution *configuration.Configuration `yaml:"-"` // Wrapped distribution config for compatibility
3738}
···140141 Jurisdiction string `yaml:"jurisdiction" comment:"Governing law jurisdiction for legal terms."`
141142}
142143144144+// LabelerRefConfig defines the connection to an ATProto labeler service.
145145+type LabelerRefConfig struct {
146146+ // DID or URL of the labeler service for content moderation.
147147+ DID string `yaml:"did" comment:"DID or URL of the ATProto labeler (e.g., did:web:labeler.atcr.io). Empty disables label filtering."`
148148+}
149149+143150// setDefaults registers all default values on the given Viper instance.
144151func setDefaults(v *viper.Viper) {
145152 v.SetDefault("version", "0.1")
···192199 // Legal defaults
193200 v.SetDefault("legal.company_name", "")
194201 v.SetDefault("legal.jurisdiction", "")
202202+203203+ // Labeler defaults
204204+ v.SetDefault("labeler.did", "")
195205196206 // Log formatter (used by distribution config, not in Config struct)
197207 v.SetDefault("log_formatter", "text")
+79
pkg/appview/db/labels.go
···11+package db
22+33+import (
44+ "database/sql"
55+ "time"
66+)
77+88+// LabelChecker wraps a database connection to check takedown labels.
99+// Implements middleware.LabelChecker interface.
1010+type LabelChecker struct {
1111+ db *sql.DB
1212+}
1313+1414+// NewLabelChecker creates a new LabelChecker.
1515+func NewLabelChecker(database *sql.DB) *LabelChecker {
1616+ return &LabelChecker{db: database}
1717+}
1818+1919+// IsTakenDown checks if a (DID, repository) pair has an active takedown label.
2020+func (lc *LabelChecker) IsTakenDown(did, repository string) (bool, error) {
2121+ return IsTakenDown(lc.db, did, repository)
2222+}
2323+2424+// Label represents an ATProto label mirrored from a labeler service.
2525+type Label struct {
2626+ ID int64
2727+ Src string
2828+ URI string
2929+ Val string
3030+ Neg bool
3131+ Cts time.Time
3232+ SubjectDID string
3333+ SubjectRepo string
3434+ Seq int64
3535+}
3636+3737+// IsTakenDown checks if a (DID, repository) pair has an active !takedown label.
3838+// Also matches user-level labels (subject_repo = ”).
3939+func IsTakenDown(db DBTX, did, repository string) (bool, error) {
4040+ var exists bool
4141+ err := db.QueryRow(
4242+ `SELECT EXISTS(
4343+ SELECT 1 FROM labels l1
4444+ WHERE l1.subject_did = ?
4545+ AND (l1.subject_repo = ? OR l1.subject_repo = '')
4646+ AND l1.val = '!takedown' AND l1.neg = 0
4747+ AND NOT EXISTS (
4848+ SELECT 1 FROM labels l2
4949+ WHERE l2.src = l1.src AND l2.uri = l1.uri AND l2.val = l1.val
5050+ AND l2.neg = 1 AND l2.id > l1.id
5151+ )
5252+ AND (l1.exp IS NULL OR l1.exp > CURRENT_TIMESTAMP)
5353+ )`,
5454+ did, repository,
5555+ ).Scan(&exists)
5656+ return exists, err
5757+}
5858+5959+// UpsertLabel inserts or updates a label from a labeler subscription.
6060+func UpsertLabel(db DBTX, l *Label) error {
6161+ _, err := db.Exec(
6262+ `INSERT INTO labels (src, uri, val, neg, cts, subject_did, subject_repo, seq)
6363+ VALUES (?, ?, ?, ?, ?, ?, ?, ?)
6464+ ON CONFLICT(src, uri, val, neg) DO UPDATE SET cts = excluded.cts, seq = excluded.seq`,
6565+ l.Src, l.URI, l.Val, l.Neg, l.Cts.UTC().Format(time.RFC3339),
6666+ l.SubjectDID, l.SubjectRepo, l.Seq,
6767+ )
6868+ return err
6969+}
7070+7171+// GetLabelCursor returns the latest sequence number for a given labeler source.
7272+func GetLabelCursor(db DBTX, src string) (int64, error) {
7373+ var cursor int64
7474+ err := db.QueryRow(
7575+ `SELECT COALESCE(MAX(seq), 0) FROM labels WHERE src = ?`,
7676+ src,
7777+ ).Scan(&cursor)
7878+ return cursor, err
7979+}
+16
pkg/appview/db/migrations/0017_create_labels.yaml
···11+description: Create labels table for ATProto content moderation (takedowns)
22+query: |
33+ CREATE TABLE IF NOT EXISTS labels (
44+ id INTEGER PRIMARY KEY AUTOINCREMENT,
55+ src TEXT NOT NULL,
66+ uri TEXT NOT NULL,
77+ val TEXT NOT NULL,
88+ neg BOOLEAN NOT NULL DEFAULT 0,
99+ cts TIMESTAMP NOT NULL,
1010+ subject_did TEXT NOT NULL,
1111+ subject_repo TEXT NOT NULL DEFAULT '',
1212+ seq INTEGER NOT NULL DEFAULT 0,
1313+ UNIQUE(src, uri, val, neg)
1414+ );
1515+ CREATE INDEX IF NOT EXISTS idx_labels_subject ON labels(subject_did, subject_repo);
1616+ CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val);
+16-1
pkg/appview/db/queries.go
···7474 SELECT DISTINCT lm.did, lm.repository, lm.latest_id
7575 FROM latest_manifests lm
7676 JOIN users u ON lm.did = u.did
7777- WHERE u.handle LIKE ? ESCAPE '\'
7777+ WHERE (u.handle LIKE ? ESCAPE '\'
7878 OR u.did = ?
7979 OR lm.repository LIKE ? ESCAPE '\'
8080 OR EXISTS (
8181 SELECT 1 FROM repository_annotations ra
8282 WHERE ra.did = lm.did AND ra.repository = lm.repository
8383 AND ra.value LIKE ? ESCAPE '\'
8484+ ))
8585+ AND NOT EXISTS (
8686+ SELECT 1 FROM labels
8787+ WHERE (subject_did = lm.did AND (subject_repo = lm.repository OR subject_repo = ''))
8888+ AND val = '!takedown' AND neg = 0
8489 )
8590 ),
8691 repo_stats AS (
···19531958 JOIN users u ON m.did = u.did
19541959 LEFT JOIN repository_stats rs ON m.did = rs.did AND m.repository = rs.repository
19551960 LEFT JOIN repo_pages rp ON m.did = rp.did AND m.repository = rp.repository
19611961+ WHERE NOT EXISTS (
19621962+ SELECT 1 FROM labels
19631963+ WHERE (subject_did = m.did AND (subject_repo = m.repository OR subject_repo = ''))
19641964+ AND val = '!takedown' AND neg = 0
19651965+ )
19561966 ORDER BY ` + orderBy + `
19571967 LIMIT ?
19581968 `
···20262036 JOIN users u ON m.did = u.did
20272037 LEFT JOIN repository_stats rs ON m.did = rs.did AND m.repository = rs.repository
20282038 LEFT JOIN repo_pages rp ON m.did = rp.did AND m.repository = rp.repository
20392039+ WHERE NOT EXISTS (
20402040+ SELECT 1 FROM labels
20412041+ WHERE (subject_did = m.did AND (subject_repo = m.repository OR subject_repo = ''))
20422042+ AND val = '!takedown' AND neg = 0
20432043+ )
20292044 ORDER BY MAX(rs.last_push, m.created_at) DESC
20302045 `
20312046
+15
pkg/appview/db/schema.sql
···271271 PRIMARY KEY(hold_did, manifest_digest)
272272);
273273CREATE INDEX IF NOT EXISTS idx_scans_user ON scans(user_did);
274274+275275+CREATE TABLE IF NOT EXISTS labels (
276276+ id INTEGER PRIMARY KEY AUTOINCREMENT,
277277+ src TEXT NOT NULL,
278278+ uri TEXT NOT NULL,
279279+ val TEXT NOT NULL,
280280+ neg BOOLEAN NOT NULL DEFAULT 0,
281281+ cts TIMESTAMP NOT NULL,
282282+ subject_did TEXT NOT NULL,
283283+ subject_repo TEXT NOT NULL DEFAULT '',
284284+ seq INTEGER NOT NULL DEFAULT 0,
285285+ UNIQUE(src, uri, val, neg)
286286+);
287287+CREATE INDEX IF NOT EXISTS idx_labels_subject ON labels(subject_did, subject_repo);
288288+CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val);
···3434 return
3535 }
36363737+ // Check for takedown labels
3838+ if taken, _ := db.IsTakenDown(h.ReadOnlyDB, did, repository); taken {
3939+ RenderNotFound(w, r, &h.BaseUIHandler)
4040+ return
4141+ }
4242+3743 // Look up user by DID
3844 owner, err := db.GetUserByDID(h.ReadOnlyDB, did)
3945 if err != nil {
+34
pkg/appview/jetstream/processor.go
···229229 }
230230 }
231231232232+ // Skip ingestion for taken-down content
233233+ if !isDelete && data != nil {
234234+ if repo := extractRepoFromRecord(collection, data); repo != "" {
235235+ if taken, _ := db.IsTakenDown(p.db, did, repo); taken {
236236+ slog.Debug("Skipping taken-down content",
237237+ "component", "processor",
238238+ "did", did,
239239+ "collection", collection,
240240+ "repository", repo)
241241+ return nil
242242+ }
243243+ }
244244+ }
245245+232246 // User-activity collections create/update user entries
233247 // Skip for deletes - user should already exist, and we don't need to resolve identity
234248 if !isDelete {
···971985972986 return nil
973987}
988988+989989+// extractRepoFromRecord extracts the repository field from a record's JSON data.
990990+// Returns empty string for collections that don't have a repository field
991991+// (e.g., sailor profile, captain, crew).
992992+func extractRepoFromRecord(collection string, data []byte) string {
993993+ switch collection {
994994+ case atproto.ManifestCollection,
995995+ atproto.TagCollection,
996996+ atproto.RepoPageCollection,
997997+ atproto.StatsCollection,
998998+ atproto.ScanCollection:
999999+ var rec struct {
10001000+ Repository string `json:"repository"`
10011001+ }
10021002+ if err := json.Unmarshal(data, &rec); err == nil {
10031003+ return rec.Repository
10041004+ }
10051005+ }
10061006+ return ""
10071007+}
+239
pkg/appview/labeler/subscriber.go
···11+// Package labeler provides a subscription client for consuming labels
22+// from an ATProto labeler service.
33+package labeler
44+55+import (
66+ "database/sql"
77+ "encoding/json"
88+ "fmt"
99+ "log/slog"
1010+ "net/url"
1111+ "strings"
1212+ "time"
1313+1414+ "atcr.io/pkg/appview/db"
1515+1616+ "github.com/gorilla/websocket"
1717+)
1818+1919+// LabelsMessage is the wire format for subscribeLabels events.
2020+type LabelsMessage struct {
2121+ Seq int64 `json:"seq"`
2222+ Labels []LabelEvent `json:"labels"`
2323+}
2424+2525+// LabelEvent is a single label from the labeler.
2626+type LabelEvent struct {
2727+ Src string `json:"src"`
2828+ URI string `json:"uri"`
2929+ CID string `json:"cid,omitempty"`
3030+ Val string `json:"val"`
3131+ Neg bool `json:"neg"`
3232+ Cts string `json:"cts"`
3333+ Exp string `json:"exp,omitempty"`
3434+}
3535+3636+// Subscriber connects to a labeler's subscribeLabels endpoint
3737+// and mirrors labels into the appview database.
3838+type Subscriber struct {
3939+ labelerURL string
4040+ database *sql.DB
4141+ stopCh chan struct{}
4242+}
4343+4444+// NewSubscriber creates a new labeler subscriber.
4545+func NewSubscriber(labelerURL string, database *sql.DB) *Subscriber {
4646+ return &Subscriber{
4747+ labelerURL: labelerURL,
4848+ database: database,
4949+ stopCh: make(chan struct{}),
5050+ }
5151+}
5252+5353+// Start begins the subscription loop in a goroutine.
5454+func (s *Subscriber) Start() {
5555+ go s.run()
5656+}
5757+5858+// Stop signals the subscriber to shut down.
5959+func (s *Subscriber) Stop() {
6060+ close(s.stopCh)
6161+}
6262+6363+func (s *Subscriber) run() {
6464+ backoff := time.Second
6565+6666+ for {
6767+ select {
6868+ case <-s.stopCh:
6969+ return
7070+ default:
7171+ }
7272+7373+ if err := s.connect(); err != nil {
7474+ slog.Warn("Labeler subscription error, reconnecting",
7575+ "error", err,
7676+ "backoff", backoff,
7777+ )
7878+ select {
7979+ case <-s.stopCh:
8080+ return
8181+ case <-time.After(backoff):
8282+ }
8383+ if backoff < 30*time.Second {
8484+ backoff *= 2
8585+ }
8686+ } else {
8787+ backoff = time.Second
8888+ }
8989+ }
9090+}
9191+9292+func (s *Subscriber) connect() error {
9393+ // Get cursor from DB
9494+ // Use the labeler URL as src identifier
9595+ labelerDID := extractDIDFromURL(s.labelerURL)
9696+ cursor, err := db.GetLabelCursor(s.database, labelerDID)
9797+ if err != nil {
9898+ return fmt.Errorf("failed to get cursor: %w", err)
9999+ }
100100+101101+ // Build WebSocket URL
102102+ wsURL := toWebSocketURL(s.labelerURL) + "/xrpc/com.atproto.label.subscribeLabels"
103103+ if cursor > 0 {
104104+ wsURL += fmt.Sprintf("?cursor=%d", cursor)
105105+ }
106106+107107+ slog.Info("Connecting to labeler", "url", wsURL, "cursor", cursor)
108108+109109+ conn, _, err := websocket.DefaultDialer.Dial(wsURL, nil)
110110+ if err != nil {
111111+ return fmt.Errorf("websocket dial failed: %w", err)
112112+ }
113113+ defer conn.Close()
114114+115115+ slog.Info("Connected to labeler", "url", s.labelerURL)
116116+117117+ for {
118118+ select {
119119+ case <-s.stopCh:
120120+ return nil
121121+ default:
122122+ }
123123+124124+ var msg LabelsMessage
125125+ if err := conn.ReadJSON(&msg); err != nil {
126126+ return fmt.Errorf("read error: %w", err)
127127+ }
128128+129129+ for _, le := range msg.Labels {
130130+ cts, _ := time.Parse(time.RFC3339, le.Cts)
131131+ did, repo := extractSubjectFromURI(le.URI)
132132+133133+ label := &db.Label{
134134+ Src: le.Src,
135135+ URI: le.URI,
136136+ Val: le.Val,
137137+ Neg: le.Neg,
138138+ Cts: cts,
139139+ SubjectDID: did,
140140+ SubjectRepo: repo,
141141+ Seq: msg.Seq,
142142+ }
143143+144144+ if err := db.UpsertLabel(s.database, label); err != nil {
145145+ slog.Warn("Failed to upsert label", "uri", le.URI, "error", err)
146146+ continue
147147+ }
148148+149149+ slog.Info("Mirrored label",
150150+ "uri", le.URI,
151151+ "val", le.Val,
152152+ "neg", le.Neg,
153153+ "subject_did", did,
154154+ "subject_repo", repo,
155155+ )
156156+ }
157157+ }
158158+}
159159+160160+// extractSubjectFromURI extracts the DID and repository from an AT URI.
161161+// Examples:
162162+//
163163+// at://did:plc:xyz → (did:plc:xyz, "")
164164+// at://did:plc:xyz/io.atcr.manifest/abc → (did:plc:xyz, "") - repo extracted from record
165165+// at://did:plc:xyz/io.atcr.repo/myimage → (did:plc:xyz, "myimage")
166166+func extractSubjectFromURI(uri string) (did, repo string) {
167167+ trimmed := strings.TrimPrefix(uri, "at://")
168168+ parts := strings.SplitN(trimmed, "/", 3)
169169+ if len(parts) == 0 {
170170+ return "", ""
171171+ }
172172+ did = parts[0]
173173+174174+ // For repo-level summary labels: at://did/io.atcr.repo/reponame
175175+ if len(parts) >= 3 && parts[1] == "io.atcr.repo" {
176176+ repo = parts[2]
177177+ }
178178+ return did, repo
179179+}
180180+181181+// extractDIDFromURL derives a did:web from a labeler URL.
182182+func extractDIDFromURL(labelerURL string) string {
183183+ u, err := url.Parse(labelerURL)
184184+ if err != nil {
185185+ return labelerURL
186186+ }
187187+ host := u.Hostname()
188188+ if port := u.Port(); port != "" {
189189+ host += "%3A" + port
190190+ }
191191+ return "did:web:" + host
192192+}
193193+194194+// toWebSocketURL converts an HTTP URL to a WebSocket URL.
195195+func toWebSocketURL(httpURL string) string {
196196+ u, err := url.Parse(httpURL)
197197+ if err != nil {
198198+ return httpURL
199199+ }
200200+ switch u.Scheme {
201201+ case "https":
202202+ u.Scheme = "wss"
203203+ default:
204204+ u.Scheme = "ws"
205205+ }
206206+ return u.String()
207207+}
208208+209209+// ParseLabelerURL parses a labeler DID or URL into an HTTP URL.
210210+func ParseLabelerURL(labelerDIDOrURL string) string {
211211+ if strings.HasPrefix(labelerDIDOrURL, "http://") || strings.HasPrefix(labelerDIDOrURL, "https://") {
212212+ return labelerDIDOrURL
213213+ }
214214+ if strings.HasPrefix(labelerDIDOrURL, "did:web:") {
215215+ host := strings.TrimPrefix(labelerDIDOrURL, "did:web:")
216216+ host = strings.ReplaceAll(host, "%3A", ":")
217217+ return "https://" + host
218218+ }
219219+ return labelerDIDOrURL
220220+}
221221+222222+// SubscriberFromConfig creates a Subscriber from a labeler DID/URL config value.
223223+// Returns nil if labelerDIDOrURL is empty.
224224+func SubscriberFromConfig(labelerDIDOrURL string, database *sql.DB) *Subscriber {
225225+ if labelerDIDOrURL == "" {
226226+ return nil
227227+ }
228228+ labelerURL := ParseLabelerURL(labelerDIDOrURL)
229229+ return NewSubscriber(labelerURL, database)
230230+}
231231+232232+// DecodeLabelsFromJSON decodes a JSON-encoded labels message.
233233+func DecodeLabelsFromJSON(data []byte) (*LabelsMessage, error) {
234234+ var msg LabelsMessage
235235+ if err := json.Unmarshal(data, &msg); err != nil {
236236+ return nil, err
237237+ }
238238+ return &msg, nil
239239+}
+21
pkg/appview/middleware/registry.go
···166166 return serviceToken, err
167167}
168168169169+// LabelChecker checks whether content has been taken down via ATProto labels.
170170+type LabelChecker interface {
171171+ IsTakenDown(did, repository string) (bool, error)
172172+}
173173+169174// Global variables for initialization only
170175// These are set by main.go during startup and copied into NamespaceResolver instances.
171176// After initialization, request handling uses the NamespaceResolver's instance fields.
···175180 globalAuthorizer auth.HoldAuthorizer
176181 globalWebhookDispatcher storage.PushWebhookDispatcher
177182 globalManifestRefChecker storage.ManifestReferenceChecker
183183+ globalLabelChecker LabelChecker
178184)
179185180186// SetGlobalRefresher sets the OAuth refresher instance during initialization
···192198// SetGlobalManifestRefChecker sets the manifest reference checker during initialization
193199func SetGlobalManifestRefChecker(checker storage.ManifestReferenceChecker) {
194200 globalManifestRefChecker = checker
201201+}
202202+203203+// SetGlobalLabelChecker sets the label checker instance during initialization
204204+func SetGlobalLabelChecker(checker LabelChecker) {
205205+ globalLabelChecker = checker
195206}
196207197208// SetGlobalAuthorizer sets the authorizer instance during initialization
···303314 }
304315305316 slog.Debug("Resolved identity", "component", "registry/middleware", "did", did, "pds", pdsEndpoint, "handle", handle)
317317+318318+ // Check for takedown labels before proceeding
319319+ if globalLabelChecker != nil {
320320+ if taken, _ := globalLabelChecker.IsTakenDown(did, imageName); taken {
321321+ return nil, errcode.Error{
322322+ Code: errcode.ErrorCodeDenied,
323323+ Message: "this repository has been removed due to a policy violation",
324324+ }
325325+ }
326326+ }
306327307328 // Query for hold DID - either user's hold or default hold service
308329 // Also returns the sailor profile so we can read preferences (e.g. AutoRemoveUntagged)
+13
pkg/appview/server.go
···2424 "atcr.io/pkg/appview/db"
2525 "atcr.io/pkg/appview/holdhealth"
2626 "atcr.io/pkg/appview/jetstream"
2727+ appviewlabeler "atcr.io/pkg/appview/labeler"
2728 "atcr.io/pkg/appview/middleware"
2829 "atcr.io/pkg/appview/readme"
2930 "atcr.io/pkg/appview/routes"
···236237 middleware.SetGlobalDatabase(holdDIDDB)
237238 middleware.SetGlobalManifestRefChecker(holdDIDDB)
238239240240+ // Set label checker for takedown filtering
241241+ middleware.SetGlobalLabelChecker(db.NewLabelChecker(s.Database))
242242+239243 // Create RemoteHoldAuthorizer for hold authorization with caching
240244 s.HoldAuthorizer = auth.NewRemoteHoldAuthorizer(s.Database, testMode)
241245 middleware.SetGlobalAuthorizer(s.HoldAuthorizer)
···286290287291 // Initialize Jetstream workers
288292 s.initializeJetstream()
293293+294294+ // Initialize labeler subscriber
295295+ if cfg.Labeler.DID != "" {
296296+ sub := appviewlabeler.SubscriberFromConfig(cfg.Labeler.DID, s.Database)
297297+ if sub != nil {
298298+ sub.Start()
299299+ slog.Info("Labeler subscriber started", "labeler", cfg.Labeler.DID)
300300+ }
301301+ }
289302290303 // Create main chi router
291304 mainRouter := chi.NewRouter()