this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

limit dids to 1000 collections each list-collections adds badwords filter and hide below 5 users

authored by

Brian Olson and committed by
Brian Olson
b8d53b7f f0433c99

+167 -26
+25
cmd/collectiondir/pebble.go
··· 282 282 283 283 var trueValue = [1]byte{'t'} 284 284 285 + func (pcd *PebbleCollectionDirectory) CountDidCollections(did string) (int, error) { 286 + lower := make([]byte, 1+len(did)) 287 + lower[0] = 'D' 288 + copy(lower[1:1+len(did)], did) 289 + upper := make([]byte, len(lower)) 290 + copy(upper, lower) 291 + upper[len(upper)-1]++ 292 + ctx := context.Background() 293 + iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{ 294 + LowerBound: lower, 295 + UpperBound: upper, 296 + }) 297 + if err != nil { 298 + return 0, fmt.Errorf("did iter start, %w", err) 299 + } 300 + defer iter.Close() 301 + count := 0 302 + for iter.First(); iter.Valid(); iter.Next() { 303 + //key := iter.Key() 304 + //xdid, xcollectionId := parseByDidKey(key) 305 + count++ 306 + } 307 + return count, nil 308 + } 309 + 285 310 func (pcd *PebbleCollectionDirectory) MaybeSetCollection(did, collection string) error { 286 311 collectionId, err := pcd.CollectionToId(collection) 287 312 if err != nil {
+142 -26
cmd/collectiondir/serve.go
··· 4 4 "compress/gzip" 5 5 "context" 6 6 "encoding/csv" 7 + "encoding/json" 7 8 "fmt" 8 - comatproto "github.com/bluesky-social/indigo/api/atproto" 9 - "github.com/bluesky-social/indigo/events" 10 - "github.com/bluesky-social/indigo/xrpc" 11 - "github.com/labstack/echo/v4" 12 - "github.com/labstack/echo/v4/middleware" 13 - "github.com/prometheus/client_golang/prometheus/promhttp" 14 - "github.com/urfave/cli/v2" 9 + lru "github.com/hashicorp/golang-lru/v2" 15 10 "log/slog" 16 11 "net" 17 12 "net/http" ··· 19 14 "os" 20 15 "os/signal" 21 16 "path/filepath" 17 + "regexp" 22 18 "sort" 23 19 "strconv" 24 20 "strings" 25 21 "sync" 26 22 "syscall" 27 23 "time" 24 + 25 + comatproto "github.com/bluesky-social/indigo/api/atproto" 26 + "github.com/bluesky-social/indigo/events" 27 + "github.com/bluesky-social/indigo/xrpc" 28 + 29 + "github.com/labstack/echo/v4" 30 + "github.com/labstack/echo/v4/middleware" 31 + "github.com/prometheus/client_golang/prometheus/promhttp" 32 + "github.com/urfave/cli/v2" 28 33 ) 29 34 30 35 var serveCmd = &cli.Command{ ··· 70 75 Usage: "secret for friend PDSes", 71 76 EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"}, 72 77 }, 78 + &cli.Uint64Flag{ 79 + Name: "clist-min-dids", 80 + Usage: "filter collection list to >= N dids", 81 + Value: 5, 82 + EnvVars: []string{"COLLECTIONS_CLIST_MIN_DIDS"}, 83 + }, 84 + &cli.IntFlag{ 85 + Name: "max-did-collections", 86 + Usage: "stop recording new collections per did after it has >= this many collections", 87 + Value: 1000, 88 + EnvVars: []string{"COLLECTIONS_MAX_DID_COLLECTIONS"}, 89 + }, 90 + &cli.StringFlag{ 91 + Name: "sets-json-path", 92 + Usage: "file path of JSON file containing static word sets", 93 + EnvVars: []string{"HEPA_SETS_JSON_PATH", "COLLECTIONS_SETS_JSON_PATH"}, 94 + }, 73 95 &cli.BoolFlag{ 74 96 Name: "verbose", 75 97 }, ··· 78 100 var server collectionServer 79 101 return server.run(cctx) 80 102 }, 103 + } 104 + 105 + type BadwordChecker interface { 106 + HasBadword(string) bool 81 107 } 82 108 83 109 type collectionServer struct { ··· 122 148 apiServer *http.Server 123 149 //esrv *echo.Echo 124 150 metricsServer *http.Server 125 - } 126 151 127 - const defaultPerPDSCrawlQPS = 100 152 + MinDidsForCollectionList uint64 153 + MaxDidCollections int 154 + 155 + didCollectionCounts *lru.Cache[string, int] 156 + 157 + badwords BadwordChecker 158 + } 128 159 129 160 func (cs *collectionServer) run(cctx *cli.Context) error { 130 161 signals := make(chan os.Signal, 1) ··· 134 165 if cctx.Bool("verbose") { 135 166 level = slog.LevelDebug 136 167 } 168 + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level})) 169 + slog.SetDefault(log) 170 + 137 171 if cctx.IsSet("ratelimit-header") { 138 172 cs.ratelimitHeader = cctx.String("ratelimit-header") 139 173 } 140 - log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level})) 174 + if cctx.IsSet("sets-json-path") { 175 + badwords, err := loadBadwords(cctx.String("sets-json-path")) 176 + if err != nil { 177 + return err 178 + } 179 + cs.badwords = badwords 180 + } 181 + cs.MinDidsForCollectionList = cctx.Uint64("clist-min-dids") 182 + cs.MaxDidCollections = cctx.Int("max-did-collections") 141 183 cs.ingestFirehose = make(chan DidCollection, 1000) 142 184 cs.ingestCrawl = make(chan DidCollection, 1000) 185 + var err error 186 + cs.didCollectionCounts, err = lru.New[string, int](1_000_000) // TODO: configurable LRU size 187 + if err != nil { 188 + return fmt.Errorf("lru init, %w", err) 189 + } 143 190 cs.wg.Add(1) 144 191 go cs.ingestReceiver() 145 192 cs.log = log ··· 150 197 cs.pcd = &PebbleCollectionDirectory{ 151 198 log: cs.log, 152 199 } 153 - err := cs.pcd.Open(pebblePath) 200 + err = cs.pcd.Open(pebblePath) 154 201 if err != nil { 155 202 return fmt.Errorf("%s: failed to open pebble db: %w", pebblePath, err) 156 203 } ··· 239 286 close(cs.shutdown) 240 287 go func() { 241 288 cs.log.Info("metrics shutdown start") 242 - cs.metricsServer.Shutdown(context.Background()) 243 - cs.log.Info("metrics shutdown") 289 + sherr := cs.metricsServer.Shutdown(context.Background()) 290 + cs.log.Info("metrics shutdown", "err", sherr) 244 291 }() 245 292 cs.log.Info("api shutdown start...") 246 293 err := cs.apiServer.Shutdown(context.Background()) ··· 545 592 } 546 593 } 547 594 595 + func (cs *collectionServer) hasBadword(collection string) bool { 596 + if cs.badwords != nil { 597 + return cs.badwords.HasBadword(collection) 598 + } 599 + return false 600 + } 601 + 548 602 // /v1/listCollections?c={}&cursor={}&limit={50<=limit<=1000} 549 603 // 550 604 // admin may set ?stalesec={} for a maximum number of seconds stale data is accepted ··· 593 647 count := 0 594 648 for _, collection := range allCollections { 595 649 if (cursor == "") || (collection > cursor) { 650 + if cs.hasBadword(collection) { 651 + // don't show badwords in public list of collections 652 + continue 653 + } 654 + if stats.CollectionCounts[collection] < cs.MinDidsForCollectionList { 655 + // don't show experimental/spam collections only implemented by a few DIDs 656 + continue 657 + } 658 + // TODO: probably regex based filter for collection-spam 596 659 out.Collections[collection] = stats.CollectionCounts[collection] 597 660 count++ 598 661 if count >= limit { ··· 620 683 cs.log.Info("ingestFirehose closed") 621 684 return 622 685 } 623 - err := cs.pcd.MaybeSetCollection(didc.Did, didc.Collection) 686 + err := cs.ingestDidc(didc, true) 624 687 if err != nil { 625 - cs.log.Warn("pcd write", "err", err) 626 688 errcount++ 627 689 } else { 628 690 errcount = 0 629 691 } 630 - if cs.dauDirectory != nil { 631 - err = cs.maybeDauWrite(didc) 632 - if err != nil { 633 - cs.log.Warn("dau write", "err", err) 634 - errcount++ 635 - } else { 636 - errcount = 0 637 - } 638 - } 639 692 case didc := <-cs.ingestCrawl: 640 - err := cs.pcd.MaybeSetCollection(didc.Did, didc.Collection) 693 + err := cs.ingestDidc(didc, false) 641 694 if err != nil { 642 - cs.log.Warn("pcd write", "err", err) 643 695 errcount++ 644 696 } else { 645 697 errcount = 0 ··· 650 702 return // TODO: cancel parent somehow 651 703 } 652 704 } 705 + } 706 + 707 + func (cs *collectionServer) ingestDidc(didc DidCollection, dau bool) error { 708 + count, ok := cs.didCollectionCounts.Get(didc.Did) 709 + var err error 710 + if !ok { 711 + count, err = cs.pcd.CountDidCollections(didc.Did) 712 + if err != nil { 713 + return fmt.Errorf("count did collections, %s %w", didc.Did, err) 714 + } 715 + cs.didCollectionCounts.Add(didc.Did, count) 716 + } 717 + if count >= cs.MaxDidCollections { 718 + cs.log.Warn("did too many collections", "did", didc.Did) 719 + return nil 720 + } 721 + err = cs.pcd.MaybeSetCollection(didc.Did, didc.Collection) 722 + if err != nil { 723 + cs.log.Warn("pcd write", "err", err) 724 + return err 725 + } 726 + if dau && cs.dauDirectory != nil { 727 + err = cs.maybeDauWrite(didc) 728 + if err != nil { 729 + cs.log.Warn("dau write", "err", err) 730 + return err 731 + } 732 + } 733 + return nil 653 734 } 654 735 655 736 // write {dauDirectoryDir}/d{YYYY-MM-DD}.pebble stats summary to {dauDirectoryDir}/d{YYYY-MM-DD}.csv.gz ··· 874 955 // TODO: check database or upstream health? 875 956 return c.String(http.StatusOK, "ok") 876 957 } 958 + 959 + func loadBadwords(path string) (*BadwordsRE, error) { 960 + fin, err := os.Open(path) 961 + if err != nil { 962 + return nil, fmt.Errorf("%s: could not open badwords, %w", path, err) 963 + } 964 + dec := json.NewDecoder(fin) 965 + var rules map[string][]string 966 + err = dec.Decode(&rules) 967 + if err != nil { 968 + return nil, fmt.Errorf("%s: badwords json, %w", path, err) 969 + } 970 + 971 + // compile a regex to search a string for any instance of a bad word, because we're expecting things runpooptogether 972 + badwords := rules["worst-words"] 973 + rwords := make([]string, len(badwords)) 974 + for i, word := range badwords { 975 + rwords[i] = regexp.QuoteMeta(word) 976 + } 977 + reStr := strings.Join(rwords, "|") 978 + re, err := regexp.Compile(reStr) 979 + if err != nil { 980 + return nil, fmt.Errorf("%s: badwords regex, %w", path, err) 981 + } 982 + return &BadwordsRE{re: re}, nil 983 + } 984 + 985 + type BadwordsRE struct { 986 + re *regexp.Regexp 987 + } 988 + 989 + func (bw *BadwordsRE) HasBadword(s string) bool { 990 + // TODO: if this is too slow, try more specialized algorithm e.g. https://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_algorithm 991 + return bw.re.FindString(s) != "" 992 + }