this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

automod: process individual pre-existing records (by AT-URI)

+147 -56
+36 -9
automod/engine.go
··· 6 6 "log/slog" 7 7 "strings" 8 8 9 + comatproto "github.com/bluesky-social/indigo/api/atproto" 9 10 appbsky "github.com/bluesky-social/indigo/api/bsky" 10 11 "github.com/bluesky-social/indigo/atproto/identity" 11 12 "github.com/bluesky-social/indigo/atproto/syntax" ··· 16 17 // 17 18 // TODO: careful when initializing: several fields should not be null or zero, even though they are pointer type. 18 19 type Engine struct { 19 - Logger *slog.Logger 20 - Directory identity.Directory 21 - Rules RuleSet 22 - Counters CountStore 23 - Sets SetStore 24 - Cache CacheStore 25 - BskyClient *xrpc.Client 20 + Logger *slog.Logger 21 + Directory identity.Directory 22 + Rules RuleSet 23 + Counters CountStore 24 + Sets SetStore 25 + Cache CacheStore 26 + RelayClient *xrpc.Client 27 + BskyClient *xrpc.Client 26 28 // used to persist moderation actions in mod service (optional) 27 29 AdminClient *xrpc.Client 28 30 } ··· 68 70 69 71 func (e *Engine) ProcessRecord(ctx context.Context, did syntax.DID, path, recCID string, rec any) error { 70 72 // similar to an HTTP server, we want to recover any panics from rule execution 71 - /* XXX 72 73 defer func() { 73 74 if r := recover(); r != nil { 74 75 e.Logger.Error("automod event execution exception", "err", r) 75 76 } 76 77 }() 77 - */ 78 78 79 79 ident, err := e.Directory.LookupDID(ctx, did) 80 80 if err != nil { ··· 133 133 } 134 134 135 135 return nil 136 + } 137 + 138 + func (e *Engine) FetchAndProcessRecord(ctx context.Context, uri string) error { 139 + // resolve URI, identity, and record 140 + aturi, err := syntax.ParseATURI(uri) 141 + if err != nil { 142 + return fmt.Errorf("parsing AT-URI argument: %v", err) 143 + } 144 + if aturi.RecordKey() == "" { 145 + return fmt.Errorf("need a full, not partial, AT-URI: %s", uri) 146 + } 147 + if e.RelayClient == nil { 148 + return fmt.Errorf("can't fetch record without relay client configured") 149 + } 150 + ident, err := e.Directory.Lookup(ctx, aturi.Authority()) 151 + if err != nil { 152 + return fmt.Errorf("resolving AT-URI authority: %v", err) 153 + } 154 + e.Logger.Info("fetching record", "did", ident.DID.String(), "collection", aturi.Collection().String(), "rkey", aturi.RecordKey().String()) 155 + out, err := comatproto.RepoGetRecord(ctx, e.RelayClient, "", aturi.Collection().String(), ident.DID.String(), aturi.RecordKey().String()) 156 + if err != nil { 157 + return fmt.Errorf("fetching record from Relay (%s): %v", aturi, err) 158 + } 159 + if out.Cid == nil { 160 + return fmt.Errorf("expected a CID in getRecord response") 161 + } 162 + return e.ProcessRecord(ctx, ident.DID, aturi.Path(), *out.Cid, out.Value.Val) 136 163 } 137 164 138 165 func (e *Engine) NewPostEvent(am AccountMeta, path, recCID string, post *appbsky.FeedPost) PostEvent {
+100 -47
cmd/hepa/main.go
··· 57 57 Value: "https://api.bsky.app", 58 58 EnvVars: []string{"ATP_BSKY_HOST"}, 59 59 }, 60 - } 61 - 62 - app.Commands = []*cli.Command{ 63 - runCmd, 64 - } 65 - 66 - return app.Run(args) 67 - } 68 - 69 - var runCmd = &cli.Command{ 70 - Name: "run", 71 - Usage: "run the hepa daemon", 72 - Flags: []cli.Flag{ 73 60 &cli.StringFlag{ 74 - Name: "metrics-listen", 75 - Usage: "IP or address, and port, to listen on for metrics APIs", 76 - Value: ":3989", 77 - EnvVars: []string{"HEPA_METRICS_LISTEN"}, 78 - }, 79 - &cli.IntFlag{ 80 - Name: "plc-rate-limit", 81 - Usage: "max number of requests per second to PLC registry", 82 - Value: 100, 83 - EnvVars: []string{"HEPA_PLC_RATE_LIMIT"}, 61 + Name: "redis-url", 62 + Usage: "redis connection URL", 63 + // redis://<user>:<pass>@localhost:6379/<db> 64 + // redis://localhost:6379/0 65 + EnvVars: []string{"HEPA_REDIS_URL"}, 84 66 }, 85 67 &cli.StringFlag{ 86 68 Name: "mod-handle", ··· 97 79 Usage: "admin authentication password for mod service", 98 80 EnvVars: []string{"HEPA_MOD_AUTH_ADMIN_TOKEN"}, 99 81 }, 82 + &cli.IntFlag{ 83 + Name: "plc-rate-limit", 84 + Usage: "max number of requests per second to PLC registry", 85 + Value: 100, 86 + EnvVars: []string{"HEPA_PLC_RATE_LIMIT"}, 87 + }, 100 88 &cli.StringFlag{ 101 89 Name: "sets-json-path", 102 90 Usage: "file path of JSON file containing static sets", 103 91 EnvVars: []string{"HEPA_SETS_JSON_PATH"}, 104 92 }, 93 + } 94 + 95 + app.Commands = []*cli.Command{ 96 + runCmd, 97 + processRecordCmd, 98 + } 99 + 100 + return app.Run(args) 101 + } 102 + 103 + func configDirectory(cctx *cli.Context) (identity.Directory, error) { 104 + baseDir := identity.BaseDirectory{ 105 + PLCURL: cctx.String("atp-plc-host"), 106 + HTTPClient: http.Client{ 107 + Timeout: time.Second * 15, 108 + }, 109 + PLCLimiter: rate.NewLimiter(rate.Limit(cctx.Int("plc-rate-limit")), 1), 110 + TryAuthoritativeDNS: true, 111 + SkipDNSDomainSuffixes: []string{".bsky.social", ".staging.bsky.dev"}, 112 + } 113 + var dir identity.Directory 114 + if cctx.String("redis-url") != "" { 115 + rdir, err := automod.NewRedisDirectory(&baseDir, cctx.String("redis-url"), time.Hour*24, time.Minute*2) 116 + if err != nil { 117 + return nil, err 118 + } 119 + dir = rdir 120 + } else { 121 + cdir := identity.NewCacheDirectory(&baseDir, 1_500_000, time.Hour*24, time.Minute*2) 122 + dir = &cdir 123 + } 124 + return dir, nil 125 + } 126 + 127 + var runCmd = &cli.Command{ 128 + Name: "run", 129 + Usage: "run the hepa daemon", 130 + Flags: []cli.Flag{ 105 131 &cli.StringFlag{ 106 - Name: "redis-url", 107 - Usage: "redis connection URL", 108 - // redis://<user>:<pass>@localhost:6379/<db> 109 - // redis://localhost:6379/0 110 - EnvVars: []string{"HEPA_REDIS_URL"}, 132 + Name: "metrics-listen", 133 + Usage: "IP or address, and port, to listen on for metrics APIs", 134 + Value: ":3989", 135 + EnvVars: []string{"HEPA_METRICS_LISTEN"}, 111 136 }, 112 137 }, 113 138 Action: func(cctx *cli.Context) error { ··· 119 144 120 145 configOTEL("hepa") 121 146 122 - baseDir := identity.BaseDirectory{ 123 - PLCURL: cctx.String("atp-plc-host"), 124 - HTTPClient: http.Client{ 125 - Timeout: time.Second * 15, 126 - }, 127 - PLCLimiter: rate.NewLimiter(rate.Limit(cctx.Int("plc-rate-limit")), 1), 128 - TryAuthoritativeDNS: true, 129 - SkipDNSDomainSuffixes: []string{".bsky.social", ".staging.bsky.dev"}, 130 - } 131 - var dir identity.Directory 132 - if cctx.String("redis-url") != "" { 133 - rdir, err := automod.NewRedisDirectory(&baseDir, cctx.String("redis-url"), time.Hour*24, time.Minute*2) 134 - if err != nil { 135 - return err 136 - } 137 - dir = rdir 138 - } else { 139 - cdir := identity.NewCacheDirectory(&baseDir, 1_500_000, time.Hour*24, time.Minute*2) 140 - dir = &cdir 147 + dir, err := configDirectory(cctx) 148 + if err != nil { 149 + return err 141 150 } 142 151 143 152 srv, err := NewServer( ··· 179 188 return nil 180 189 }, 181 190 } 191 + 192 + var processRecordCmd = &cli.Command{ 193 + Name: "process-record", 194 + Usage: "process a single record in isolation", 195 + ArgsUsage: `<at-uri>`, 196 + Flags: []cli.Flag{}, 197 + Action: func(cctx *cli.Context) error { 198 + uri := cctx.Args().First() 199 + if uri == "" { 200 + return fmt.Errorf("expected a single AT-URI argument") 201 + } 202 + 203 + ctx := context.Background() 204 + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 205 + Level: slog.LevelInfo, 206 + })) 207 + slog.SetDefault(logger) 208 + 209 + dir, err := configDirectory(cctx) 210 + if err != nil { 211 + return err 212 + } 213 + 214 + srv, err := NewServer( 215 + dir, 216 + Config{ 217 + BGSHost: cctx.String("atp-bgs-host"), 218 + BskyHost: cctx.String("atp-bsky-host"), 219 + Logger: logger, 220 + ModHost: cctx.String("atp-mod-host"), 221 + ModAdminToken: cctx.String("mod-admin-token"), 222 + ModUsername: cctx.String("mod-handle"), 223 + ModPassword: cctx.String("mod-password"), 224 + SetsFileJSON: cctx.String("sets-json-path"), 225 + RedisURL: cctx.String("redis-url"), 226 + }, 227 + ) 228 + if err != nil { 229 + return err 230 + } 231 + 232 + return srv.engine.FetchAndProcessRecord(ctx, uri) 233 + }, 234 + }
+11
cmd/hepa/server.go
··· 117 117 cache = automod.NewMemCacheStore(5_000, 30*time.Minute) 118 118 } 119 119 120 + relayURL := config.BGSHost 121 + if strings.HasPrefix(relayURL, "ws") { 122 + relayURL = "http" + relayURL[2:] 123 + } 124 + // XXX: 125 + relayURL = "https://bsky.social" 126 + 120 127 engine := automod.Engine{ 121 128 Logger: logger, 122 129 Directory: dir, ··· 128 135 BskyClient: &xrpc.Client{ 129 136 Client: util.RobustHTTPClient(), 130 137 Host: config.BskyHost, 138 + }, 139 + RelayClient: &xrpc.Client{ 140 + Client: util.RobustHTTPClient(), 141 + Host: relayURL, 131 142 }, 132 143 } 133 144