this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

goat: add firehose verify args

+211 -11
+179 -11
cmd/goat/firehose.go
··· 11 11 "net/url" 12 12 "os" 13 13 "strings" 14 + "time" 14 15 15 16 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 17 "github.com/bluesky-social/indigo/atproto/data" 18 + "github.com/bluesky-social/indigo/atproto/identity" 17 19 "github.com/bluesky-social/indigo/atproto/repo" 18 20 "github.com/bluesky-social/indigo/atproto/syntax" 19 21 "github.com/bluesky-social/indigo/events" ··· 49 51 Usage: "only print account and identity events", 50 52 }, 51 53 &cli.BoolFlag{ 54 + Name: "quiet", 55 + Aliases: []string{"q"}, 56 + Usage: "don't actually print events to stdout (eg, errors only)", 57 + }, 58 + &cli.BoolFlag{ 59 + Name: "verify-basic", 60 + Usage: "parse events and do basic syntax and structure checks", 61 + }, 62 + &cli.BoolFlag{ 63 + Name: "verify-sig", 64 + Usage: "verify account signatures on commits", 65 + }, 66 + &cli.BoolFlag{ 67 + Name: "verify-mst", 68 + Usage: "run inductive verification of ops and MST structure", 69 + }, 70 + &cli.BoolFlag{ 52 71 Name: "ops", 53 72 Aliases: []string{"records"}, 54 73 Usage: "instead of printing entire events, print individual record ops", ··· 58 77 } 59 78 60 79 type GoatFirehoseConsumer struct { 61 - // for pretty-printing events to stdout 62 - EventLogger *slog.Logger 63 80 OpsMode bool 64 81 AccountsOnly bool 82 + Quiet bool 83 + VerifyBasic bool 84 + VerifySig bool 85 + VerifyMST bool 65 86 // filter to specified collections 66 87 CollectionFilter []string 88 + // for signature verification 89 + Dir identity.Directory 67 90 } 68 91 69 92 func runFirehose(cctx *cli.Context) error { 70 93 ctx := context.Background() 71 94 72 - slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stderr, nil))) 95 + slog.SetDefault(configLogger(cctx, os.Stderr)) 96 + 97 + // main thing is skipping handle verification 98 + bdir := identity.BaseDirectory{ 99 + SkipHandleVerification: true, 100 + TryAuthoritativeDNS: false, 101 + SkipDNSDomainSuffixes: []string{".bsky.social"}, 102 + UserAgent: "goat/" + versioninfo.Short(), 103 + } 104 + cdir := identity.NewCacheDirectory(&bdir, 1_000_000, time.Hour*24, time.Minute*2, time.Minute*5) 73 105 74 106 gfc := GoatFirehoseConsumer{ 75 - EventLogger: slog.New(slog.NewJSONHandler(os.Stdout, nil)), 76 107 OpsMode: cctx.Bool("ops"), 77 108 AccountsOnly: cctx.Bool("account-events"), 78 109 CollectionFilter: cctx.StringSlice("collection"), 110 + Quiet: cctx.Bool("quiet"), 111 + VerifyBasic: cctx.Bool("verify-basic"), 112 + VerifySig: cctx.Bool("verify-sig"), 113 + VerifyMST: cctx.Bool("verify-mst"), 114 + Dir: &cdir, 79 115 } 80 116 81 117 var relayHost string ··· 104 140 u.RawQuery = fmt.Sprintf("cursor=%d", cctx.Int("cursor")) 105 141 } 106 142 urlString := u.String() 107 - slog.Debug("GET", "url", urlString) 108 143 con, _, err := dialer.Dial(urlString, http.Header{ 109 144 "User-Agent": []string{fmt.Sprintf("goat/%s", versioninfo.Short())}, 110 145 }) ··· 114 149 115 150 rsc := &events.RepoStreamCallbacks{ 116 151 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 117 - slog.Debug("commit event", "did", evt.Repo, "seq", evt.Seq) 152 + //slog.Debug("commit event", "did", evt.Repo, "seq", evt.Seq) 118 153 if !gfc.AccountsOnly && !gfc.OpsMode { 119 154 return gfc.handleCommitEvent(ctx, evt) 120 155 } else if !gfc.AccountsOnly && gfc.OpsMode { ··· 123 158 return nil 124 159 }, 125 160 RepoSync: func(evt *comatproto.SyncSubscribeRepos_Sync) error { 126 - slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq) 161 + //slog.Debug("sync event", "did", evt.Did, "seq", evt.Seq) 127 162 if !gfc.AccountsOnly && !gfc.OpsMode { 128 163 return gfc.handleSyncEvent(ctx, evt) 129 164 } 130 165 return nil 131 166 }, 132 167 RepoIdentity: func(evt *comatproto.SyncSubscribeRepos_Identity) error { 133 - slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq) 168 + //slog.Debug("identity event", "did", evt.Did, "seq", evt.Seq) 134 169 if !gfc.OpsMode { 135 170 return gfc.handleIdentityEvent(ctx, evt) 136 171 } 137 172 return nil 138 173 }, 139 174 RepoAccount: func(evt *comatproto.SyncSubscribeRepos_Account) error { 140 - slog.Debug("account event", "did", evt.Did, "seq", evt.Seq) 175 + //slog.Debug("account event", "did", evt.Did, "seq", evt.Seq) 141 176 if !gfc.OpsMode { 142 177 return gfc.handleAccountEvent(ctx, evt) 143 178 } 144 179 return nil 145 180 }, 181 + RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { 182 + if gfc.VerifyBasic { 183 + slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq) 184 + } 185 + return nil 186 + }, 187 + RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { 188 + if gfc.VerifyBasic { 189 + slog.Info("deprecated event type", "eventType", "migrate", "did", evt.Did, "seq", evt.Seq) 190 + } 191 + return nil 192 + }, 193 + RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { 194 + if gfc.VerifyBasic { 195 + slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq) 196 + } 197 + return nil 198 + }, 146 199 } 147 200 148 201 scheduler := parallel.NewScheduler( ··· 156 209 } 157 210 158 211 func (gfc *GoatFirehoseConsumer) handleIdentityEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) error { 212 + if gfc.VerifySig { 213 + did, err := syntax.ParseDID(evt.Did) 214 + if err != nil { 215 + return err 216 + } 217 + gfc.Dir.Purge(ctx, did.AtIdentifier()) 218 + } 219 + if gfc.VerifyBasic { 220 + if _, err := syntax.ParseDID(evt.Did); err != nil { 221 + slog.Warn("invalid DID", "eventType", "identity", "did", evt.Did, "seq", evt.Seq) 222 + } 223 + } 224 + if gfc.Quiet { 225 + return nil 226 + } 159 227 out := make(map[string]interface{}) 160 228 out["type"] = "identity" 161 229 out["payload"] = evt ··· 168 236 } 169 237 170 238 func (gfc *GoatFirehoseConsumer) handleAccountEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Account) error { 239 + if gfc.VerifyBasic { 240 + if _, err := syntax.ParseDID(evt.Did); err != nil { 241 + slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq) 242 + } 243 + } 244 + if gfc.Quiet { 245 + return nil 246 + } 171 247 out := make(map[string]interface{}) 172 248 out["type"] = "account" 173 249 out["payload"] = evt ··· 184 260 if err != nil { 185 261 return err 186 262 } 263 + if gfc.VerifyBasic { 264 + if err := commit.VerifyStructure(); err != nil { 265 + slog.Warn("bad commit object", "eventType", "sync", "did", evt.Did, "seq", evt.Seq, "err", err) 266 + } 267 + if _, err := syntax.ParseDID(evt.Did); err != nil { 268 + slog.Warn("invalid DID", "eventType", "account", "did", evt.Did, "seq", evt.Seq) 269 + } 270 + } 271 + if gfc.Quiet { 272 + return nil 273 + } 187 274 evt.Blocks = nil 188 275 out := make(map[string]interface{}) 189 276 out["type"] = "sync" ··· 200 287 // this is the simple version, when not in "records" mode: print the event as JSON, but don't include blocks 201 288 func (gfc *GoatFirehoseConsumer) handleCommitEvent(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error { 202 289 290 + if gfc.VerifyBasic || gfc.VerifySig || gfc.VerifyMST { 291 + 292 + logger := slog.With("eventType", "commit", "did", evt.Repo, "seq", evt.Seq, "rev", evt.Rev) 293 + 294 + did, err := syntax.ParseDID(evt.Repo) 295 + if err != nil { 296 + return err 297 + } 298 + 299 + commit, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks)) 300 + if err != nil { 301 + return err 302 + } 303 + 304 + if gfc.VerifySig { 305 + ident, err := gfc.Dir.LookupDID(ctx, did) 306 + if err != nil { 307 + return err 308 + } 309 + pubkey, err := ident.PublicKey() 310 + if err != nil { 311 + return err 312 + } 313 + logger = logger.With("pds", ident.PDSEndpoint()) 314 + if err := commit.VerifySignature(pubkey); err != nil { 315 + logger.Warn("commit signature validation failed", "err", err) 316 + } 317 + } 318 + 319 + if len(evt.Blocks) == 0 { 320 + logger.Warn("commit message missing blocks") 321 + } 322 + 323 + if gfc.VerifyBasic { 324 + // the commit itself 325 + if err := commit.VerifyStructure(); err != nil { 326 + logger.Warn("bad commit object", "err", err) 327 + } 328 + // the event fields 329 + rev, err := syntax.ParseTID(evt.Rev) 330 + if err != nil { 331 + logger.Warn("bad TID syntax in commit rev", "err", err) 332 + } 333 + if rev.String() != commit.Rev { 334 + logger.Warn("event rev != commit rev", "commitRev", commit.Rev) 335 + } 336 + if did.String() != commit.DID { 337 + logger.Warn("event DID != commit DID", "commitDID", commit.DID) 338 + } 339 + _, err = syntax.ParseDatetime(evt.Time) 340 + if err != nil { 341 + logger.Warn("bad datetime syntax in commit time", "time", evt.Time, "err", err) 342 + } 343 + if evt.TooBig { 344 + logger.Warn("deprecated tooBig commit flag set") 345 + } 346 + if evt.Rebase { 347 + logger.Warn("deprecated rebase commit flag set") 348 + } 349 + } 350 + 351 + if gfc.VerifyMST { 352 + if evt.PrevData == nil { 353 + logger.Warn("prevData is nil, skipping MST check") 354 + } else { 355 + // TODO: break out this function in to smaller chunks 356 + if _, err := repo.VerifyCommitMessage(ctx, evt); err != nil { 357 + logger.Warn("failed to invert commit MST", "err", err) 358 + } 359 + } 360 + } 361 + } 362 + 363 + if gfc.Quiet { 364 + return nil 365 + } 366 + 203 367 // apply collections filter 204 368 if len(gfc.CollectionFilter) > 0 { 205 369 keep := false ··· 308 472 if err != nil { 309 473 return err 310 474 } 311 - fmt.Println(string(b)) 475 + if !gfc.Quiet { 476 + fmt.Println(string(b)) 477 + } 312 478 case "delete": 313 479 out["action"] = "delete" 314 480 b, err := json.Marshal(out) 315 481 if err != nil { 316 482 return err 317 483 } 318 - fmt.Println(string(b)) 484 + if !gfc.Quiet { 485 + fmt.Println(string(b)) 486 + } 319 487 default: 320 488 logger.Error("unexpected record op kind") 321 489 }
+7
cmd/goat/main.go
··· 23 23 Name: "goat", 24 24 Usage: "Go AT protocol CLI tool", 25 25 Version: versioninfo.Short(), 26 + Flags: []cli.Flag{ 27 + &cli.StringFlag{ 28 + Name: "log-level", 29 + Usage: "log verbosity level (eg: warn, info, debug)", 30 + EnvVars: []string{"GOAT_LOG_LEVEL", "GO_LOG_LEVEL", "LOG_LEVEL"}, 31 + }, 32 + }, 26 33 } 27 34 app.Commands = []*cli.Command{ 28 35 cmdRecordGet,
+25
cmd/goat/util.go
··· 3 3 import ( 4 4 "context" 5 5 "io" 6 + "log/slog" 6 7 "os" 8 + "strings" 7 9 8 10 "github.com/bluesky-social/indigo/atproto/identity" 9 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 + 13 + "github.com/urfave/cli/v2" 10 14 ) 11 15 12 16 func resolveIdent(ctx context.Context, arg string) (*identity.Identity, error) { ··· 42 46 } 43 47 return file, nil 44 48 } 49 + 50 + func configLogger(cctx *cli.Context, writer io.Writer) *slog.Logger { 51 + var level slog.Level 52 + switch strings.ToLower(cctx.String("log-level")) { 53 + case "error": 54 + level = slog.LevelError 55 + case "warn": 56 + level = slog.LevelWarn 57 + case "info": 58 + level = slog.LevelInfo 59 + case "debug": 60 + level = slog.LevelDebug 61 + default: 62 + level = slog.LevelInfo 63 + } 64 + logger := slog.New(slog.NewJSONHandler(writer, &slog.HandlerOptions{ 65 + Level: level, 66 + })) 67 + slog.SetDefault(logger) 68 + return logger 69 + }