this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

treat sync1.1 errors as warnings

+140 -36
+5 -1
cmd/relay/bgs/bgs.go
··· 100 100 101 101 // AdminToken checked against "Authorization: Bearer {}" header 102 102 AdminToken string 103 + 104 + Sync11ErrorsAreWarnings bool 103 105 } 104 106 105 107 func DefaultBGSConfig() *BGSConfig { ··· 111 113 } 112 114 } 113 115 114 - func NewBGS(db *gorm.DB, validator *Validator, evtman *events.EventManager, didd identity.Directory, config *BGSConfig) (*BGS, error) { 116 + func NewBGS(db *gorm.DB, evtman *events.EventManager, didd identity.Directory, config *BGSConfig) (*BGS, error) { 115 117 116 118 if config == nil { 117 119 config = DefaultBGSConfig() ··· 130 132 } 131 133 132 134 uc, _ := lru.New[string, *Account](1_000_000) 135 + 136 + validator := NewValidator(didd, config.InductionTraceLog, config.Sync11ErrorsAreWarnings) 133 137 134 138 bgs := &BGS{ 135 139 db: db,
+127 -31
cmd/relay/bgs/validator.go
··· 20 20 21 21 const defaultMaxRevFuture = time.Hour 22 22 23 - func NewValidator(directory identity.Directory, inductionTraceLog *slog.Logger) *Validator { 23 + func NewValidator(directory identity.Directory, inductionTraceLog *slog.Logger, sync11ErrorsAreWarnings bool) *Validator { 24 24 maxRevFuture := defaultMaxRevFuture // TODO: configurable 25 25 ErrRevTooFarFuture := fmt.Errorf("new rev is > %s in the future", maxRevFuture) 26 26 ··· 30 30 inductionTraceLog: inductionTraceLog, 31 31 directory: directory, 32 32 33 - maxRevFuture: maxRevFuture, 34 - ErrRevTooFarFuture: ErrRevTooFarFuture, 35 - AllowSignatureNotFound: true, // TODO: configurable 33 + maxRevFuture: maxRevFuture, 34 + ErrRevTooFarFuture: ErrRevTooFarFuture, 35 + AllowSignatureNotFound: true, // TODO: configurable 36 + Sync11ErrorsAreWarnings: sync11ErrorsAreWarnings, 36 37 } 37 38 } 38 39 ··· 56 57 // AllowSignatureNotFound enables counting messages without findable public key to pass through with a warning counter 57 58 // TODO: refine this for what kind of 'not found' we accept. 58 59 AllowSignatureNotFound bool 60 + 61 + Sync11ErrorsAreWarnings bool 59 62 } 60 63 61 64 type NextCommitHandler interface { ··· 129 132 hostname := host.Host 130 133 hasWarning := false 131 134 commitVerifyStarts.Inc() 132 - logger := slog.Default().With("did", msg.Repo, "rev", msg.Rev, "seq", msg.Seq, "time", msg.Time) 135 + logger := slog.Default().With("host", hostname, "did", msg.Repo, "rev", msg.Rev, "seq", msg.Seq, "time", msg.Time) 133 136 134 137 did, err := syntax.ParseDID(msg.Repo) 135 138 if err != nil { ··· 147 150 prevTime := prevRev.Time() 148 151 if curTime.Before(prevTime) { 149 152 commitVerifyErrors.WithLabelValues(hostname, "revb").Inc() 150 - dt := prevTime.Sub(curTime) 151 - return nil, &revOutOfOrderError{dt} 153 + if !val.Sync11ErrorsAreWarnings { 154 + dt := prevTime.Sub(curTime) 155 + return nil, &revOutOfOrderError{dt} 156 + } else { 157 + logger.Warn("new rev before old rev", "prev rev", prevRev, "rev", rev) 158 + } 152 159 } 153 160 } 154 161 if rev.Time().After(time.Now().Add(val.maxRevFuture)) { 155 162 commitVerifyErrors.WithLabelValues(hostname, "revf").Inc() 156 - return nil, val.ErrRevTooFarFuture 163 + if !val.Sync11ErrorsAreWarnings { 164 + return nil, val.ErrRevTooFarFuture 165 + } else { 166 + logger.Warn("far future rev", "now", time.Now(), "rev", rev.Time(), "err", err) 167 + } 157 168 } 158 169 _, err = syntax.ParseDatetime(msg.Time) 159 170 if err != nil { 160 171 commitVerifyErrors.WithLabelValues(hostname, "time").Inc() 161 - return nil, err 172 + if !val.Sync11ErrorsAreWarnings { 173 + return nil, err 174 + } else { 175 + logger.Warn("invalid time", "err", err) 176 + } 162 177 } 163 178 164 179 if msg.TooBig { ··· 177 192 commit, repoFragment, err := atrepo.LoadFromCAR(ctx, bytes.NewReader([]byte(msg.Blocks))) 178 193 if err != nil { 179 194 commitVerifyErrors.WithLabelValues(hostname, "car").Inc() 180 - return nil, err 195 + if !val.Sync11ErrorsAreWarnings { 196 + return nil, err 197 + } else { 198 + logger.Warn("invalid car", "err", err) 199 + } 181 200 } 182 201 183 202 if commit.Rev != rev.String() { 184 203 commitVerifyErrors.WithLabelValues(hostname, "rev").Inc() 185 - return nil, fmt.Errorf("rev did not match commit") 204 + if !val.Sync11ErrorsAreWarnings { 205 + return nil, fmt.Errorf("rev did not match commit") 206 + } else { 207 + logger.Warn("message rev != commit.rev") 208 + } 186 209 } 187 210 if commit.DID != did.String() { 188 211 commitVerifyErrors.WithLabelValues(hostname, "did2").Inc() 189 - return nil, fmt.Errorf("rev did not match commit") 212 + if !val.Sync11ErrorsAreWarnings { 213 + return nil, fmt.Errorf("rev did not match commit") 214 + } else { 215 + logger.Warn("message did != commit.did") 216 + } 190 217 } 191 218 192 219 err = val.VerifyCommitSignature(ctx, commit, hostname, &hasWarning) ··· 202 229 nsid, rkey, err := syntax.ParseRepoPath(op.Path) 203 230 if err != nil { 204 231 commitVerifyErrors.WithLabelValues(hostname, "opp").Inc() 205 - return nil, fmt.Errorf("invalid repo path in ops list: %w", err) 232 + if !val.Sync11ErrorsAreWarnings { 233 + return nil, fmt.Errorf("invalid repo path in ops list: %w", err) 234 + } else { 235 + logger.Warn("invalid repo path", "err", err) 236 + } 206 237 } 207 - val, err := repoFragment.GetRecordCID(ctx, nsid, rkey) 238 + rcid, err := repoFragment.GetRecordCID(ctx, nsid, rkey) 208 239 if err != nil { 209 240 commitVerifyErrors.WithLabelValues(hostname, "rcid").Inc() 210 - return nil, err 241 + if !val.Sync11ErrorsAreWarnings { 242 + return nil, err 243 + } else { 244 + logger.Warn("invalid record cid", "err", err) 245 + } 211 246 } 212 - if *c != *val { 247 + if *c != *rcid { 213 248 commitVerifyErrors.WithLabelValues(hostname, "opc").Inc() 214 - return nil, fmt.Errorf("record op doesn't match MST tree value") 249 + if !val.Sync11ErrorsAreWarnings { 250 + return nil, fmt.Errorf("record op doesn't match MST tree value") 251 + } else { 252 + logger.Warn("record op doesn't match MST tree value") 253 + } 215 254 } 216 255 _, _, err = repoFragment.GetRecordBytes(ctx, nsid, rkey) 217 256 if err != nil { 218 257 commitVerifyErrors.WithLabelValues(hostname, "rec").Inc() 219 - return nil, err 258 + if !val.Sync11ErrorsAreWarnings { 259 + return nil, err 260 + } else { 261 + logger.Warn("could not get record bytes", "err", err) 262 + } 220 263 } 221 264 } 222 265 } ··· 257 300 ops, err := ParseCommitOps(msg.Ops) 258 301 if err != nil { 259 302 commitVerifyErrors.WithLabelValues(hostname, "pop").Inc() 260 - return nil, err 303 + if !val.Sync11ErrorsAreWarnings { 304 + return nil, err 305 + } else { 306 + logger.Warn("invalid commit ops", "err", err) 307 + } 261 308 } 262 309 ops, err = atrepo.NormalizeOps(ops) 263 310 if err != nil { 264 311 commitVerifyErrors.WithLabelValues(hostname, "nop").Inc() 265 - return nil, err 312 + if !val.Sync11ErrorsAreWarnings { 313 + return nil, err 314 + } else { 315 + logger.Warn("could not normalize ops", "err", err) 316 + } 266 317 } 267 318 268 319 invTree := repoFragment.MST.Copy() 269 320 for _, op := range ops { 270 321 if err := atrepo.InvertOp(&invTree, &op); err != nil { 271 322 commitVerifyErrors.WithLabelValues(hostname, "inv").Inc() 272 - return nil, err 323 + if !val.Sync11ErrorsAreWarnings { 324 + return nil, err 325 + } else { 326 + logger.Warn("could not invert op", "err", err) 327 + } 273 328 } 274 329 } 275 330 computed, err := invTree.RootCID() 276 331 if err != nil { 277 332 commitVerifyErrors.WithLabelValues(hostname, "it").Inc() 278 - return nil, err 333 + if !val.Sync11ErrorsAreWarnings { 334 + return nil, err 335 + } else { 336 + logger.Warn("inverted tree could not get root cid", "err", err) 337 + } 279 338 } 280 339 if *computed != *c { 281 340 // this is self-inconsistent malformed data 282 341 commitVerifyErrors.WithLabelValues(hostname, "pd").Inc() 283 - return nil, fmt.Errorf("inverted tree root didn't match prevData") 342 + if !val.Sync11ErrorsAreWarnings { 343 + return nil, fmt.Errorf("inverted tree root didn't match prevData") 344 + } else { 345 + logger.Warn("inverted tree root didn't match prevData") 346 + } 284 347 } 285 348 //logger.Debug("prevData matched", "prevData", c.String(), "computed", computed.String()) 286 349 ··· 305 368 func (val *Validator) HandleSync(ctx context.Context, host *models.PDS, msg *atproto.SyncSubscribeRepos_Sync) (newRoot *cid.Cid, err error) { 306 369 hostname := host.Host 307 370 hasWarning := false 371 + logger := slog.Default().With("host", hostname, "did", msg.Did, "rev", msg.Rev, "seq", msg.Seq, "time", msg.Time) 308 372 309 373 did, err := syntax.ParseDID(msg.Did) 310 374 if err != nil { 311 375 syncVerifyErrors.WithLabelValues(hostname, "did").Inc() 312 - return nil, err 376 + if !val.Sync11ErrorsAreWarnings { 377 + return nil, err 378 + } else { 379 + logger.Warn("invalid did", "err", err) 380 + } 313 381 } 314 382 rev, err := syntax.ParseTID(msg.Rev) 315 383 if err != nil { 316 384 syncVerifyErrors.WithLabelValues(hostname, "tid").Inc() 317 - return nil, err 385 + if !val.Sync11ErrorsAreWarnings { 386 + return nil, err 387 + } else { 388 + logger.Warn("invalid rev", "err", err) 389 + } 318 390 } 319 391 if rev.Time().After(time.Now().Add(val.maxRevFuture)) { 320 392 syncVerifyErrors.WithLabelValues(hostname, "revf").Inc() 321 - return nil, val.ErrRevTooFarFuture 393 + if !val.Sync11ErrorsAreWarnings { 394 + return nil, val.ErrRevTooFarFuture 395 + } else { 396 + logger.Warn("invalid rev too far future", "now", time.Now(), "rev", rev.Time()) 397 + } 322 398 } 323 399 _, err = syntax.ParseDatetime(msg.Time) 324 400 if err != nil { 325 401 syncVerifyErrors.WithLabelValues(hostname, "time").Inc() 326 - return nil, err 402 + if !val.Sync11ErrorsAreWarnings { 403 + return nil, err 404 + } else { 405 + logger.Warn("invalid time", "err", err) 406 + } 327 407 } 328 408 329 409 commit, err := atrepo.LoadCARCommit(ctx, bytes.NewReader([]byte(msg.Blocks))) 330 410 if err != nil { 331 411 commitVerifyErrors.WithLabelValues(hostname, "car").Inc() 332 - return nil, err 412 + if !val.Sync11ErrorsAreWarnings { 413 + return nil, err 414 + } else { 415 + logger.Warn("invalid car", "err", err) 416 + } 333 417 } 334 418 335 419 if commit.Rev != rev.String() { 336 420 commitVerifyErrors.WithLabelValues(hostname, "rev").Inc() 337 - return nil, fmt.Errorf("rev did not match commit") 421 + if !val.Sync11ErrorsAreWarnings { 422 + return nil, fmt.Errorf("rev did not match commit") 423 + } else { 424 + logger.Warn("message rev != commit.rev") 425 + } 338 426 } 339 427 if commit.DID != did.String() { 340 428 commitVerifyErrors.WithLabelValues(hostname, "did2").Inc() 341 - return nil, fmt.Errorf("rev did not match commit") 429 + if !val.Sync11ErrorsAreWarnings { 430 + return nil, fmt.Errorf("did did not match commit") 431 + } else { 432 + logger.Warn("message did != commit.did") 433 + } 342 434 } 343 435 344 436 err = val.VerifyCommitSignature(ctx, commit, hostname, &hasWarning) 345 437 if err != nil { 346 438 // signature errors are metrics counted inside VerifyCommitSignature() 347 - return nil, err 439 + if !val.Sync11ErrorsAreWarnings { 440 + return nil, err 441 + } else { 442 + logger.Warn("invalid sig", "err", err) 443 + } 348 444 } 349 445 350 446 return &commit.Data, nil
+8 -4
cmd/relay/main.go
··· 151 151 Value: false, 152 152 Usage: "make outbound firehose sequence number approximately unix microseconds", 153 153 }, 154 + &cli.BoolFlag{ 155 + Name: "sync11-errors-are-warnings", 156 + EnvVars: []string{"RELAY_SYNC11_JUST_WARN"}, 157 + Value: true, 158 + Usage: "don't fail traffic on sync1.1 errors", 159 + }, 154 160 } 155 161 156 162 app.Action = runRelay ··· 211 217 } 212 218 cacheDir := identity.NewCacheDirectory(&baseDir, cctx.Int("did-cache-size"), time.Hour*24, time.Minute*2, time.Minute*5) 213 219 214 - // TODO: rename repoman 215 - repoman := libbgs.NewValidator(&cacheDir, inductionTraceLog) 216 - 217 220 var persister events.EventPersistence 218 221 219 222 dpd := cctx.String("disk-persister-dir") ··· 264 267 bgsConfig.DefaultRepoLimit = cctx.Int64("default-repo-limit") 265 268 bgsConfig.ApplyPDSClientSettings = makePdsClientSetup(ratelimitBypass) 266 269 bgsConfig.InductionTraceLog = inductionTraceLog 270 + bgsConfig.Sync11ErrorsAreWarnings = cctx.Bool("sync11-errors-are-warnings") 267 271 nextCrawlers := cctx.StringSlice("next-crawler") 268 272 if len(nextCrawlers) != 0 { 269 273 nextCrawlerUrls := make([]*url.URL, len(nextCrawlers)) ··· 285 289 bgsConfig.AdminToken = base64.URLEncoding.EncodeToString(rblob[:]) 286 290 logger.Info("generated random admin key", "header", "Authorization: Bearer "+bgsConfig.AdminToken) 287 291 } 288 - bgs, err := libbgs.NewBGS(db, repoman, evtman, &cacheDir, bgsConfig) 292 + bgs, err := libbgs.NewBGS(db, evtman, &cacheDir, bgsConfig) 289 293 if err != nil { 290 294 return err 291 295 }