this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

rewrite validation code

+219 -476
+1 -2
cmd/rerelay/main.go
··· 257 257 } 258 258 259 259 evtman := eventmgr.NewEventManager(persister) 260 - vldtr := relay.NewValidator(&dir) 261 260 262 261 logger.Info("constructing relay service") 263 - r, err := relay.NewRelay(db, vldtr, evtman, &dir, relayConfig) 262 + r, err := relay.NewRelay(db, evtman, &dir, relayConfig) 264 263 if err != nil { 265 264 return err 266 265 }
+2 -3
cmd/rerelay/relay/account.go
··· 11 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 12 "github.com/bluesky-social/indigo/cmd/rerelay/relay/models" 13 13 14 - "github.com/ipfs/go-cid" 15 14 "gorm.io/gorm" 16 15 ) 17 16 ··· 261 260 return accounts, nil 262 261 } 263 262 264 - func (r *Relay) UpsertAccountRepo(uid uint64, rev syntax.TID, commitCID, commitDataCID cid.Cid) error { 265 - return r.db.Exec("INSERT INTO account_repo (uid, rev, commit_cid, commit_data) VALUES (?, ?, ?, ?) ON CONFLICT (uid) DO UPDATE SET rev = EXCLUDED.rev, commit_cid = EXCLUDED.commit_cid, commit_data = EXCLUDED.commit_data", uid, rev, commitCID.String(), commitDataCID.String()).Error 263 + func (r *Relay) UpsertAccountRepo(uid uint64, rev syntax.TID, commitCID, commitDataCID string) error { 264 + return r.db.Exec("INSERT INTO account_repo (uid, rev, commit_cid, commit_data) VALUES (?, ?, ?, ?) ON CONFLICT (uid) DO UPDATE SET rev = EXCLUDED.rev, commit_cid = EXCLUDED.commit_cid, commit_data = EXCLUDED.commit_data", uid, rev, commitCID, commitDataCID).Error 266 265 } 267 266 268 267 // this function with exact name and args implements the `diskpersist.UidSource` interface
cmd/rerelay/relay/firehose.go cmd/rerelay/relay/broadcast.go
+30 -34
cmd/rerelay/relay/ingest.go
··· 11 11 "github.com/bluesky-social/indigo/cmd/rerelay/relay/models" 12 12 "github.com/bluesky-social/indigo/cmd/rerelay/stream" 13 13 14 - "github.com/ipfs/go-cid" 15 14 "go.opentelemetry.io/otel/attribute" 16 15 "gorm.io/gorm" 17 16 ) ··· 65 64 if err != nil { 66 65 return fmt.Errorf("invalid DID in message: %w", err) 67 66 } 67 + 68 68 // XXX: did = did.Normalize() 69 69 account, err := r.GetAccount(ctx, did) 70 70 if err != nil { ··· 81 81 return err 82 82 } 83 83 } 84 + 84 85 if account == nil { 85 86 return ErrAccountNotFound 86 87 } ··· 134 135 } 135 136 } 136 137 137 - // TODO: very messy fetch code here 138 - var repo *models.AccountRepo 139 - err = r.db.First(repo, account.UID).Error 140 - if err != nil && !errors.Is(err, gorm.ErrRecordNotFound) { 141 - logger.Error("failed to get previous root", "err", err) 142 - repo = nil 138 + ident, err := r.dir.LookupDID(ctx, did) 139 + if err != nil { 140 + // XXX: handle more granularly (eg, true not-founds vs errors); and add tests 141 + logger.Warn("failed to load identity") 143 142 } 144 - var prevRev *syntax.TID 145 - var prevData *cid.Cid 146 - if repo != nil { 147 - c, err := cid.Parse(repo.CommitData) 148 - if err != nil { 149 - return fmt.Errorf("parsing commitDataCID from database: %w", err) 143 + 144 + var prevRepo *models.AccountRepo 145 + err = r.db.First(prevRepo, account.UID).Error 146 + if err != nil { 147 + if !errors.Is(err, gorm.ErrRecordNotFound) { 148 + // TODO: should this be a hard error? 149 + logger.Error("failed to read previous repo state", "err", err) 150 150 } 151 - prevData = &c 152 - t := syntax.TID(repo.Rev) 153 - prevRev = &t 154 - } 155 - evtPrevDataStr := "" 156 - if evt.PrevData != nil { 157 - evtPrevDataStr = ((*cid.Cid)(evt.PrevData)).String() 151 + prevRepo = nil 158 152 } 159 - commitDataCID, err := r.Validator.HandleCommit(ctx, hostname, account, evt, prevRev, prevData) 153 + 154 + // most commit validation happens in this method. Note that is handles lenient/strict modes. 155 + newRepo, err := r.VerifyRepoCommit(ctx, evt, ident, prevRepo, hostname) 160 156 if err != nil { 161 - // XXX: induction trace log 162 - logger.Error("commit bad", "prevData", evtPrevDataStr, "err", err) 163 - logger.Warn("failed handling event", "err", err, "commitCID", evt.Commit.String()) 164 - return fmt.Errorf("handle user event failed: %w", err) 157 + logger.Warn("commit message failed verification", "err", err) 158 + return err 165 159 } 166 160 167 - // TID syntax has been verified by validator 168 - rev := syntax.TID(evt.Rev) 169 - 170 - err = r.UpsertAccountRepo(account.UID, rev, cid.Cid(evt.Commit), *commitDataCID) 161 + err = r.UpsertAccountRepo(account.UID, syntax.TID(newRepo.Rev), newRepo.CommitCID, newRepo.CommitData) 171 162 if err != nil { 172 - return fmt.Errorf("failed to set previous root uid=%d: %w", account.UID, err) 163 + return fmt.Errorf("failed to upsert account repo (%s): %w", account.DID, err) 173 164 } 174 165 175 166 // Broadcast the identity event to all consumers 167 + // TODO: is this copy important? 176 168 commitCopy := *evt 177 169 err = r.Events.AddEvent(ctx, &stream.XRPCStreamEvent{ 178 170 RepoCommit: &commitCopy, ··· 209 201 return fmt.Errorf("could not get user for did %#v: %w", evt.Did, err) 210 202 } 211 203 212 - commitCID, commitDataCID, err := r.Validator.HandleSync(ctx, hostname, evt) 204 + ident, err := r.dir.LookupDID(ctx, did) 205 + if err != nil { 206 + // XXX: handle more granularly (eg, true not-founds vs errors); and add tests 207 + logger.Warn("failed to load identity") 208 + } 209 + 210 + newRepo, err := r.VerifyRepoSync(ctx, evt, ident, hostname) 213 211 if err != nil { 214 212 return err 215 213 } 216 - // TID syntax has been verified by validator 217 - rev := syntax.TID(evt.Rev) 218 214 219 215 // TODO: should this happen before or after firehose persist/broadcast? 220 - err = r.UpsertAccountRepo(account.UID, rev, *commitCID, *commitDataCID) 216 + err = r.UpsertAccountRepo(account.UID, syntax.TID(newRepo.Rev), newRepo.CommitCID, newRepo.CommitData) 221 217 if err != nil { 222 218 return fmt.Errorf("failed to upsert repo state (uid %d): %w", account.UID, err) 223 219 }
+2 -3
cmd/rerelay/relay/relay.go
··· 22 22 Logger *slog.Logger 23 23 Slurper *Slurper 24 24 Events *eventmgr.EventManager 25 - Validator *Validator 26 25 HostChecker HostChecker 27 26 Config RelayConfig 28 27 ··· 47 46 MaxQueuePerHost int64 48 47 ApplyHostClientSettings func(c *xrpc.Client) 49 48 SkipAccountHostCheck bool // XXX: only used for testing 49 + LenientSyncValidation bool // XXX: wire through config 50 50 51 51 // if true, ignore "requestCrawl" 52 52 DisableNewHosts bool ··· 62 62 } 63 63 } 64 64 65 - func NewRelay(db *gorm.DB, vldtr *Validator, evtman *eventmgr.EventManager, dir identity.Directory, config *RelayConfig) (*Relay, error) { 65 + func NewRelay(db *gorm.DB, evtman *eventmgr.EventManager, dir identity.Directory, config *RelayConfig) (*Relay, error) { 66 66 67 67 if config == nil { 68 68 config = DefaultRelayConfig() ··· 77 77 dir: dir, 78 78 Logger: slog.Default().With("system", "relay"), 79 79 Events: evtman, 80 - Validator: vldtr, 81 80 HostChecker: hc, 82 81 Config: *config, 83 82
-432
cmd/rerelay/relay/validator.go
··· 1 - package relay 2 - 3 - import ( 4 - "bytes" 5 - "context" 6 - "fmt" 7 - "log/slog" 8 - "sync" 9 - "sync/atomic" 10 - "time" 11 - 12 - comatproto "github.com/bluesky-social/indigo/api/atproto" 13 - "github.com/bluesky-social/indigo/atproto/identity" 14 - "github.com/bluesky-social/indigo/atproto/repo" 15 - "github.com/bluesky-social/indigo/atproto/syntax" 16 - "github.com/bluesky-social/indigo/cmd/rerelay/relay/models" 17 - 18 - "github.com/ipfs/go-cid" 19 - "go.opentelemetry.io/otel" 20 - ) 21 - 22 - const defaultMaxRevFuture = time.Hour 23 - 24 - func NewValidator(directory identity.Directory) *Validator { 25 - maxRevFuture := defaultMaxRevFuture // TODO: configurable 26 - ErrRevTooFarFuture := fmt.Errorf("new rev is > %s in the future", maxRevFuture) 27 - 28 - return &Validator{ 29 - userLocks: make(map[uint64]*userLock), 30 - log: slog.Default().With("system", "validator"), 31 - directory: directory, 32 - 33 - maxRevFuture: maxRevFuture, 34 - ErrRevTooFarFuture: ErrRevTooFarFuture, 35 - AllowSignatureNotFound: true, // TODO: configurable 36 - } 37 - } 38 - 39 - // Validator contains the context and code necessary to validate #commit and #sync messages 40 - type Validator struct { 41 - lklk sync.Mutex 42 - userLocks map[uint64]*userLock 43 - 44 - log *slog.Logger 45 - 46 - directory identity.Directory 47 - 48 - // maxRevFuture is added to time.Now() for a limit of clock skew we'll accept a `rev` in the future for 49 - maxRevFuture time.Duration 50 - 51 - // ErrRevTooFarFuture is the error we return 52 - // held here because we fmt.Errorf() once with our configured maxRevFuture into the message 53 - ErrRevTooFarFuture error 54 - 55 - // AllowSignatureNotFound enables counting messages without findable public key to pass through with a warning counter 56 - // TODO: refine this for what kind of 'not found' we accept. 57 - AllowSignatureNotFound bool 58 - } 59 - 60 - type NextCommitHandler interface { 61 - HandleCommit(ctx context.Context, hostname string, uid uint64, did string, commit *comatproto.SyncSubscribeRepos_Commit) error 62 - } 63 - 64 - type userLock struct { 65 - lk sync.Mutex 66 - waiters atomic.Int32 67 - } 68 - 69 - // lockUser re-serializes access per-user after events may have been fanned out to many worker threads by events/schedulers/parallel 70 - func (val *Validator) lockUser(ctx context.Context, uid uint64) func() { 71 - ctx, span := otel.Tracer("validator").Start(ctx, "userLock") 72 - defer span.End() 73 - 74 - val.lklk.Lock() 75 - 76 - ulk, ok := val.userLocks[uid] 77 - if !ok { 78 - ulk = &userLock{} 79 - val.userLocks[uid] = ulk 80 - } 81 - 82 - ulk.waiters.Add(1) 83 - 84 - val.lklk.Unlock() 85 - 86 - ulk.lk.Lock() 87 - 88 - return func() { 89 - val.lklk.Lock() 90 - defer val.lklk.Unlock() 91 - 92 - ulk.lk.Unlock() 93 - 94 - nv := ulk.waiters.Add(-1) 95 - 96 - if nv == 0 { 97 - delete(val.userLocks, uid) 98 - } 99 - } 100 - } 101 - 102 - func (val *Validator) HandleCommit(ctx context.Context, hostname string, account *models.Account, commit *comatproto.SyncSubscribeRepos_Commit, prevRev *syntax.TID, prevData *cid.Cid) (commitDataCID *cid.Cid, err error) { 103 - uid := account.UID 104 - unlock := val.lockUser(ctx, uid) 105 - defer unlock() 106 - repoFragment, err := val.VerifyCommitMessage(ctx, hostname, commit, prevRev, prevData) 107 - if err != nil { 108 - return nil, err 109 - } 110 - commitDataCID, err = repoFragment.MST.RootCID() 111 - if err != nil { 112 - return nil, err 113 - } 114 - return commitDataCID, nil 115 - } 116 - 117 - type revOutOfOrderError struct { 118 - dt time.Duration 119 - } 120 - 121 - func (roooe *revOutOfOrderError) Error() string { 122 - return fmt.Sprintf("new rev is before previous rev by %s", roooe.dt.String()) 123 - } 124 - 125 - var ErrNewRevBeforePrevRev = &revOutOfOrderError{} 126 - 127 - func (val *Validator) VerifyCommitMessage(ctx context.Context, hostname string, msg *comatproto.SyncSubscribeRepos_Commit, prevRev *syntax.TID, prevData *cid.Cid) (*repo.Repo, error) { 128 - hasWarning := false 129 - commitVerifyStarts.Inc() 130 - logger := slog.Default().With("did", msg.Repo, "rev", msg.Rev, "seq", msg.Seq, "time", msg.Time) 131 - 132 - did, err := syntax.ParseDID(msg.Repo) 133 - if err != nil { 134 - commitVerifyErrors.WithLabelValues(hostname, "did").Inc() 135 - return nil, err 136 - } 137 - rev, err := syntax.ParseTID(msg.Rev) 138 - if err != nil { 139 - commitVerifyErrors.WithLabelValues(hostname, "tid").Inc() 140 - return nil, err 141 - } 142 - if prevRev != nil { 143 - curTime := rev.Time() 144 - prevTime := prevRev.Time() 145 - if curTime.Before(prevTime) { 146 - commitVerifyErrors.WithLabelValues(hostname, "revb").Inc() 147 - dt := prevTime.Sub(curTime) 148 - return nil, &revOutOfOrderError{dt} 149 - } 150 - } 151 - if rev.Time().After(time.Now().Add(val.maxRevFuture)) { 152 - commitVerifyErrors.WithLabelValues(hostname, "revf").Inc() 153 - return nil, val.ErrRevTooFarFuture 154 - } 155 - _, err = syntax.ParseDatetime(msg.Time) 156 - if err != nil { 157 - commitVerifyErrors.WithLabelValues(hostname, "time").Inc() 158 - return nil, err 159 - } 160 - 161 - if msg.TooBig { 162 - //logger.Warn("event with tooBig flag set") 163 - commitVerifyWarnings.WithLabelValues(hostname, "big").Inc() 164 - // XXX: induction trace log 165 - val.log.Warn("commit tooBig", "seq", msg.Seq, "host", hostname, "repo", msg.Repo) 166 - hasWarning = true 167 - } 168 - if msg.Rebase { 169 - //logger.Warn("event with rebase flag set") 170 - commitVerifyWarnings.WithLabelValues(hostname, "reb").Inc() 171 - // XXX: induction trace log 172 - val.log.Warn("commit rebase", "seq", msg.Seq, "host", hostname, "repo", msg.Repo) 173 - hasWarning = true 174 - } 175 - 176 - commit, repoFragment, err := repo.LoadRepoFromCAR(ctx, bytes.NewReader([]byte(msg.Blocks))) 177 - if err != nil { 178 - commitVerifyErrors.WithLabelValues(hostname, "car").Inc() 179 - return nil, err 180 - } 181 - 182 - if commit.Rev != rev.String() { 183 - commitVerifyErrors.WithLabelValues(hostname, "rev").Inc() 184 - return nil, fmt.Errorf("rev did not match commit") 185 - } 186 - if commit.DID != did.String() { 187 - commitVerifyErrors.WithLabelValues(hostname, "did2").Inc() 188 - return nil, fmt.Errorf("rev did not match commit") 189 - } 190 - 191 - err = val.VerifyCommitSignature(ctx, commit, hostname, &hasWarning) 192 - if err != nil { 193 - // signature errors are metrics counted inside VerifyCommitSignature() 194 - return nil, err 195 - } 196 - 197 - // load out all the records 198 - for _, op := range msg.Ops { 199 - if (op.Action == "create" || op.Action == "update") && op.Cid != nil { 200 - c := (*cid.Cid)(op.Cid) 201 - nsid, rkey, err := syntax.ParseRepoPath(op.Path) 202 - if err != nil { 203 - commitVerifyErrors.WithLabelValues(hostname, "opp").Inc() 204 - return nil, fmt.Errorf("invalid repo path in ops list: %w", err) 205 - } 206 - val, err := repoFragment.GetRecordCID(ctx, nsid, rkey) 207 - if err != nil { 208 - commitVerifyErrors.WithLabelValues(hostname, "rcid").Inc() 209 - return nil, err 210 - } 211 - if *c != *val { 212 - commitVerifyErrors.WithLabelValues(hostname, "opc").Inc() 213 - return nil, fmt.Errorf("record op doesn't match MST tree value") 214 - } 215 - _, _, err = repoFragment.GetRecordBytes(ctx, nsid, rkey) 216 - if err != nil { 217 - commitVerifyErrors.WithLabelValues(hostname, "rec").Inc() 218 - return nil, err 219 - } 220 - } 221 - } 222 - 223 - // TODO: once firehose format is fully shipped, remove this 224 - for _, o := range msg.Ops { 225 - switch o.Action { 226 - case "delete": 227 - if o.Prev == nil { 228 - logger.Debug("can't invert legacy op", "action", o.Action) 229 - // XXX: induction trace log 230 - val.log.Warn("commit delete op", "seq", msg.Seq, "host", hostname, "repo", msg.Repo) 231 - commitVerifyOkish.WithLabelValues(hostname, "del").Inc() 232 - return repoFragment, nil 233 - } 234 - case "update": 235 - if o.Prev == nil { 236 - logger.Debug("can't invert legacy op", "action", o.Action) 237 - // XXX: induction trace log 238 - val.log.Warn("commit update op", "seq", msg.Seq, "host", hostname, "repo", msg.Repo) 239 - commitVerifyOkish.WithLabelValues(hostname, "up").Inc() 240 - return repoFragment, nil 241 - } 242 - } 243 - } 244 - 245 - if msg.PrevData != nil { 246 - c := (*cid.Cid)(msg.PrevData) 247 - if prevData != nil { 248 - if *c != *prevData { 249 - commitVerifyWarnings.WithLabelValues(hostname, "pr").Inc() 250 - // XXX: induction trace log 251 - val.log.Warn("commit prevData mismatch", "seq", msg.Seq, "host", hostname, "repo", msg.Repo) 252 - hasWarning = true 253 - } 254 - } else { 255 - // see counter below for okish "new" 256 - } 257 - 258 - // check internal consistency that claimed previous root matches the rest of this message 259 - ops, err := ParseCommitOps(msg.Ops) 260 - if err != nil { 261 - commitVerifyErrors.WithLabelValues(hostname, "pop").Inc() 262 - return nil, err 263 - } 264 - ops, err = repo.NormalizeOps(ops) 265 - if err != nil { 266 - commitVerifyErrors.WithLabelValues(hostname, "nop").Inc() 267 - return nil, err 268 - } 269 - 270 - invTree := repoFragment.MST.Copy() 271 - for _, op := range ops { 272 - if err := repo.InvertOp(&invTree, &op); err != nil { 273 - commitVerifyErrors.WithLabelValues(hostname, "inv").Inc() 274 - return nil, err 275 - } 276 - } 277 - computed, err := invTree.RootCID() 278 - if err != nil { 279 - commitVerifyErrors.WithLabelValues(hostname, "it").Inc() 280 - return nil, err 281 - } 282 - if *computed != *c { 283 - // this is self-inconsistent malformed data 284 - commitVerifyErrors.WithLabelValues(hostname, "pd").Inc() 285 - return nil, fmt.Errorf("inverted tree root didn't match prevData") 286 - } 287 - //logger.Debug("prevData matched", "prevData", c.String(), "computed", computed.String()) 288 - 289 - if prevData == nil { 290 - commitVerifyOkish.WithLabelValues(hostname, "new").Inc() 291 - } else if hasWarning { 292 - commitVerifyOkish.WithLabelValues(hostname, "warn").Inc() 293 - } else { 294 - // TODO: would it be better to make everything "okish"? 295 - // commitVerifyOkish.WithLabelValues(hostname, "ok").Inc() 296 - commitVerifyOk.WithLabelValues(hostname).Inc() 297 - } 298 - } else { 299 - // this source is still on old protocol without new prevData field 300 - commitVerifyOkish.WithLabelValues(hostname, "old").Inc() 301 - } 302 - 303 - return repoFragment, nil 304 - } 305 - 306 - // HandleSync checks signed commit from a #sync message 307 - func (val *Validator) HandleSync(ctx context.Context, hostname string, msg *comatproto.SyncSubscribeRepos_Sync) (commitCID, commitDataCID *cid.Cid, err error) { 308 - hasWarning := false 309 - 310 - did, err := syntax.ParseDID(msg.Did) 311 - if err != nil { 312 - syncVerifyErrors.WithLabelValues(hostname, "did").Inc() 313 - return nil, nil, err 314 - } 315 - rev, err := syntax.ParseTID(msg.Rev) 316 - if err != nil { 317 - syncVerifyErrors.WithLabelValues(hostname, "tid").Inc() 318 - return nil, nil, err 319 - } 320 - if rev.Time().After(time.Now().Add(val.maxRevFuture)) { 321 - syncVerifyErrors.WithLabelValues(hostname, "revf").Inc() 322 - return nil, nil, val.ErrRevTooFarFuture 323 - } 324 - _, err = syntax.ParseDatetime(msg.Time) 325 - if err != nil { 326 - syncVerifyErrors.WithLabelValues(hostname, "time").Inc() 327 - return nil, nil, err 328 - } 329 - 330 - commit, commitCID, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader([]byte(msg.Blocks))) 331 - if err != nil { 332 - commitVerifyErrors.WithLabelValues(hostname, "car").Inc() 333 - return nil, nil, err 334 - } 335 - 336 - if commit.Rev != rev.String() { 337 - commitVerifyErrors.WithLabelValues(hostname, "rev").Inc() 338 - return nil, nil, fmt.Errorf("rev did not match commit") 339 - } 340 - if commit.DID != did.String() { 341 - commitVerifyErrors.WithLabelValues(hostname, "did2").Inc() 342 - return nil, nil, fmt.Errorf("rev did not match commit") 343 - } 344 - 345 - err = val.VerifyCommitSignature(ctx, commit, hostname, &hasWarning) 346 - if err != nil { 347 - // signature errors are metrics counted inside VerifyCommitSignature() 348 - return nil, nil, err 349 - } 350 - 351 - return commitCID, &commit.Data, nil 352 - } 353 - 354 - // TODO: lift back to indigo/atproto/repo util code? 355 - func ParseCommitOps(ops []*comatproto.SyncSubscribeRepos_RepoOp) ([]repo.Operation, error) { 356 - out := []repo.Operation{} 357 - for _, rop := range ops { 358 - switch rop.Action { 359 - case "create": 360 - if rop.Cid == nil || rop.Prev != nil { 361 - return nil, fmt.Errorf("invalid repoOp: create") 362 - } 363 - op := repo.Operation{ 364 - Path: rop.Path, 365 - Prev: nil, 366 - Value: (*cid.Cid)(rop.Cid), 367 - } 368 - out = append(out, op) 369 - case "delete": 370 - if rop.Cid != nil || rop.Prev == nil { 371 - return nil, fmt.Errorf("invalid repoOp: delete") 372 - } 373 - op := repo.Operation{ 374 - Path: rop.Path, 375 - Prev: (*cid.Cid)(rop.Prev), 376 - Value: nil, 377 - } 378 - out = append(out, op) 379 - case "update": 380 - if rop.Cid == nil || rop.Prev == nil { 381 - return nil, fmt.Errorf("invalid repoOp: update") 382 - } 383 - op := repo.Operation{ 384 - Path: rop.Path, 385 - Prev: (*cid.Cid)(rop.Prev), 386 - Value: (*cid.Cid)(rop.Cid), 387 - } 388 - out = append(out, op) 389 - default: 390 - return nil, fmt.Errorf("invalid repoOp action: %s", rop.Action) 391 - } 392 - } 393 - return out, nil 394 - } 395 - 396 - // VerifyCommitSignature get's repo's registered public key from Identity Directory, verifies Commit 397 - // hostname is just for metrics in case of error 398 - func (val *Validator) VerifyCommitSignature(ctx context.Context, commit *repo.Commit, hostname string, hasWarning *bool) error { 399 - if val.directory == nil { 400 - return nil 401 - } 402 - xdid, err := syntax.ParseDID(commit.DID) 403 - if err != nil { 404 - commitVerifyErrors.WithLabelValues(hostname, "sig1").Inc() 405 - return fmt.Errorf("bad car DID: %w", err) 406 - } 407 - ident, err := val.directory.LookupDID(ctx, xdid) 408 - if err != nil { 409 - if val.AllowSignatureNotFound { 410 - // allow not-found conditions to pass without signature check 411 - commitVerifyWarnings.WithLabelValues(hostname, "nok").Inc() 412 - if hasWarning != nil { 413 - *hasWarning = true 414 - } 415 - return nil 416 - } 417 - commitVerifyErrors.WithLabelValues(hostname, "sig2").Inc() 418 - return fmt.Errorf("DID lookup failed: %w", err) 419 - } 420 - pk, err := ident.PublicKey() 421 - if err != nil { 422 - commitVerifyErrors.WithLabelValues(hostname, "sig3").Inc() 423 - return fmt.Errorf("no atproto pubkey: %w", err) 424 - } 425 - err = commit.VerifySignature(pk) 426 - if err != nil { 427 - // TODO: if the DID document was stale, force re-fetch from source and re-try if pubkey has changed 428 - commitVerifyErrors.WithLabelValues(hostname, "sig4").Inc() 429 - return fmt.Errorf("invalid signature: %w", err) 430 - } 431 - return nil 432 - }
+182
cmd/rerelay/relay/verify.go
··· 1 + package relay 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "errors" 7 + "fmt" 8 + "time" 9 + 10 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 + "github.com/bluesky-social/indigo/atproto/identity" 12 + "github.com/bluesky-social/indigo/atproto/repo" 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + "github.com/bluesky-social/indigo/cmd/rerelay/relay/models" 15 + ) 16 + 17 + var ( 18 + ErrFutureRev = errors.New("commit revision in the future") 19 + ErrRevSequence = errors.New("commit revision out of order") 20 + ) 21 + 22 + const futureRevTolerance = time.Minute * 5 23 + 24 + // High-level entrypoint for verifying #commit messages. 25 + // 26 + // Always verifies: loading commit and repo; field syntax; commit signature; future rev 27 + // 28 + // Strict verification: use of deprecated fields; MST inversion; all ops present in blocks 29 + // 30 + // Does not check: account/host matching; host-level sequence; account-level rev ordering; DID syntax 31 + // 32 + // `ident` arg may be nil (if resolution failed) 33 + // `prevRepo` arg represents previous state, and is optional/nullable. 34 + // `hostname` arg is piped through just for logging, not for validating account/host match 35 + // returns an AccountRepo with empty UID, containing metadata about *this* commit 36 + func (r *Relay) VerifyRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit, ident *identity.Identity, prevRepo *models.AccountRepo, hostname string) (*models.AccountRepo, error) { 37 + logger := r.Logger.With("host", hostname, "did", evt.Repo, "rev", evt.Rev) 38 + 39 + // even in lenient/legacy mode (eg, tooBig), we need to verify commit 40 + commit, commitCID, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks)) 41 + if err != nil { 42 + return nil, err 43 + } 44 + 45 + if err := r.VerifyCommitObject(ctx, commit, ident, hostname); err != nil { 46 + return nil, err 47 + } 48 + 49 + // consistency between event fields and commit fields 50 + if evt.Repo != commit.DID { 51 + return nil, fmt.Errorf("mismatched inner commit DID field: %s", commit.DID) 52 + } 53 + if evt.Rev != commit.Rev { 54 + return nil, fmt.Errorf("mismatched inner commit rev field: %s", commit.Rev) 55 + } 56 + 57 + err = r.VerifyCommitMessageStrict(ctx, evt, commit, prevRepo, hostname) 58 + if err != nil { 59 + if r.Config.LenientSyncValidation { 60 + logger.Warn("allowing commit message which failed strict validation", "problem", err) 61 + } else { 62 + return nil, err 63 + } 64 + } 65 + 66 + resp := models.AccountRepo{ 67 + Rev: commit.Rev, 68 + CommitCID: commitCID.String(), 69 + CommitData: commit.Data.String(), 70 + } 71 + return &resp, nil 72 + } 73 + 74 + // the parts of basic verification which are common between #commit and #sync messages 75 + func (r *Relay) VerifyCommitObject(ctx context.Context, commit *repo.Commit, ident *identity.Identity, hostname string) error { 76 + logger := r.Logger.With("host", hostname, "did", commit.DID, "rev", commit.Rev) 77 + 78 + // TODO: what parts does this actually do? 79 + if err := commit.VerifyStructure(); err != nil { 80 + return err 81 + } 82 + 83 + // if identity is available, verify the signature 84 + if ident != nil { 85 + // NOTE: may eventually want to cache cryptographic key parsing 86 + pubkey, err := ident.PublicKey() 87 + if err != nil { 88 + return fmt.Errorf("commit verification: %w", err) 89 + } 90 + 91 + if err := commit.VerifySignature(pubkey); err != nil { 92 + return fmt.Errorf("commit verification: %w", err) 93 + } 94 + } else { 95 + logger.Warn("skipping commit signature validation", "reason", "ident unavailable") 96 + } 97 + return nil 98 + } 99 + 100 + func (r *Relay) VerifyCommitMessageStrict(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit, commit *repo.Commit, prevRepo *models.AccountRepo, hostname string) error { 101 + 102 + logger := r.Logger.With("host", hostname, "did", commit.DID, "rev", commit.Rev) 103 + 104 + // first check things which would skip MST inversion entirely 105 + if len(evt.Blocks) == 0 { 106 + return fmt.Errorf("commit messaging missing blocks") 107 + } 108 + if evt.TooBig { 109 + return fmt.Errorf("deprecated tooBig commit flag set") 110 + } 111 + if evt.PrevData == nil { 112 + return fmt.Errorf("missing prevData field") 113 + } 114 + if prevRepo != nil { 115 + if evt.PrevData.String() != prevRepo.CommitData { 116 + logger.Warn("commit with miss-matching prevData", "prevData", evt.PrevData, "prevRepo.CommitData", prevRepo.CommitData) 117 + } 118 + if evt.Since != nil && *evt.Since != prevRepo.Rev { 119 + logger.Warn("commit with miss-matching since", "since", evt.Since, "prevRepo.Rev", prevRepo.Rev) 120 + } 121 + } 122 + 123 + // this parse is redundant with earlier parse; once lenient mode is removed we should do only a single pass 124 + origRepo, _, err := repo.LoadRepoFromCAR(ctx, bytes.NewReader(evt.Blocks)) 125 + if err != nil { 126 + return err 127 + } 128 + 129 + // TODO: break out this function in to smaller chunks 130 + _ = origRepo 131 + if _, err := repo.VerifyCommitMessage(ctx, evt); err != nil { 132 + logger.Warn("failed to invert commit MST", "err", err) 133 + } 134 + 135 + // finally less-important checks 136 + if evt.Rebase { 137 + return fmt.Errorf("deprecated rebase commit flag set") 138 + } 139 + _, err = syntax.ParseDatetime(evt.Time) 140 + if err != nil { 141 + return fmt.Errorf("commit timestamp syntax: %w", err) 142 + } 143 + return nil 144 + } 145 + 146 + // High-level entrypoint for verifying #sync messages. 147 + // 148 + // Always verifies: loading commit and repo; field syntax; commit signature; future rev 149 + // 150 + // Does not check: account/host matching; host-level sequence; account-level rev ordering; DID syntax 151 + // 152 + // `ident` arg may be nil (if resolution failed) 153 + // `hostname` arg is piped through just for logging, not for validating account/host match 154 + // returns an AccountRepo with empty UID, containing metadata about *this* commit 155 + func (r *Relay) VerifyRepoSync(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync, ident *identity.Identity, hostname string) (*models.AccountRepo, error) { 156 + //logger := r.Logger.With("host", hostname, "did", evt.Did, "rev", evt.Rev) 157 + 158 + // even in lenient/legacy mode (eg, tooBig), we need to verify commit 159 + commit, commitCID, err := repo.LoadCommitFromCAR(ctx, bytes.NewReader(evt.Blocks)) 160 + if err != nil { 161 + return nil, err 162 + } 163 + 164 + if err := r.VerifyCommitObject(ctx, commit, ident, hostname); err != nil { 165 + return nil, err 166 + } 167 + 168 + // consistency between event fields and commit fields 169 + if evt.Did != commit.DID { 170 + return nil, fmt.Errorf("mismatched inner commit DID field: %s", commit.DID) 171 + } 172 + if evt.Rev != commit.Rev { 173 + return nil, fmt.Errorf("mismatched inner commit rev field: %s", commit.Rev) 174 + } 175 + 176 + resp := models.AccountRepo{ 177 + Rev: commit.Rev, 178 + CommitCID: commitCID.String(), 179 + CommitData: commit.Data.String(), 180 + } 181 + return &resp, nil 182 + }
+2 -2
cmd/rerelay/testing/runner.go
··· 35 35 relayConfig := relay.DefaultRelayConfig() 36 36 relayConfig.SSL = false 37 37 relayConfig.SkipAccountHostCheck = true 38 + relayConfig.LenientSyncValidation = true 38 39 39 40 db, err := cliutil.SetupDatabase("sqlite://:memory:", 40) 40 41 if err != nil { ··· 46 47 if err != nil { 47 48 panic(err) 48 49 } 49 - vldtr := relay.NewValidator(dir) 50 50 evtman := eventmgr.NewEventManager(persister) 51 51 52 - r, err := relay.NewRelay(db, vldtr, evtman, dir, relayConfig) 52 + r, err := relay.NewRelay(db, evtman, dir, relayConfig) 53 53 if err != nil { 54 54 panic(err) 55 55 }