A container registry that uses the AT Protocol for manifest storage and S3 for blob storage. atcr.io
docker container atproto go
80
fork

Configure Feed

Select the types of activity you want to include in your feed.

simplify appview label subs

+338 -234
+36 -43
pkg/appview/db/labels.go
··· 16 16 return &LabelChecker{db: database} 17 17 } 18 18 19 - // IsTakenDown checks if a (DID, repository) pair has an active takedown label. 19 + // IsTakenDown checks if a (DID, repository) pair has an active takedown. 20 20 func (lc *LabelChecker) IsTakenDown(did, repository string) (bool, error) { 21 21 return IsTakenDown(lc.db, did, repository) 22 22 } 23 23 24 - // Label represents an ATProto label mirrored from a labeler service. 25 - // Exp is the optional expiration timestamp from the ATProto label spec; 26 - // nil means the label does not expire. 27 - type Label struct { 28 - ID int64 29 - Src string 30 - URI string 31 - Val string 32 - Neg bool 33 - Cts time.Time 34 - Exp *time.Time 35 - SubjectDID string 36 - SubjectRepo string 37 - Seq int64 38 - } 39 - 40 - // IsTakenDown checks if a (DID, repository) pair has an active !takedown label. 41 - // Also matches user-level labels (subject_repo = ”). 24 + // IsTakenDown reports whether the given (did, repo) pair is currently taken 25 + // down, either by an exact-repo row or by a user-level row (repo=”). 42 26 func IsTakenDown(db DBTX, did, repository string) (bool, error) { 43 27 var exists bool 44 28 err := db.QueryRow( 45 29 `SELECT EXISTS( 46 - SELECT 1 FROM labels l1 47 - WHERE l1.subject_did = ? 48 - AND (l1.subject_repo = ? OR l1.subject_repo = '') 49 - AND l1.val = '!takedown' AND l1.neg = 0 50 - AND NOT EXISTS ( 51 - SELECT 1 FROM labels l2 52 - WHERE l2.src = l1.src AND l2.uri = l1.uri AND l2.val = l1.val 53 - AND l2.neg = 1 AND l2.id > l1.id 54 - ) 55 - AND (l1.exp IS NULL OR datetime(l1.exp) > CURRENT_TIMESTAMP) 30 + SELECT 1 FROM taken_down_subjects 31 + WHERE did = ? AND (repo = ? OR repo = '') 56 32 )`, 57 33 did, repository, 58 34 ).Scan(&exists) 59 35 return exists, err 60 36 } 61 37 62 - // UpsertLabel inserts or updates a label from a labeler subscription. 63 - func UpsertLabel(db DBTX, l *Label) error { 64 - var exp any 65 - if l.Exp != nil { 66 - exp = l.Exp.UTC().Format(time.RFC3339) 67 - } 38 + // SetTakedown records a positive takedown for (src, did, repo). Idempotent: 39 + // re-applying updates the timestamp. 40 + func SetTakedown(db DBTX, src, did, repo string, cts time.Time) error { 41 + _, err := db.Exec( 42 + `INSERT INTO taken_down_subjects (src, did, repo, cts) VALUES (?, ?, ?, ?) 43 + ON CONFLICT(src, did, repo) DO UPDATE SET cts = excluded.cts`, 44 + src, did, repo, cts.UTC().Format(time.RFC3339), 45 + ) 46 + return err 47 + } 48 + 49 + // RemoveTakedown drops the takedown row for (src, did, repo). Idempotent. 50 + func RemoveTakedown(db DBTX, src, did, repo string) error { 68 51 _, err := db.Exec( 69 - `INSERT INTO labels (src, uri, val, neg, cts, exp, subject_did, subject_repo, seq) 70 - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 71 - ON CONFLICT(src, uri, val, neg) DO UPDATE SET cts = excluded.cts, exp = excluded.exp, seq = excluded.seq`, 72 - l.Src, l.URI, l.Val, l.Neg, l.Cts.UTC().Format(time.RFC3339), exp, 73 - l.SubjectDID, l.SubjectRepo, l.Seq, 52 + `DELETE FROM taken_down_subjects WHERE src = ? AND did = ? AND repo = ?`, 53 + src, did, repo, 74 54 ) 75 55 return err 76 56 } 77 57 78 - // GetLabelCursor returns the latest sequence number for a given labeler source. 79 - func GetLabelCursor(db DBTX, src string) (int64, error) { 58 + // GetCursor returns the last persisted cursor for a labeler src (0 if none). 59 + func GetCursor(db DBTX, src string) (int64, error) { 80 60 var cursor int64 81 61 err := db.QueryRow( 82 - `SELECT COALESCE(MAX(seq), 0) FROM labels WHERE src = ?`, 62 + `SELECT cursor FROM labeler_cursor WHERE src = ?`, 83 63 src, 84 64 ).Scan(&cursor) 65 + if err == sql.ErrNoRows { 66 + return 0, nil 67 + } 85 68 return cursor, err 86 69 } 70 + 71 + // SetCursor persists the cursor for a labeler src. 72 + func SetCursor(db DBTX, src string, cursor int64) error { 73 + _, err := db.Exec( 74 + `INSERT INTO labeler_cursor (src, cursor) VALUES (?, ?) 75 + ON CONFLICT(src) DO UPDATE SET cursor = excluded.cursor`, 76 + src, cursor, 77 + ) 78 + return err 79 + }
+164 -88
pkg/appview/db/labels_test.go
··· 5 5 "time" 6 6 ) 7 7 8 - // TestIsTakenDown_ExpRespected verifies that the takedown check honors the 9 - // optional ATProto label expiration: NULL and future-dated exp values count 10 - // the label as active, while past-dated exp values exclude it. 11 - func TestIsTakenDown_ExpRespected(t *testing.T) { 12 - db, err := InitDB("file:TestIsTakenDown_ExpRespected?mode=memory&cache=shared", LibsqlConfig{}) 8 + const testLabelerSrc = "did:plc:labeler" 9 + 10 + // TestSetTakedown_RepoLevel verifies that an exact-repo row blocks only that 11 + // repo and leaves siblings untouched. 12 + func TestSetTakedown_RepoLevel(t *testing.T) { 13 + db, err := InitDB("file:TestSetTakedown_RepoLevel?mode=memory&cache=shared", LibsqlConfig{}) 13 14 if err != nil { 14 15 t.Fatalf("init db: %v", err) 15 16 } 16 17 defer db.Close() 17 18 18 - now := time.Now().UTC() 19 - past := now.Add(-1 * time.Hour) 20 - future := now.Add(1 * time.Hour) 19 + if err := SetTakedown(db, testLabelerSrc, "did:plc:user", "myrepo", time.Now().UTC()); err != nil { 20 + t.Fatalf("set takedown: %v", err) 21 + } 21 22 22 - cases := []struct { 23 - name string 24 - did string 25 - exp *time.Time 26 - wantHit bool 27 - }{ 28 - {"no_exp_active", "did:plc:noexp", nil, true}, 29 - {"future_exp_active", "did:plc:future", &future, true}, 30 - {"past_exp_inactive", "did:plc:past", &past, false}, 23 + hit, err := IsTakenDown(db, "did:plc:user", "myrepo") 24 + if err != nil { 25 + t.Fatalf("is taken down (target): %v", err) 26 + } 27 + if !hit { 28 + t.Fatal("expected target repo to be taken down") 29 + } 30 + 31 + hit, err = IsTakenDown(db, "did:plc:user", "otherrepo") 32 + if err != nil { 33 + t.Fatalf("is taken down (sibling): %v", err) 34 + } 35 + if hit { 36 + t.Fatal("sibling repo should not be taken down") 31 37 } 32 38 33 - for _, tc := range cases { 34 - label := &Label{ 35 - Src: "did:plc:labeler", 36 - URI: "at://" + tc.did + "/io.atcr.repo.page/foo", 37 - Val: "!takedown", 38 - Neg: false, 39 - Cts: now, 40 - Exp: tc.exp, 41 - SubjectDID: tc.did, 42 - Seq: 1, 43 - } 44 - if err := UpsertLabel(db, label); err != nil { 45 - t.Fatalf("%s: upsert label: %v", tc.name, err) 46 - } 39 + hit, err = IsTakenDown(db, "did:plc:other", "myrepo") 40 + if err != nil { 41 + t.Fatalf("is taken down (other did): %v", err) 42 + } 43 + if hit { 44 + t.Fatal("repo on different did should not be taken down") 45 + } 46 + } 47 47 48 - got, err := IsTakenDown(db, tc.did, "foo") 48 + // TestSetTakedown_UserLevel verifies that a user-level row (repo=”) matches 49 + // any repo lookup for that DID. 50 + func TestSetTakedown_UserLevel(t *testing.T) { 51 + db, err := InitDB("file:TestSetTakedown_UserLevel?mode=memory&cache=shared", LibsqlConfig{}) 52 + if err != nil { 53 + t.Fatalf("init db: %v", err) 54 + } 55 + defer db.Close() 56 + 57 + if err := SetTakedown(db, testLabelerSrc, "did:plc:user", "", time.Now().UTC()); err != nil { 58 + t.Fatalf("set user-level takedown: %v", err) 59 + } 60 + 61 + for _, repo := range []string{"alpha", "beta", "anything"} { 62 + hit, err := IsTakenDown(db, "did:plc:user", repo) 49 63 if err != nil { 50 - t.Fatalf("%s: IsTakenDown: %v", tc.name, err) 64 + t.Fatalf("is taken down %q: %v", repo, err) 51 65 } 52 - if got != tc.wantHit { 53 - t.Errorf("%s: IsTakenDown = %v, want %v", tc.name, got, tc.wantHit) 66 + if !hit { 67 + t.Fatalf("user-level takedown should cover repo %q", repo) 54 68 } 55 69 } 70 + 71 + hit, err := IsTakenDown(db, "did:plc:bystander", "alpha") 72 + if err != nil { 73 + t.Fatalf("is taken down (bystander): %v", err) 74 + } 75 + if hit { 76 + t.Fatal("user-level takedown should not affect a different did") 77 + } 56 78 } 57 79 58 - // TestIsTakenDown_NegationWinsOverExp verifies that a later negation row 59 - // suppresses an earlier non-expired takedown — exp doesn't shield it from 60 - // being reversed by a !takedown neg=1 with a higher id. 61 - func TestIsTakenDown_NegationWinsOverExp(t *testing.T) { 62 - db, err := InitDB("file:TestIsTakenDown_NegationWinsOverExp?mode=memory&cache=shared", LibsqlConfig{}) 80 + // TestRemoveTakedown_Idempotent verifies that removing a missing row is a 81 + // no-op and removing an existing row flips IsTakenDown back to false. 82 + func TestRemoveTakedown_Idempotent(t *testing.T) { 83 + db, err := InitDB("file:TestRemoveTakedown_Idempotent?mode=memory&cache=shared", LibsqlConfig{}) 63 84 if err != nil { 64 85 t.Fatalf("init db: %v", err) 65 86 } 66 87 defer db.Close() 67 88 68 - did := "did:plc:reversed" 69 - uri := "at://" + did + "/io.atcr.repo.page/bar" 70 - src := "did:plc:labeler" 71 - now := time.Now().UTC() 72 - future := now.Add(24 * time.Hour) 89 + if err := RemoveTakedown(db, testLabelerSrc, "did:plc:ghost", "ghost"); err != nil { 90 + t.Fatalf("remove missing row: %v", err) 91 + } 73 92 74 - if err := UpsertLabel(db, &Label{ 75 - Src: src, URI: uri, Val: "!takedown", Neg: false, 76 - Cts: now, Exp: &future, SubjectDID: did, Seq: 1, 77 - }); err != nil { 78 - t.Fatalf("seed takedown: %v", err) 93 + if err := SetTakedown(db, testLabelerSrc, "did:plc:user", "myrepo", time.Now().UTC()); err != nil { 94 + t.Fatalf("set takedown: %v", err) 79 95 } 80 - if err := UpsertLabel(db, &Label{ 81 - Src: src, URI: uri, Val: "!takedown", Neg: true, 82 - Cts: now.Add(time.Minute), SubjectDID: did, Seq: 2, 83 - }); err != nil { 84 - t.Fatalf("seed reversal: %v", err) 96 + if err := RemoveTakedown(db, testLabelerSrc, "did:plc:user", "myrepo"); err != nil { 97 + t.Fatalf("remove existing row: %v", err) 85 98 } 86 99 87 - got, err := IsTakenDown(db, did, "bar") 100 + hit, err := IsTakenDown(db, "did:plc:user", "myrepo") 88 101 if err != nil { 89 - t.Fatalf("IsTakenDown: %v", err) 102 + t.Fatalf("is taken down after remove: %v", err) 90 103 } 91 - if got { 92 - t.Errorf("IsTakenDown = true, want false (reversal should suppress non-expired takedown)") 104 + if hit { 105 + t.Fatal("expected takedown to be cleared after remove") 93 106 } 94 107 } 95 108 96 - // TestUpsertLabel_ExpUpdatedOnConflict verifies that upserting an existing 97 - // label row updates the exp column (not just cts/seq) — so a labeler that 98 - // extends or removes an expiration is reflected. 99 - func TestUpsertLabel_ExpUpdatedOnConflict(t *testing.T) { 100 - db, err := InitDB("file:TestUpsertLabel_ExpUpdatedOnConflict?mode=memory&cache=shared", LibsqlConfig{}) 109 + // TestSetTakedown_OnConflictUpdatesCts verifies that re-applying with a later 110 + // cts updates the row's timestamp instead of inserting a duplicate. 111 + func TestSetTakedown_OnConflictUpdatesCts(t *testing.T) { 112 + db, err := InitDB("file:TestSetTakedown_OnConflictUpdatesCts?mode=memory&cache=shared", LibsqlConfig{}) 101 113 if err != nil { 102 114 t.Fatalf("init db: %v", err) 103 115 } 104 116 defer db.Close() 105 117 106 - did := "did:plc:updated" 107 - src := "did:plc:labeler" 108 - uri := "at://" + did + "/io.atcr.repo.page/baz" 109 - now := time.Now().UTC() 110 - past := now.Add(-1 * time.Hour) 111 - future := now.Add(1 * time.Hour) 118 + earlier := time.Now().Add(-2 * time.Hour).UTC().Truncate(time.Second) 119 + later := time.Now().UTC().Truncate(time.Second) 112 120 113 - // Seed with an already-expired label — IsTakenDown should be false. 114 - if err := UpsertLabel(db, &Label{ 115 - Src: src, URI: uri, Val: "!takedown", Neg: false, 116 - Cts: now, Exp: &past, SubjectDID: did, Seq: 1, 117 - }); err != nil { 118 - t.Fatalf("seed: %v", err) 121 + if err := SetTakedown(db, testLabelerSrc, "did:plc:user", "repo", earlier); err != nil { 122 + t.Fatalf("set initial: %v", err) 119 123 } 120 - if got, _ := IsTakenDown(db, did, "baz"); got { 121 - t.Fatalf("expected expired label to be inactive before update") 124 + if err := SetTakedown(db, testLabelerSrc, "did:plc:user", "repo", later); err != nil { 125 + t.Fatalf("set update: %v", err) 122 126 } 123 127 124 - // Re-upsert with a future exp — same UNIQUE key, should update exp. 125 - if err := UpsertLabel(db, &Label{ 126 - Src: src, URI: uri, Val: "!takedown", Neg: false, 127 - Cts: now.Add(time.Minute), Exp: &future, SubjectDID: did, Seq: 2, 128 - }); err != nil { 129 - t.Fatalf("re-upsert: %v", err) 128 + var stored string 129 + err = db.QueryRow( 130 + `SELECT cts FROM taken_down_subjects WHERE src=? AND did=? AND repo=?`, 131 + testLabelerSrc, "did:plc:user", "repo", 132 + ).Scan(&stored) 133 + if err != nil { 134 + t.Fatalf("query cts: %v", err) 130 135 } 131 - got, err := IsTakenDown(db, did, "baz") 136 + got, err := time.Parse(time.RFC3339, stored) 132 137 if err != nil { 133 - t.Fatalf("IsTakenDown after update: %v", err) 138 + t.Fatalf("parse stored cts %q: %v", stored, err) 134 139 } 135 - if !got { 136 - t.Errorf("IsTakenDown = false, want true after extending exp into the future") 140 + if !got.Equal(later) { 141 + t.Fatalf("expected cts to be updated to %s, got %s", later, got) 142 + } 143 + } 144 + 145 + // TestCursor_GetSetRoundtrip verifies that an unknown src returns 0 and 146 + // SetCursor/GetCursor round-trip and overwrite. 147 + func TestCursor_GetSetRoundtrip(t *testing.T) { 148 + db, err := InitDB("file:TestCursor_GetSetRoundtrip?mode=memory&cache=shared", LibsqlConfig{}) 149 + if err != nil { 150 + t.Fatalf("init db: %v", err) 151 + } 152 + defer db.Close() 153 + 154 + got, err := GetCursor(db, "did:plc:nobody") 155 + if err != nil { 156 + t.Fatalf("get cursor unknown: %v", err) 157 + } 158 + if got != 0 { 159 + t.Fatalf("expected 0 for unknown src, got %d", got) 160 + } 161 + 162 + if err := SetCursor(db, testLabelerSrc, 42); err != nil { 163 + t.Fatalf("set cursor: %v", err) 164 + } 165 + got, err = GetCursor(db, testLabelerSrc) 166 + if err != nil { 167 + t.Fatalf("get cursor after set: %v", err) 168 + } 169 + if got != 42 { 170 + t.Fatalf("expected 42, got %d", got) 171 + } 172 + 173 + if err := SetCursor(db, testLabelerSrc, 7); err != nil { 174 + t.Fatalf("set cursor (overwrite): %v", err) 175 + } 176 + got, err = GetCursor(db, testLabelerSrc) 177 + if err != nil { 178 + t.Fatalf("get cursor after overwrite: %v", err) 179 + } 180 + if got != 7 { 181 + t.Fatalf("expected 7 after overwrite, got %d", got) 182 + } 183 + } 184 + 185 + // TestSetTakedown_PerSrcIsolation verifies that two labeler srcs can each 186 + // own a row for the same (did, repo) and removing one src's row leaves the 187 + // other intact. 188 + func TestSetTakedown_PerSrcIsolation(t *testing.T) { 189 + db, err := InitDB("file:TestSetTakedown_PerSrcIsolation?mode=memory&cache=shared", LibsqlConfig{}) 190 + if err != nil { 191 + t.Fatalf("init db: %v", err) 192 + } 193 + defer db.Close() 194 + 195 + now := time.Now().UTC() 196 + if err := SetTakedown(db, "did:plc:labeler-a", "did:plc:user", "repo", now); err != nil { 197 + t.Fatalf("set src a: %v", err) 198 + } 199 + if err := SetTakedown(db, "did:plc:labeler-b", "did:plc:user", "repo", now); err != nil { 200 + t.Fatalf("set src b: %v", err) 201 + } 202 + 203 + if err := RemoveTakedown(db, "did:plc:labeler-a", "did:plc:user", "repo"); err != nil { 204 + t.Fatalf("remove src a: %v", err) 205 + } 206 + 207 + hit, err := IsTakenDown(db, "did:plc:user", "repo") 208 + if err != nil { 209 + t.Fatalf("is taken down: %v", err) 210 + } 211 + if !hit { 212 + t.Fatal("repo should still be taken down by src b after src a row removed") 137 213 } 138 214 }
+16
pkg/appview/db/migrations/0025_simplify_labels.yaml
··· 1 + description: Replace labels (append-only mirror) with taken_down_subjects + labeler_cursor (current-state model) 2 + query: | 3 + DROP TABLE IF EXISTS labels; 4 + CREATE TABLE IF NOT EXISTS taken_down_subjects ( 5 + src TEXT NOT NULL, 6 + did TEXT NOT NULL, 7 + repo TEXT NOT NULL DEFAULT '', 8 + cts TIMESTAMP NOT NULL, 9 + PRIMARY KEY (src, did, repo) 10 + ); 11 + CREATE INDEX IF NOT EXISTS idx_taken_down_subjects_did ON taken_down_subjects(did); 12 + CREATE INDEX IF NOT EXISTS idx_taken_down_subjects_did_repo ON taken_down_subjects(did, repo); 13 + CREATE TABLE IF NOT EXISTS labeler_cursor ( 14 + src TEXT PRIMARY KEY, 15 + cursor INTEGER NOT NULL 16 + );
+5 -17
pkg/appview/db/queries.go
··· 19 19 // activeTakedownClause returns a SQL fragment ready to drop into a `WHERE NOT 20 20 // EXISTS (...)` filter for excluding rows whose `(did, repository)` pair is currently 21 21 // taken down. The `alias` argument is the outer table alias (e.g. "m" for manifests, 22 - // "lm" for latest_manifests) and must already be in scope at the use site. 23 - // 24 - // Mirrors the semantics of `IsTakenDown` (defined in labels.go) so listings stay 25 - // consistent with the per-repo page check: a label only counts as active when it has 26 - // neg=0, no newer neg=1 row with the same (src, uri, val), and a non-expired `exp`. 27 - // Without these clauses listings hide a repo forever once you've ever taken it down, 28 - // even after a reversal. 22 + // "lm" for latest_manifests) and must already be in scope at the use site. Mirrors 23 + // `IsTakenDown` so listings stay consistent with the per-repo page check. 29 24 func activeTakedownClause(alias string) string { 30 25 return `NOT EXISTS ( 31 - SELECT 1 FROM labels l1 32 - WHERE l1.subject_did = ` + alias + `.did 33 - AND (l1.subject_repo = ` + alias + `.repository OR l1.subject_repo = '') 34 - AND l1.val = '!takedown' AND l1.neg = 0 35 - AND NOT EXISTS ( 36 - SELECT 1 FROM labels l2 37 - WHERE l2.src = l1.src AND l2.uri = l1.uri AND l2.val = l1.val 38 - AND l2.neg = 1 AND l2.id > l1.id 39 - ) 40 - AND (l1.exp IS NULL OR datetime(l1.exp) > CURRENT_TIMESTAMP) 26 + SELECT 1 FROM taken_down_subjects t 27 + WHERE t.did = ` + alias + `.did 28 + AND (t.repo = ` + alias + `.repository OR t.repo = '') 41 29 )` 42 30 } 43 31
+11 -12
pkg/appview/db/schema.sql
··· 299 299 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP 300 300 ); 301 301 302 - CREATE TABLE IF NOT EXISTS labels ( 303 - id INTEGER PRIMARY KEY AUTOINCREMENT, 302 + CREATE TABLE IF NOT EXISTS taken_down_subjects ( 304 303 src TEXT NOT NULL, 305 - uri TEXT NOT NULL, 306 - val TEXT NOT NULL, 307 - neg BOOLEAN NOT NULL DEFAULT 0, 304 + did TEXT NOT NULL, 305 + repo TEXT NOT NULL DEFAULT '', 308 306 cts TIMESTAMP NOT NULL, 309 - exp TIMESTAMP, 310 - subject_did TEXT NOT NULL, 311 - subject_repo TEXT NOT NULL DEFAULT '', 312 - seq INTEGER NOT NULL DEFAULT 0, 313 - UNIQUE(src, uri, val, neg) 307 + PRIMARY KEY (src, did, repo) 308 + ); 309 + CREATE INDEX IF NOT EXISTS idx_taken_down_subjects_did ON taken_down_subjects(did); 310 + CREATE INDEX IF NOT EXISTS idx_taken_down_subjects_did_repo ON taken_down_subjects(did, repo); 311 + 312 + CREATE TABLE IF NOT EXISTS labeler_cursor ( 313 + src TEXT PRIMARY KEY, 314 + cursor INTEGER NOT NULL 314 315 ); 315 - CREATE INDEX IF NOT EXISTS idx_labels_subject ON labels(subject_did, subject_repo); 316 - CREATE INDEX IF NOT EXISTS idx_labels_val ON labels(val);
+106 -74
pkg/appview/labeler/subscriber.go
··· 19 19 "github.com/gorilla/websocket" 20 20 ) 21 21 22 - // Subscriber connects to a labeler's subscribeLabels endpoint 23 - // and mirrors labels into the appview database. 22 + // TakedownLabelValue is the only label value the appview honors. 23 + const TakedownLabelValue = "!takedown" 24 + 25 + // Subscriber connects to a labeler's subscribeLabels endpoint and mirrors 26 + // the current set of active takedowns into the appview database. 24 27 type Subscriber struct { 25 28 labelerURL string 29 + labelerDID string 26 30 database *sql.DB 27 31 stopCh chan struct{} 28 32 } 29 33 30 - // NewSubscriber creates a new labeler subscriber. 31 - func NewSubscriber(labelerURL string, database *sql.DB) *Subscriber { 34 + // NewSubscriber creates a new labeler subscriber. labelerDIDOrURL is the 35 + // original config value (used to preserve a configured did:web identifier 36 + // when present); labelerURL is the resolved HTTP(S) endpoint. 37 + func NewSubscriber(labelerDIDOrURL, labelerURL string, database *sql.DB) *Subscriber { 32 38 return &Subscriber{ 33 39 labelerURL: labelerURL, 40 + labelerDID: deriveLabelerDID(labelerDIDOrURL, labelerURL), 34 41 database: database, 35 42 stopCh: make(chan struct{}), 36 43 } ··· 76 83 } 77 84 78 85 func (s *Subscriber) connect() error { 79 - // Get cursor from DB 80 - // Use the labeler URL as src identifier 81 - labelerDID := extractDIDFromURL(s.labelerURL) 82 - cursor, err := db.GetLabelCursor(s.database, labelerDID) 86 + cursor, err := db.GetCursor(s.database, s.labelerDID) 83 87 if err != nil { 84 88 return fmt.Errorf("failed to get cursor: %w", err) 85 89 } 86 90 87 - // Build WebSocket URL 88 91 wsURL := toWebSocketURL(s.labelerURL) + "/xrpc/com.atproto.label.subscribeLabels" 89 92 if cursor > 0 { 90 93 wsURL += fmt.Sprintf("?cursor=%d", cursor) ··· 120 123 seq, labels, err := decodeFrame(payload) 121 124 if err != nil { 122 125 if errors.Is(err, errInfoFrame) { 123 - continue // already logged inside decodeFrame 126 + continue 124 127 } 125 128 return fmt.Errorf("decode frame: %w", err) 126 129 } 127 130 128 131 for _, le := range labels { 129 - cts, _ := time.Parse(time.RFC3339, le.Cts) 130 - did, repo := extractSubjectFromURI(le.Uri) 132 + s.applyLabel(le) 133 + } 131 134 132 - // Exp is optional in the ATProto label spec — treat unparseable 133 - // values as "no expiration" rather than dropping the label. 134 - var exp *time.Time 135 - if le.Exp != nil { 136 - if t, err := time.Parse(time.RFC3339, *le.Exp); err == nil { 137 - exp = &t 138 - } 139 - } 135 + if err := db.SetCursor(s.database, s.labelerDID, seq); err != nil { 136 + slog.Warn("Failed to persist labeler cursor", "seq", seq, "error", err) 137 + } 138 + } 139 + } 140 140 141 - label := &db.Label{ 142 - Src: le.Src, 143 - URI: le.Uri, 144 - Val: le.Val, 145 - Neg: le.Neg != nil && *le.Neg, 146 - Cts: cts, 147 - Exp: exp, 148 - SubjectDID: did, 149 - SubjectRepo: repo, 150 - Seq: seq, 151 - } 141 + // applyLabel processes a single label. The appview only honors !takedown labels 142 + // from the configured labeler, and only at the granularity it can enforce — 143 + // user-level (at://<did>) and repo summary (at://<did>/io.atcr.repo/<repo>). 144 + // Per-record labels (per manifest, tag, repo-page) are dropped; the registry 145 + // middleware gates per (did, repo) so finer granularity has no effect. 146 + func (s *Subscriber) applyLabel(le *comatproto.LabelDefs_Label) { 147 + if le == nil { 148 + return 149 + } 150 + if le.Val != TakedownLabelValue { 151 + return 152 + } 153 + if le.Src != s.labelerDID { 154 + slog.Debug("Ignoring label from untrusted source", "src", le.Src, "uri", le.Uri) 155 + return 156 + } 152 157 153 - if err := db.UpsertLabel(s.database, label); err != nil { 154 - slog.Warn("Failed to upsert label", "uri", le.Uri, "error", err) 155 - continue 156 - } 158 + shape := classifyURI(le.Uri) 159 + if shape.kind == uriOther { 160 + slog.Debug("Skipping non-enforced label", "uri", le.Uri) 161 + return 162 + } 157 163 158 - // "Mirrored label X" reads as an apply; reversals are a different action 159 - // from the operator's POV (and a different SQL effect — the NOT EXISTS 160 - // negation clause kicks in), so log them distinctly. 161 - msg := "Mirrored label" 162 - if label.Neg { 163 - msg = "Mirrored label reversal" 164 - } 165 - slog.Info(msg, 166 - "uri", le.Uri, 167 - "val", le.Val, 168 - "neg", label.Neg, 169 - "subject_did", did, 170 - "subject_repo", repo, 171 - ) 164 + negated := le.Neg != nil && *le.Neg 165 + if negated { 166 + if err := db.RemoveTakedown(s.database, le.Src, shape.did, shape.repo); err != nil { 167 + slog.Warn("Failed to remove takedown", "uri", le.Uri, "error", err) 168 + return 172 169 } 170 + slog.Info("Mirrored takedown reversal", 171 + "src", le.Src, "did", shape.did, "repo", shape.repo) 172 + return 173 173 } 174 + 175 + cts, _ := time.Parse(time.RFC3339, le.Cts) 176 + if err := db.SetTakedown(s.database, le.Src, shape.did, shape.repo, cts); err != nil { 177 + slog.Warn("Failed to record takedown", "uri", le.Uri, "error", err) 178 + return 179 + } 180 + slog.Info("Mirrored takedown", 181 + "src", le.Src, "did", shape.did, "repo", shape.repo) 182 + } 183 + 184 + // uriShape captures the parts of a label subject URI that the appview cares about. 185 + type uriShape struct { 186 + kind uriKind 187 + did string 188 + repo string 189 + } 190 + 191 + type uriKind int 192 + 193 + const ( 194 + uriOther uriKind = iota 195 + uriUserLevel 196 + uriRepoSummary 197 + ) 198 + 199 + // classifyURI reports whether the URI is a user-level subject (at://<did>), 200 + // a repo summary (at://<did>/io.atcr.repo/<repo>), or something else 201 + // (per-record manifest/tag/repo-page labels we don't enforce). 202 + func classifyURI(uri string) uriShape { 203 + const prefix = "at://" 204 + if !strings.HasPrefix(uri, prefix) { 205 + return uriShape{} 206 + } 207 + rest := uri[len(prefix):] 208 + parts := strings.SplitN(rest, "/", 3) 209 + if len(parts) == 0 || parts[0] == "" { 210 + return uriShape{} 211 + } 212 + did := parts[0] 213 + if len(parts) == 1 { 214 + return uriShape{kind: uriUserLevel, did: did} 215 + } 216 + if len(parts) == 3 && parts[1] == "io.atcr.repo" && parts[2] != "" { 217 + return uriShape{kind: uriRepoSummary, did: did, repo: parts[2]} 218 + } 219 + return uriShape{kind: uriOther, did: did} 174 220 } 175 221 176 222 // errInfoFrame is returned by decodeFrame when the frame is informational and the ··· 221 267 } 222 268 } 223 269 224 - // extractSubjectFromURI extracts the DID and repository from an AT URI. 225 - // Examples: 226 - // 227 - // at://did:plc:xyz → (did:plc:xyz, "") 228 - // at://did:plc:xyz/io.atcr.manifest/abc → (did:plc:xyz, "") - repo extracted from record 229 - // at://did:plc:xyz/io.atcr.repo/myimage → (did:plc:xyz, "myimage") 230 - func extractSubjectFromURI(uri string) (did, repo string) { 231 - trimmed := strings.TrimPrefix(uri, "at://") 232 - parts := strings.SplitN(trimmed, "/", 3) 233 - if len(parts) == 0 { 234 - return "", "" 270 + // deriveLabelerDID returns the canonical labeler DID for source filtering. 271 + // When the operator gave us a did:... identifier directly, we use it as-is. 272 + // When they gave us a URL, we derive a did:web from its host so dev URLs 273 + // like http://labeler:5002 yield did:web:labeler%3A5002, matching the 274 + // labeler's own self-served identity. 275 + func deriveLabelerDID(labelerDIDOrURL, httpURL string) string { 276 + if strings.HasPrefix(labelerDIDOrURL, "did:") { 277 + return labelerDIDOrURL 235 278 } 236 - did = parts[0] 237 - 238 - // For repo-level summary labels: at://did/io.atcr.repo/reponame 239 - if len(parts) >= 3 && parts[1] == "io.atcr.repo" { 240 - repo = parts[2] 241 - } 242 - return did, repo 243 - } 244 - 245 - // extractDIDFromURL derives a did:web from a labeler URL. 246 - func extractDIDFromURL(labelerURL string) string { 247 - u, err := url.Parse(labelerURL) 279 + u, err := url.Parse(httpURL) 248 280 if err != nil { 249 - return labelerURL 281 + return labelerDIDOrURL 250 282 } 251 283 host := u.Hostname() 252 284 if port := u.Port(); port != "" { ··· 290 322 return nil 291 323 } 292 324 labelerURL := ParseLabelerURL(labelerDIDOrURL) 293 - return NewSubscriber(labelerURL, database) 325 + return NewSubscriber(labelerDIDOrURL, labelerURL, database) 294 326 }