this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

automod: redis counters; some refactors

+162 -75
+11 -11
automod/countstore.go
··· 14 14 ) 15 15 16 16 type CountStore interface { 17 - GetCount(ctx context.Context, key, period string) (int, error) 18 - Increment(ctx context.Context, key string) error 17 + GetCount(ctx context.Context, name, val, period string) (int, error) 18 + Increment(ctx context.Context, name, val string) error 19 19 // TODO: batch increment method 20 20 } 21 21 ··· 30 30 } 31 31 } 32 32 33 - func PeriodKey(key, period string) string { 33 + func PeriodBucket(name, val, period string) string { 34 34 switch period { 35 35 case PeriodTotal: 36 - return key 36 + return fmt.Sprintf("%s/%s", name, val) 37 37 case PeriodDay: 38 38 t := time.Now().UTC().Format(time.DateOnly) 39 - return fmt.Sprintf("%s:%s", key, t) 39 + return fmt.Sprintf("%s/%s/%s", name, val, t) 40 40 case PeriodHour: 41 41 t := time.Now().UTC().Format(time.RFC3339)[0:13] 42 - return fmt.Sprintf("%s:%s", key, t) 42 + return fmt.Sprintf("%s/%s/%s", name, val, t) 43 43 default: 44 44 slog.Warn("unhandled counter period", "period", period) 45 - return key 45 + return fmt.Sprintf("%s/%s", name, val) 46 46 } 47 47 } 48 48 49 - func (s MemCountStore) GetCount(ctx context.Context, key, period string) (int, error) { 50 - v, ok := s.Counts[PeriodKey(key, period)] 49 + func (s MemCountStore) GetCount(ctx context.Context, name, val, period string) (int, error) { 50 + v, ok := s.Counts[PeriodBucket(name, val, period)] 51 51 if !ok { 52 52 return 0, nil 53 53 } 54 54 return v, nil 55 55 } 56 56 57 - func (s MemCountStore) Increment(ctx context.Context, key string) error { 57 + func (s MemCountStore) Increment(ctx context.Context, name, val string) error { 58 58 for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} { 59 - k := PeriodKey(key, p) 59 + k := PeriodBucket(name, val, p) 60 60 v, ok := s.Counts[k] 61 61 if !ok { 62 62 v = 0
+5 -14
automod/engine.go
··· 54 54 return evt.Err 55 55 } 56 56 evt.CanonicalLogLine() 57 - if err := evt.PersistAccountActions(ctx); err != nil { 58 - return err 59 - } 60 - if err := evt.PersistCounters(ctx); err != nil { 57 + if err := evt.PersistActions(ctx); err != nil { 61 58 return err 62 59 } 63 60 return nil ··· 95 92 return evt.Err 96 93 } 97 94 evt.CanonicalLogLine() 98 - if err := evt.PersistAccountActions(ctx); err != nil { 99 - return err 100 - } 101 - if err := evt.PersistRecordActions(ctx); err != nil { 95 + if err := evt.PersistActions(ctx); err != nil { 102 96 return err 103 97 } 104 98 if err := evt.PersistCounters(ctx); err != nil { ··· 114 108 return evt.Err 115 109 } 116 110 evt.CanonicalLogLine() 117 - if err := evt.PersistAccountActions(ctx); err != nil { 118 - return err 119 - } 120 - if err := evt.PersistRecordActions(ctx); err != nil { 111 + if err := evt.PersistActions(ctx); err != nil { 121 112 return err 122 113 } 123 114 if err := evt.PersistCounters(ctx); err != nil { ··· 167 158 } 168 159 } 169 160 170 - func (e *Engine) GetCount(key, period string) (int, error) { 171 - return e.Counters.GetCount(context.TODO(), key, period) 161 + func (e *Engine) GetCount(name, val, period string) (int, error) { 162 + return e.Counters.GetCount(context.TODO(), name, val, period) 172 163 } 173 164 174 165 // checks if `val` is an element of set `name`
+2 -2
automod/engine_test.go
··· 16 16 func simpleRule(evt *PostEvent) error { 17 17 for _, tag := range evt.Post.Tags { 18 18 if evt.InSet("banned-hashtags", tag) { 19 - evt.AddLabel("bad-hashtag") 19 + evt.AddRecordLabel("bad-hashtag") 20 20 break 21 21 } 22 22 } ··· 25 25 if feat.RichtextFacet_Tag != nil { 26 26 tag := feat.RichtextFacet_Tag.Tag 27 27 if evt.InSet("banned-hashtags", tag) { 28 - evt.AddLabel("bad-hashtag") 28 + evt.AddRecordLabel("bad-hashtag") 29 29 break 30 30 } 31 31 }
+30 -31
automod/event.go
··· 21 21 // TODO: createdAt / age 22 22 } 23 23 24 + type CounterRef struct { 25 + Name string 26 + Val string 27 + } 28 + 24 29 // base type for events. events are both containers for data about the event itself (similar to an HTTP request type); aggregate results and state (counters, mod actions) to be persisted after all rules are run; and act as an API for additional network reads and operations. 25 30 type Event struct { 26 31 Engine *Engine 27 32 Err error 28 33 Logger *slog.Logger 29 34 Account AccountMeta 30 - CounterIncrements []string 35 + CounterIncrements []CounterRef 31 36 AccountLabels []string 32 37 AccountFlags []string 33 38 AccountReports []ModReport 34 39 AccountTakedown bool 35 40 } 36 41 37 - func (e *Event) CountTotal(key string) int { 38 - v, err := e.Engine.GetCount(key, PeriodTotal) 39 - if err != nil { 40 - e.Err = err 41 - return 0 42 - } 43 - return v 44 - } 45 - 46 - func (e *Event) CountDay(key string) int { 47 - v, err := e.Engine.GetCount(key, PeriodDay) 48 - if err != nil { 49 - e.Err = err 50 - return 0 51 - } 52 - return v 53 - } 54 - 55 - func (e *Event) CountHour(key string) int { 56 - v, err := e.Engine.GetCount(key, PeriodHour) 42 + func (e *Event) GetCount(name, val, period string) int { 43 + v, err := e.Engine.GetCount(name, val, period) 57 44 if err != nil { 58 45 e.Err = err 59 46 return 0 ··· 70 57 return v 71 58 } 72 59 73 - func (e *Event) IncrementCounter(key string) { 74 - e.CounterIncrements = append(e.CounterIncrements, key) 60 + func (e *Event) Increment(name, val string) { 61 + e.CounterIncrements = append(e.CounterIncrements, CounterRef{Name: name, Val: val}) 75 62 } 76 63 77 64 func (e *Event) TakedownAccount() { 78 65 e.AccountTakedown = true 79 66 } 80 67 81 - func (e *Event) AddLabelAccount(val string) { 68 + func (e *Event) AddAccountLabel(val string) { 82 69 e.AccountLabels = append(e.AccountLabels, val) 83 70 } 84 71 85 - func (e *Event) AddFlag(val string) { 72 + func (e *Event) AddAccountFlag(val string) { 86 73 e.AccountFlags = append(e.AccountFlags, val) 87 74 } 88 75 ··· 144 131 return nil 145 132 } 146 133 134 + func (e *Event) PersistActions(ctx context.Context) error { 135 + return e.PersistAccountActions(ctx) 136 + } 137 + 147 138 func (e *Event) PersistCounters(ctx context.Context) error { 148 - for _, k := range dedupeStrings(e.CounterIncrements) { 149 - err := e.Engine.Counters.Increment(ctx, k) 139 + // TODO: dedupe this array 140 + for _, ref := range e.CounterIncrements { 141 + err := e.Engine.Counters.Increment(ctx, ref.Name, ref.Val) 150 142 if err != nil { 151 143 return err 152 144 } ··· 180 172 // TODO: commit metadata 181 173 } 182 174 183 - func (e *RecordEvent) Takedown() { 175 + func (e *RecordEvent) TakedownRecord() { 184 176 e.RecordTakedown = true 185 177 } 186 178 187 - func (e *RecordEvent) AddLabel(val string) { 179 + func (e *RecordEvent) AddRecordLabel(val string) { 188 180 e.RecordLabels = append(e.RecordLabels, val) 189 181 } 190 182 191 - func (e *RecordEvent) AddFlag(val string) { 183 + func (e *RecordEvent) AddRecordFlag(val string) { 192 184 e.RecordFlags = append(e.RecordFlags, val) 193 185 } 194 186 195 - func (e *RecordEvent) Report(reason, comment string) { 187 + func (e *RecordEvent) ReportRecord(reason, comment string) { 196 188 e.RecordReports = append(e.RecordReports, ModReport{ReasonType: reason, Comment: comment}) 197 189 } 198 190 ··· 247 239 } 248 240 } 249 241 return nil 242 + } 243 + 244 + func (e *RecordEvent) PersistActions(ctx context.Context) error { 245 + if err := e.PersistAccountActions(ctx); err != nil { 246 + return err 247 + } 248 + return e.PersistRecordActions(ctx) 250 249 } 251 250 252 251 func (e *RecordEvent) CanonicalLogLine() {
+65
automod/redis_counters.go
··· 1 + package automod 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "github.com/redis/go-redis/v9" 8 + ) 9 + 10 + var redisCountPrefix string = "count/" 11 + 12 + type RedisCountStore struct { 13 + Client *redis.Client 14 + } 15 + 16 + func NewRedisCountStore(redisURL string) (*RedisCountStore, error) { 17 + opt, err := redis.ParseURL(redisURL) 18 + if err != nil { 19 + return nil, err 20 + } 21 + rdb := redis.NewClient(opt) 22 + // check redis connection 23 + _, err = rdb.Ping(context.TODO()).Result() 24 + if err != nil { 25 + return nil, err 26 + } 27 + rcs := RedisCountStore{ 28 + Client: rdb, 29 + } 30 + return &rcs, nil 31 + } 32 + 33 + func (s *RedisCountStore) GetCount(ctx context.Context, name, val, period string) (int, error) { 34 + key := redisCountPrefix + PeriodBucket(name, val, period) 35 + c, err := s.Client.Get(ctx, key).Int() 36 + if err == redis.Nil { 37 + return 0, nil 38 + } else if err != nil { 39 + return 0, err 40 + } 41 + return c, nil 42 + } 43 + 44 + func (s *RedisCountStore) Increment(ctx context.Context, name, val string) error { 45 + 46 + var key string 47 + 48 + // increment multiple counters in a single redis round-trip 49 + multi := s.Client.Pipeline() 50 + 51 + key = redisCountPrefix + PeriodBucket(name, val, PeriodHour) 52 + multi.Incr(ctx, key) 53 + multi.Expire(ctx, key, 2*time.Hour) 54 + 55 + key = redisCountPrefix + PeriodBucket(name, val, PeriodDay) 56 + multi.Incr(ctx, key) 57 + multi.Expire(ctx, key, 48*time.Hour) 58 + 59 + key = redisCountPrefix + PeriodBucket(name, val, PeriodTotal) 60 + multi.Incr(ctx, key) 61 + // no expiration for total 62 + 63 + _, err := multi.Exec(ctx) 64 + return err 65 + }
+11 -9
automod/redis_directory.go
··· 15 15 "github.com/redis/go-redis/v9" 16 16 ) 17 17 18 + var redisDirPrefix string = "dir/" 19 + 18 20 // uses redis as a cache for identity lookups. includes a local cache layer as well, for hot keys 19 21 type RedisDirectory struct { 20 22 Inner identity.Directory ··· 93 95 } 94 96 d.handleCache.Set(&cache.Item{ 95 97 Ctx: ctx, 96 - Key: h.String(), 98 + Key: redisDirPrefix + h.String(), 97 99 Value: he, 98 100 TTL: d.ErrTTL, 99 101 }) ··· 113 115 114 116 d.identityCache.Set(&cache.Item{ 115 117 Ctx: ctx, 116 - Key: ident.DID.String(), 118 + Key: redisDirPrefix + ident.DID.String(), 117 119 Value: entry, 118 120 TTL: d.HitTTL, 119 121 }) 120 122 d.handleCache.Set(&cache.Item{ 121 123 Ctx: ctx, 122 - Key: h.String(), 124 + Key: redisDirPrefix + h.String(), 123 125 Value: he, 124 126 TTL: d.HitTTL, 125 127 }) ··· 128 130 129 131 func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { 130 132 var entry HandleEntry 131 - err := d.handleCache.Get(ctx, h.String(), &entry) 133 + err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), &entry) 132 134 if err != nil && err != cache.ErrCacheMiss { 133 135 return "", err 134 136 } ··· 147 149 select { 148 150 case <-val.(chan struct{}): 149 151 // The result should now be in the cache 150 - err := d.handleCache.Get(ctx, h.String(), entry) 152 + err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), entry) 151 153 if err != nil && err != cache.ErrCacheMiss { 152 154 return "", err 153 155 } ··· 198 200 199 201 d.identityCache.Set(&cache.Item{ 200 202 Ctx: ctx, 201 - Key: did.String(), 203 + Key: redisDirPrefix + did.String(), 202 204 Value: entry, 203 205 TTL: d.HitTTL, 204 206 }) 205 207 if he != nil { 206 208 d.handleCache.Set(&cache.Item{ 207 209 Ctx: ctx, 208 - Key: ident.Handle.String(), 210 + Key: redisDirPrefix + ident.Handle.String(), 209 211 Value: *he, 210 212 TTL: d.HitTTL, 211 213 }) ··· 215 217 216 218 func (d *RedisDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identity.Identity, error) { 217 219 var entry IdentityEntry 218 - err := d.identityCache.Get(ctx, did.String(), &entry) 220 + err := d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) 219 221 if err != nil && err != cache.ErrCacheMiss { 220 222 return nil, err 221 223 } ··· 234 236 select { 235 237 case <-val.(chan struct{}): 236 238 // The result should now be in the cache 237 - err = d.identityCache.Get(ctx, did.String(), &entry) 239 + err = d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) 238 240 if err != nil && err != cache.ErrCacheMiss { 239 241 return nil, err 240 242 }
+1
automod/rules/all.go
··· 9 9 PostRules: []automod.PostRuleFunc{ 10 10 MisleadingURLPostRule, 11 11 MisleadingMentionPostRule, 12 + ReplyCountPostRule, 12 13 BanHashtagsPostRule, 13 14 }, 14 15 }
+2 -2
automod/rules/hashtags.go
··· 7 7 func BanHashtagsPostRule(evt *automod.PostEvent) error { 8 8 for _, tag := range evt.Post.Tags { 9 9 if evt.InSet("banned-hashtags", tag) { 10 - evt.AddLabel("bad-hashtag") 10 + evt.AddRecordLabel("bad-hashtag") 11 11 break 12 12 } 13 13 } ··· 16 16 if feat.RichtextFacet_Tag != nil { 17 17 tag := feat.RichtextFacet_Tag.Tag 18 18 if evt.InSet("banned-hashtags", tag) { 19 - evt.AddLabel("bad-hashtag") 19 + evt.AddRecordLabel("bad-hashtag") 20 20 break 21 21 } 22 22 }
+5 -5
automod/rules/misleading.go
··· 14 14 if feat.RichtextFacet_Link != nil { 15 15 if int(facet.Index.ByteEnd) > len([]byte(evt.Post.Text)) || facet.Index.ByteStart > facet.Index.ByteEnd { 16 16 evt.Logger.Warn("invalid facet range") 17 - evt.AddLabel("invalid") // TODO: or some other "this record is corrupt" indicator? 17 + evt.AddRecordLabel("invalid") // TODO: or some other "this record is corrupt" indicator? 18 18 continue 19 19 } 20 20 txt := string([]byte(evt.Post.Text)[facet.Index.ByteStart:facet.Index.ByteEnd]) ··· 36 36 // this public code will obviously get discovered and bypassed. this doesn't earn you any security cred! 37 37 if linkURL.Host != textURL.Host { 38 38 evt.Logger.Warn("misleading mismatched domains", "link", linkURL.Host, "text", textURL.Host) 39 - evt.AddLabel("misleading") 39 + evt.AddRecordLabel("misleading") 40 40 } 41 41 } 42 42 } ··· 52 52 if feat.RichtextFacet_Mention != nil { 53 53 if int(facet.Index.ByteEnd) > len([]byte(evt.Post.Text)) || facet.Index.ByteStart > facet.Index.ByteEnd { 54 54 evt.Logger.Warn("invalid facet range") 55 - evt.AddLabel("invalid") // TODO: or some other "this record is corrupt" indicator? 55 + evt.AddRecordLabel("invalid") // TODO: or some other "this record is corrupt" indicator? 56 56 continue 57 57 } 58 58 txt := string([]byte(evt.Post.Text)[facet.Index.ByteStart:facet.Index.ByteEnd]) ··· 68 68 mentioned, err := evt.Engine.Directory.LookupHandle(ctx, handle) 69 69 if err != nil { 70 70 evt.Logger.Warn("could not resolve handle", "handle", handle) 71 - evt.AddLabel("misleading") 71 + evt.AddRecordLabel("misleading") 72 72 break 73 73 } 74 74 75 75 // TODO: check if mentioned DID was recently updated? might be a caching issue 76 76 if mentioned.DID.String() != feat.RichtextFacet_Mention.Did { 77 77 evt.Logger.Warn("misleading mention", "text", txt, "did", mentioned.DID) 78 - evt.AddLabel("misleading") 78 + evt.AddRecordLabel("misleading") 79 79 continue 80 80 } 81 81 }
+16
automod/rules/replies.go
··· 1 + package rules 2 + 3 + import ( 4 + "github.com/bluesky-social/indigo/automod" 5 + ) 6 + 7 + func ReplyCountPostRule(evt *automod.PostEvent) error { 8 + if evt.Post.Reply != nil { 9 + did := evt.Account.Identity.DID.String() 10 + if evt.GetCount("reply", did, automod.PeriodDay) > 3 { 11 + evt.AddAccountFlag("frequent-replier") 12 + } 13 + evt.Increment("reply", did) 14 + } 15 + return nil 16 + }
+1
cmd/hepa/main.go
··· 144 144 ModUsername: cctx.String("mod-handle"), 145 145 ModPassword: cctx.String("mod-password"), 146 146 SetsFileJSON: cctx.String("sets-json-path"), 147 + RedisURL: cctx.String("redis-url"), 147 148 }, 148 149 ) 149 150 if err != nil {
+13 -1
cmd/hepa/server.go
··· 31 31 ModUsername string 32 32 ModPassword string 33 33 SetsFileJSON string 34 + RedisURL string 34 35 Logger *slog.Logger 35 36 } 36 37 ··· 79 80 } 80 81 } 81 82 83 + var counters automod.CountStore 84 + if config.RedisURL != "" { 85 + c, err := automod.NewRedisCountStore(config.RedisURL) 86 + if err != nil { 87 + return nil, err 88 + } 89 + counters = c 90 + } else { 91 + counters = automod.NewMemCountStore() 92 + } 93 + 82 94 engine := automod.Engine{ 83 95 Logger: logger, 84 96 Directory: dir, 85 - Counters: automod.NewMemCountStore(), 97 + Counters: counters, 86 98 Sets: sets, 87 99 Rules: rules.DefaultRules(), 88 100 AdminClient: xrpcc,