this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

automod: refactor 'admin' and Ozone clients (#647)

We don't actually need the "ozone" client to have a session via
password; it just uses admin auth and provides DID as CreatedBy arg when
emitting events. It can also talk directly to the Ozone instance.

The PDS admin client also is admin-auth only, and talks to entryway
(optional).

authored by

bnewbold and committed by
GitHub
2c9022a4 8a9a11f7

+89 -111
+4 -2
automod/engine/engine.go
··· 35 35 Flags flagstore.FlagStore 36 36 // unlike the other sub-modules, this field (Notifier) may be nil 37 37 Notifier Notifier 38 - // use to fetch public account metadata from AppView 38 + // use to fetch public account metadata from AppView; no auth 39 39 BskyClient *xrpc.Client 40 - // used to persist moderation actions in mod service (optional) 40 + // used to persist moderation actions in ozone moderation service; optional, admin auth 41 + OzoneClient *xrpc.Client 42 + // used to fetch private account metadata from PDS or entryway; optional, admin auth 41 43 AdminClient *xrpc.Client 42 44 // used to fetch blobs from upstream PDS instances 43 45 BlobClient *http.Client
+6 -6
automod/engine/persist.go
··· 77 77 } 78 78 79 79 // if we can't actually talk to service, bail out early 80 - if eng.AdminClient == nil { 80 + if eng.OzoneClient == nil { 81 81 if anyModActions { 82 82 c.Logger.Warn("not persisting actions, mod service client not configured") 83 83 } 84 84 return nil 85 85 } 86 86 87 - xrpcc := eng.AdminClient 87 + xrpcc := eng.OzoneClient 88 88 89 89 if len(newLabels) > 0 { 90 90 c.Logger.Info("labeling record", "newLabels", newLabels) ··· 166 166 167 167 atURI := c.RecordOp.ATURI().String() 168 168 newLabels := dedupeStrings(c.effects.RecordLabels) 169 - if len(newLabels) > 0 && eng.AdminClient != nil { 170 - rv, err := toolsozone.ModerationGetRecord(ctx, eng.AdminClient, c.RecordOp.CID.String(), c.RecordOp.ATURI().String()) 169 + if len(newLabels) > 0 && eng.OzoneClient != nil { 170 + rv, err := toolsozone.ModerationGetRecord(ctx, eng.OzoneClient, c.RecordOp.CID.String(), c.RecordOp.ATURI().String()) 171 171 if err != nil { 172 172 c.Logger.Warn("failed to fetch private record metadata", "err", err) 173 173 } else { ··· 234 234 return nil 235 235 } 236 236 237 - if eng.AdminClient == nil { 237 + if eng.OzoneClient == nil { 238 238 c.Logger.Warn("not persisting actions because mod service client not configured") 239 239 return nil 240 240 } ··· 249 249 Uri: atURI, 250 250 } 251 251 252 - xrpcc := eng.AdminClient 252 + xrpcc := eng.OzoneClient 253 253 if len(newLabels) > 0 { 254 254 c.Logger.Info("labeling record", "newLabels", newLabels) 255 255 for _, val := range newLabels {
+40 -40
cmd/hepa/main.go
··· 53 53 EnvVars: []string{"ATP_PLC_HOST"}, 54 54 }, 55 55 &cli.StringFlag{ 56 - Name: "atp-mod-host", 57 - Usage: "method, hostname, and port of moderation service", 58 - Value: "https://api.bsky.app", 59 - EnvVars: []string{"ATP_MOD_HOST"}, 60 - }, 61 - &cli.StringFlag{ 62 56 Name: "atp-bsky-host", 63 - Usage: "method, hostname, and port of bsky API (appview) service", 64 - Value: "https://api.bsky.app", 57 + Usage: "method, hostname, and port of bsky API (appview) service. does not use auth", 58 + Value: "https://public.api.bsky.app", 65 59 EnvVars: []string{"ATP_BSKY_HOST"}, 66 60 }, 67 61 &cli.StringFlag{ 68 - Name: "redis-url", 69 - Usage: "redis connection URL", 70 - // redis://<user>:<pass>@localhost:6379/<db> 71 - // redis://localhost:6379/0 72 - EnvVars: []string{"HEPA_REDIS_URL"}, 62 + Name: "atp-ozone-host", 63 + Usage: "method, hostname, and port of ozone instance. requires ozone-admin-token as well", 64 + Value: "https://mod.bsky.app", 65 + EnvVars: []string{"ATP_OZONE_HOST", "ATP_MOD_HOST"}, 73 66 }, 74 67 &cli.StringFlag{ 75 - Name: "mod-handle", 76 - Usage: "for mod service login", 77 - EnvVars: []string{"HEPA_MOD_AUTH_HANDLE"}, 68 + Name: "ozone-did", 69 + Usage: "DID of account to attribute ozone actions to", 70 + EnvVars: []string{"HEPA_OZONE_DID"}, 78 71 }, 79 72 &cli.StringFlag{ 80 - Name: "mod-password", 81 - Usage: "for mod service login", 82 - EnvVars: []string{"HEPA_MOD_AUTH_PASSWORD"}, 73 + Name: "ozone-admin-token", 74 + Usage: "admin authentication password for mod service", 75 + EnvVars: []string{"HEPA_OZONE_AUTH_ADMIN_TOKEN", "HEPA_MOD_AUTH_ADMIN_TOKEN"}, 76 + }, 77 + &cli.StringFlag{ 78 + Name: "atp-pds-host", 79 + Usage: "method, hostname, and port of PDS (or entryway) for admin account info; uses admin auth", 80 + Value: "https://bsky.social", 81 + EnvVars: []string{"ATP_PDS_HOST"}, 82 + }, 83 + &cli.StringFlag{ 84 + Name: "pds-admin-token", 85 + Usage: "admin authentication password for PDS (or entryway)", 86 + EnvVars: []string{"HEPA_PDS_AUTH_ADMIN_TOKEN"}, 83 87 }, 84 88 &cli.StringFlag{ 85 - Name: "mod-admin-token", 86 - Usage: "admin authentication password for mod service", 87 - EnvVars: []string{"HEPA_MOD_AUTH_ADMIN_TOKEN"}, 89 + Name: "redis-url", 90 + Usage: "redis connection URL", 91 + // redis://<user>:<pass>@localhost:6379/<db> 92 + // redis://localhost:6379/0 93 + EnvVars: []string{"HEPA_REDIS_URL"}, 88 94 }, 89 95 &cli.IntFlag{ 90 96 Name: "plc-rate-limit", ··· 219 225 srv, err := NewServer( 220 226 dir, 221 227 Config{ 228 + Logger: logger, 222 229 RelayHost: cctx.String("atp-relay-host"), 223 230 BskyHost: cctx.String("atp-bsky-host"), 224 - Logger: logger, 225 - ModHost: cctx.String("atp-mod-host"), 226 - ModAdminToken: cctx.String("mod-admin-token"), 227 - ModUsername: cctx.String("mod-handle"), 228 - ModPassword: cctx.String("mod-password"), 231 + OzoneHost: cctx.String("atp-ozone-host"), 232 + OzoneDID: cctx.String("ozone-did"), 233 + OzoneAdminToken: cctx.String("ozone-admin-token"), 234 + PDSHost: cctx.String("atp-pds-host"), 235 + PDSAdminToken: cctx.String("pds-admin-token"), 229 236 SetsFileJSON: cctx.String("sets-json-path"), 230 237 RedisURL: cctx.String("redis-url"), 231 238 SlackWebhookURL: cctx.String("slack-webhook-url"), ··· 257 264 } 258 265 }() 259 266 260 - if srv.engine.AdminClient != nil { 261 - go func() { 262 - if err := srv.RunRefreshAdminClient(ctx); err != nil { 263 - slog.Error("session refresh failed", "err", err) 264 - } 265 - }() 266 - } 267 - 268 267 // the main service loop 269 268 if err := srv.RunConsumer(ctx); err != nil { 270 269 return fmt.Errorf("failure consuming and processing firehose: %w", err) ··· 286 285 return NewServer( 287 286 dir, 288 287 Config{ 288 + Logger: logger, 289 289 RelayHost: cctx.String("atp-relay-host"), 290 290 BskyHost: cctx.String("atp-bsky-host"), 291 - Logger: logger, 292 - ModHost: cctx.String("atp-mod-host"), 293 - ModAdminToken: cctx.String("mod-admin-token"), 294 - ModUsername: cctx.String("mod-handle"), 295 - ModPassword: cctx.String("mod-password"), 291 + OzoneHost: cctx.String("atp-ozone-host"), 292 + OzoneDID: cctx.String("ozone-did"), 293 + OzoneAdminToken: cctx.String("ozone-admin-token"), 294 + PDSHost: cctx.String("atp-pds-host"), 295 + PDSAdminToken: cctx.String("pds-admin-token"), 296 296 SetsFileJSON: cctx.String("sets-json-path"), 297 297 RedisURL: cctx.String("redis-url"), 298 298 HiveAPIToken: cctx.String("hiveai-api-token"),
+39 -63
cmd/hepa/server.go
··· 10 10 "sync/atomic" 11 11 "time" 12 12 13 - comatproto "github.com/bluesky-social/indigo/api/atproto" 14 13 "github.com/bluesky-social/indigo/atproto/identity" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 15 "github.com/bluesky-social/indigo/automod" 16 16 "github.com/bluesky-social/indigo/automod/cachestore" 17 17 "github.com/bluesky-social/indigo/automod/countstore" ··· 41 41 } 42 42 43 43 type Config struct { 44 + Logger *slog.Logger 44 45 RelayHost string 45 46 BskyHost string 46 - ModHost string 47 - ModAdminToken string 48 - ModUsername string 49 - ModPassword string 47 + OzoneHost string 48 + OzoneDID string 49 + OzoneAdminToken string 50 + PDSHost string 51 + PDSAdminToken string 50 52 SetsFileJSON string 51 53 RedisURL string 52 54 SlackWebhookURL string ··· 56 58 RulesetName string 57 59 RatelimitBypass string 58 60 FirehoseParallelism int 59 - Logger *slog.Logger 60 61 } 61 62 62 63 func NewServer(dir identity.Directory, config Config) (*Server, error) { ··· 72 73 return nil, fmt.Errorf("specified relay host must include 'ws://' or 'wss://'") 73 74 } 74 75 75 - // TODO: this isn't a very robust way to handle a persistent client 76 - var xrpcc *xrpc.Client 77 - if config.ModAdminToken != "" { 78 - xrpcc = &xrpc.Client{ 76 + var ozoneClient *xrpc.Client 77 + if config.OzoneAdminToken != "" && config.OzoneDID != "" { 78 + ozoneClient = &xrpc.Client{ 79 79 Client: util.RobustHTTPClient(), 80 - Host: config.ModHost, 81 - AdminToken: &config.ModAdminToken, 80 + Host: config.OzoneHost, 81 + AdminToken: &config.OzoneAdminToken, 82 82 Auth: &xrpc.AuthInfo{}, 83 83 } 84 84 if config.RatelimitBypass != "" { 85 - xrpcc.Headers = make(map[string]string) 86 - xrpcc.Headers["x-ratelimit-bypass"] = config.RatelimitBypass 85 + ozoneClient.Headers = make(map[string]string) 86 + ozoneClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass 87 87 } 88 - 89 - auth, err := comatproto.ServerCreateSession(context.TODO(), xrpcc, &comatproto.ServerCreateSession_Input{ 90 - Identifier: config.ModUsername, 91 - Password: config.ModPassword, 92 - }) 88 + od, err := syntax.ParseDID(config.OzoneDID) 93 89 if err != nil { 94 - return nil, fmt.Errorf("connecting to mod service: %v", err) 90 + return nil, fmt.Errorf("ozone account DID supplied was not valid: %v", err) 95 91 } 96 - xrpcc.Auth.AccessJwt = auth.AccessJwt 97 - xrpcc.Auth.RefreshJwt = auth.RefreshJwt 98 - xrpcc.Auth.Did = auth.Did 99 - xrpcc.Auth.Handle = auth.Handle 92 + ozoneClient.Auth.Did = od.String() 93 + logger.Info("configured ozone admin client", "did", od.String(), "host", config.OzoneHost) 94 + } else { 95 + logger.Info("did not configure ozone client") 96 + } 97 + 98 + var adminClient *xrpc.Client 99 + if config.PDSAdminToken != "" { 100 + adminClient = &xrpc.Client{ 101 + Client: util.RobustHTTPClient(), 102 + Host: config.PDSHost, 103 + AdminToken: &config.PDSAdminToken, 104 + Auth: &xrpc.AuthInfo{}, 105 + } 106 + if config.RatelimitBypass != "" { 107 + adminClient.Headers = make(map[string]string) 108 + adminClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass 109 + } 110 + logger.Info("configured PDS admin client", "host", config.PDSHost) 111 + } else { 112 + logger.Info("did not configure PDS admin client") 100 113 } 101 114 102 115 sets := setstore.NewMemSetStore() ··· 200 213 Cache: cache, 201 214 Rules: ruleset, 202 215 Notifier: notifier, 203 - AdminClient: xrpcc, 204 216 BskyClient: &bskyClient, 217 + OzoneClient: ozoneClient, 218 + AdminClient: adminClient, 205 219 BlobClient: blobClient, 206 220 } 207 221 ··· 250 264 } 251 265 err := s.rdb.Set(ctx, cursorKey, lastSeq, 14*24*time.Hour).Err() 252 266 return err 253 - } 254 - 255 - // Periodically refreshes the engine's admin XRPC client JWT auth token. 256 - // 257 - // Expects to be run in a goroutine, and to be the only running code which touches the auth fields (aka, there is no locking). 258 - // TODO: this is a hack until we have an XRPC client which handles these details automatically. 259 - func (s *Server) RunRefreshAdminClient(ctx context.Context) error { 260 - if s.engine.AdminClient == nil { 261 - return nil 262 - } 263 - ac := s.engine.AdminClient 264 - ticker := time.NewTicker(1 * time.Hour) 265 - for { 266 - select { 267 - case <-ticker.C: 268 - // uses a temporary xrpc client instead of the existing one because we need to put refreshJwt in the position of accessJwt, and that would cause an error for any concurrent requests 269 - tmpClient := xrpc.Client{ 270 - Host: ac.Host, 271 - Auth: &xrpc.AuthInfo{ 272 - Did: ac.Auth.Did, 273 - Handle: ac.Auth.Handle, 274 - AccessJwt: ac.Auth.RefreshJwt, 275 - RefreshJwt: ac.Auth.RefreshJwt, 276 - }, 277 - } 278 - refresh, err := comatproto.ServerRefreshSession(ctx, &tmpClient) 279 - if err != nil { 280 - // don't return an error, just log, and attempt again on the next tick 281 - s.logger.Error("failed to refresh admin client session", "err", err, "host", ac.Host) 282 - } else { 283 - s.engine.AdminClient.Auth.RefreshJwt = refresh.RefreshJwt 284 - s.engine.AdminClient.Auth.AccessJwt = refresh.AccessJwt 285 - s.logger.Info("refreshed admin client session") 286 - } 287 - case <-ctx.Done(): 288 - return nil 289 - } 290 - } 291 267 } 292 268 293 269 // this method runs in a loop, persisting the current cursor state every 5 seconds