this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

beemo: add mention notifier

+324 -49
+139
cmd/beemo/firehose_consumer.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "log/slog" 8 + "net/http" 9 + "net/url" 10 + "strings" 11 + 12 + comatproto "github.com/bluesky-social/indigo/api/atproto" 13 + appbsky "github.com/bluesky-social/indigo/api/bsky" 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/bluesky-social/indigo/events/schedulers/autoscaling" 16 + lexutil "github.com/bluesky-social/indigo/lex/util" 17 + 18 + "github.com/bluesky-social/indigo/events" 19 + "github.com/bluesky-social/indigo/repo" 20 + "github.com/bluesky-social/indigo/repomgr" 21 + "github.com/carlmjohnson/versioninfo" 22 + "github.com/gorilla/websocket" 23 + ) 24 + 25 + func RunFirehoseConsumer(ctx context.Context, logger *slog.Logger, relayHost string, postCallback func(context.Context, syntax.DID, syntax.RecordKey, appbsky.FeedPost) error) error { 26 + 27 + dialer := websocket.DefaultDialer 28 + u, err := url.Parse(relayHost) 29 + if err != nil { 30 + return fmt.Errorf("invalid relayHost URI: %w", err) 31 + } 32 + // always continue at the current cursor offset (don't provide cursor query param) 33 + u.Path = "xrpc/com.atproto.sync.subscribeRepos" 34 + logger.Info("subscribing to repo event stream", "upstream", relayHost) 35 + con, _, err := dialer.Dial(u.String(), http.Header{ 36 + "User-Agent": []string{fmt.Sprintf("beemo/%s", versioninfo.Short())}, 37 + }) 38 + if err != nil { 39 + return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 40 + } 41 + 42 + rsc := &events.RepoStreamCallbacks{ 43 + RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 44 + return HandleRepoCommit(ctx, logger, evt, postCallback) 45 + }, 46 + // NOTE: could add other callbacks as needed 47 + } 48 + 49 + var scheduler events.Scheduler 50 + // use auto-scaling scheduler 51 + scaleSettings := autoscaling.DefaultAutoscaleSettings() 52 + scheduler = autoscaling.NewScheduler(scaleSettings, relayHost, rsc.EventHandler) 53 + logger.Info("beemo firehose scheduler configured", "scheduler", "autoscaling", "initial", scaleSettings.Concurrency, "max", scaleSettings.MaxConcurrency) 54 + 55 + return events.HandleRepoStream(ctx, con, scheduler) 56 + } 57 + 58 + // TODO: move this to a "ParsePath" helper in syntax package? 59 + func splitRepoPath(path string) (syntax.NSID, syntax.RecordKey, error) { 60 + parts := strings.SplitN(path, "/", 3) 61 + if len(parts) != 2 { 62 + return "", "", fmt.Errorf("invalid record path: %s", path) 63 + } 64 + collection, err := syntax.ParseNSID(parts[0]) 65 + if err != nil { 66 + return "", "", err 67 + } 68 + rkey, err := syntax.ParseRecordKey(parts[1]) 69 + if err != nil { 70 + return "", "", err 71 + } 72 + return collection, rkey, nil 73 + } 74 + 75 + // NOTE: for now, this function basically never errors, just logs and returns nil. Should think through error processing better. 76 + func HandleRepoCommit(ctx context.Context, logger *slog.Logger, evt *comatproto.SyncSubscribeRepos_Commit, postCallback func(context.Context, syntax.DID, syntax.RecordKey, appbsky.FeedPost) error) error { 77 + 78 + logger = logger.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) 79 + logger.Debug("received commit event") 80 + 81 + if evt.TooBig { 82 + logger.Warn("skipping tooBig events for now") 83 + return nil 84 + } 85 + 86 + did, err := syntax.ParseDID(evt.Repo) 87 + if err != nil { 88 + logger.Error("bad DID syntax in event", "err", err) 89 + return nil 90 + } 91 + 92 + rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 93 + if err != nil { 94 + logger.Error("failed to read repo from car", "err", err) 95 + return nil 96 + } 97 + 98 + for _, op := range evt.Ops { 99 + logger = logger.With("eventKind", op.Action, "path", op.Path) 100 + collection, rkey, err := splitRepoPath(op.Path) 101 + if err != nil { 102 + logger.Error("invalid path in repo op") 103 + return nil 104 + } 105 + 106 + ek := repomgr.EventKind(op.Action) 107 + switch ek { 108 + case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 109 + // read the record bytes from blocks, and verify CID 110 + rc, recordCBOR, err := rr.GetRecordBytes(ctx, op.Path) 111 + if err != nil { 112 + logger.Error("reading record from event blocks (CAR)", "err", err) 113 + continue 114 + } 115 + if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 116 + logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 117 + continue 118 + } 119 + 120 + switch collection { 121 + case "app.bsky.feed.post": 122 + var post appbsky.FeedPost 123 + if err := post.UnmarshalCBOR(bytes.NewReader(*recordCBOR)); err != nil { 124 + logger.Error("failed to parse app.bsky.feed.post record", "err", err) 125 + continue 126 + } 127 + if err := postCallback(ctx, did, rkey, post); err != nil { 128 + logger.Error("failed to process post record", "err", err) 129 + continue 130 + } 131 + } 132 + 133 + default: 134 + // ignore other events 135 + } 136 + } 137 + 138 + return nil 139 + }
+85 -38
cmd/beemo/main.go
··· 4 4 package main 5 5 6 6 import ( 7 + "io" 8 + "log/slog" 7 9 "os" 10 + "strings" 8 11 9 12 _ "github.com/joho/godotenv/autoload" 10 13 _ "go.uber.org/automaxprocs" 11 14 12 15 "github.com/carlmjohnson/versioninfo" 13 - logging "github.com/ipfs/go-log" 14 16 "github.com/urfave/cli/v2" 15 17 ) 16 18 17 - var log = logging.Logger("beemo") 18 - 19 19 func main() { 20 20 if err := run(os.Args); err != nil { 21 - log.Fatal(err) 21 + slog.Error("exiting", "err", err) 22 + os.Exit(-1) 22 23 } 23 24 } 24 25 ··· 32 33 33 34 app.Flags = []cli.Flag{ 34 35 &cli.StringFlag{ 35 - Name: "pds-host", 36 - Usage: "method, hostname, and port of PDS instance", 37 - Value: "http://localhost:4849", 38 - EnvVars: []string{"ATP_PDS_HOST"}, 39 - }, 40 - &cli.StringFlag{ 41 - Name: "admin-host", 42 - Usage: "method, hostname, and port of admin interface (eg, Ozone), for direct links", 43 - Value: "http://localhost:3000", 44 - EnvVars: []string{"ATP_ADMIN_HOST"}, 45 - }, 46 - &cli.StringFlag{ 47 - Name: "handle", 48 - Usage: "for PDS login", 49 - Required: true, 50 - EnvVars: []string{"ATP_AUTH_HANDLE"}, 51 - }, 52 - &cli.StringFlag{ 53 - Name: "password", 54 - Usage: "for PDS login", 55 - Required: true, 56 - EnvVars: []string{"ATP_AUTH_PASSWORD"}, 57 - }, 58 - &cli.StringFlag{ 59 - Name: "admin-password", 60 - Usage: "admin authentication password for PDS", 61 - Required: true, 62 - EnvVars: []string{"ATP_AUTH_ADMIN_PASSWORD"}, 36 + Name: "log-level", 37 + Usage: "log verbosity level (eg: warn, info, debug)", 38 + EnvVars: []string{"BEEMO_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"}, 63 39 }, 64 40 &cli.StringFlag{ 65 41 Name: "slack-webhook-url", ··· 68 44 Required: true, 69 45 EnvVars: []string{"SLACK_WEBHOOK_URL"}, 70 46 }, 71 - &cli.IntFlag{ 72 - Name: "poll-period", 73 - Usage: "API poll period in seconds", 74 - Value: 30, 75 - EnvVars: []string{"POLL_PERIOD"}, 76 - }, 77 47 } 78 48 app.Commands = []*cli.Command{ 79 49 &cli.Command{ 80 50 Name: "notify-reports", 81 51 Usage: "watch for new moderation reports, notify in slack", 82 52 Action: pollNewReports, 53 + Flags: []cli.Flag{ 54 + &cli.StringFlag{ 55 + Name: "pds-host", 56 + Usage: "method, hostname, and port of PDS instance", 57 + Value: "http://localhost:4849", 58 + EnvVars: []string{"ATP_PDS_HOST"}, 59 + }, 60 + &cli.StringFlag{ 61 + Name: "admin-host", 62 + Usage: "method, hostname, and port of admin interface (eg, Ozone), for direct links", 63 + Value: "http://localhost:3000", 64 + EnvVars: []string{"ATP_ADMIN_HOST"}, 65 + }, 66 + &cli.IntFlag{ 67 + Name: "poll-period", 68 + Usage: "API poll period in seconds", 69 + Value: 30, 70 + EnvVars: []string{"POLL_PERIOD"}, 71 + }, 72 + &cli.StringFlag{ 73 + Name: "handle", 74 + Usage: "for PDS login", 75 + Required: true, 76 + EnvVars: []string{"ATP_AUTH_HANDLE"}, 77 + }, 78 + &cli.StringFlag{ 79 + Name: "password", 80 + Usage: "for PDS login", 81 + Required: true, 82 + EnvVars: []string{"ATP_AUTH_PASSWORD"}, 83 + }, 84 + &cli.StringFlag{ 85 + Name: "admin-password", 86 + Usage: "admin authentication password for PDS", 87 + Required: true, 88 + EnvVars: []string{"ATP_AUTH_ADMIN_PASSWORD"}, 89 + }, 90 + }, 91 + }, 92 + &cli.Command{ 93 + Name: "notify-mentions", 94 + Usage: "watch firehose for posts mentioning specific accounts", 95 + Action: notifyMentions, 96 + Flags: []cli.Flag{ 97 + &cli.StringFlag{ 98 + Name: "relay-host", 99 + Usage: "method, hostname, and port of Relay instance (websocket)", 100 + Value: "wss://bsky.network", 101 + EnvVars: []string{"ATP_RELAY_HOST"}, 102 + }, 103 + &cli.StringFlag{ 104 + Name: "mention-dids", 105 + Usage: "DIDs to look for in mentions (comma-separated)", 106 + Required: true, 107 + EnvVars: []string{"BEEMO_MENTION_DIDS"}, 108 + }, 109 + }, 83 110 }, 84 111 } 85 112 return app.Run(args) 86 113 } 87 114 115 + func configLogger(cctx *cli.Context, writer io.Writer) *slog.Logger { 116 + var level slog.Level 117 + switch strings.ToLower(cctx.String("log-level")) { 118 + case "error": 119 + level = slog.LevelError 120 + case "warn": 121 + level = slog.LevelWarn 122 + case "info": 123 + level = slog.LevelInfo 124 + case "debug": 125 + level = slog.LevelDebug 126 + default: 127 + level = slog.LevelInfo 128 + } 129 + logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{ 130 + Level: level, 131 + })) 132 + slog.SetDefault(logger) 133 + return logger 134 + }
+86
cmd/beemo/notify_mentions.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "log/slog" 7 + "os" 8 + "strings" 9 + 10 + appbsky "github.com/bluesky-social/indigo/api/bsky" 11 + "github.com/bluesky-social/indigo/atproto/identity" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + 14 + "github.com/urfave/cli/v2" 15 + ) 16 + 17 + type MentionChecker struct { 18 + slackWebhookURL string 19 + mentionDIDs []syntax.DID 20 + logger *slog.Logger 21 + directory identity.Directory 22 + } 23 + 24 + func (mc *MentionChecker) ProcessPost(ctx context.Context, did syntax.DID, rkey syntax.RecordKey, post appbsky.FeedPost) error { 25 + mc.logger.Debug("processing post record", "did", did, "rkey", rkey) 26 + 27 + for _, facet := range post.Facets { 28 + for _, feature := range facet.Features { 29 + mention := feature.RichtextFacet_Mention 30 + if mention == nil { 31 + continue 32 + } 33 + for _, d := range mc.mentionDIDs { 34 + if mention.Did == d.String() { 35 + mc.logger.Info("found mention", "target", d, "author", did, "rkey", rkey) 36 + targetIdent, err := mc.directory.LookupDID(ctx, syntax.DID(mention.Did)) 37 + if err != nil { 38 + return err 39 + } 40 + authorIdent, err := mc.directory.LookupDID(ctx, did) 41 + if err != nil { 42 + return err 43 + } 44 + msg := fmt.Sprintf("Mention of `@%s` by `@%s` (<https://bsky.app/profile/%s/post/%s|post link>):\n```%s```", targetIdent.Handle, authorIdent.Handle, did, rkey, post.Text) 45 + return sendSlackMsg(ctx, msg, mc.slackWebhookURL) 46 + } 47 + } 48 + } 49 + } 50 + return nil 51 + } 52 + 53 + func notifyMentions(cctx *cli.Context) error { 54 + ctx := context.Background() 55 + logger := configLogger(cctx, os.Stdout) 56 + relayHost := cctx.String("relay-host") 57 + 58 + mentionDIDs := []syntax.DID{} 59 + for _, raw := range strings.Split(cctx.String("mention-dids"), ",") { 60 + fmt.Println(raw) 61 + did, err := syntax.ParseDID(raw) 62 + if err != nil { 63 + return err 64 + } 65 + mentionDIDs = append(mentionDIDs, did) 66 + } 67 + 68 + checker := MentionChecker{ 69 + slackWebhookURL: cctx.String("slack-webhook-url"), 70 + mentionDIDs: mentionDIDs, 71 + logger: logger, 72 + directory: identity.DefaultDirectory(), 73 + } 74 + 75 + logger.Info("beemo mention checker starting up...", "relayHost", relayHost, "mentionDIDs", mentionDIDs) 76 + 77 + // can flip this bool to false to prevent spamming slack channel on startup 78 + if true { 79 + err := sendSlackMsg(ctx, fmt.Sprintf("beemo booting, looking for account mentions: `%s`", mentionDIDs), checker.slackWebhookURL) 80 + if err != nil { 81 + return err 82 + } 83 + } 84 + 85 + return RunFirehoseConsumer(ctx, logger, relayHost, checker.ProcessPost) 86 + }
+11 -6
cmd/beemo/notify_reports.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "os" 6 7 "strings" 7 8 "time" 8 9 ··· 15 16 ) 16 17 17 18 func pollNewReports(cctx *cli.Context) error { 19 + ctx := context.Background() 20 + logger := configLogger(cctx, os.Stdout) 21 + slackWebhookURL := cctx.String("slack-webhook-url") 22 + 18 23 // record last-seen report timestamp 19 24 since := time.Now() 20 25 // NOTE: uncomment this for testing ··· 44 49 if len(adminToken) > 0 { 45 50 xrpcc.AdminToken = &adminToken 46 51 } 47 - log.Infof("report polling bot starting up...") 52 + logger.Info("report polling bot starting up...") 48 53 // can flip this bool to false to prevent spamming slack channel on startup 49 54 if true { 50 - err := sendSlackMsg(cctx, fmt.Sprintf("restarted bot, monitoring for reports since `%s`...", since.Format(time.RFC3339))) 55 + err := sendSlackMsg(ctx, fmt.Sprintf("restarted bot, monitoring for reports since `%s`...", since.Format(time.RFC3339)), slackWebhookURL) 51 56 if err != nil { 52 57 return err 53 58 } ··· 108 113 msg += fmt.Sprintf("reasonType: `%s`\t", shortType) 109 114 msg += fmt.Sprintf("Admin: %s/reports/%d\n", cctx.String("admin-host"), evt.Id) 110 115 //msg += fmt.Sprintf("reportedByDid: `%s`\n", report.ReportedByDid) 111 - log.Infof("found new report, notifying slack: %s", report) 112 - err := sendSlackMsg(cctx, msg) 116 + logger.Info("found new report, notifying slack", "report", report) 117 + err := sendSlackMsg(ctx, msg, slackWebhookURL) 113 118 if err != nil { 114 119 return fmt.Errorf("failed to send slack message: %w", err) 115 120 } 116 121 since = createdAt 117 122 break 118 123 } else { 119 - log.Debugf("skipping report: %s", report) 124 + logger.Debug("skipping report", "report", report) 120 125 } 121 126 } 122 - log.Infof("... sleeping for %s", period) 127 + logger.Info("... sleeping", "period", period) 123 128 time.Sleep(period) 124 129 } 125 130 }
+3 -5
cmd/beemo/slack.go
··· 2 2 3 3 import ( 4 4 "bytes" 5 + "context" 5 6 "encoding/json" 6 7 "fmt" 7 8 "net/http" 8 9 9 10 "github.com/bluesky-social/indigo/util" 10 - 11 - "github.com/urfave/cli/v2" 12 11 ) 13 12 14 13 type SlackWebhookBody struct { ··· 17 16 18 17 // sends a simple slack message to a channel via "incoming webhook" 19 18 // The slack incoming webhook must be already configured in the slack workplace. 20 - func sendSlackMsg(cctx *cli.Context, msg string) error { 19 + func sendSlackMsg(ctx context.Context, msg, webhookURL string) error { 21 20 // loosely based on: https://golangcode.com/send-slack-messages-without-a-library/ 22 21 23 - webhookUrl := cctx.String("slack-webhook-url") 24 22 body, _ := json.Marshal(SlackWebhookBody{Text: msg}) 25 - req, err := http.NewRequest(http.MethodPost, webhookUrl, bytes.NewBuffer(body)) 23 + req, err := http.NewRequestWithContext(ctx, http.MethodPost, webhookURL, bytes.NewBuffer(body)) 26 24 if err != nil { 27 25 return err 28 26 }