this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

PR feedback mostly reorg a couple chains of actions into functions so bail out is easier

+121 -101
+121 -101
cmd/relay/bgs/validator.go
··· 137 137 did, err := syntax.ParseDID(msg.Repo) 138 138 if err != nil { 139 139 commitVerifyErrors.WithLabelValues(hostname, "did").Inc() 140 - return nil, err 140 + return nil, fmt.Errorf("invalid DID, %w", err) 141 141 } 142 142 rev, err := syntax.ParseTID(msg.Rev) 143 143 if err != nil { 144 144 commitVerifyErrors.WithLabelValues(hostname, "tid").Inc() 145 - return nil, err 145 + return nil, fmt.Errorf("invalid rev TID, %w", err) 146 146 } 147 147 if prevRoot != nil { 148 148 prevRev := prevRoot.GetRev() ··· 201 201 if commit.Rev != rev.String() { 202 202 commitVerifyErrors.WithLabelValues(hostname, "rev").Inc() 203 203 if !val.Sync11ErrorsAreWarnings { 204 - return nil, fmt.Errorf("rev did not match commit") 204 + return nil, fmt.Errorf("message rev != commit.rev") 205 205 } else { 206 206 logger.Warn("message rev != commit.rev") 207 207 } ··· 209 209 if commit.DID != did.String() { 210 210 commitVerifyErrors.WithLabelValues(hostname, "did2").Inc() 211 211 if !val.Sync11ErrorsAreWarnings { 212 - return nil, fmt.Errorf("rev did not match commit") 212 + return nil, fmt.Errorf("message did != commit.did") 213 213 } else { 214 214 logger.Warn("message did != commit.did") 215 215 } ··· 223 223 224 224 // load out all the records 225 225 for _, op := range msg.Ops { 226 - if (op.Action == "create" || op.Action == "update") && op.Cid != nil { 227 - c := (*cid.Cid)(op.Cid) 228 - nsid, rkey, err := syntax.ParseRepoPath(op.Path) 229 - if err != nil { 230 - commitVerifyErrors.WithLabelValues(hostname, "opp").Inc() 231 - if !val.Sync11ErrorsAreWarnings { 232 - return nil, fmt.Errorf("invalid repo path in ops list: %w", err) 233 - } else { 234 - logger.Warn("invalid repo path", "err", err) 235 - } 236 - } 237 - rcid, err := repoFragment.GetRecordCID(ctx, nsid, rkey) 238 - if err != nil { 239 - commitVerifyErrors.WithLabelValues(hostname, "rcid").Inc() 240 - if !val.Sync11ErrorsAreWarnings { 241 - return nil, err 242 - } else { 243 - logger.Warn("invalid record cid", "err", err) 244 - } 245 - } 246 - if *c != *rcid { 247 - commitVerifyErrors.WithLabelValues(hostname, "opc").Inc() 248 - if !val.Sync11ErrorsAreWarnings { 249 - return nil, fmt.Errorf("record op doesn't match MST tree value") 250 - } else { 251 - logger.Warn("record op doesn't match MST tree value") 252 - } 253 - } 254 - _, _, err = repoFragment.GetRecordBytes(ctx, nsid, rkey) 255 - if err != nil { 256 - commitVerifyErrors.WithLabelValues(hostname, "rec").Inc() 257 - if !val.Sync11ErrorsAreWarnings { 258 - return nil, err 259 - } else { 260 - logger.Warn("could not get record bytes", "err", err) 261 - } 262 - } 226 + err = val.verifyCommitMessageOp(ctx, op, hostname, repoFragment, logger) 227 + if err != nil { 228 + // Sync11ErrorsAreWarnings is handled within verifyCommitMessageOp(), at this level, errors are errors 229 + return nil, err 263 230 } 264 231 } 265 232 ··· 284 251 } 285 252 286 253 if msg.PrevData != nil { 287 - c := (*cid.Cid)(msg.PrevData) 288 - if prevRoot != nil { 289 - if *c != prevRoot.GetCid() { 290 - commitVerifyWarnings.WithLabelValues(hostname, "pr").Inc() 291 - val.inductionTraceLog.Warn("commit prevData mismatch", "seq", msg.Seq, "pdsHost", host.Host, "repo", msg.Repo) 292 - hasWarning = true 293 - } 294 - } else { 295 - // see counter below for okish "new" 254 + err = val.verifyCommitMessagePrevData(msg, prevRoot, host, logger, repoFragment, hasWarning) 255 + if err != nil { 256 + // Sync11ErrorsAreWarnings is handled within, at this level errors are errors 257 + return nil, err 296 258 } 259 + } else { 260 + // this source is still on old protocol without new prevData field 261 + commitVerifyOkish.WithLabelValues(hostname, "old").Inc() 262 + } 297 263 298 - // check internal consistency that claimed previous root matches the rest of this message 299 - ops, err := ParseCommitOps(msg.Ops) 264 + return repoFragment, nil 265 + } 266 + 267 + func (val *Validator) verifyCommitMessageOp(ctx context.Context, op *atproto.SyncSubscribeRepos_RepoOp, hostname string, repoFragment *atrepo.Repo, logger *slog.Logger) error { 268 + if (op.Action == "create" || op.Action == "update") && op.Cid != nil { 269 + c := (*cid.Cid)(op.Cid) 270 + nsid, rkey, err := syntax.ParseRepoPath(op.Path) 300 271 if err != nil { 301 - commitVerifyErrors.WithLabelValues(hostname, "pop").Inc() 272 + commitVerifyErrors.WithLabelValues(hostname, "opp").Inc() 302 273 if !val.Sync11ErrorsAreWarnings { 303 - return nil, err 274 + return fmt.Errorf("invalid repo path in ops list: %w", err) 304 275 } else { 305 - logger.Warn("invalid commit ops", "err", err) 276 + logger.Warn("invalid repo path", "err", err) 277 + return nil 306 278 } 307 279 } 308 - ops, err = atrepo.NormalizeOps(ops) 280 + rcid, err := repoFragment.GetRecordCID(ctx, nsid, rkey) 309 281 if err != nil { 310 - commitVerifyErrors.WithLabelValues(hostname, "nop").Inc() 282 + commitVerifyErrors.WithLabelValues(hostname, "rcid").Inc() 311 283 if !val.Sync11ErrorsAreWarnings { 312 - return nil, err 284 + return err 313 285 } else { 314 - logger.Warn("could not normalize ops", "err", err) 286 + logger.Warn("could not get record cid for nsid/rkey path", "err", err, "nsid", nsid, "rkey", rkey) 287 + return nil 315 288 } 316 289 } 317 - 318 - invTree := repoFragment.MST.Copy() 319 - for _, op := range ops { 320 - if err := atrepo.InvertOp(&invTree, &op); err != nil { 321 - commitVerifyErrors.WithLabelValues(hostname, "inv").Inc() 322 - if !val.Sync11ErrorsAreWarnings { 323 - return nil, err 324 - } else { 325 - logger.Warn("could not invert op", "err", err) 326 - } 290 + if *c != *rcid { 291 + commitVerifyErrors.WithLabelValues(hostname, "opc").Inc() 292 + if !val.Sync11ErrorsAreWarnings { 293 + return fmt.Errorf("record op doesn't match MST tree value") 294 + } else { 295 + logger.Warn("record op doesn't match MST tree value") 296 + return nil 327 297 } 328 298 } 329 - computed, err := invTree.RootCID() 299 + _, _, err = repoFragment.GetRecordBytes(ctx, nsid, rkey) 330 300 if err != nil { 331 - commitVerifyErrors.WithLabelValues(hostname, "it").Inc() 301 + commitVerifyErrors.WithLabelValues(hostname, "rec").Inc() 332 302 if !val.Sync11ErrorsAreWarnings { 333 - return nil, err 303 + return err 334 304 } else { 335 - logger.Warn("inverted tree could not get root cid", "err", err) 305 + logger.Warn("could not get record bytes", "err", err, "nsid", nsid, "rkey", rkey) 306 + return nil 336 307 } 337 308 } 338 - if *computed != *c { 339 - // this is self-inconsistent malformed data 340 - commitVerifyErrors.WithLabelValues(hostname, "pd").Inc() 309 + } 310 + return nil 311 + } 312 + 313 + func (val *Validator) verifyCommitMessagePrevData(msg *atproto.SyncSubscribeRepos_Commit, prevRoot *AccountPreviousState, host *models.PDS, logger *slog.Logger, repoFragment *atrepo.Repo, hasWarning bool) error { 314 + hostname := host.Host 315 + c := (*cid.Cid)(msg.PrevData) 316 + if prevRoot != nil { 317 + if *c != prevRoot.GetCid() { 318 + commitVerifyWarnings.WithLabelValues(hostname, "pr").Inc() 319 + val.inductionTraceLog.Warn("commit prevData mismatch", "seq", msg.Seq, "pdsHost", host.Host, "repo", msg.Repo) 320 + hasWarning = true 321 + } 322 + } else { 323 + // see counter below for okish "new" 324 + } 325 + 326 + // check internal consistency that claimed previous root matches the rest of this message 327 + ops, err := ParseCommitOps(msg.Ops) 328 + if err != nil { 329 + commitVerifyErrors.WithLabelValues(hostname, "pop").Inc() 330 + if !val.Sync11ErrorsAreWarnings { 331 + return err 332 + } else { 333 + logger.Warn("invalid commit ops", "err", err) 334 + return nil 335 + } 336 + } 337 + ops, err = atrepo.NormalizeOps(ops) 338 + if err != nil { 339 + commitVerifyErrors.WithLabelValues(hostname, "nop").Inc() 340 + if !val.Sync11ErrorsAreWarnings { 341 + return err 342 + } else { 343 + logger.Warn("could not normalize ops", "err", err) 344 + return nil 345 + } 346 + } 347 + 348 + invTree := repoFragment.MST.Copy() 349 + for _, op := range ops { 350 + if err := atrepo.InvertOp(&invTree, &op); err != nil { 351 + commitVerifyErrors.WithLabelValues(hostname, "inv").Inc() 341 352 if !val.Sync11ErrorsAreWarnings { 342 - return nil, fmt.Errorf("inverted tree root didn't match prevData") 353 + return err 343 354 } else { 344 - logger.Warn("inverted tree root didn't match prevData") 355 + logger.Warn("could not invert op", "err", err) 356 + return nil 345 357 } 346 358 } 347 - //logger.Debug("prevData matched", "prevData", c.String(), "computed", computed.String()) 348 - 349 - if prevRoot == nil { 350 - commitVerifyOkish.WithLabelValues(hostname, "new").Inc() 351 - } else if hasWarning { 352 - commitVerifyOkish.WithLabelValues(hostname, "warn").Inc() 359 + } 360 + computed, err := invTree.RootCID() 361 + if err != nil { 362 + commitVerifyErrors.WithLabelValues(hostname, "it").Inc() 363 + if !val.Sync11ErrorsAreWarnings { 364 + return err 353 365 } else { 354 - // TODO: would it be better to make everything "okish"? 355 - // commitVerifyOkish.WithLabelValues(hostname, "ok").Inc() 356 - commitVerifyOk.WithLabelValues(hostname).Inc() 366 + logger.Warn("inverted tree could not get root cid", "err", err) 367 + return nil 368 + } 369 + } 370 + if *computed != *c { 371 + // this is self-inconsistent malformed data 372 + commitVerifyErrors.WithLabelValues(hostname, "pd").Inc() 373 + if !val.Sync11ErrorsAreWarnings { 374 + return fmt.Errorf("inverted tree root didn't match prevData") 375 + } else { 376 + logger.Warn("inverted tree root didn't match prevData") 377 + return nil 357 378 } 358 - } else { 359 - // this source is still on old protocol without new prevData field 360 - commitVerifyOkish.WithLabelValues(hostname, "old").Inc() 361 379 } 380 + //logger.Debug("prevData matched", "prevData", c.String(), "computed", computed.String()) 362 381 363 - return repoFragment, nil 382 + if prevRoot == nil { 383 + commitVerifyOkish.WithLabelValues(hostname, "new").Inc() 384 + } else if hasWarning { 385 + commitVerifyOkish.WithLabelValues(hostname, "warn").Inc() 386 + } else { 387 + // TODO: would it be better to make everything "okish"? 388 + // commitVerifyOkish.WithLabelValues(hostname, "ok").Inc() 389 + commitVerifyOk.WithLabelValues(hostname).Inc() 390 + } 391 + return nil 364 392 } 365 393 366 394 // HandleSync checks signed commit from a #sync message ··· 372 400 did, err := syntax.ParseDID(msg.Did) 373 401 if err != nil { 374 402 syncVerifyErrors.WithLabelValues(hostname, "did").Inc() 375 - if !val.Sync11ErrorsAreWarnings { 376 - return nil, err 377 - } else { 378 - logger.Warn("invalid did", "err", err) 379 - } 403 + return nil, fmt.Errorf("invalid DID, %w", err) 380 404 } 381 405 rev, err := syntax.ParseTID(msg.Rev) 382 406 if err != nil { 383 407 syncVerifyErrors.WithLabelValues(hostname, "tid").Inc() 384 - if !val.Sync11ErrorsAreWarnings { 385 - return nil, err 386 - } else { 387 - logger.Warn("invalid rev", "err", err) 388 - } 408 + return nil, fmt.Errorf("invalid rev TID, %w", err) 389 409 } 390 410 if rev.Time().After(time.Now().Add(val.maxRevFuture)) { 391 411 syncVerifyErrors.WithLabelValues(hostname, "revf").Inc()