this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

beemo: refactor and add mention notifications (#674)

authored by

bnewbold and committed by
GitHub
d4ddb540 b5826759

+486 -189
+144
cmd/beemo/firehose_consumer.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "log/slog" 8 + "net/http" 9 + "net/url" 10 + "strings" 11 + 12 + comatproto "github.com/bluesky-social/indigo/api/atproto" 13 + appbsky "github.com/bluesky-social/indigo/api/bsky" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 + lexutil "github.com/bluesky-social/indigo/lex/util" 17 + 18 + "github.com/bluesky-social/indigo/events" 19 + "github.com/bluesky-social/indigo/repo" 20 + "github.com/bluesky-social/indigo/repomgr" 21 + "github.com/carlmjohnson/versioninfo" 22 + "github.com/gorilla/websocket" 23 + ) 24 + 25 + func RunFirehoseConsumer(ctx context.Context, logger *slog.Logger, relayHost string, postCallback func(context.Context, syntax.DID, syntax.RecordKey, appbsky.FeedPost) error) error { 26 + 27 + dialer := websocket.DefaultDialer 28 + u, err := url.Parse(relayHost) 29 + if err != nil { 30 + return fmt.Errorf("invalid relayHost URI: %w", err) 31 + } 32 + // always continue at the current cursor offset (don't provide cursor query param) 33 + u.Path = "xrpc/com.atproto.sync.subscribeRepos" 34 + logger.Info("subscribing to repo event stream", "upstream", relayHost) 35 + con, _, err := dialer.Dial(u.String(), http.Header{ 36 + "User-Agent": []string{fmt.Sprintf("beemo/%s", versioninfo.Short())}, 37 + }) 38 + if err != nil { 39 + return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 40 + } 41 + 42 + rsc := &events.RepoStreamCallbacks{ 43 + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 44 + return HandleRepoCommit(ctx, logger, evt, postCallback) 45 + }, 46 + // NOTE: could add other callbacks as needed 47 + } 48 + 49 + var scheduler events.Scheduler 50 + // use parallel scheduler 51 + parallelism := 4 52 + scheduler = parallel.NewScheduler( 53 + parallelism, 54 + 1000, 55 + relayHost, 56 + rsc.EventHandler, 57 + ) 58 + logger.Info("beemo firehose scheduler configured", "scheduler", "parallel", "workers", parallelism) 59 + 60 + return events.HandleRepoStream(ctx, con, scheduler) 61 + } 62 + 63 + // TODO: move this to a "ParsePath" helper in syntax package? 64 + func splitRepoPath(path string) (syntax.NSID, syntax.RecordKey, error) { 65 + parts := strings.SplitN(path, "/", 3) 66 + if len(parts) != 2 { 67 + return "", "", fmt.Errorf("invalid record path: %s", path) 68 + } 69 + collection, err := syntax.ParseNSID(parts[0]) 70 + if err != nil { 71 + return "", "", err 72 + } 73 + rkey, err := syntax.ParseRecordKey(parts[1]) 74 + if err != nil { 75 + return "", "", err 76 + } 77 + return collection, rkey, nil 78 + } 79 + 80 + // NOTE: for now, this function basically never errors, just logs and returns nil. Should think through error processing better. 81 + func HandleRepoCommit(ctx context.Context, logger *slog.Logger, evt *comatproto.SyncSubscribeRepos_Commit, postCallback func(context.Context, syntax.DID, syntax.RecordKey, appbsky.FeedPost) error) error { 82 + 83 + logger = logger.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) 84 + logger.Debug("received commit event") 85 + 86 + if evt.TooBig { 87 + logger.Warn("skipping tooBig events for now") 88 + return nil 89 + } 90 + 91 + did, err := syntax.ParseDID(evt.Repo) 92 + if err != nil { 93 + logger.Error("bad DID syntax in event", "err", err) 94 + return nil 95 + } 96 + 97 + rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 98 + if err != nil { 99 + logger.Error("failed to read repo from car", "err", err) 100 + return nil 101 + } 102 + 103 + for _, op := range evt.Ops { 104 + logger = logger.With("eventKind", op.Action, "path", op.Path) 105 + collection, rkey, err := splitRepoPath(op.Path) 106 + if err != nil { 107 + logger.Error("invalid path in repo op") 108 + return nil 109 + } 110 + 111 + ek := repomgr.EventKind(op.Action) 112 + switch ek { 113 + case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 114 + // read the record bytes from blocks, and verify CID 115 + rc, recordCBOR, err := rr.GetRecordBytes(ctx, op.Path) 116 + if err != nil { 117 + logger.Error("reading record from event blocks (CAR)", "err", err) 118 + continue 119 + } 120 + if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 121 + logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 122 + continue 123 + } 124 + 125 + switch collection { 126 + case "app.bsky.feed.post": 127 + var post appbsky.FeedPost 128 + if err := post.UnmarshalCBOR(bytes.NewReader(*recordCBOR)); err != nil { 129 + logger.Error("failed to parse app.bsky.feed.post record", "err", err) 130 + continue 131 + } 132 + if err := postCallback(ctx, did, rkey, post); err != nil { 133 + logger.Error("failed to process post record", "err", err) 134 + continue 135 + } 136 + } 137 + 138 + default: 139 + // ignore other events 140 + } 141 + } 142 + 143 + return nil 144 + }
+82 -189
cmd/beemo/main.go
··· 4 4 package main 5 5 6 6 import ( 7 - "bytes" 8 - "context" 9 - "encoding/json" 10 - "fmt" 11 - "net/http" 7 + "io" 8 + "log/slog" 12 9 "os" 13 10 "strings" 14 - "time" 15 - 16 - comatproto "github.com/bluesky-social/indigo/api/atproto" 17 - toolsozone "github.com/bluesky-social/indigo/api/ozone" 18 - "github.com/bluesky-social/indigo/util" 19 - "github.com/bluesky-social/indigo/xrpc" 20 11 21 12 _ "github.com/joho/godotenv/autoload" 22 13 _ "go.uber.org/automaxprocs" 23 14 24 15 "github.com/carlmjohnson/versioninfo" 25 - logging "github.com/ipfs/go-log" 26 16 "github.com/urfave/cli/v2" 27 17 ) 28 18 29 - var log = logging.Logger("beemo") 30 - 31 19 func main() { 32 20 if err := run(os.Args); err != nil { 33 - log.Fatal(err) 21 + slog.Error("exiting", "err", err) 22 + os.Exit(-1) 34 23 } 35 24 } 36 25 ··· 44 33 45 34 app.Flags = []cli.Flag{ 46 35 &cli.StringFlag{ 47 - Name: "pds-host", 48 - Usage: "method, hostname, and port of PDS instance", 49 - Value: "http://localhost:4849", 50 - EnvVars: []string{"ATP_PDS_HOST"}, 51 - }, 52 - &cli.StringFlag{ 53 - Name: "admin-host", 54 - Usage: "method, hostname, and port of admin interface (eg, Ozone), for direct links", 55 - Value: "http://localhost:3000", 56 - EnvVars: []string{"ATP_ADMIN_HOST"}, 57 - }, 58 - &cli.StringFlag{ 59 - Name: "handle", 60 - Usage: "for PDS login", 61 - Required: true, 62 - EnvVars: []string{"ATP_AUTH_HANDLE"}, 63 - }, 64 - &cli.StringFlag{ 65 - Name: "password", 66 - Usage: "for PDS login", 67 - Required: true, 68 - EnvVars: []string{"ATP_AUTH_PASSWORD"}, 69 - }, 70 - &cli.StringFlag{ 71 - Name: "admin-password", 72 - Usage: "admin authentication password for PDS", 73 - Required: true, 74 - EnvVars: []string{"ATP_AUTH_ADMIN_PASSWORD"}, 36 + Name: "log-level", 37 + Usage: "log verbosity level (eg: warn, info, debug)", 38 + EnvVars: []string{"BEEMO_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"}, 75 39 }, 76 40 &cli.StringFlag{ 77 41 Name: "slack-webhook-url", ··· 80 44 Required: true, 81 45 EnvVars: []string{"SLACK_WEBHOOK_URL"}, 82 46 }, 83 - &cli.IntFlag{ 84 - Name: "poll-period", 85 - Usage: "API poll period in seconds", 86 - Value: 30, 87 - EnvVars: []string{"POLL_PERIOD"}, 88 - }, 89 47 } 90 48 app.Commands = []*cli.Command{ 91 49 &cli.Command{ 92 50 Name: "notify-reports", 93 51 Usage: "watch for new moderation reports, notify in slack", 94 52 Action: pollNewReports, 53 + Flags: []cli.Flag{ 54 + &cli.StringFlag{ 55 + Name: "pds-host", 56 + Usage: "method, hostname, and port of PDS instance", 57 + Value: "http://localhost:4849", 58 + EnvVars: []string{"ATP_PDS_HOST"}, 59 + }, 60 + &cli.StringFlag{ 61 + Name: "admin-host", 62 + Usage: "method, hostname, and port of admin interface (eg, Ozone), for direct links", 63 + Value: "http://localhost:3000", 64 + EnvVars: []string{"ATP_ADMIN_HOST"}, 65 + }, 66 + &cli.IntFlag{ 67 + Name: "poll-period", 68 + Usage: "API poll period in seconds", 69 + Value: 30, 70 + EnvVars: []string{"POLL_PERIOD"}, 71 + }, 72 + &cli.StringFlag{ 73 + Name: "handle", 74 + Usage: "for PDS login", 75 + Required: true, 76 + EnvVars: []string{"ATP_AUTH_HANDLE"}, 77 + }, 78 + &cli.StringFlag{ 79 + Name: "password", 80 + Usage: "for PDS login", 81 + Required: true, 82 + EnvVars: []string{"ATP_AUTH_PASSWORD"}, 83 + }, 84 + &cli.StringFlag{ 85 + Name: "admin-password", 86 + Usage: "admin authentication password for PDS", 87 + Required: true, 88 + EnvVars: []string{"ATP_AUTH_ADMIN_PASSWORD"}, 89 + }, 90 + }, 91 + }, 92 + &cli.Command{ 93 + Name: "notify-mentions", 94 + Usage: "watch firehose for posts mentioning specific accounts", 95 + Action: notifyMentions, 96 + Flags: []cli.Flag{ 97 + &cli.StringFlag{ 98 + Name: "relay-host", 99 + Usage: "method, hostname, and port of Relay instance (websocket)", 100 + Value: "wss://bsky.network", 101 + EnvVars: []string{"ATP_RELAY_HOST"}, 102 + }, 103 + &cli.StringFlag{ 104 + Name: "mention-dids", 105 + Usage: "DIDs to look for in mentions (comma-separated)", 106 + Required: true, 107 + EnvVars: []string{"BEEMO_MENTION_DIDS"}, 108 + }, 109 + }, 95 110 }, 96 111 } 97 112 return app.Run(args) 98 113 } 99 114 100 - func pollNewReports(cctx *cli.Context) error { 101 - // record last-seen report timestamp 102 - since := time.Now() 103 - // NOTE: uncomment this for testing 104 - //since = time.Now().Add(time.Duration(-12) * time.Hour) 105 - period := time.Duration(cctx.Int("poll-period")) * time.Second 106 - 107 - // create a new session 108 - xrpcc := &xrpc.Client{ 109 - Client: util.RobustHTTPClient(), 110 - Host: cctx.String("pds-host"), 111 - Auth: &xrpc.AuthInfo{Handle: cctx.String("handle")}, 115 + func configLogger(cctx *cli.Context, writer io.Writer) *slog.Logger { 116 + var level slog.Level 117 + switch strings.ToLower(cctx.String("log-level")) { 118 + case "error": 119 + level = slog.LevelError 120 + case "warn": 121 + level = slog.LevelWarn 122 + case "info": 123 + level = slog.LevelInfo 124 + case "debug": 125 + level = slog.LevelDebug 126 + default: 127 + level = slog.LevelInfo 112 128 } 113 - 114 - auth, err := comatproto.ServerCreateSession(context.TODO(), xrpcc, &comatproto.ServerCreateSession_Input{ 115 - Identifier: xrpcc.Auth.Handle, 116 - Password: cctx.String("password"), 117 - }) 118 - if err != nil { 119 - return err 120 - } 121 - xrpcc.Auth.AccessJwt = auth.AccessJwt 122 - xrpcc.Auth.RefreshJwt = auth.RefreshJwt 123 - xrpcc.Auth.Did = auth.Did 124 - xrpcc.Auth.Handle = auth.Handle 125 - 126 - adminToken := cctx.String("admin-password") 127 - if len(adminToken) > 0 { 128 - xrpcc.AdminToken = &adminToken 129 - } 130 - log.Infof("report polling bot starting up...") 131 - // can flip this bool to false to prevent spamming slack channel on startup 132 - if true { 133 - err := sendSlackMsg(cctx, fmt.Sprintf("restarted bot, monitoring for reports since `%s`...", since.Format(time.RFC3339))) 134 - if err != nil { 135 - return err 136 - } 137 - } 138 - for { 139 - // refresh session 140 - xrpcc.Auth.AccessJwt = xrpcc.Auth.RefreshJwt 141 - refresh, err := comatproto.ServerRefreshSession(context.TODO(), xrpcc) 142 - if err != nil { 143 - return err 144 - } 145 - xrpcc.Auth.AccessJwt = refresh.AccessJwt 146 - xrpcc.Auth.RefreshJwt = refresh.RefreshJwt 147 - 148 - // query just new reports (regardless of resolution state) 149 - // ModerationQueryEvents(ctx context.Context, c *xrpc.Client, createdBy string, cursor string, includeAllUserRecords bool, limit int64, sortDirection string, subject string, types []string) (*ModerationQueryEvents_Output, error) 150 - var limit int64 = 50 151 - me, err := toolsozone.ModerationQueryEvents( 152 - cctx.Context, 153 - xrpcc, 154 - nil, 155 - nil, 156 - "", 157 - "", 158 - "", 159 - "", 160 - "", 161 - false, 162 - true, 163 - limit, 164 - nil, 165 - nil, 166 - nil, 167 - "", 168 - "", 169 - []string{"tools.ozone.moderation.defs#modEventReport"}, 170 - ) 171 - if err != nil { 172 - return err 173 - } 174 - // this works out to iterate from newest to oldest, which is the behavior we want (report only newest, then break) 175 - for _, evt := range me.Events { 176 - report := evt.Event.ModerationDefs_ModEventReport 177 - // TODO: filter out based on subject state? similar to old "report.ResolvedByActionIds" 178 - createdAt, err := time.Parse(time.RFC3339, evt.CreatedAt) 179 - if err != nil { 180 - return fmt.Errorf("invalid time format for 'createdAt': %w", err) 181 - } 182 - if createdAt.After(since) { 183 - shortType := "" 184 - if report.ReportType != nil && strings.Contains(*report.ReportType, "#") { 185 - shortType = strings.SplitN(*report.ReportType, "#", 2)[1] 186 - } 187 - // ok, we found a "new" report, need to notify 188 - msg := fmt.Sprintf("⚠️ New report at `%s` ⚠️\n", evt.CreatedAt) 189 - msg += fmt.Sprintf("report id: `%d`\t", evt.Id) 190 - msg += fmt.Sprintf("instance: `%s`\n", cctx.String("pds-host")) 191 - msg += fmt.Sprintf("reasonType: `%s`\t", shortType) 192 - msg += fmt.Sprintf("Admin: %s/reports/%d\n", cctx.String("admin-host"), evt.Id) 193 - //msg += fmt.Sprintf("reportedByDid: `%s`\n", report.ReportedByDid) 194 - log.Infof("found new report, notifying slack: %s", report) 195 - err := sendSlackMsg(cctx, msg) 196 - if err != nil { 197 - return fmt.Errorf("failed to send slack message: %w", err) 198 - } 199 - since = createdAt 200 - break 201 - } else { 202 - log.Debugf("skipping report: %s", report) 203 - } 204 - } 205 - log.Infof("... sleeping for %s", period) 206 - time.Sleep(period) 207 - } 208 - } 209 - 210 - type SlackWebhookBody struct { 211 - Text string `json:"text"` 212 - } 213 - 214 - // sends a simple slack message to a channel via "incoming webhook" 215 - // The slack incoming webhook must be already configured in the slack workplace. 216 - func sendSlackMsg(cctx *cli.Context, msg string) error { 217 - // loosely based on: https://golangcode.com/send-slack-messages-without-a-library/ 218 - 219 - webhookUrl := cctx.String("slack-webhook-url") 220 - body, _ := json.Marshal(SlackWebhookBody{Text: msg}) 221 - req, err := http.NewRequest(http.MethodPost, webhookUrl, bytes.NewBuffer(body)) 222 - if err != nil { 223 - return err 224 - } 225 - req.Header.Add("Content-Type", "application/json") 226 - client := util.RobustHTTPClient() 227 - resp, err := client.Do(req) 228 - if err != nil { 229 - return err 230 - } 231 - 232 - defer resp.Body.Close() 233 - 234 - buf := new(bytes.Buffer) 235 - buf.ReadFrom(resp.Body) 236 - if resp.StatusCode != 200 || buf.String() != "ok" { 237 - // TODO: in some cases print body? eg, if short and text 238 - return fmt.Errorf("failed slack webhook POST request. status=%d", resp.StatusCode) 239 - } 240 - return nil 129 + logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{ 130 + Level: level, 131 + })) 132 + slog.SetDefault(logger) 133 + return logger 241 134 }
+88
cmd/beemo/notify_mentions.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "os" 8 + "strings" 9 + 10 + appbsky "github.com/bluesky-social/indigo/api/bsky" 11 + "github.com/bluesky-social/indigo/atproto/identity" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + 14 + "github.com/urfave/cli/v2" 15 + ) 16 + 17 + type MentionChecker struct { 18 + slackWebhookURL string 19 + mentionDIDs []syntax.DID 20 + logger *slog.Logger 21 + directory identity.Directory 22 + } 23 + 24 + func (mc *MentionChecker) ProcessPost(ctx context.Context, did syntax.DID, rkey syntax.RecordKey, post appbsky.FeedPost) error { 25 + mc.logger.Debug("processing post record", "did", did, "rkey", rkey) 26 + 27 + for _, facet := range post.Facets { 28 + for _, feature := range facet.Features { 29 + mention := feature.RichtextFacet_Mention 30 + if mention == nil { 31 + continue 32 + } 33 + for _, d := range mc.mentionDIDs { 34 + if mention.Did == d.String() { 35 + mc.logger.Info("found mention", "target", d, "author", did, "rkey", rkey) 36 + targetIdent, err := mc.directory.LookupDID(ctx, syntax.DID(mention.Did)) 37 + if err != nil { 38 + return err 39 + } 40 + authorIdent, err := mc.directory.LookupDID(ctx, did) 41 + if err != nil { 42 + return err 43 + } 44 + msg := fmt.Sprintf("Mention of `@%s` by `@%s` (<https://bsky.app/profile/%s/post/%s|post link>):\n```%s```", targetIdent.Handle, authorIdent.Handle, did, rkey, post.Text) 45 + if post.Embed != nil && (post.Embed.EmbedImages != nil || post.Embed.EmbedRecordWithMedia != nil || post.Embed.EmbedRecord != nil || post.Embed.EmbedExternal != nil) { 46 + msg += "\n(post also contains an embed/quote/media)" 47 + } 48 + return sendSlackMsg(ctx, msg, mc.slackWebhookURL) 49 + } 50 + } 51 + } 52 + } 53 + return nil 54 + } 55 + 56 + func notifyMentions(cctx *cli.Context) error { 57 + ctx := context.Background() 58 + logger := configLogger(cctx, os.Stdout) 59 + relayHost := cctx.String("relay-host") 60 + 61 + mentionDIDs := []syntax.DID{} 62 + for _, raw := range strings.Split(cctx.String("mention-dids"), ",") { 63 + did, err := syntax.ParseDID(raw) 64 + if err != nil { 65 + return err 66 + } 67 + mentionDIDs = append(mentionDIDs, did) 68 + } 69 + 70 + checker := MentionChecker{ 71 + slackWebhookURL: cctx.String("slack-webhook-url"), 72 + mentionDIDs: mentionDIDs, 73 + logger: logger, 74 + directory: identity.DefaultDirectory(), 75 + } 76 + 77 + logger.Info("beemo mention checker starting up...", "relayHost", relayHost, "mentionDIDs", mentionDIDs) 78 + 79 + // can flip this bool to false to prevent spamming slack channel on startup 80 + if true { 81 + err := sendSlackMsg(ctx, fmt.Sprintf("beemo booting, looking for account mentions: `%s`", mentionDIDs), checker.slackWebhookURL) 82 + if err != nil { 83 + return err 84 + } 85 + } 86 + 87 + return RunFirehoseConsumer(ctx, logger, relayHost, checker.ProcessPost) 88 + }
+130
cmd/beemo/notify_reports.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "os" 7 + "strings" 8 + "time" 9 + 10 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 + toolsozone "github.com/bluesky-social/indigo/api/ozone" 12 + "github.com/bluesky-social/indigo/util" 13 + "github.com/bluesky-social/indigo/xrpc" 14 + 15 + "github.com/urfave/cli/v2" 16 + ) 17 + 18 + func pollNewReports(cctx *cli.Context) error { 19 + ctx := context.Background() 20 + logger := configLogger(cctx, os.Stdout) 21 + slackWebhookURL := cctx.String("slack-webhook-url") 22 + 23 + // record last-seen report timestamp 24 + since := time.Now() 25 + // NOTE: uncomment this for testing 26 + //since = time.Now().Add(time.Duration(-12) * time.Hour) 27 + period := time.Duration(cctx.Int("poll-period")) * time.Second 28 + 29 + // create a new session 30 + xrpcc := &xrpc.Client{ 31 + Client: util.RobustHTTPClient(), 32 + Host: cctx.String("pds-host"), 33 + Auth: &xrpc.AuthInfo{Handle: cctx.String("handle")}, 34 + } 35 + 36 + auth, err := comatproto.ServerCreateSession(ctx, xrpcc, &comatproto.ServerCreateSession_Input{ 37 + Identifier: xrpcc.Auth.Handle, 38 + Password: cctx.String("password"), 39 + }) 40 + if err != nil { 41 + return err 42 + } 43 + xrpcc.Auth.AccessJwt = auth.AccessJwt 44 + xrpcc.Auth.RefreshJwt = auth.RefreshJwt 45 + xrpcc.Auth.Did = auth.Did 46 + xrpcc.Auth.Handle = auth.Handle 47 + 48 + adminToken := cctx.String("admin-password") 49 + if len(adminToken) > 0 { 50 + xrpcc.AdminToken = &adminToken 51 + } 52 + logger.Info("report polling bot starting up...") 53 + // can flip this bool to false to prevent spamming slack channel on startup 54 + if true { 55 + err := sendSlackMsg(ctx, fmt.Sprintf("restarted bot, monitoring for reports since `%s`...", since.Format(time.RFC3339)), slackWebhookURL) 56 + if err != nil { 57 + return err 58 + } 59 + } 60 + for { 61 + // refresh session 62 + xrpcc.Auth.AccessJwt = xrpcc.Auth.RefreshJwt 63 + refresh, err := comatproto.ServerRefreshSession(ctx, xrpcc) 64 + if err != nil { 65 + return err 66 + } 67 + xrpcc.Auth.AccessJwt = refresh.AccessJwt 68 + xrpcc.Auth.RefreshJwt = refresh.RefreshJwt 69 + 70 + // query just new reports (regardless of resolution state) 71 + // ModerationQueryEvents(ctx context.Context, c *xrpc.Client, createdBy string, cursor string, includeAllUserRecords bool, limit int64, sortDirection string, subject string, types []string) (*ModerationQueryEvents_Output, error) 72 + var limit int64 = 50 73 + me, err := toolsozone.ModerationQueryEvents( 74 + cctx.Context, 75 + xrpcc, 76 + nil, 77 + nil, 78 + "", 79 + "", 80 + "", 81 + "", 82 + "", 83 + false, 84 + true, 85 + limit, 86 + nil, 87 + nil, 88 + nil, 89 + "", 90 + "", 91 + []string{"tools.ozone.moderation.defs#modEventReport"}, 92 + ) 93 + if err != nil { 94 + return err 95 + } 96 + // this works out to iterate from newest to oldest, which is the behavior we want (report only newest, then break) 97 + for _, evt := range me.Events { 98 + report := evt.Event.ModerationDefs_ModEventReport 99 + // TODO: filter out based on subject state? similar to old "report.ResolvedByActionIds" 100 + createdAt, err := time.Parse(time.RFC3339, evt.CreatedAt) 101 + if err != nil { 102 + return fmt.Errorf("invalid time format for 'createdAt': %w", err) 103 + } 104 + if createdAt.After(since) { 105 + shortType := "" 106 + if report.ReportType != nil && strings.Contains(*report.ReportType, "#") { 107 + shortType = strings.SplitN(*report.ReportType, "#", 2)[1] 108 + } 109 + // ok, we found a "new" report, need to notify 110 + msg := fmt.Sprintf("⚠️ New report at `%s` ⚠️\n", evt.CreatedAt) 111 + msg += fmt.Sprintf("report id: `%d`\t", evt.Id) 112 + msg += fmt.Sprintf("instance: `%s`\n", cctx.String("pds-host")) 113 + msg += fmt.Sprintf("reasonType: `%s`\t", shortType) 114 + msg += fmt.Sprintf("Admin: %s/reports/%d\n", cctx.String("admin-host"), evt.Id) 115 + //msg += fmt.Sprintf("reportedByDid: `%s`\n", report.ReportedByDid) 116 + logger.Info("found new report, notifying slack", "report", report) 117 + err := sendSlackMsg(ctx, msg, slackWebhookURL) 118 + if err != nil { 119 + return fmt.Errorf("failed to send slack message: %w", err) 120 + } 121 + since = createdAt 122 + break 123 + } else { 124 + logger.Debug("skipping report", "report", report) 125 + } 126 + } 127 + logger.Info("... sleeping", "period", period) 128 + time.Sleep(period) 129 + } 130 + }
+42
cmd/beemo/slack.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "net/http" 9 + 10 + "github.com/bluesky-social/indigo/util" 11 + ) 12 + 13 + type SlackWebhookBody struct { 14 + Text string `json:"text"` 15 + } 16 + 17 + // sends a simple slack message to a channel via "incoming webhook" 18 + // The slack incoming webhook must be already configured in the slack workplace. 19 + func sendSlackMsg(ctx context.Context, msg, webhookURL string) error { 20 + // loosely based on: https://golangcode.com/send-slack-messages-without-a-library/ 21 + 22 + body, _ := json.Marshal(SlackWebhookBody{Text: msg}) 23 + req, err := http.NewRequestWithContext(ctx, http.MethodPost, webhookURL, bytes.NewBuffer(body)) 24 + if err != nil { 25 + return err 26 + } 27 + req.Header.Add("Content-Type", "application/json") 28 + client := util.RobustHTTPClient() 29 + resp, err := client.Do(req) 30 + if err != nil { 31 + return err 32 + } 33 + 34 + defer resp.Body.Close() 35 + 36 + buf := new(bytes.Buffer) 37 + buf.ReadFrom(resp.Body) 38 + if resp.StatusCode != 200 || buf.String() != "ok" { 39 + return fmt.Errorf("failed slack webhook POST request. status=%d", resp.StatusCode) 40 + } 41 + return nil 42 + }