this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

automod polish (#456)

Bunch of changes here, getting automod to roughly "v0". Intent is to get
this PR merged, then going forward do proper code review of all future
changes.

New here:

- "distinct value" counters (uses redis HyperLogLog; in-memory is
possibly-huge `map[string]bool`
- persist "flags" in redis
- slack webhook notifications for "new mod actions"
- fixes to exiting rules, and disable some trivial examples from
"default" ruleset
- new trivial/example rules, such as GTUBE spam string, counters
- new rule: interaction churn (follow/unfollow)
- new rule: new account reply promo
- helper command to re-process most-recent N posts from an account
- various helper routines in the rules package

authored by

bnewbold and committed by
GitHub
a18c4f45 a857fdc1

+1485 -145
+64 -5
automod/README.md
··· 1 - indigo/automod 2 - ============== 1 + `indigo/automod` 2 + ================ 3 3 4 4 This package (`github.com/bluesky-social/indigo/automod`) contains a "rules engine" to augment human moderators in the atproto network. Batches of rules are processed for novel "events" such as a new post or update of an account handle. Counters and other statistics are collected, which can drive subsequent rule invocations. The outcome of rules can be moderation events like "report account for human review" or "label post". A lot of what this package does is collect and maintain caches of relevant metadata about accounts and pieces of content, so that rules have efficient access to this information. 5 5 ··· 7 7 8 8 Some example rules are included in the `automod/rules` package, but the expectation is that some real-world rules will be kept secret. 9 9 10 - Code for subscribing to a firehose is not included here; see `cmd/hepa` for a complete service built on this library. 10 + Code for subscribing to a firehose is not included here; see `../cmd/hepa` for a service daemon built on this package. 11 + 12 + API reference documentation can be found on [pkg.go.dev](https://pkg.go.dev/github.com/bluesky-social/indigo/automod). 13 + 14 + ## Architecture 15 + 16 + The runtime (`automod.Engine`) manages network requests, caching, and configuration. Outside calling code makes concurrent calls to the `Process*Event` methods that the runtime provides. The runtime constructs event structs (eg, `automod.RecordEvent`), hydrates relevant context metadata from (cached) external services, and then executes a configured set of rules on the event. Rules may request additional context, do arbitrary local compute, and mute the event with any moderation "actions". After all rules have run, the runtime will inspect the event, update counter state, and push any new moderation actions to external services. 17 + 18 + The runtime keeps state in several "stores", each of which has an interface and both in-memory and Redis implementations. It is expected that Redis is used in virtually all deployments. The store types are: 19 + 20 + - `automod.CacheStore`: generic data caching with expiration (TTL) and explicit purging. Used to cache account-level metadata, including identity lookups and (if available) private account metadata 21 + - `automod.CountStore`: keyed integer counters with time bucketing (eg, "hour", "day", "total"). Also includes probabilistic "distinct value" counters (eg, Redis HyperLogLog counters, with roughly 2% precision) 22 + - `automod.SetStore`: configurable static string sets. May eventually be runtime configurable 23 + - `automod.FlagStore`: mechanism to keep track of automod-generated "flags" (like labels or hashtags) on accounts or records. Mostly used to detect *new* flags. May eventually be moved in to the moderation service itself, similar to labels 24 + 25 + 26 + ## Rule API 27 + 28 + Here is a simple example rule, which handles creation of new events: 29 + 30 + ```golang 31 + var gtubeString = "XJS*C4JDBQADN1.NSBN3*2IDNEN*GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X" 32 + 33 + func GtubePostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 34 + if strings.Contains(post.Text, gtubeString) { 35 + evt.AddRecordLabel("spam") 36 + } 37 + return nil 38 + } 39 + ``` 40 + 41 + Every new post record will be inspected to see if it contains a static test string. If it does, the label `spam` will be applied to the record itself. 42 + 43 + The `evt` parameter provides access to relevant pre-fetched metadata; methods to fetch additional metadata from the network; a `slog` logging interface; and methods to store output decisions. The runtime will catch and recover from unexpected panics, and will log returned errors, but rules are generally expected to run robustly and efficiently, and not have complex control flow needs. 44 + 45 + Some of the more commonly used features of `evt` (`automod.RecordEvent`): 46 + 47 + - `evt.Logger`: a `log/slog` logging interface 48 + - `evt.Account.Identity`: atproto identity for the author account, including DID, handle, and PDS endpoint 49 + - `evt.Account.Private`: when not-null (aka, when the runtime has administrator access) will contain things like `.IndexedAt` (account first seen) and `.Email` (the current registered account email) 50 + - `evt.Account.Profile`: a cached subset of the account's `app.bsky.actor.profile` record (if non-null) 51 + - `evt.GetCount(<namespace>, <value>, <time-period>)` and `evt.Increment(<namespace>, <value>)`: to access and update simple counters (by hour, day, or total). Incrementing counters is lazy and happens in batch after all rules have executed: this means that multiple calls are de-duplicated, and that `GetCount` will not reflect any prior `Increment` calls in the same rule (or between rules). 52 + - `evt.GetCountDistinct(<namespace>, <bucket>, <time-period>)` and `evt.IncrementDistinct(<namespace>, <bucket>, <value>)`: similar to simple counters, but counts "unique distinct values" 53 + - `evt.InSet(<set-name>, <value>)`: checks if a string is in a named set 11 54 12 55 13 - ## Design 56 + ## Developing New Rules 14 57 15 - Prior art and inspiration: 58 + The current tl;dr process to deploy a new rule: 59 + 60 + - copy a similar existing rule from `automod/rules` 61 + - add the new rule to a `RuleSet`, so it will be invoked 62 + - test against content that triggers the new rule 63 + - deploy 64 + 65 + You'll usually want to start with both a known pattern you are looking for, and some example real-world content which you want to trigger on. 66 + 67 + The `automod/rules` package contains a set of example rules and some shared helper functions, and demonstrates some patterns for how to use counters, sets, filters, and account metadata to compose a rule pattern. 68 + 69 + The `hepa` command provides `process-record` and `process-recent` sub-commands which will pull an existing individual record (by AT-URI) or all recent bsky posts for an account (by handle or DID), which can be helpful for testing. 70 + 71 + When deploying a new rule, it is recommended to start with a minimal action, like setting a flag or just logging. Any "action" (including new flag creation) can result in a Slack notification. You can gain confidence in the rule by running against the full firehose with these limited actions, tweaking the rule until it seems to have acceptable sensitivity (eg, few false positives), and then escalate the actions to reporting (adds to the human review queue), or action-and-report (label or takedown, and concurrently report for humans to review the action). 72 + 73 + 74 + ## Prior Art 16 75 17 76 * The [SQRL language](https://sqrl-lang.github.io/sqrl/) and runtime was originally developed by an industry vendor named Smyte, then acquired by Twitter, with some core Javascript components released open source in 2023. The SQRL documentation is extensive and describes many of the design trade-offs and features specific to rules engines. Bluesky considered adopting SQRL but decided to start with a simpler runtime with rules in a known language (golang). 18 77
+24 -9
automod/account_meta.go
··· 26 26 27 27 // information about a repo/account/identity, always pre-populated and relevant to many rules 28 28 type AccountMeta struct { 29 - Identity *identity.Identity 30 - Profile ProfileSummary 31 - Private *AccountPrivate 32 - AccountLabels []string 33 - FollowersCount int64 34 - FollowsCount int64 35 - PostsCount int64 29 + Identity *identity.Identity 30 + Profile ProfileSummary 31 + Private *AccountPrivate 32 + AccountLabels []string 33 + AccountNegatedLabels []string 34 + AccountFlags []string 35 + FollowersCount int64 36 + FollowsCount int64 37 + PostsCount int64 38 + Takendown bool 36 39 } 37 40 38 41 func (e *Engine) GetAccountMeta(ctx context.Context, ident *identity.Identity) (*AccountMeta, error) { ··· 71 74 } 72 75 73 76 var labels []string 77 + var negLabels []string 74 78 for _, lbl := range pv.Labels { 75 - labels = append(labels, lbl.Val) 79 + if lbl.Neg != nil && *lbl.Neg == true { 80 + negLabels = append(negLabels, lbl.Val) 81 + } else { 82 + labels = append(labels, lbl.Val) 83 + } 84 + } 85 + 86 + flags, err := e.Flags.Get(ctx, ident.DID.String()) 87 + if err != nil { 88 + return nil, err 76 89 } 77 90 78 91 am := AccountMeta{ ··· 82 95 Description: pv.Description, 83 96 DisplayName: pv.DisplayName, 84 97 }, 85 - AccountLabels: dedupeStrings(labels), 98 + AccountLabels: dedupeStrings(labels), 99 + AccountNegatedLabels: dedupeStrings(negLabels), 100 + AccountFlags: flags, 86 101 } 87 102 if pv.PostsCount != nil { 88 103 am.PostsCount = *pv.PostsCount
+6
automod/cachestore.go
··· 10 10 type CacheStore interface { 11 11 Get(ctx context.Context, name, key string) (string, error) 12 12 Set(ctx context.Context, name, key string, val string) error 13 + Purge(ctx context.Context, name, key string) error 13 14 } 14 15 15 16 type MemCacheStore struct { ··· 34 35 s.Data.Add(name+"/"+key, val) 35 36 return nil 36 37 } 38 + 39 + func (s MemCacheStore) Purge(ctx context.Context, name, key string) error { 40 + s.Data.Remove(name + "/" + key) 41 + return nil 42 + }
+27 -2
automod/countstore.go
··· 17 17 GetCount(ctx context.Context, name, val, period string) (int, error) 18 18 Increment(ctx context.Context, name, val string) error 19 19 // TODO: batch increment method 20 + GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) 21 + IncrementDistinct(ctx context.Context, name, bucket, val string) error 20 22 } 21 23 22 24 // TODO: this implementation isn't race-safe (yet)! 23 25 type MemCountStore struct { 24 - Counts map[string]int 26 + Counts map[string]int 27 + DistinctCounts map[string]map[string]bool 25 28 } 26 29 27 30 func NewMemCountStore() MemCountStore { 28 31 return MemCountStore{ 29 - Counts: make(map[string]int), 32 + Counts: make(map[string]int), 33 + DistinctCounts: make(map[string]map[string]bool), 30 34 } 31 35 } 32 36 ··· 66 70 } 67 71 return nil 68 72 } 73 + 74 + func (s MemCountStore) GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) { 75 + v, ok := s.DistinctCounts[PeriodBucket(name, bucket, period)] 76 + if !ok { 77 + return 0, nil 78 + } 79 + return len(v), nil 80 + } 81 + 82 + func (s MemCountStore) IncrementDistinct(ctx context.Context, name, bucket, val string) error { 83 + for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} { 84 + k := PeriodBucket(name, bucket, p) 85 + m, ok := s.DistinctCounts[k] 86 + if !ok { 87 + m = make(map[string]bool) 88 + } 89 + m[val] = true 90 + s.DistinctCounts[k] = m 91 + } 92 + return nil 93 + }
+40
automod/countstore_test.go
··· 1 + package automod 2 + 3 + import ( 4 + "context" 5 + "testing" 6 + 7 + "github.com/stretchr/testify/assert" 8 + ) 9 + 10 + func TestMemCountStoreBasics(t *testing.T) { 11 + assert := assert.New(t) 12 + ctx := context.Background() 13 + 14 + cs := NewMemCountStore() 15 + 16 + c, err := cs.GetCount(ctx, "test1", "val1", PeriodTotal) 17 + assert.NoError(err) 18 + assert.Equal(0, c) 19 + assert.NoError(cs.Increment(ctx, "test1", "val1")) 20 + assert.NoError(cs.Increment(ctx, "test1", "val1")) 21 + c, err = cs.GetCount(ctx, "test1", "val1", PeriodTotal) 22 + assert.NoError(err) 23 + assert.Equal(2, c) 24 + 25 + c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) 26 + assert.NoError(err) 27 + assert.Equal(0, c) 28 + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "one")) 29 + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "one")) 30 + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "one")) 31 + c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) 32 + assert.NoError(err) 33 + assert.Equal(1, c) 34 + 35 + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "two")) 36 + assert.NoError(cs.IncrementDistinct(ctx, "test2", "val2", "three")) 37 + c, err = cs.GetCountDistinct(ctx, "test2", "val2", PeriodTotal) 38 + assert.NoError(err) 39 + assert.Equal(3, c) 40 + }
+112 -9
automod/engine.go
··· 22 22 Counters CountStore 23 23 Sets SetStore 24 24 Cache CacheStore 25 + Flags FlagStore 25 26 RelayClient *xrpc.Client 26 27 BskyClient *xrpc.Client 27 28 // used to persist moderation actions in mod service (optional) 28 - AdminClient *xrpc.Client 29 + AdminClient *xrpc.Client 30 + SlackWebhookURL string 29 31 } 30 32 31 33 func (e *Engine) ProcessIdentityEvent(ctx context.Context, t string, did syntax.DID) error { 32 34 // similar to an HTTP server, we want to recover any panics from rule execution 33 35 defer func() { 34 36 if r := recover(); r != nil { 35 - e.Logger.Error("automod event execution exception", "err", r) 37 + e.Logger.Error("automod event execution exception", "err", r, "did", did, "type", t) 36 38 } 37 39 }() 38 40 ··· 62 64 return evt.Err 63 65 } 64 66 evt.CanonicalLogLine() 67 + e.PurgeAccountCaches(ctx, am.Identity.DID) 65 68 if err := evt.PersistActions(ctx); err != nil { 66 69 return err 67 70 } 71 + if err := evt.PersistCounters(ctx); err != nil { 72 + return err 73 + } 68 74 return nil 69 75 } 70 76 ··· 72 78 // similar to an HTTP server, we want to recover any panics from rule execution 73 79 defer func() { 74 80 if r := recover(); r != nil { 75 - e.Logger.Error("automod event execution exception", "err", r) 81 + e.Logger.Error("automod event execution exception", "err", r, "did", did, "path", path) 76 82 } 77 83 }() 78 84 ··· 97 103 return evt.Err 98 104 } 99 105 evt.CanonicalLogLine() 106 + // purge the account meta cache when profile is updated 107 + if evt.Collection == "app.bsky.actor.profile" { 108 + e.PurgeAccountCaches(ctx, am.Identity.DID) 109 + } 100 110 if err := evt.PersistActions(ctx); err != nil { 101 111 return err 102 112 } 103 113 if err := evt.PersistCounters(ctx); err != nil { 104 114 return err 105 115 } 116 + return nil 117 + } 106 118 119 + func (e *Engine) ProcessRecordDelete(ctx context.Context, did syntax.DID, path string) error { 120 + // similar to an HTTP server, we want to recover any panics from rule execution 121 + defer func() { 122 + if r := recover(); r != nil { 123 + e.Logger.Error("automod event execution exception", "err", r, "did", did, "path", path) 124 + } 125 + }() 126 + 127 + ident, err := e.Directory.LookupDID(ctx, did) 128 + if err != nil { 129 + return fmt.Errorf("resolving identity: %w", err) 130 + } 131 + if ident == nil { 132 + return fmt.Errorf("identity not found for did: %s", did.String()) 133 + } 134 + 135 + am, err := e.GetAccountMeta(ctx, ident) 136 + if err != nil { 137 + return err 138 + } 139 + evt := e.NewRecordDeleteEvent(*am, path) 140 + e.Logger.Debug("processing record deletion", "did", ident.DID, "path", path) 141 + if err := e.Rules.CallRecordDeleteRules(&evt); err != nil { 142 + return err 143 + } 144 + if evt.Err != nil { 145 + return evt.Err 146 + } 147 + evt.CanonicalLogLine() 148 + // purge the account meta cache when profile is updated 149 + if evt.Collection == "app.bsky.actor.profile" { 150 + e.PurgeAccountCaches(ctx, am.Identity.DID) 151 + } 152 + if err := evt.PersistActions(ctx); err != nil { 153 + return err 154 + } 155 + if err := evt.PersistCounters(ctx); err != nil { 156 + return err 157 + } 107 158 return nil 108 159 } 109 160 110 - func (e *Engine) FetchAndProcessRecord(ctx context.Context, uri string) error { 161 + func (e *Engine) FetchAndProcessRecord(ctx context.Context, aturi syntax.ATURI) error { 111 162 // resolve URI, identity, and record 112 - aturi, err := syntax.ParseATURI(uri) 113 - if err != nil { 114 - return fmt.Errorf("parsing AT-URI argument: %v", err) 115 - } 116 163 if aturi.RecordKey() == "" { 117 - return fmt.Errorf("need a full, not partial, AT-URI: %s", uri) 164 + return fmt.Errorf("need a full, not partial, AT-URI: %s", aturi) 118 165 } 119 166 ident, err := e.Directory.Lookup(ctx, aturi.Authority()) 120 167 if err != nil { ··· 137 184 return e.ProcessRecord(ctx, ident.DID, aturi.Path(), *out.Cid, out.Value.Val) 138 185 } 139 186 187 + func (e *Engine) FetchAndProcessRecent(ctx context.Context, atid syntax.AtIdentifier, limit int) error { 188 + 189 + ident, err := e.Directory.Lookup(ctx, atid) 190 + if err != nil { 191 + return fmt.Errorf("failed to resolve AT identifier: %v", err) 192 + } 193 + pdsURL := ident.PDSEndpoint() 194 + if pdsURL == "" { 195 + return fmt.Errorf("could not resolve PDS endpoint for account: %s", ident.DID.String()) 196 + } 197 + pdsClient := xrpc.Client{Host: ident.PDSEndpoint()} 198 + 199 + resp, err := comatproto.RepoListRecords(ctx, &pdsClient, "app.bsky.feed.post", "", int64(limit), ident.DID.String(), false, "", "") 200 + if err != nil { 201 + return fmt.Errorf("failed to fetch record list: %v", err) 202 + } 203 + 204 + e.Logger.Info("got recent posts", "did", ident.DID.String(), "pds", pdsURL, "count", len(resp.Records)) 205 + // records are most-recent first; we want recent but oldest-first, so iterate backwards 206 + for i := range resp.Records { 207 + rec := resp.Records[len(resp.Records)-i-1] 208 + aturi, err := syntax.ParseATURI(rec.Uri) 209 + if err != nil { 210 + return fmt.Errorf("parsing PDS record response: %v", err) 211 + } 212 + err = e.ProcessRecord(ctx, ident.DID, aturi.Path(), rec.Cid, rec.Value.Val) 213 + if err != nil { 214 + return err 215 + } 216 + } 217 + return nil 218 + } 219 + 140 220 func (e *Engine) NewRecordEvent(am AccountMeta, path, recCID string, rec any) RecordEvent { 141 221 parts := strings.SplitN(path, "/", 2) 142 222 return RecordEvent{ ··· 156 236 } 157 237 } 158 238 239 + func (e *Engine) NewRecordDeleteEvent(am AccountMeta, path string) RecordDeleteEvent { 240 + parts := strings.SplitN(path, "/", 2) 241 + return RecordDeleteEvent{ 242 + RepoEvent{ 243 + Engine: e, 244 + Logger: e.Logger.With("did", am.Identity.DID, "collection", parts[0], "rkey", parts[1]), 245 + Account: am, 246 + }, 247 + parts[0], 248 + parts[1], 249 + } 250 + } 251 + 159 252 func (e *Engine) GetCount(name, val, period string) (int, error) { 160 253 return e.Counters.GetCount(context.TODO(), name, val, period) 161 254 } 162 255 256 + func (e *Engine) GetCountDistinct(name, bucket, period string) (int, error) { 257 + return e.Counters.GetCountDistinct(context.TODO(), name, bucket, period) 258 + } 259 + 163 260 // checks if `val` is an element of set `name` 164 261 func (e *Engine) InSet(name, val string) (bool, error) { 165 262 return e.Sets.InSet(context.TODO(), name, val) 166 263 } 264 + 265 + // purge caches of any exiting metadata 266 + func (e *Engine) PurgeAccountCaches(ctx context.Context, did syntax.DID) error { 267 + e.Directory.Purge(ctx, did.AtIdentifier()) 268 + return e.Cache.Purge(ctx, "acct", did.String()) 269 + }
+9 -4
automod/engine_test.go
··· 4 4 "context" 5 5 "log/slog" 6 6 "testing" 7 + "time" 7 8 8 9 appbsky "github.com/bluesky-social/indigo/api/bsky" 9 10 "github.com/bluesky-social/indigo/atproto/identity" ··· 14 15 15 16 func simpleRule(evt *RecordEvent, post *appbsky.FeedPost) error { 16 17 for _, tag := range post.Tags { 17 - if evt.InSet("banned-hashtags", tag) { 18 + if evt.InSet("bad-hashtags", tag) { 18 19 evt.AddRecordLabel("bad-hashtag") 19 20 break 20 21 } ··· 23 24 for _, feat := range facet.Features { 24 25 if feat.RichtextFacet_Tag != nil { 25 26 tag := feat.RichtextFacet_Tag.Tag 26 - if evt.InSet("banned-hashtags", tag) { 27 + if evt.InSet("bad-hashtags", tag) { 27 28 evt.AddRecordLabel("bad-hashtag") 28 29 break 29 30 } ··· 39 40 simpleRule, 40 41 }, 41 42 } 43 + cache := NewMemCacheStore(10, time.Hour) 44 + flags := NewMemFlagStore() 42 45 sets := NewMemSetStore() 43 - sets.Sets["banned-hashtags"] = make(map[string]bool) 44 - sets.Sets["banned-hashtags"]["slur"] = true 46 + sets.Sets["bad-hashtags"] = make(map[string]bool) 47 + sets.Sets["bad-hashtags"]["slur"] = true 45 48 dir := identity.NewMockDirectory() 46 49 id1 := identity.Identity{ 47 50 DID: syntax.DID("did:plc:abc111"), ··· 53 56 Directory: &dir, 54 57 Counters: NewMemCountStore(), 55 58 Sets: sets, 59 + Flags: flags, 60 + Cache: cache, 56 61 Rules: rules, 57 62 } 58 63 return engine
+174 -24
automod/event.go
··· 4 4 "context" 5 5 "fmt" 6 6 "log/slog" 7 + "strings" 7 8 8 9 comatproto "github.com/bluesky-social/indigo/api/atproto" 9 10 appbsky "github.com/bluesky-social/indigo/api/bsky" ··· 17 18 type CounterRef struct { 18 19 Name string 19 20 Val string 21 + } 22 + 23 + type CounterDistinctRef struct { 24 + Name string 25 + Bucket string 26 + Val string 20 27 } 21 28 22 29 // base type for events specific to an account, usually derived from a repo event stream message (one such message may result in multiple `RepoEvent`) 23 30 // 24 31 // events are both containers for data about the event itself (similar to an HTTP request type); aggregate results and state (counters, mod actions) to be persisted after all rules are run; and act as an API for additional network reads and operations. 25 32 type RepoEvent struct { 26 - Engine *Engine 27 - Err error 28 - Logger *slog.Logger 29 - Account AccountMeta 30 - CounterIncrements []CounterRef 31 - AccountLabels []string 32 - AccountFlags []string 33 - AccountReports []ModReport 34 - AccountTakedown bool 33 + Engine *Engine 34 + Err error 35 + Logger *slog.Logger 36 + Account AccountMeta 37 + CounterIncrements []CounterRef 38 + CounterDistinctIncrements []CounterDistinctRef // TODO: better variable names 39 + AccountLabels []string 40 + AccountFlags []string 41 + AccountReports []ModReport 42 + AccountTakedown bool 35 43 } 36 44 37 45 func (e *RepoEvent) GetCount(name, val, period string) int { ··· 43 51 return v 44 52 } 45 53 54 + func (e *RepoEvent) Increment(name, val string) { 55 + e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val}) 56 + } 57 + 58 + func (e *RepoEvent) GetCountDistinct(name, bucket, period string) int { 59 + v, err := e.Engine.GetCountDistinct(name, bucket, period) 60 + if err != nil { 61 + e.Err = err 62 + return 0 63 + } 64 + return v 65 + } 66 + 67 + func (e *RepoEvent) IncrementDistinct(name, bucket, val string) { 68 + e.CounterDistinctIncrements = append(e.CounterDistinctIncrements, CounterDistinctRef{Name: name, Bucket: bucket, Val: val}) 69 + } 70 + 46 71 func (e *RepoEvent) InSet(name, val string) bool { 47 72 v, err := e.Engine.InSet(name, val) 48 73 if err != nil { ··· 50 75 return false 51 76 } 52 77 return v 53 - } 54 - 55 - func (e *RepoEvent) Increment(name, val string) { 56 - e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val}) 57 78 } 58 79 59 80 func (e *RepoEvent) TakedownAccount() { ··· 72 93 e.AccountReports = append(e.AccountReports, ModReport{ReasonType: reason, Comment: comment}) 73 94 } 74 95 96 + func slackBody(msg string, newLabels, newFlags []string, newReports []ModReport, newTakedown bool) string { 97 + if len(newLabels) > 0 { 98 + msg += fmt.Sprintf("New Labels: `%s`\n", strings.Join(newLabels, ", ")) 99 + } 100 + if len(newFlags) > 0 { 101 + msg += fmt.Sprintf("New Flags: `%s`\n", strings.Join(newFlags, ", ")) 102 + } 103 + for _, rep := range newReports { 104 + msg += fmt.Sprintf("Report `%s`: %s\n", rep.ReasonType, rep.Comment) 105 + } 106 + if newTakedown { 107 + msg += fmt.Sprintf("Takedown!\n") 108 + } 109 + return msg 110 + } 111 + 112 + // Persists account-level moderation actions: new labels, new flags, new takedowns, and reports. 113 + // 114 + // If necessary, will "purge" identity and account caches, so that state updates will be picked up for subsequent events. 115 + // 116 + // TODO: de-dupe reports based on existing state, similar to other state 75 117 func (e *RepoEvent) PersistAccountActions(ctx context.Context) error { 118 + 119 + // de-dupe actions 120 + newLabels := []string{} 121 + for _, val := range dedupeStrings(e.AccountLabels) { 122 + exists := false 123 + for _, e := range e.Account.AccountNegatedLabels { 124 + if val == e { 125 + exists = true 126 + break 127 + } 128 + } 129 + for _, e := range e.Account.AccountLabels { 130 + if val == e { 131 + exists = true 132 + break 133 + } 134 + } 135 + if !exists { 136 + newLabels = append(newLabels, val) 137 + } 138 + } 139 + newFlags := []string{} 140 + for _, val := range dedupeStrings(e.AccountFlags) { 141 + exists := false 142 + for _, e := range e.Account.AccountFlags { 143 + if val == e { 144 + exists = true 145 + break 146 + } 147 + } 148 + if !exists { 149 + newFlags = append(newFlags, val) 150 + } 151 + } 152 + newReports := e.AccountReports 153 + newTakedown := e.AccountTakedown && !e.Account.Takendown 154 + 155 + if newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || len(newReports) > 0 { 156 + if e.Engine.SlackWebhookURL != "" { 157 + msg := fmt.Sprintf("⚠️ Automod Account Action ⚠️\n") 158 + msg += fmt.Sprintf("`%s` / `%s` / <https://bsky.app/profile/%s|bsky> / <https://admin.prod.bsky.dev/repositories/%s|ozone>\n", 159 + e.Account.Identity.DID, 160 + e.Account.Identity.Handle, 161 + e.Account.Identity.DID, 162 + e.Account.Identity.DID, 163 + ) 164 + msg = slackBody(msg, newLabels, newFlags, newReports, newTakedown) 165 + if err := e.Engine.SendSlackMsg(ctx, msg); err != nil { 166 + e.Logger.Error("sending slack webhook", "err", err) 167 + } 168 + } 169 + } 170 + 76 171 if e.Engine.AdminClient == nil { 77 172 return nil 78 173 } 174 + 175 + needsPurge := false 79 176 xrpcc := e.Engine.AdminClient 80 - if len(e.AccountLabels) > 0 { 177 + if len(newLabels) > 0 { 81 178 comment := "automod" 82 179 _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ 83 180 CreatedBy: xrpcc.Auth.Did, 84 181 Event: &comatproto.AdminEmitModerationEvent_Input_Event{ 85 182 AdminDefs_ModEventLabel: &comatproto.AdminDefs_ModEventLabel{ 86 - CreateLabelVals: dedupeStrings(e.AccountLabels), 183 + CreateLabelVals: newLabels, 184 + NegateLabelVals: []string{}, 87 185 Comment: &comment, 88 186 }, 89 187 }, ··· 96 194 if err != nil { 97 195 return err 98 196 } 197 + needsPurge = true 99 198 } 100 - // TODO: AccountFlags 101 - for _, mr := range e.AccountReports { 199 + if len(newFlags) > 0 { 200 + e.Engine.Flags.Add(ctx, e.Account.Identity.DID.String(), newFlags) 201 + needsPurge = true 202 + } 203 + for _, mr := range newReports { 102 204 _, err := comatproto.ModerationCreateReport(ctx, xrpcc, &comatproto.ModerationCreateReport_Input{ 103 205 ReasonType: &mr.ReasonType, 104 206 Reason: &mr.Comment, ··· 112 214 return err 113 215 } 114 216 } 115 - if e.AccountTakedown { 217 + if newTakedown { 116 218 comment := "automod" 117 219 _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ 118 220 CreatedBy: xrpcc.Auth.Did, ··· 130 232 if err != nil { 131 233 return err 132 234 } 235 + needsPurge = true 236 + } 237 + if needsPurge { 238 + return e.Engine.PurgeAccountCaches(ctx, e.Account.Identity.DID) 133 239 } 134 240 return nil 135 241 } ··· 142 248 // TODO: dedupe this array 143 249 for _, ref := range e.CounterIncrements { 144 250 err := e.Engine.Counters.Increment(ctx, ref.Name, ref.Val) 251 + if err != nil { 252 + return err 253 + } 254 + } 255 + for _, ref := range e.CounterDistinctIncrements { 256 + err := e.Engine.Counters.IncrementDistinct(ctx, ref.Name, ref.Bucket, ref.Val) 145 257 if err != nil { 146 258 return err 147 259 } ··· 192 304 e.RecordReports = append(e.RecordReports, ModReport{ReasonType: reason, Comment: comment}) 193 305 } 194 306 307 + // Persists some record-level state: labels, takedowns, reports. 308 + // 309 + // NOTE: this method currently does *not* persist record-level flags to any storage, and does not de-dupe most actions, on the assumption that the record is new (from firehose) and has no existing mod state. 195 310 func (e *RecordEvent) PersistRecordActions(ctx context.Context) error { 311 + 312 + // TODO: consider de-duping record-level actions? at least for updates and deletes. 313 + newLabels := dedupeStrings(e.RecordLabels) 314 + newFlags := dedupeStrings(e.RecordFlags) 315 + newReports := e.RecordReports 316 + newTakedown := e.RecordTakedown 317 + atURI := fmt.Sprintf("at://%s/%s/%s", e.Account.Identity.DID, e.Collection, e.RecordKey) 318 + 319 + if newTakedown || len(newLabels) > 0 || len(newFlags) > 0 || len(newReports) > 0 { 320 + if e.Engine.SlackWebhookURL != "" { 321 + msg := fmt.Sprintf("⚠️ Automod Record Action ⚠️\n") 322 + msg += fmt.Sprintf("`%s` / `%s` / <https://bsky.app/profile/%s|bsky> / <https://admin.prod.bsky.dev/repositories/%s|ozone>\n", 323 + e.Account.Identity.DID, 324 + e.Account.Identity.Handle, 325 + e.Account.Identity.DID, 326 + e.Account.Identity.DID, 327 + ) 328 + msg += fmt.Sprintf("`%s`\n", atURI) 329 + msg = slackBody(msg, newLabels, newFlags, newReports, newTakedown) 330 + if err := e.Engine.SendSlackMsg(ctx, msg); err != nil { 331 + e.Logger.Error("sending slack webhook", "err", err) 332 + } 333 + } 334 + } 196 335 if e.Engine.AdminClient == nil { 197 336 return nil 198 337 } 199 338 strongRef := comatproto.RepoStrongRef{ 200 339 Cid: e.CID, 201 - Uri: fmt.Sprintf("at://%s/%s/%s", e.Account.Identity.DID, e.Collection, e.RecordKey), 340 + Uri: atURI, 202 341 } 203 342 xrpcc := e.Engine.AdminClient 204 - if len(e.RecordLabels) > 0 { 343 + if len(newLabels) > 0 { 205 344 comment := "automod" 206 345 _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ 207 346 CreatedBy: xrpcc.Auth.Did, 208 347 Event: &comatproto.AdminEmitModerationEvent_Input_Event{ 209 348 AdminDefs_ModEventLabel: &comatproto.AdminDefs_ModEventLabel{ 210 - CreateLabelVals: dedupeStrings(e.RecordLabels), 349 + CreateLabelVals: newLabels, 350 + NegateLabelVals: []string{}, 211 351 Comment: &comment, 212 352 }, 213 353 }, ··· 219 359 return err 220 360 } 221 361 } 222 - // TODO: AccountFlags 223 - for _, mr := range e.RecordReports { 362 + if len(newFlags) > 0 { 363 + e.Engine.Flags.Add(ctx, atURI, newFlags) 364 + } 365 + for _, mr := range newReports { 224 366 _, err := comatproto.ModerationCreateReport(ctx, xrpcc, &comatproto.ModerationCreateReport_Input{ 225 367 ReasonType: &mr.ReasonType, 226 368 Reason: &mr.Comment, ··· 232 374 return err 233 375 } 234 376 } 235 - if e.RecordTakedown { 377 + if newTakedown { 236 378 comment := "automod" 237 379 _, err := comatproto.AdminEmitModerationEvent(ctx, xrpcc, &comatproto.AdminEmitModerationEvent_Input{ 238 380 CreatedBy: xrpcc.Auth.Did, ··· 272 414 ) 273 415 } 274 416 417 + type RecordDeleteEvent struct { 418 + RepoEvent 419 + 420 + Collection string 421 + RecordKey string 422 + } 423 + 275 424 type IdentityRuleFunc = func(evt *IdentityEvent) error 276 425 type RecordRuleFunc = func(evt *RecordEvent) error 277 426 type PostRuleFunc = func(evt *RecordEvent, post *appbsky.FeedPost) error 278 427 type ProfileRuleFunc = func(evt *RecordEvent, profile *appbsky.ActorProfile) error 428 + type RecordDeleteRuleFunc = func(evt *RecordDeleteEvent) error
+66
automod/flagstore.go
··· 1 + package automod 2 + 3 + import ( 4 + "context" 5 + ) 6 + 7 + type FlagStore interface { 8 + Get(ctx context.Context, key string) ([]string, error) 9 + Add(ctx context.Context, key string, flags []string) error 10 + Remove(ctx context.Context, key string, flags []string) error 11 + } 12 + 13 + type MemFlagStore struct { 14 + Data map[string][]string 15 + } 16 + 17 + func NewMemFlagStore() MemFlagStore { 18 + return MemFlagStore{ 19 + Data: make(map[string][]string), 20 + } 21 + } 22 + 23 + func (s MemFlagStore) Get(ctx context.Context, key string) ([]string, error) { 24 + v, ok := s.Data[key] 25 + if !ok { 26 + return []string{}, nil 27 + } 28 + return v, nil 29 + } 30 + 31 + func (s MemFlagStore) Add(ctx context.Context, key string, flags []string) error { 32 + v, ok := s.Data[key] 33 + if !ok { 34 + v = []string{} 35 + } 36 + for _, f := range flags { 37 + v = append(v, f) 38 + } 39 + v = dedupeStrings(v) 40 + s.Data[key] = v 41 + return nil 42 + } 43 + 44 + // does not error if flags not in set 45 + func (s MemFlagStore) Remove(ctx context.Context, key string, flags []string) error { 46 + if len(flags) == 0 { 47 + return nil 48 + } 49 + v, ok := s.Data[key] 50 + if !ok { 51 + v = []string{} 52 + } 53 + m := make(map[string]bool, len(v)) 54 + for _, f := range v { 55 + m[f] = true 56 + } 57 + for _, f := range flags { 58 + delete(m, f) 59 + } 60 + out := []string{} 61 + for f, _ := range m { 62 + out = append(out, f) 63 + } 64 + s.Data[key] = out 65 + return nil 66 + }
+30
automod/flagstore_test.go
··· 1 + package automod 2 + 3 + import ( 4 + "context" 5 + "testing" 6 + 7 + "github.com/stretchr/testify/assert" 8 + ) 9 + 10 + func TestFlagStoreBasics(t *testing.T) { 11 + assert := assert.New(t) 12 + ctx := context.Background() 13 + 14 + fs := NewMemFlagStore() 15 + 16 + l, err := fs.Get(ctx, "test1") 17 + assert.NoError(err) 18 + assert.Empty(l) 19 + 20 + assert.NoError(fs.Add(ctx, "test1", []string{"red", "green"})) 21 + assert.NoError(fs.Add(ctx, "test1", []string{"red", "blue"})) 22 + l, err = fs.Get(ctx, "test1") 23 + assert.NoError(err) 24 + assert.Equal(3, len(l)) 25 + 26 + assert.NoError(fs.Remove(ctx, "test1", []string{"red", "blue"})) 27 + l, err = fs.Get(ctx, "test1") 28 + assert.NoError(err) 29 + assert.Equal([]string{"green"}, l) 30 + }
+9 -2
automod/redis_cache.go
··· 53 53 } 54 54 55 55 func (s RedisCacheStore) Set(ctx context.Context, name, key string, val string) error { 56 - s.Data.Set(&cache.Item{ 56 + return s.Data.Set(&cache.Item{ 57 57 Ctx: ctx, 58 58 Key: redisCacheKey(name, key), 59 59 Value: val, 60 60 TTL: s.TTL, 61 61 }) 62 - return nil 62 + } 63 + 64 + func (s RedisCacheStore) Purge(ctx context.Context, name, key string) error { 65 + err := s.Data.Delete(ctx, redisCacheKey(name, key)) 66 + if err == cache.ErrCacheMiss { 67 + return nil 68 + } 69 + return err 63 70 }
+35
automod/redis_counters.go
··· 8 8 ) 9 9 10 10 var redisCountPrefix string = "count/" 11 + var redisDistinctPrefix string = "distinct/" 11 12 12 13 type RedisCountStore struct { 13 14 Client *redis.Client ··· 63 64 _, err := multi.Exec(ctx) 64 65 return err 65 66 } 67 + 68 + func (s *RedisCountStore) GetCountDistinct(ctx context.Context, name, val, period string) (int, error) { 69 + key := redisDistinctPrefix + PeriodBucket(name, val, period) 70 + c, err := s.Client.PFCount(ctx, key).Result() 71 + if err == redis.Nil { 72 + return 0, nil 73 + } else if err != nil { 74 + return 0, err 75 + } 76 + return int(c), nil 77 + } 78 + 79 + func (s *RedisCountStore) IncrementDistinct(ctx context.Context, name, bucket, val string) error { 80 + 81 + var key string 82 + 83 + // increment multiple counters in a single redis round-trip 84 + multi := s.Client.Pipeline() 85 + 86 + key = redisDistinctPrefix + PeriodBucket(name, bucket, PeriodHour) 87 + multi.PFAdd(ctx, key, val) 88 + multi.Expire(ctx, key, 2*time.Hour) 89 + 90 + key = redisDistinctPrefix + PeriodBucket(name, bucket, PeriodDay) 91 + multi.PFAdd(ctx, key, val) 92 + multi.Expire(ctx, key, 48*time.Hour) 93 + 94 + key = redisDistinctPrefix + PeriodBucket(name, bucket, PeriodTotal) 95 + multi.PFAdd(ctx, key, val) 96 + // no expiration for total 97 + 98 + _, err := multi.Exec(ctx) 99 + return err 100 + }
+2
automod/redis_directory.go
··· 86 86 } 87 87 88 88 func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*HandleEntry, error) { 89 + h = h.Normalize() 89 90 ident, err := d.Inner.LookupHandle(ctx, h) 90 91 if err != nil { 91 92 he := HandleEntry{ ··· 314 315 func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error { 315 316 handle, err := a.AsHandle() 316 317 if nil == err { // if not an error, is a handle 318 + handle = handle.Normalize() 317 319 return d.handleCache.Delete(ctx, handle.String()) 318 320 } 319 321 did, err := a.AsDID()
+65
automod/redis_flags.go
··· 1 + package automod 2 + 3 + import ( 4 + "context" 5 + 6 + "github.com/redis/go-redis/v9" 7 + ) 8 + 9 + var redisFlagsPrefix string = "flags/" 10 + 11 + type RedisFlagStore struct { 12 + Client *redis.Client 13 + } 14 + 15 + func NewRedisFlagStore(redisURL string) (*RedisFlagStore, error) { 16 + opt, err := redis.ParseURL(redisURL) 17 + if err != nil { 18 + return nil, err 19 + } 20 + rdb := redis.NewClient(opt) 21 + // check redis connection 22 + _, err = rdb.Ping(context.TODO()).Result() 23 + if err != nil { 24 + return nil, err 25 + } 26 + rcs := RedisFlagStore{ 27 + Client: rdb, 28 + } 29 + return &rcs, nil 30 + } 31 + 32 + func (s *RedisFlagStore) Get(ctx context.Context, key string) ([]string, error) { 33 + rkey := redisFlagsPrefix + key 34 + l, err := s.Client.SMembers(ctx, rkey).Result() 35 + if err == redis.Nil { 36 + return []string{}, nil 37 + } else if err != nil { 38 + return nil, err 39 + } 40 + return l, nil 41 + } 42 + 43 + func (s *RedisFlagStore) Add(ctx context.Context, key string, flags []string) error { 44 + if len(flags) == 0 { 45 + return nil 46 + } 47 + l := []interface{}{} 48 + for _, v := range flags { 49 + l = append(l, v) 50 + } 51 + rkey := redisFlagsPrefix + key 52 + return s.Client.SAdd(ctx, rkey, l...).Err() 53 + } 54 + 55 + func (s *RedisFlagStore) Remove(ctx context.Context, key string, flags []string) error { 56 + if len(flags) == 0 { 57 + return nil 58 + } 59 + l := []interface{}{} 60 + for _, v := range flags { 61 + l = append(l, v) 62 + } 63 + rkey := redisFlagsPrefix + key 64 + return s.Client.SRem(ctx, rkey, l...).Err() 65 + }
+35
automod/redis_flagstore_test.go
··· 1 + package automod 2 + 3 + import ( 4 + "context" 5 + "testing" 6 + 7 + "github.com/stretchr/testify/assert" 8 + ) 9 + 10 + func TestRedisFlagStoreBasics(t *testing.T) { 11 + t.Skip("live test, need redis running locally") 12 + assert := assert.New(t) 13 + ctx := context.Background() 14 + 15 + fs, err := NewRedisFlagStore("redis://localhost:6379/0") 16 + if err != nil { 17 + t.Fail() 18 + } 19 + 20 + l, err := fs.Get(ctx, "test1") 21 + assert.NoError(err) 22 + assert.Empty(l) 23 + 24 + assert.NoError(fs.Add(ctx, "test1", []string{"red", "green"})) 25 + assert.NoError(fs.Add(ctx, "test1", []string{"red", "blue"})) 26 + l, err = fs.Get(ctx, "test1") 27 + assert.NoError(err) 28 + assert.Equal(3, len(l)) 29 + 30 + assert.NoError(fs.Remove(ctx, "test1", []string{"red", "blue", "orange"})) 31 + l, err = fs.Get(ctx, "test1") 32 + assert.NoError(err) 33 + assert.Equal([]string{"green"}, l) 34 + assert.NoError(fs.Remove(ctx, "test1", []string{"green"})) 35 + }
+20 -2
automod/rules/all.go
··· 10 10 MisleadingURLPostRule, 11 11 MisleadingMentionPostRule, 12 12 ReplyCountPostRule, 13 - BanHashtagsPostRule, 14 - AccountDemoPostRule, 13 + BadHashtagsPostRule, 14 + //TooManyHashtagsPostRule, 15 + //AccountDemoPostRule, 15 16 AccountPrivateDemoPostRule, 17 + GtubePostRule, 18 + KeywordPostRule, 19 + ReplySingleKeywordPostRule, 20 + AggressivePromotionRule, 21 + }, 22 + ProfileRules: []automod.ProfileRuleFunc{ 23 + GtubeProfileRule, 24 + KeywordProfileRule, 25 + }, 26 + RecordRules: []automod.RecordRuleFunc{ 27 + InteractionChurnRule, 28 + }, 29 + RecordDeleteRules: []automod.RecordDeleteRuleFunc{ 30 + DeleteInteractionRule, 31 + }, 32 + IdentityRules: []automod.IdentityRuleFunc{ 33 + NewAccountRule, 16 34 }, 17 35 } 18 36 return rules
+8 -2
automod/rules/example_sets.json
··· 1 1 { 2 - "banned-hashtags": [ 2 + "bad-hashtags": [ 3 3 "slur", 4 - "anotherslur" 4 + "deathtooutgroup" 5 + ], 6 + "bad-words": [ 7 + "hardar" 8 + ], 9 + "promo-domain": [ 10 + "buy-crypto.example.com" 5 11 ] 6 12 }
+8 -3
automod/rules/fixture_test.go
··· 2 2 3 3 import ( 4 4 "log/slog" 5 + "time" 5 6 6 7 "github.com/bluesky-social/indigo/atproto/identity" 7 8 "github.com/bluesky-social/indigo/atproto/syntax" ··· 12 13 func engineFixture() automod.Engine { 13 14 rules := automod.RuleSet{ 14 15 PostRules: []automod.PostRuleFunc{ 15 - BanHashtagsPostRule, 16 + BadHashtagsPostRule, 16 17 }, 17 18 } 19 + flags := automod.NewMemFlagStore() 20 + cache := automod.NewMemCacheStore(10, time.Hour) 18 21 sets := automod.NewMemSetStore() 19 - sets.Sets["banned-hashtags"] = make(map[string]bool) 20 - sets.Sets["banned-hashtags"]["slur"] = true 22 + sets.Sets["bad-hashtags"] = make(map[string]bool) 23 + sets.Sets["bad-hashtags"]["slur"] = true 21 24 dir := identity.NewMockDirectory() 22 25 id1 := identity.Identity{ 23 26 DID: syntax.DID("did:plc:abc111"), ··· 37 40 Directory: &dir, 38 41 Counters: automod.NewMemCountStore(), 39 42 Sets: sets, 43 + Flags: flags, 44 + Cache: cache, 40 45 Rules: rules, 41 46 AdminClient: &adminc, 42 47 }
+25
automod/rules/gtube.go
··· 1 + package rules 2 + 3 + import ( 4 + "strings" 5 + 6 + appbsky "github.com/bluesky-social/indigo/api/bsky" 7 + "github.com/bluesky-social/indigo/automod" 8 + ) 9 + 10 + // https://en.wikipedia.org/wiki/GTUBE 11 + var gtubeString = "XJS*C4JDBQADN1.NSBN3*2IDNEN*GTUBE-STANDARD-ANTI-UBE-TEST-EMAIL*C.34X" 12 + 13 + func GtubePostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 14 + if strings.Contains(post.Text, gtubeString) { 15 + evt.AddRecordLabel("spam") 16 + } 17 + return nil 18 + } 19 + 20 + func GtubeProfileRule(evt *automod.RecordEvent, profile *appbsky.ActorProfile) error { 21 + if profile.Description != nil && strings.Contains(*profile.Description, gtubeString) { 22 + evt.AddRecordLabel("spam") 23 + } 24 + return nil 25 + }
+21 -2
automod/rules/hashtags.go
··· 5 5 "github.com/bluesky-social/indigo/automod" 6 6 ) 7 7 8 - func BanHashtagsPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 8 + // looks for specific hashtags from known lists 9 + func BadHashtagsPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 9 10 for _, tag := range ExtractHashtags(post) { 10 - if evt.InSet("banned-hashtags", tag) { 11 + tag = NormalizeHashtag(tag) 12 + if evt.InSet("bad-hashtags", tag) { 11 13 evt.AddRecordFlag("bad-hashtag") 12 14 break 13 15 } 14 16 } 15 17 return nil 16 18 } 19 + 20 + // if a post is "almost all" hashtags, it might be a form of search spam 21 + func TooManyHashtagsPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 22 + tags := ExtractHashtags(post) 23 + tagChars := 0 24 + for _, tag := range tags { 25 + tagChars += len(tag) 26 + } 27 + tagTextRatio := float64(tagChars) / float64(len(post.Text)) 28 + // if there is an image, allow some more tags 29 + if len(tags) > 4 && tagTextRatio > 0.6 && post.Embed.EmbedImages == nil { 30 + evt.AddRecordFlag("many-hashtags") 31 + } else if len(tags) > 7 && tagTextRatio > 0.8 { 32 + evt.AddRecordFlag("many-hashtags") 33 + } 34 + return nil 35 + }
+3 -3
automod/rules/hashtags_test.go
··· 11 11 "github.com/stretchr/testify/assert" 12 12 ) 13 13 14 - func TestBanHashtagPostRule(t *testing.T) { 14 + func TestBadHashtagPostRule(t *testing.T) { 15 15 assert := assert.New(t) 16 16 17 17 engine := engineFixture() ··· 27 27 Text: "some post blah", 28 28 } 29 29 evt1 := engine.NewRecordEvent(am1, path, cid1, &p1) 30 - assert.NoError(BanHashtagsPostRule(&evt1, &p1)) 30 + assert.NoError(BadHashtagsPostRule(&evt1, &p1)) 31 31 assert.Empty(evt1.RecordFlags) 32 32 33 33 p2 := appbsky.FeedPost{ ··· 35 35 Tags: []string{"one", "slur"}, 36 36 } 37 37 evt2 := engine.NewRecordEvent(am1, path, cid1, &p2) 38 - assert.NoError(BanHashtagsPostRule(&evt2, &p2)) 38 + assert.NoError(BadHashtagsPostRule(&evt2, &p2)) 39 39 assert.NotEmpty(evt2.RecordFlags) 40 40 }
+101
automod/rules/helpers.go
··· 2 2 3 3 import ( 4 4 "fmt" 5 + "regexp" 6 + "strings" 7 + "unicode" 5 8 6 9 appbsky "github.com/bluesky-social/indigo/api/bsky" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + "github.com/bluesky-social/indigo/automod" 7 12 ) 8 13 9 14 func dedupeStrings(in []string) []string { ··· 31 36 } 32 37 } 33 38 return dedupeStrings(tags) 39 + } 40 + 41 + func NormalizeHashtag(raw string) string { 42 + return strings.ToLower(raw) 34 43 } 35 44 36 45 type PostFacet struct { ··· 76 85 } 77 86 return out, nil 78 87 } 88 + 89 + func ExtractPostBlobCIDsPost(post *appbsky.FeedPost) []string { 90 + var out []string 91 + if post.Embed.EmbedImages != nil { 92 + for _, img := range post.Embed.EmbedImages.Images { 93 + out = append(out, img.Image.Ref.String()) 94 + } 95 + } 96 + if post.Embed.EmbedRecordWithMedia != nil { 97 + media := post.Embed.EmbedRecordWithMedia.Media 98 + if media.EmbedImages != nil { 99 + for _, img := range media.EmbedImages.Images { 100 + out = append(out, img.Image.Ref.String()) 101 + } 102 + } 103 + } 104 + return dedupeStrings(out) 105 + } 106 + 107 + func ExtractBlobCIDsProfile(profile *appbsky.ActorProfile) []string { 108 + var out []string 109 + if profile.Avatar != nil { 110 + out = append(out, profile.Avatar.Ref.String()) 111 + } 112 + if profile.Banner != nil { 113 + out = append(out, profile.Banner.Ref.String()) 114 + } 115 + return dedupeStrings(out) 116 + } 117 + 118 + // NOTE: this function has not been optimiszed at all! 119 + func ExtractTextTokens(raw string) []string { 120 + raw = strings.ToLower(raw) 121 + f := func(c rune) bool { 122 + return !unicode.IsLetter(c) && !unicode.IsNumber(c) 123 + } 124 + return strings.FieldsFunc(raw, f) 125 + } 126 + 127 + func ExtractTextTokensPost(post *appbsky.FeedPost) []string { 128 + return ExtractTextTokens(post.Text) 129 + } 130 + 131 + func ExtractTextTokensProfile(profile *appbsky.ActorProfile) []string { 132 + s := "" 133 + if profile.Description != nil { 134 + s += " " + *profile.Description 135 + } 136 + if profile.DisplayName != nil { 137 + s += " " + *profile.DisplayName 138 + } 139 + return ExtractTextTokens(s) 140 + } 141 + 142 + // based on: https://stackoverflow.com/a/48769624, with no trailing period allowed 143 + var urlRegex = regexp.MustCompile(`(?:(?:https?|ftp):\/\/)?[\w/\-?=%.]+\.[\w/\-&?=%.]*[\w/\-&?=%]+`) 144 + 145 + func ExtractTextURLs(raw string) []string { 146 + return urlRegex.FindAllString(raw, -1) 147 + } 148 + 149 + func ExtractTextURLsProfile(profile *appbsky.ActorProfile) []string { 150 + s := "" 151 + if profile.Description != nil { 152 + s += " " + *profile.Description 153 + } 154 + if profile.DisplayName != nil { 155 + s += " " + *profile.DisplayName 156 + } 157 + return ExtractTextURLs(s) 158 + } 159 + 160 + // checks if the post event is a reply post for which the author is replying to themselves, or author is the root author (OP) 161 + func IsSelfThread(evt *automod.RecordEvent, post *appbsky.FeedPost) bool { 162 + if post.Reply == nil { 163 + return false 164 + } 165 + did := evt.Account.Identity.DID.String() 166 + parentURI, err := syntax.ParseATURI(post.Reply.Parent.Uri) 167 + if err != nil { 168 + return false 169 + } 170 + rootURI, err := syntax.ParseATURI(post.Reply.Root.Uri) 171 + if err != nil { 172 + return false 173 + } 174 + 175 + if parentURI.Authority().String() == did || rootURI.Authority().String() == did { 176 + return true 177 + } 178 + return false 179 + }
+55
automod/rules/helpers_test.go
··· 1 + package rules 2 + 3 + import ( 4 + "testing" 5 + 6 + "github.com/stretchr/testify/assert" 7 + ) 8 + 9 + func TestTokenizeText(t *testing.T) { 10 + assert := assert.New(t) 11 + 12 + fixtures := []struct { 13 + s string 14 + out []string 15 + }{ 16 + { 17 + s: "1 'Two' three!", 18 + out: []string{"1", "two", "three"}, 19 + }, 20 + { 21 + s: " foo1;bar2,baz3...", 22 + out: []string{"foo1", "bar2", "baz3"}, 23 + }, 24 + { 25 + s: "https://example.com/index.html", 26 + out: []string{"https", "example", "com", "index", "html"}, 27 + }, 28 + } 29 + 30 + for _, fix := range fixtures { 31 + assert.Equal(fix.out, ExtractTextTokens(fix.s)) 32 + } 33 + } 34 + 35 + func TestExtractURL(t *testing.T) { 36 + assert := assert.New(t) 37 + 38 + fixtures := []struct { 39 + s string 40 + out []string 41 + }{ 42 + { 43 + s: "this is a description with example.com mentioned in the middle", 44 + out: []string{"example.com"}, 45 + }, 46 + { 47 + s: "this is another example with https://en.wikipedia.org/index.html: and archive.org, and https://eff.org/... and bsky.app.", 48 + out: []string{"https://en.wikipedia.org/index.html", "archive.org", "https://eff.org/", "bsky.app"}, 49 + }, 50 + } 51 + 52 + for _, fix := range fixtures { 53 + assert.Equal(fix.out, ExtractTextURLs(fix.s)) 54 + } 55 + }
+45
automod/rules/identity.go
··· 1 + package rules 2 + 3 + import ( 4 + "net/url" 5 + "strings" 6 + "time" 7 + 8 + "github.com/bluesky-social/indigo/automod" 9 + ) 10 + 11 + // triggers on first identity event for an account (DID) 12 + func NewAccountRule(evt *automod.IdentityEvent) error { 13 + // need access to IndexedAt for this rule 14 + if evt.Account.Private == nil || evt.Account.Identity == nil { 15 + return nil 16 + } 17 + 18 + did := evt.Account.Identity.DID.String() 19 + age := time.Since(evt.Account.Private.IndexedAt) 20 + if age > 2*time.Hour { 21 + return nil 22 + } 23 + exists := evt.GetCount("acct/exists", did, automod.PeriodTotal) 24 + if exists == 0 { 25 + evt.Logger.Info("new account") 26 + evt.Increment("acct/exists", did) 27 + 28 + pdsURL, err := url.Parse(evt.Account.Identity.PDSEndpoint()) 29 + if err != nil { 30 + evt.Logger.Warn("invalid PDS URL", "err", err, "endpoint", evt.Account.Identity.PDSEndpoint()) 31 + return nil 32 + } 33 + pdsHost := strings.ToLower(pdsURL.Host) 34 + existingAccounts := evt.GetCount("host/newacct", pdsHost, automod.PeriodTotal) 35 + evt.Increment("host/newacct", pdsHost) 36 + 37 + // new PDS host 38 + if existingAccounts == 0 { 39 + evt.Logger.Info("new PDS instance", "host", pdsHost) 40 + evt.Increment("host", "new") 41 + evt.AddAccountFlag("host-first-account") 42 + } 43 + } 44 + return nil 45 + }
+44
automod/rules/interaction.go
··· 1 + package rules 2 + 3 + import ( 4 + "github.com/bluesky-social/indigo/automod" 5 + ) 6 + 7 + var interactionDailyThreshold = 500 8 + 9 + // looks for accounts which do frequent interaction churn, such as follow-unfollow. 10 + func InteractionChurnRule(evt *automod.RecordEvent) error { 11 + did := evt.Account.Identity.DID.String() 12 + switch evt.Collection { 13 + case "app.bsky.feed.like": 14 + evt.Increment("like", did) 15 + created := evt.GetCount("like", did, automod.PeriodDay) 16 + deleted := evt.GetCount("unlike", did, automod.PeriodDay) 17 + ratio := float64(deleted) / float64(created) 18 + if created > interactionDailyThreshold && deleted > interactionDailyThreshold && ratio > 0.5 { 19 + evt.Logger.Info("high-like-churn", "created-today", created, "deleted-today", deleted) 20 + evt.AddAccountFlag("high-like-churn") 21 + } 22 + case "app.bsky.graph.follow": 23 + evt.Increment("follow", did) 24 + created := evt.GetCount("follow", did, automod.PeriodDay) 25 + deleted := evt.GetCount("unfollow", did, automod.PeriodDay) 26 + ratio := float64(deleted) / float64(created) 27 + if created > interactionDailyThreshold && deleted > interactionDailyThreshold && ratio > 0.5 { 28 + evt.Logger.Info("high-follow-churn", "created-today", created, "deleted-today", deleted) 29 + evt.AddAccountFlag("high-follow-churn") 30 + } 31 + } 32 + return nil 33 + } 34 + 35 + func DeleteInteractionRule(evt *automod.RecordDeleteEvent) error { 36 + did := evt.Account.Identity.DID.String() 37 + switch evt.Collection { 38 + case "app.bsky.feed.like": 39 + evt.Increment("unlike", did) 40 + case "app.bsky.graph.follow": 41 + evt.Increment("unfollow", did) 42 + } 43 + return nil 44 + }
+36
automod/rules/keyword.go
··· 1 + package rules 2 + 3 + import ( 4 + appbsky "github.com/bluesky-social/indigo/api/bsky" 5 + "github.com/bluesky-social/indigo/automod" 6 + ) 7 + 8 + func KeywordPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 9 + for _, tok := range ExtractTextTokensPost(post) { 10 + if evt.InSet("bad-words", tok) { 11 + evt.AddRecordFlag("bad-word") 12 + break 13 + } 14 + } 15 + return nil 16 + } 17 + 18 + func KeywordProfileRule(evt *automod.RecordEvent, profile *appbsky.ActorProfile) error { 19 + for _, tok := range ExtractTextTokensProfile(profile) { 20 + if evt.InSet("bad-words", tok) { 21 + evt.AddRecordFlag("bad-word") 22 + break 23 + } 24 + } 25 + return nil 26 + } 27 + 28 + func ReplySingleKeywordPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 29 + if post.Reply != nil && !IsSelfThread(evt, post) { 30 + tokens := ExtractTextTokensPost(post) 31 + if len(tokens) == 1 && evt.InSet("bad-words", tokens[0]) { 32 + evt.AddRecordFlag("reply-single-bad-word") 33 + } 34 + } 35 + return nil 36 + }
+81 -34
automod/rules/misleading.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "log/slog" 5 6 "net/url" 6 7 "strings" 8 + "unicode" 7 9 8 10 appbsky "github.com/bluesky-social/indigo/api/bsky" 9 11 "github.com/bluesky-social/indigo/atproto/syntax" 10 12 "github.com/bluesky-social/indigo/automod" 11 13 ) 12 14 15 + func isMisleadingURLFacet(facet PostFacet, logger *slog.Logger) bool { 16 + linkURL, err := url.Parse(*facet.URL) 17 + if err != nil { 18 + logger.Warn("invalid link metadata URL", "url", facet.URL) 19 + return false 20 + } 21 + 22 + // basic text string pre-cleanups 23 + text := strings.ToLower(strings.TrimSpace(facet.Text)) 24 + 25 + // remove square brackets 26 + if strings.HasPrefix(text, "[") && strings.HasSuffix(text, "]") { 27 + text = text[1 : len(text)-1] 28 + } 29 + 30 + // truncated and not an obvious prefix hack (TODO: more special domains? regex?) 31 + if strings.HasSuffix(text, "...") && !strings.HasSuffix(text, ".com...") && !strings.HasSuffix(text, ".org...") { 32 + return false 33 + } 34 + if strings.HasSuffix(text, "…") && !strings.HasSuffix(text, ".com…") && !strings.HasSuffix(text, ".org…") { 35 + return false 36 + } 37 + 38 + // remove any other truncation suffix 39 + text = strings.TrimSuffix(strings.TrimSuffix(text, "..."), "…") 40 + 41 + if len(text) == 0 { 42 + logger.Warn("empty facet text", "text", facet.Text) 43 + return false 44 + } 45 + 46 + // if really not-a-domain, just skip 47 + if !strings.Contains(text, ".") { 48 + return false 49 + } 50 + 51 + // hostnames can't start with a digit (eg, arxiv or DOI links) 52 + for _, c := range text[0:1] { 53 + if unicode.IsNumber(c) { 54 + return false 55 + } 56 + } 57 + 58 + // try to fix any missing method in the text 59 + if !strings.Contains(text, "://") { 60 + text = "https://" + text 61 + } 62 + 63 + // try parsing as a full URL (with whitespace trimmed) 64 + textURL, err := url.Parse(text) 65 + if err != nil { 66 + logger.Warn("invalid link text URL", "url", facet.Text) 67 + return false 68 + } 69 + 70 + // for now just compare domains to handle the most obvious cases 71 + // this public code will obviously get discovered and bypassed. this doesn't earn you any security cred! 72 + linkHost := strings.TrimPrefix(strings.ToLower(linkURL.Host), "www.") 73 + textHost := strings.TrimPrefix(strings.ToLower(textURL.Host), "www.") 74 + if textHost != linkHost { 75 + logger.Warn("misleading mismatched domains", "linkHost", linkURL.Host, "textHost", textURL.Host, "text", facet.Text) 76 + return true 77 + } 78 + return false 79 + } 80 + 13 81 func MisleadingURLPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 82 + // TODO: make this an InSet() config? 83 + if evt.Account.Identity.Handle == "nowbreezing.ntw.app" { 84 + return nil 85 + } 14 86 facets, err := ExtractFacets(post) 15 87 if err != nil { 16 88 evt.Logger.Warn("invalid facets", "err", err) 17 - evt.AddRecordFlag("invalid") // TODO: or some other "this record is corrupt" indicator? 89 + // TODO: or some other "this record is corrupt" indicator? 90 + //evt.AddRecordFlag("broken-post") 18 91 return nil 19 92 } 20 93 for _, facet := range facets { 21 94 if facet.URL != nil { 22 - linkURL, err := url.Parse(*facet.URL) 23 - if err != nil { 24 - evt.Logger.Warn("invalid link metadata URL", "url", facet.URL) 25 - continue 26 - } 27 - 28 - // basic text string pre-cleanups 29 - text := strings.ToLower(strings.TrimSuffix(strings.TrimSpace(facet.Text), "...")) 30 - // if really not a domain, just skipp 31 - if !strings.Contains(text, ".") { 32 - continue 33 - } 34 - // try to fix any missing method in the text 35 - if !strings.Contains(text, "://") { 36 - text = "https://" + text 37 - } 38 - 39 - // try parsing as a full URL (with whitespace trimmed) 40 - textURL, err := url.Parse(text) 41 - if err != nil { 42 - evt.Logger.Warn("invalid link text URL", "url", facet.Text) 43 - continue 44 - } 45 - 46 - // for now just compare domains to handle the most obvious cases 47 - // this public code will obviously get discovered and bypassed. this doesn't earn you any security cred! 48 - if linkURL.Host != textURL.Host && linkURL.Host != "www."+linkURL.Host { 49 - evt.Logger.Warn("misleading mismatched domains", "linkHost", linkURL.Host, "textHost", textURL.Host, "text", facet.Text) 50 - evt.AddRecordFlag("misleading") 95 + if isMisleadingURLFacet(facet, evt.Logger) { 96 + evt.AddRecordFlag("misleading-link") 51 97 } 52 98 } 53 99 } ··· 60 106 facets, err := ExtractFacets(post) 61 107 if err != nil { 62 108 evt.Logger.Warn("invalid facets", "err", err) 63 - evt.AddRecordFlag("invalid") // TODO: or some other "this record is corrupt" indicator? 109 + // TODO: or some other "this record is corrupt" indicator? 110 + //evt.AddRecordFlag("broken-post") 64 111 return nil 65 112 } 66 113 for _, facet := range facets { ··· 69 116 if txt[0] == '@' { 70 117 txt = txt[1:] 71 118 } 72 - handle, err := syntax.ParseHandle(txt) 119 + handle, err := syntax.ParseHandle(strings.ToLower(txt)) 73 120 if err != nil { 74 121 evt.Logger.Warn("mention was not a valid handle", "text", txt) 75 122 continue ··· 78 125 mentioned, err := evt.Engine.Directory.LookupHandle(ctx, handle) 79 126 if err != nil { 80 127 evt.Logger.Warn("could not resolve handle", "handle", handle) 81 - evt.AddRecordFlag("misleading") 128 + evt.AddRecordFlag("broken-mention") 82 129 break 83 130 } 84 131 85 132 // TODO: check if mentioned DID was recently updated? might be a caching issue 86 133 if mentioned.DID.String() != *facet.DID { 87 134 evt.Logger.Warn("misleading mention", "text", txt, "did", facet.DID) 88 - evt.AddRecordFlag("misleading") 135 + evt.AddRecordFlag("misleading-mention") 89 136 continue 90 137 } 91 138 }
+83
automod/rules/misleading_test.go
··· 1 1 package rules 2 2 3 3 import ( 4 + "log/slog" 4 5 "testing" 5 6 6 7 appbsky "github.com/bluesky-social/indigo/api/bsky" ··· 80 81 assert.NoError(MisleadingMentionPostRule(&evt1, &p1)) 81 82 assert.NotEmpty(evt1.RecordFlags) 82 83 } 84 + 85 + func pstr(raw string) *string { 86 + return &raw 87 + } 88 + 89 + func TestIsMisleadingURL(t *testing.T) { 90 + assert := assert.New(t) 91 + logger := slog.Default() 92 + 93 + fixtures := []struct { 94 + facet PostFacet 95 + out bool 96 + }{ 97 + { 98 + facet: PostFacet{ 99 + Text: "https://atproto.com", 100 + URL: pstr("https://atproto.com"), 101 + }, 102 + out: false, 103 + }, 104 + { 105 + facet: PostFacet{ 106 + Text: "https://atproto.com", 107 + URL: pstr("https://evil.com"), 108 + }, 109 + out: true, 110 + }, 111 + { 112 + facet: PostFacet{ 113 + Text: "https://www.atproto.com", 114 + URL: pstr("https://atproto.com"), 115 + }, 116 + out: false, 117 + }, 118 + { 119 + facet: PostFacet{ 120 + Text: "https://atproto.com", 121 + URL: pstr("https://www.atproto.com"), 122 + }, 123 + out: false, 124 + }, 125 + { 126 + facet: PostFacet{ 127 + Text: "[example.com]", 128 + URL: pstr("https://www.example.com"), 129 + }, 130 + out: false, 131 + }, 132 + { 133 + facet: PostFacet{ 134 + Text: "example.com...", 135 + URL: pstr("https://example.com.evil.com"), 136 + }, 137 + out: true, 138 + }, 139 + { 140 + facet: PostFacet{ 141 + Text: "ATPROTO.com...", 142 + URL: pstr("https://atproto.com"), 143 + }, 144 + out: false, 145 + }, 146 + { 147 + facet: PostFacet{ 148 + Text: "1234.5678", 149 + URL: pstr("https://arxiv.org/abs/1234.5678"), 150 + }, 151 + out: false, 152 + }, 153 + { 154 + facet: PostFacet{ 155 + Text: "www.techdirt.com…", 156 + URL: pstr("https://www.techdirt.com/"), 157 + }, 158 + out: false, 159 + }, 160 + } 161 + 162 + for _, fix := range fixtures { 163 + assert.Equal(fix.out, isMisleadingURLFacet(fix.facet, logger)) 164 + } 165 + }
+60
automod/rules/promo.go
··· 1 + package rules 2 + 3 + import ( 4 + "net/url" 5 + "strings" 6 + "time" 7 + 8 + appbsky "github.com/bluesky-social/indigo/api/bsky" 9 + "github.com/bluesky-social/indigo/automod" 10 + ) 11 + 12 + // looks for new accounts, with a commercial or donation link in profile, which directly reply to several accounts 13 + // 14 + // this rule depends on ReplyCountPostRule() to set counts 15 + func AggressivePromotionRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 16 + if evt.Account.Private == nil || evt.Account.Identity == nil { 17 + return nil 18 + } 19 + // TODO: helper for account age 20 + age := time.Since(evt.Account.Private.IndexedAt) 21 + if age > 7*24*time.Hour { 22 + return nil 23 + } 24 + if post.Reply == nil || IsSelfThread(evt, post) { 25 + return nil 26 + } 27 + 28 + allURLs := ExtractTextURLs(post.Text) 29 + if evt.Account.Profile.Description != nil { 30 + profileURLs := ExtractTextURLs(*evt.Account.Profile.Description) 31 + allURLs = append(allURLs, profileURLs...) 32 + } 33 + hasPromo := false 34 + for _, s := range allURLs { 35 + if !strings.Contains(s, "://") { 36 + s = "https://" + s 37 + } 38 + u, err := url.Parse(s) 39 + if err != nil { 40 + evt.Logger.Warn("failed to parse URL", "url", s) 41 + continue 42 + } 43 + host := strings.TrimPrefix(strings.ToLower(u.Host), "www.") 44 + if evt.InSet("promo-domain", host) { 45 + hasPromo = true 46 + break 47 + } 48 + } 49 + if !hasPromo { 50 + return nil 51 + } 52 + 53 + did := evt.Account.Identity.DID.String() 54 + uniqueReplies := evt.GetCountDistinct("reply-to", did, automod.PeriodDay) 55 + if uniqueReplies >= 5 { 56 + evt.AddAccountFlag("promo-multi-reply") 57 + } 58 + 59 + return nil 60 + }
+18 -6
automod/rules/replies.go
··· 2 2 3 3 import ( 4 4 appbsky "github.com/bluesky-social/indigo/api/bsky" 5 + "github.com/bluesky-social/indigo/atproto/syntax" 5 6 "github.com/bluesky-social/indigo/automod" 6 7 ) 7 8 9 + // does not count "self-replies" (direct to self, or in own post thread) 8 10 func ReplyCountPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 9 - if post.Reply != nil { 10 - did := evt.Account.Identity.DID.String() 11 - if evt.GetCount("reply", did, automod.PeriodDay) > 3 { 12 - evt.AddAccountFlag("frequent-replier") 13 - } 14 - evt.Increment("reply", did) 11 + if post.Reply == nil || IsSelfThread(evt, post) { 12 + return nil 15 13 } 14 + 15 + did := evt.Account.Identity.DID.String() 16 + if evt.GetCount("reply", did, automod.PeriodDay) > 3 { 17 + // TODO: disabled, too noisy for prod 18 + //evt.AddAccountFlag("frequent-replier") 19 + } 20 + evt.Increment("reply", did) 21 + 22 + parentURI, err := syntax.ParseATURI(post.Reply.Parent.Uri) 23 + if err != nil { 24 + evt.Logger.Warn("failed to parse reply AT-URI", "uri", post.Reply.Parent.Uri) 25 + return nil 26 + } 27 + evt.IncrementDistinct("reply-to", did, parentURI.Authority().String()) 16 28 return nil 17 29 }
+18 -4
automod/ruleset.go
··· 7 7 ) 8 8 9 9 type RuleSet struct { 10 - PostRules []PostRuleFunc 11 - ProfileRules []ProfileRuleFunc 12 - RecordRules []RecordRuleFunc 13 - IdentityRules []IdentityRuleFunc 10 + PostRules []PostRuleFunc 11 + ProfileRules []ProfileRuleFunc 12 + RecordRules []RecordRuleFunc 13 + RecordDeleteRules []RecordDeleteRuleFunc 14 + IdentityRules []IdentityRuleFunc 14 15 } 15 16 16 17 func (r *RuleSet) CallRecordRules(evt *RecordEvent) error { ··· 53 54 if evt.Err != nil { 54 55 return evt.Err 55 56 } 57 + } 58 + } 59 + return nil 60 + } 61 + 62 + func (r *RuleSet) CallRecordDeleteRules(evt *RecordDeleteEvent) error { 63 + for _, f := range r.RecordDeleteRules { 64 + err := f(evt) 65 + if err != nil { 66 + return err 67 + } 68 + if evt.Err != nil { 69 + return evt.Err 56 70 } 57 71 } 58 72 return nil
+42
automod/slack.go
··· 1 + package automod 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "net/http" 9 + ) 10 + 11 + type SlackWebhookBody struct { 12 + Text string `json:"text"` 13 + } 14 + 15 + // Sends a simple slack message to a channel via "incoming webhook". 16 + // 17 + // The slack incoming webhook must be already configured in the slack workplace. 18 + func (e *Engine) SendSlackMsg(ctx context.Context, msg string) error { 19 + // loosely based on: https://golangcode.com/send-slack-messages-without-a-library/ 20 + 21 + body, _ := json.Marshal(SlackWebhookBody{Text: msg}) 22 + req, err := http.NewRequestWithContext(ctx, http.MethodPost, e.SlackWebhookURL, bytes.NewBuffer(body)) 23 + if err != nil { 24 + return err 25 + } 26 + req.Header.Add("Content-Type", "application/json") 27 + client := http.DefaultClient 28 + resp, err := client.Do(req) 29 + if err != nil { 30 + return err 31 + } 32 + 33 + defer resp.Body.Close() 34 + 35 + buf := new(bytes.Buffer) 36 + buf.ReadFrom(resp.Body) 37 + if resp.StatusCode != 200 || buf.String() != "ok" { 38 + // TODO: in some cases print body? eg, if short and text 39 + return fmt.Errorf("failed slack webhook POST request. status=%d", resp.StatusCode) 40 + } 41 + return nil 42 + }
+18
cmd/hepa/consumer.go
··· 61 61 } 62 62 return nil 63 63 }, 64 + RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { 65 + s.lastSeq = evt.Seq 66 + did, err := syntax.ParseDID(evt.Did) 67 + if err != nil { 68 + s.logger.Error("bad DID in RepoMigrate event", "did", evt.Did, "seq", evt.Seq, "err", err) 69 + return nil 70 + } 71 + if err := s.engine.ProcessIdentityEvent(ctx, "migrate", did); err != nil { 72 + s.logger.Error("processing repo migrate failed", "did", evt.Did, "seq", evt.Seq, "err", err) 73 + } 74 + return nil 75 + }, 64 76 // TODO: other event callbacks as needed 65 77 } 66 78 ··· 117 129 } 118 130 119 131 err = s.engine.ProcessRecord(ctx, did, op.Path, op.Cid.String(), rec) 132 + if err != nil { 133 + logger.Error("engine failed to process record", "err", err) 134 + continue 135 + } 136 + case repomgr.EvtKindDeleteRecord: 137 + err = s.engine.ProcessRecordDelete(ctx, did, op.Path) 120 138 if err != nil { 121 139 logger.Error("engine failed to process record", "err", err) 122 140 continue
+79 -12
cmd/hepa/main.go
··· 9 9 "time" 10 10 11 11 "github.com/bluesky-social/indigo/atproto/identity" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 12 13 "github.com/bluesky-social/indigo/automod" 13 14 14 15 "github.com/carlmjohnson/versioninfo" ··· 95 96 app.Commands = []*cli.Command{ 96 97 runCmd, 97 98 processRecordCmd, 99 + processRecentCmd, 98 100 } 99 101 100 102 return app.Run(args) ··· 134 136 Value: ":3989", 135 137 EnvVars: []string{"HEPA_METRICS_LISTEN"}, 136 138 }, 139 + &cli.StringFlag{ 140 + Name: "slack-webhook-url", 141 + // eg: https://hooks.slack.com/services/X1234 142 + Usage: "full URL of slack webhook", 143 + EnvVars: []string{"SLACK_WEBHOOK_URL"}, 144 + }, 137 145 }, 138 146 Action: func(cctx *cli.Context) error { 139 147 ctx := context.Background() ··· 152 160 srv, err := NewServer( 153 161 dir, 154 162 Config{ 155 - BGSHost: cctx.String("atp-bgs-host"), 156 - BskyHost: cctx.String("atp-bsky-host"), 157 - Logger: logger, 158 - ModHost: cctx.String("atp-mod-host"), 159 - ModAdminToken: cctx.String("mod-admin-token"), 160 - ModUsername: cctx.String("mod-handle"), 161 - ModPassword: cctx.String("mod-password"), 162 - SetsFileJSON: cctx.String("sets-json-path"), 163 - RedisURL: cctx.String("redis-url"), 163 + BGSHost: cctx.String("atp-bgs-host"), 164 + BskyHost: cctx.String("atp-bsky-host"), 165 + Logger: logger, 166 + ModHost: cctx.String("atp-mod-host"), 167 + ModAdminToken: cctx.String("mod-admin-token"), 168 + ModUsername: cctx.String("mod-handle"), 169 + ModPassword: cctx.String("mod-password"), 170 + SetsFileJSON: cctx.String("sets-json-path"), 171 + RedisURL: cctx.String("redis-url"), 172 + SlackWebhookURL: cctx.String("slack-webhook-url"), 164 173 }, 165 174 ) 166 175 if err != nil { ··· 195 204 ArgsUsage: `<at-uri>`, 196 205 Flags: []cli.Flag{}, 197 206 Action: func(cctx *cli.Context) error { 198 - uri := cctx.Args().First() 199 - if uri == "" { 207 + uriArg := cctx.Args().First() 208 + if uriArg == "" { 200 209 return fmt.Errorf("expected a single AT-URI argument") 210 + } 211 + aturi, err := syntax.ParseATURI(uriArg) 212 + if err != nil { 213 + return fmt.Errorf("not a valid AT-URI: %v", err) 201 214 } 202 215 203 216 ctx := context.Background() ··· 229 242 return err 230 243 } 231 244 232 - return srv.engine.FetchAndProcessRecord(ctx, uri) 245 + return srv.engine.FetchAndProcessRecord(ctx, aturi) 246 + }, 247 + } 248 + 249 + var processRecentCmd = &cli.Command{ 250 + Name: "process-recent", 251 + Usage: "fetch and process recent posts for an account", 252 + ArgsUsage: `<at-identifier>`, 253 + Flags: []cli.Flag{ 254 + &cli.IntFlag{ 255 + Name: "limit", 256 + Usage: "how many post records to parse", 257 + Value: 20, 258 + }, 259 + }, 260 + Action: func(cctx *cli.Context) error { 261 + idArg := cctx.Args().First() 262 + if idArg == "" { 263 + return fmt.Errorf("expected a single AT identifier (handle or DID) argument") 264 + } 265 + atid, err := syntax.ParseAtIdentifier(idArg) 266 + if err != nil { 267 + return fmt.Errorf("not a valid handle or DID: %v", err) 268 + } 269 + 270 + ctx := context.Background() 271 + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 272 + Level: slog.LevelInfo, 273 + })) 274 + slog.SetDefault(logger) 275 + 276 + dir, err := configDirectory(cctx) 277 + if err != nil { 278 + return err 279 + } 280 + 281 + srv, err := NewServer( 282 + dir, 283 + Config{ 284 + BGSHost: cctx.String("atp-bgs-host"), 285 + BskyHost: cctx.String("atp-bsky-host"), 286 + Logger: logger, 287 + ModHost: cctx.String("atp-mod-host"), 288 + ModAdminToken: cctx.String("mod-admin-token"), 289 + ModUsername: cctx.String("mod-handle"), 290 + ModPassword: cctx.String("mod-password"), 291 + SetsFileJSON: cctx.String("sets-json-path"), 292 + RedisURL: cctx.String("redis-url"), 293 + }, 294 + ) 295 + if err != nil { 296 + return err 297 + } 298 + 299 + return srv.engine.FetchAndProcessRecent(ctx, *atid, cctx.Int("limit")) 233 300 }, 234 301 }
+20 -9
cmd/hepa/server.go
··· 29 29 } 30 30 31 31 type Config struct { 32 - BGSHost string 33 - BskyHost string 34 - ModHost string 35 - ModAdminToken string 36 - ModUsername string 37 - ModPassword string 38 - SetsFileJSON string 39 - RedisURL string 40 - Logger *slog.Logger 32 + BGSHost string 33 + BskyHost string 34 + ModHost string 35 + ModAdminToken string 36 + ModUsername string 37 + ModPassword string 38 + SetsFileJSON string 39 + RedisURL string 40 + SlackWebhookURL string 41 + Logger *slog.Logger 41 42 } 42 43 43 44 func NewServer(dir identity.Directory, config Config) (*Server, error) { ··· 87 88 88 89 var counters automod.CountStore 89 90 var cache automod.CacheStore 91 + var flags automod.FlagStore 90 92 var rdb *redis.Client 91 93 if config.RedisURL != "" { 92 94 // generic client, for cursor state ··· 112 114 return nil, err 113 115 } 114 116 cache = csh 117 + 118 + flg, err := automod.NewRedisFlagStore(config.RedisURL) 119 + if err != nil { 120 + return nil, err 121 + } 122 + flags = flg 115 123 } else { 116 124 counters = automod.NewMemCountStore() 117 125 cache = automod.NewMemCacheStore(5_000, 30*time.Minute) 126 + flags = automod.NewMemFlagStore() 118 127 } 119 128 120 129 engine := automod.Engine{ ··· 122 131 Directory: dir, 123 132 Counters: counters, 124 133 Sets: sets, 134 + Flags: flags, 125 135 Cache: cache, 126 136 Rules: rules.DefaultRules(), 127 137 AdminClient: xrpcc, ··· 129 139 Client: util.RobustHTTPClient(), 130 140 Host: config.BskyHost, 131 141 }, 142 + SlackWebhookURL: config.SlackWebhookURL, 132 143 } 133 144 134 145 s := &Server{
+2 -13
go.sum
··· 80 80 github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= 81 81 github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= 82 82 github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= 83 - github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= 84 83 github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 85 84 github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 86 85 github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= ··· 301 300 github.com/ipfs/go-ipfs-routing v0.3.0/go.mod h1:dKqtTFIql7e1zYsEuWLyuOU+E0WJWW8JjbTPLParDWo= 302 301 github.com/ipfs/go-ipfs-util v0.0.3 h1:2RFdGez6bu2ZlZdI+rWfIdbQb1KudQp3VGwPtdNCmE0= 303 302 github.com/ipfs/go-ipfs-util v0.0.3/go.mod h1:LHzG1a0Ig4G+iZ26UUOMjHd+lfM84LZCrn17xAKWBvs= 304 - github.com/ipfs/go-ipld-cbor v0.0.7-0.20230126201833-a73d038d90bc h1:eUEo764smNy0EVRuMTSmirmuh552Mf2aBjfpDcLnDa8= 305 - github.com/ipfs/go-ipld-cbor v0.0.7-0.20230126201833-a73d038d90bc/go.mod h1:X7SgEIwC4COC5OWfcepZBWafO5kA1Rmt9ZsLLbhihQk= 306 303 github.com/ipfs/go-ipld-cbor v0.1.0 h1:dx0nS0kILVivGhfWuB6dUpMa/LAwElHPw1yOGYopoYs= 307 304 github.com/ipfs/go-ipld-cbor v0.1.0/go.mod h1:U2aYlmVrJr2wsUBU67K4KgepApSZddGRDWBYR0H4sCk= 308 305 github.com/ipfs/go-ipld-format v0.6.0 h1:VEJlA2kQ3LqFSIm5Vu6eIlSxD/Ze90xtc4Meten1F5U= ··· 324 321 github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY= 325 322 github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVztekOF0pg= 326 323 github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU= 327 - github.com/ipfs/go-unixfsnode v1.6.0 h1:JOSA02yaLylRNi2rlB4ldPr5VcZhcnaIVj5zNLcOjDo= 328 - github.com/ipfs/go-unixfsnode v1.6.0/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk= 324 + github.com/ipfs/go-unixfsnode v1.8.0 h1:yCkakzuE365glu+YkgzZt6p38CSVEBPgngL9ZkfnyQU= 325 + github.com/ipfs/go-unixfsnode v1.8.0/go.mod h1:HxRu9HYHOjK6HUqFBAi++7DVoWAHn0o4v/nZ/VA+0g8= 329 326 github.com/ipfs/go-verifcid v0.0.3 h1:gmRKccqhWDocCRkC+a59g5QW7uJw5bpX9HWBevXa0zs= 330 327 github.com/ipfs/go-verifcid v0.0.3/go.mod h1:gcCtGniVzelKrbk9ooUSX/pM3xlH73fZZJDzQJRvOUw= 331 328 github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 h1:oFo19cBmcP0Cmg3XXbrr0V/c+xU9U1huEZp8+OgBzdI= 332 329 github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4/go.mod h1:6nkFF8OmR5wLKBzRKi7/YFJpyYR7+oEn1DX+mMWnlLA= 333 - github.com/ipld/go-car/v2 v2.9.0 h1:mkMSfh9NpnfdFe30xBFTQiKZ6+LY+mwOPrq6r56xsPo= 334 - github.com/ipld/go-car/v2 v2.9.0/go.mod h1:UeIST4b5Je6LEx8GjFysgeCYwxAHKtAcsWxmF6PupNQ= 335 330 github.com/ipld/go-car/v2 v2.13.1 h1:KnlrKvEPEzr5IZHKTXLAEub+tPrzeAFQVRlSQvuxBO4= 336 331 github.com/ipld/go-car/v2 v2.13.1/go.mod h1:QkdjjFNGit2GIkpQ953KBwowuoukoM75nP/JI1iDJdo= 337 332 github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= ··· 377 372 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= 378 373 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= 379 374 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= 380 - github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= 381 375 github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= 382 376 github.com/klauspost/compress v1.17.3 h1:qkRjuerhUU1EmXLYGkSH6EZL+vPSxIrYjLNAK4slzwA= 383 377 github.com/klauspost/compress v1.17.3/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= ··· 629 623 github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= 630 624 github.com/vmihailenco/go-tinylfu v0.2.2 h1:H1eiG6HM36iniK6+21n9LLpzx1G9R3DJa2UjUjbynsI= 631 625 github.com/vmihailenco/go-tinylfu v0.2.2/go.mod h1:CutYi2Q9puTxfcolkliPq4npPuofg9N9t8JVrjzwa3Q= 632 - github.com/vmihailenco/msgpack/v5 v5.3.4 h1:qMKAwOV+meBw2Y8k9cVwAy7qErtYCwBzZ2ellBfvnqc= 633 626 github.com/vmihailenco/msgpack/v5 v5.3.4/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= 634 627 github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= 635 628 github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= ··· 641 634 github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= 642 635 github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 h1:5HZfQkwe0mIfyDmc1Em5GqlNRzcdtlv4HTNmdpt7XH0= 643 636 github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11/go.mod h1:Wlo/SzPmxVp6vXpGt/zaXhHH0fn4IxgqZc82aKg6bpQ= 644 - github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25 h1:yVYDLoN2gmB3OdBXFW8e1UwgVbmCvNlnAKhvHPaNARI= 645 - github.com/whyrusleeping/cbor-gen v0.0.0-20230818171029-f91ae536ca25/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= 646 637 github.com/whyrusleeping/cbor-gen v0.0.0-20230923211252-36a87e1ba72f h1:SBuSxXJL0/ZJMtTxbXZgHZkThl9dNrzyaNhlyaqscRo= 647 638 github.com/whyrusleeping/cbor-gen v0.0.0-20230923211252-36a87e1ba72f/go.mod h1:fgkXqYy7bV2cFeIEOkVTZS/WjXARfBqSH6Q2qHL33hQ= 648 639 github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f h1:jQa4QT2UP9WYv2nzyawpKMOCl+Z/jW7djv2/J50lj9E= ··· 992 983 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 993 984 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 994 985 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 995 - golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk= 996 - golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= 997 986 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= 998 987 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= 999 988 google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=