this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

automod: identical reply rule (#466)

Two enabling features:

- cheap consistent non-cryptographic-strength hashing of strings for use
in counter keys (went with uint64 murmur3, which was already in
dependency tree)
- ability to increment a counter for a single time period, to control
counter key space growth (for redis)

This initial version of the rule counts replies to any other user in the
same bucket, not distinct-accounts-with-same-reply-text. I'm a little
worried about redis memory growth if we have a HyperLogLog for each
author+text combination (as opposed to simple counter int). Maybe the
redis implementation is clever and efficient for the small-distinct
case? Or maybe RAM is cheap enough?

This branch will conflict with
https://github.com/bluesky-social/indigo/pull/464. Plan to merge that
one first, then i'll rebase this one.

authored by

bnewbold and committed by
GitHub
7d7e1f14 25a236f7

+255 -33
+99
automod/countstore.go
··· 1 + package automod 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "time" 8 + ) 9 + 10 + const ( 11 + PeriodTotal = "total" 12 + PeriodDay = "day" 13 + PeriodHour = "hour" 14 + ) 15 + 16 + type CountStore interface { 17 + GetCount(ctx context.Context, name, val, period string) (int, error) 18 + Increment(ctx context.Context, name, val string) error 19 + IncrementPeriod(ctx context.Context, name, val, period string) error 20 + // TODO: batch increment method 21 + GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) 22 + IncrementDistinct(ctx context.Context, name, bucket, val string) error 23 + } 24 + 25 + // TODO: this implementation isn't race-safe (yet)! 26 + type MemCountStore struct { 27 + Counts map[string]int 28 + DistinctCounts map[string]map[string]bool 29 + } 30 + 31 + func NewMemCountStore() MemCountStore { 32 + return MemCountStore{ 33 + Counts: make(map[string]int), 34 + DistinctCounts: make(map[string]map[string]bool), 35 + } 36 + } 37 + 38 + func PeriodBucket(name, val, period string) string { 39 + switch period { 40 + case PeriodTotal: 41 + return fmt.Sprintf("%s/%s", name, val) 42 + case PeriodDay: 43 + t := time.Now().UTC().Format(time.DateOnly) 44 + return fmt.Sprintf("%s/%s/%s", name, val, t) 45 + case PeriodHour: 46 + t := time.Now().UTC().Format(time.RFC3339)[0:13] 47 + return fmt.Sprintf("%s/%s/%s", name, val, t) 48 + default: 49 + slog.Warn("unhandled counter period", "period", period) 50 + return fmt.Sprintf("%s/%s", name, val) 51 + } 52 + } 53 + 54 + func (s MemCountStore) GetCount(ctx context.Context, name, val, period string) (int, error) { 55 + v, ok := s.Counts[PeriodBucket(name, val, period)] 56 + if !ok { 57 + return 0, nil 58 + } 59 + return v, nil 60 + } 61 + 62 + func (s MemCountStore) Increment(ctx context.Context, name, val string) error { 63 + for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} { 64 + s.IncrementPeriod(ctx, name, val, p) 65 + } 66 + return nil 67 + } 68 + 69 + func (s MemCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error { 70 + k := PeriodBucket(name, val, period) 71 + v, ok := s.Counts[k] 72 + if !ok { 73 + v = 0 74 + } 75 + v = v + 1 76 + s.Counts[k] = v 77 + return nil 78 + } 79 + 80 + func (s MemCountStore) GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) { 81 + v, ok := s.DistinctCounts[PeriodBucket(name, bucket, period)] 82 + if !ok { 83 + return 0, nil 84 + } 85 + return len(v), nil 86 + } 87 + 88 + func (s MemCountStore) IncrementDistinct(ctx context.Context, name, bucket, val string) error { 89 + for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} { 90 + k := PeriodBucket(name, bucket, p) 91 + m, ok := s.DistinctCounts[k] 92 + if !ok { 93 + m = make(map[string]bool) 94 + } 95 + m[val] = true 96 + s.DistinctCounts[k] = m 97 + } 98 + return nil 99 + }
+5 -4
automod/countstore/countstore.go
··· 27 27 // Incrementing -- both the "Increment" and "IncrementDistinct" variants -- increases 28 28 // a count in each supported period bucket size. 29 29 // In other words, one call to CountStore.Increment causes three increments internally: 30 - // one to the count for the hour, one to the count for the day, and one to thte all-time count. 30 + // one to the count for the hour, one to the count for the day, and one to the all-time count. 31 + // The "IncrementPeriod" method allows only incrementing a single period bucket. Care must be taken to match the "GetCount" period with the incremented period when using this variant. 31 32 // 32 33 // The exact implementation and precision of the "*Distinct" methods may vary: 33 34 // in the MemCountStore implementation, it is precise (it's based on large maps); 34 35 // in the RedisCountStore implementation, it uses the Redis "pfcount" feature, 35 - // which is based on a HyperLogLog datastructure which has probablistic properties 36 + // which is based on a HyperLogLog datastructure which has probabilistic properties 36 37 // (see https://redis.io/commands/pfcount/ ). 37 38 // 38 - // Memory growth and availablity of information over time also varies by implementation. 39 + // Memory growth and availability of information over time also varies by implementation. 39 40 // The RedisCountStore implementation uses Redis's key expiration primitives; 40 41 // only the all-time counts go without expiration. 41 42 // The MemCountStore grows without bound (it's intended to be used in testing 42 43 // and other non-production operations). 43 - // 44 44 type CountStore interface { 45 45 GetCount(ctx context.Context, name, val, period string) (int, error) 46 46 Increment(ctx context.Context, name, val string) error 47 + IncrementPeriod(ctx context.Context, name, val, period string) error 47 48 // TODO: batch increment method 48 49 GetCountDistinct(ctx context.Context, name, bucket, period string) (int, error) 49 50 IncrementDistinct(ctx context.Context, name, bucket, val string) error
+12 -5
automod/countstore/countstore_mem.go
··· 33 33 34 34 func (s MemCountStore) Increment(ctx context.Context, name, val string) error { 35 35 for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} { 36 - k := periodBucket(name, val, p) 37 - s.Counts.Compute(k, func(oldVal int, _ bool) (int, bool) { 38 - return oldVal+1, false 39 - }) 36 + if err := s.IncrementPeriod(ctx, name, val, p); err != nil { 37 + return err 38 + } 40 39 } 40 + return nil 41 + } 42 + 43 + func (s MemCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error { 44 + k := periodBucket(name, val, period) 45 + s.Counts.Compute(k, func(oldVal int, _ bool) (int, bool) { 46 + return oldVal + 1, false 47 + }) 41 48 return nil 42 49 } 43 50 ··· 52 59 func (s MemCountStore) IncrementDistinct(ctx context.Context, name, bucket, val string) error { 53 60 for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} { 54 61 k := periodBucket(name, bucket, p) 55 - s.DistinctCounts.Compute(k,func(nested *xsync.MapOf[string, bool], _ bool) (*xsync.MapOf[string, bool], bool) { 62 + s.DistinctCounts.Compute(k, func(nested *xsync.MapOf[string, bool], _ bool) (*xsync.MapOf[string, bool], bool) { 56 63 if nested == nil { 57 64 nested = xsync.NewMapOf[string, bool]() 58 65 }
+20
automod/countstore/countstore_redis.go
··· 65 65 return err 66 66 } 67 67 68 + // Variant of Increment() which only acts on a single specified time period. The intended us of this variant is to control the total number of counters persisted, by using a relatively short time period, for which the counters will expire. 69 + func (s *RedisCountStore) IncrementPeriod(ctx context.Context, name, val, period string) error { 70 + 71 + // multiple ops in a single redis round-trip 72 + multi := s.Client.Pipeline() 73 + 74 + key := redisCountPrefix + periodBucket(name, val, period) 75 + multi.Incr(ctx, key) 76 + 77 + switch period { 78 + case PeriodHour: 79 + multi.Expire(ctx, key, 2*time.Hour) 80 + case PeriodDay: 81 + multi.Expire(ctx, key, 48*time.Hour) 82 + } 83 + 84 + _, err := multi.Exec(ctx) 85 + return err 86 + } 87 + 68 88 func (s *RedisCountStore) GetCountDistinct(ctx context.Context, name, val, period string) (int, error) { 69 89 key := redisDistinctPrefix + periodBucket(name, val, period) 70 90 c, err := s.Client.PFCount(ctx, key).Result()
+79 -23
automod/event.go
··· 16 16 } 17 17 18 18 type CounterRef struct { 19 - Name string 20 - Val string 19 + Name string 20 + Val string 21 + Period *string 21 22 } 22 23 23 24 type CounterDistinctRef struct { ··· 26 27 Val string 27 28 } 28 29 29 - // base type for events specific to an account, usually derived from a repo event stream message (one such message may result in multiple `RepoEvent`) 30 + // Base type for events specific to an account, usually derived from a repo event stream message (one such message may result in multiple `RepoEvent`) 31 + // 32 + // Events are both containers for data about the event itself (similar to an HTTP request type); aggregate results and state (counters, mod actions) to be persisted after all rules are run; and act as an API for additional network reads and operations. 30 33 // 31 - // events are both containers for data about the event itself (similar to an HTTP request type); aggregate results and state (counters, mod actions) to be persisted after all rules are run; and act as an API for additional network reads and operations. 34 + // Handling of moderation actions (such as labels, flags, and reports) are deferred until the end of all rule execution, then de-duplicated against any pre-existing actions on the account. 32 35 type RepoEvent struct { 33 - Engine *Engine 34 - Err error 35 - Logger *slog.Logger 36 - Account AccountMeta 37 - CounterIncrements []CounterRef 36 + // Back-reference to Engine that is processing this event. Pointer, but must not be nil. 37 + Engine *Engine 38 + // Any error encountered while processing the event can be stashed in this field and handled at the end of all processing. 39 + Err error 40 + // slog logger handle, with event-specific structured fields pre-populated. Pointer, but expected to not be nil. 41 + Logger *slog.Logger 42 + // Metadata for the account (identity) associated with this event (aka, the repo owner) 43 + Account AccountMeta 44 + // List of counters which should be incremented as part of processing this event. These are collected during rule execution and persisted in bulk at the end. 45 + CounterIncrements []CounterRef 46 + // Similar to "CounterIncrements", but for "distinct" style counters 38 47 CounterDistinctIncrements []CounterDistinctRef // TODO: better variable names 39 - AccountLabels []string 40 - AccountFlags []string 41 - AccountReports []ModReport 42 - AccountTakedown bool 48 + // Label values which should be applied to the overall account, as a result of rule execution. 49 + AccountLabels []string 50 + // Moderation flags (similar to labels, but private) which should be applied to the overall account, as a result of rule execution. 51 + AccountFlags []string 52 + // Reports which should be filed against this account, as a result of rule execution. 53 + AccountReports []ModReport 54 + // If "true", indicates that a rule indicates that the entire account should have a takedown. 55 + AccountTakedown bool 43 56 } 44 57 58 + // Immediate fetches a count from the event's engine's countstore. Returns 0 by default (if counter has never been incremented). 59 + // 60 + // "name" is the counter namespace. 61 + // "val" is the specific counter with that namespace. 62 + // "period" is the time period bucke (one of the fixed "Period*" values) 45 63 func (e *RepoEvent) GetCount(name, val, period string) int { 46 64 v, err := e.Engine.GetCount(name, val, period) 47 65 if err != nil { ··· 51 69 return v 52 70 } 53 71 72 + // Enqueues the named counter to be incremented at the end of all rule processing. Will automatically increment for all time periods. 73 + // 74 + // "name" is the counter namespace. 75 + // "val" is the specific counter with that namespace. 54 76 func (e *RepoEvent) Increment(name, val string) { 55 77 e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val}) 56 78 } 57 79 80 + // Enqueues the named counter to be incremented at the end of all rule processing. Will only increment the indicated time period bucket. 81 + func (e *RepoEvent) IncrementPeriod(name, val string, period string) { 82 + e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val, Period: &period}) 83 + } 84 + 85 + // Immediate fetches an estimated (statistical) count of distinct string values in the indicated bucket and time period. 58 86 func (e *RepoEvent) GetCountDistinct(name, bucket, period string) int { 59 87 v, err := e.Engine.GetCountDistinct(name, bucket, period) 60 88 if err != nil { ··· 64 92 return v 65 93 } 66 94 95 + // Enqueues the named "distinct value" counter based on the supplied string value ("val") to be incremented at the end of all rule processing. Will automatically increment for all time periods. 67 96 func (e *RepoEvent) IncrementDistinct(name, bucket, val string) { 68 97 e.CounterDistinctIncrements = append(e.CounterDistinctIncrements, CounterDistinctRef{Name: name, Bucket: bucket, Val: val}) 69 98 } 70 99 100 + // Checks the Engine's setstore for whether the indicated "val" is a member of the "name" set. 71 101 func (e *RepoEvent) InSet(name, val string) bool { 72 102 v, err := e.Engine.InSet(name, val) 73 103 if err != nil { ··· 77 107 return v 78 108 } 79 109 110 + // Enqueues the entire account to be taken down at the end of rule processing. 80 111 func (e *RepoEvent) TakedownAccount() { 81 112 e.AccountTakedown = true 82 113 } 83 114 115 + // Enqueues the provided label (string value) to be added to the account at the end of rule processing. 84 116 func (e *RepoEvent) AddAccountLabel(val string) { 85 117 e.AccountLabels = append(e.AccountLabels, val) 86 118 } 87 119 120 + // Enqueues the provided flag (string value) to be recorded (in the Engine's flagstore) at the end of rule processing. 88 121 func (e *RepoEvent) AddAccountFlag(val string) { 89 122 e.AccountFlags = append(e.AccountFlags, val) 90 123 } 91 124 125 + // Enqueues a moderation report to be filed against the account at the end of rule processing. 92 126 func (e *RepoEvent) ReportAccount(reason, comment string) { 93 127 e.AccountReports = append(e.AccountReports, ModReport{ReasonType: reason, Comment: comment}) 94 128 } ··· 247 281 func (e *RepoEvent) PersistCounters(ctx context.Context) error { 248 282 // TODO: dedupe this array 249 283 for _, ref := range e.CounterIncrements { 250 - err := e.Engine.Counters.Increment(ctx, ref.Name, ref.Val) 251 - if err != nil { 252 - return err 284 + if ref.Period != nil { 285 + err := e.Engine.Counters.IncrementPeriod(ctx, ref.Name, ref.Val, *ref.Period) 286 + if err != nil { 287 + return err 288 + } 289 + } else { 290 + err := e.Engine.Counters.Increment(ctx, ref.Name, ref.Val) 291 + if err != nil { 292 + return err 293 + } 253 294 } 254 295 } 255 296 for _, ref := range e.CounterDistinctIncrements { ··· 270 311 ) 271 312 } 272 313 314 + // Alias of RepoEvent 273 315 type IdentityEvent struct { 274 316 RepoEvent 275 317 } 276 318 319 + // Extends RepoEvent. Represents the creation of a single record in the given repository. 277 320 type RecordEvent struct { 278 321 RepoEvent 279 322 280 - Record any 281 - Collection string 282 - RecordKey string 283 - CID string 284 - RecordLabels []string 323 + // The un-marshalled record, as a go struct, from the api/atproto or api/bsky type packages. 324 + Record any 325 + // The "collection" part of the repo path for this record. Must be an NSID, though this isn't indicated by the type of this field. 326 + Collection string 327 + // The "record key" (rkey) part of repo path. 328 + RecordKey string 329 + // CID of the canonical CBOR version of the record, as matches the repo value. 330 + CID string 331 + // Same as "AccountLabels", but at record-level 332 + RecordLabels []string 333 + // Same as "AccountTakedown", but at record-level 285 334 RecordTakedown bool 286 - RecordReports []ModReport 287 - RecordFlags []string 335 + // Same as "AccountReports", but at record-level 336 + RecordReports []ModReport 337 + // Same as "AccountFlags", but at record-level 338 + RecordFlags []string 288 339 // TODO: commit metadata 289 340 } 290 341 342 + // Enqueues the record to be taken down at the end of rule processing. 291 343 func (e *RecordEvent) TakedownRecord() { 292 344 e.RecordTakedown = true 293 345 } 294 346 347 + // Enqueues the provided label (string value) to be added to the record at the end of rule processing. 295 348 func (e *RecordEvent) AddRecordLabel(val string) { 296 349 e.RecordLabels = append(e.RecordLabels, val) 297 350 } 298 351 352 + // Enqueues the provided flag (string value) to be recorded (in the Engine's flagstore) at the end of rule processing. 299 353 func (e *RecordEvent) AddRecordFlag(val string) { 300 354 e.RecordFlags = append(e.RecordFlags, val) 301 355 } 302 356 357 + // Enqueues a moderation report to be filed against the record at the end of rule processing. 303 358 func (e *RecordEvent) ReportRecord(reason, comment string) { 304 359 e.RecordReports = append(e.RecordReports, ModReport{ReasonType: reason, Comment: comment}) 305 360 } ··· 414 469 ) 415 470 } 416 471 472 + // Extends RepoEvent. Represents the deletion of a single record in the given repository. 417 473 type RecordDeleteEvent struct { 418 474 RepoEvent 419 475
+1
automod/rules/all.go
··· 18 18 KeywordPostRule, 19 19 ReplySingleKeywordPostRule, 20 20 AggressivePromotionRule, 21 + IdenticalReplyPostRule, 21 22 }, 22 23 ProfileRules: []automod.ProfileRuleFunc{ 23 24 GtubeProfileRule,
+10
automod/rules/helpers.go
··· 9 9 appbsky "github.com/bluesky-social/indigo/api/bsky" 10 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 11 "github.com/bluesky-social/indigo/automod" 12 + 13 + "github.com/spaolacci/murmur3" 12 14 ) 13 15 14 16 func dedupeStrings(in []string) []string { ··· 177 179 } 178 180 return false 179 181 } 182 + 183 + // returns a fast, compact hash of a string 184 + // 185 + // current implementation uses murmur3, default seed, and hex encoding 186 + func HashOfString(s string) string { 187 + val := murmur3.Sum64([]byte(s)) 188 + return fmt.Sprintf("%016x", val) 189 + }
+7
automod/rules/helpers_test.go
··· 53 53 assert.Equal(fix.out, ExtractTextURLs(fix.s)) 54 54 } 55 55 } 56 + 57 + func TestHashOfString(t *testing.T) { 58 + assert := assert.New(t) 59 + 60 + // hashing function should be consistent over time 61 + assert.Equal("4e6f69c0e3d10992", HashOfString("dummy-value")) 62 + }
+21
automod/rules/replies.go
··· 28 28 evt.IncrementDistinct("reply-to", did, parentURI.Authority().String()) 29 29 return nil 30 30 } 31 + 32 + var identicalReplyLimit = 5 33 + 34 + // Looks for accounts posting the exact same text multiple times. Does not currently count the number of distinct accounts replied to, just counts replies at all. 35 + // 36 + // There can be legitimate situations that trigger this rule, so in most situations should be a "report" not "label" action. 37 + func IdenticalReplyPostRule(evt *automod.RecordEvent, post *appbsky.FeedPost) error { 38 + if post.Reply == nil || IsSelfThread(evt, post) { 39 + return nil 40 + } 41 + 42 + // use a specific period (IncrementPeriod()) to reduce the number of counters (one per unique post text) 43 + period := automod.PeriodDay 44 + bucket := evt.Account.Identity.DID.String() + "/" + HashOfString(post.Text) 45 + if evt.GetCount("reply-text", bucket, period) >= identicalReplyLimit { 46 + evt.AddAccountFlag("multi-identical-reply") 47 + } 48 + 49 + evt.IncrementPeriod("reply-text", bucket, period) 50 + return nil 51 + }
+1 -1
go.mod
··· 153 153 github.com/samber/lo v1.38.1 // indirect 154 154 github.com/scylladb/go-reflectx v1.0.1 // indirect 155 155 github.com/segmentio/asm v1.2.0 // indirect 156 - github.com/spaolacci/murmur3 v1.1.0 // indirect 156 + github.com/spaolacci/murmur3 v1.1.0 157 157 github.com/valyala/bytebufferpool v1.0.0 // indirect 158 158 github.com/valyala/fasttemplate v1.2.2 // indirect 159 159 github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect