Stitch any CI into Tangled
82
fork

Configure Feed

Select the types of activity you want to include in your feed.

jetstream: hold cursor on transient apply failures #8

open opened by mitchellh.com targeting main from push-lpurukqqttnr

Previously, handleJetstreamEvent saved the time-based cursor after every event regardless of whether applyCommit succeeded. That is fine for permanently bad records (malformed JSON, schema violations) where replaying achieves nothing, but wrong for transient infra failures (SQLite busy, store closed during shutdown, disk full): the cursor would advance past a perfectly good event and silently drop the membership or repo row that backs it, with no way to recover short of a manual replay.

applyCommit now distinguishes the two classes via a new badRecordError wrapper. JSON decode failures in applySpindleMember, applyRepo, and applyRepoCollaborator are wrapped with badRecord(...) so they remain cursor-advancing. Everything else returned from applyCommit is treated as transient: handleJetstreamEvent logs it, returns the error to the scheduler, and skips SaveCursor so the next reconnect (which already rewinds by jetstreamRewind) will redeliver and retry.

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:onu3oqfahfubgbetlr4giknc/sh.tangled.repo.pull/3mktv7cgxwh22
+133 -11
Diff #0
+75 -11
jetstream.go
··· 20 20 import ( 21 21 "context" 22 22 "encoding/json" 23 + "errors" 23 24 "fmt" 24 25 "time" 25 26 ··· 159 160 } 160 161 161 162 // handleJetstreamEvent is the per-event callback for the JetStream. It 162 - // applies the event to the store and advances the persisted cursor. Any 163 - // returned error is logged by the scheduler but does not tear down the 164 - // connection — the next event will retry the cursor write implicitly. 163 + // applies the event to the store and, when appropriate, advances the 164 + // persisted cursor. Any returned error is logged by the scheduler but 165 + // does not tear down the connection. 166 + // 167 + // Cursor-advancement policy: 168 + // 169 + // - Apply succeeded: advance the cursor. 170 + // - Apply failed with a badRecordError (malformed record / unusable 171 + // input): advance anyway, since replaying a permanently-broken 172 + // event on every reconnect would stall the firehose forever. 173 + // - Apply failed with anything else (treated as transient: store 174 + // hiccup, SQLite busy, disk full, shutdown race, ...): do NOT 175 + // advance. Saving the cursor here would permanently skip the 176 + // record, which for membership/repo state means we'd silently 177 + // lose a row that the rest of tack relies on. Leaving the cursor 178 + // in place gives the next reconnect (which rewinds by 179 + // jetstreamRewind) a chance to re-deliver and re-apply it. 165 180 func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 166 181 // We only care about commits, which are the actual record CRUD 167 182 // operations on a user's PDS. Account/identity events are ignored ··· 174 189 // Dispatch on collection. Unknown collections shouldn't happen given 175 190 // our wantedCollections filter, but be defensive — jetstream may 176 191 // send schema changes ahead of us updating the filter. 177 - if err := applyCommit(ctx, st, knots, hostname, evt); err != nil { 192 + applyErr := applyCommit(ctx, st, knots, hostname, evt) 193 + if applyErr != nil { 178 194 logger.Error("apply commit", 179 - "err", err, 195 + "err", applyErr, 180 196 "did", evt.Did, 181 197 "collection", evt.Commit.Collection, 182 198 "op", evt.Commit.Operation, 183 199 "rkey", evt.Commit.RKey, 200 + "transient", !isBadRecord(applyErr), 184 201 ) 185 202 186 - // Fall through to cursor save: a single bad record shouldn't 187 - // stall the cursor forever and force us to re-process every 188 - // subsequent event after a restart. 203 + // Transient failure: bail without advancing the cursor so the 204 + // next delivery can retry this event. Returning the error 205 + // surfaces it in the scheduler's logs too. 206 + if !isBadRecord(applyErr) { 207 + return applyErr 208 + } 209 + // Otherwise (badRecordError) fall through to cursor save: a 210 + // single bad record shouldn't stall the cursor forever and 211 + // force us to re-process every subsequent event after a 212 + // restart. 189 213 } 190 214 191 215 // Advance the cursor. TimeUS is the jetstream-assigned microsecond ··· 199 223 return nil 200 224 } 201 225 226 + // badRecordError marks an applyCommit failure as caused by the *record 227 + // itself* being permanently unusable, e.g. a malformed JSON body, or a 228 + // field that violates an invariant we can't recover from on retry. 229 + // 230 + // We need this distinction because the jetstream cursor is time-based 231 + // and once advanced past an event we will never see it again. So: 232 + // 233 + // - Permanent bad-input failures should still advance the cursor: 234 + // replaying the same broken record on every restart accomplishes 235 + // nothing and would stall progress on every later event behind it. 236 + // - Transient infrastructure failures (SQLite busy, disk full, store 237 + // closed mid-shutdown, etc.) must NOT advance the cursor: the 238 + // record is fine and the next attempt, on reconnect or after the 239 + // transient condition clears, should reapply it. Skipping past 240 + // such a failure can permanently lose membership/repo state. 241 + // 242 + // Anything returned from applyCommit that isn't a badRecordError is 243 + // treated as transient by handleJetstreamEvent. 244 + type badRecordError struct{ err error } 245 + 246 + func (e *badRecordError) Error() string { return e.err.Error() } 247 + func (e *badRecordError) Unwrap() error { return e.err } 248 + 249 + // badRecord wraps err so handleJetstreamEvent recognizes it as a 250 + // permanent, cursor-advancing failure. Use this for any error caused by 251 + // the contents of the record (decode errors, schema violations); never 252 + // for store/IO errors. 253 + func badRecord(err error) error { return &badRecordError{err: err} } 254 + 255 + // isBadRecord reports whether err (or anything it wraps) is a 256 + // badRecordError. 257 + func isBadRecord(err error) bool { 258 + var b *badRecordError 259 + return errors.As(err, &b) 260 + } 261 + 202 262 // applyCommit routes a commit to the right store mutation based on its 203 263 // collection NSID and operation. 204 264 func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { ··· 224 284 case jsOpCreate, jsOpUpdate: 225 285 var rec tangled.SpindleMember 226 286 if err := json.Unmarshal(c.Record, &rec); err != nil { 227 - return fmt.Errorf("decode spindle.member: %w", err) 287 + // Decode failures are a permanent property of the record's 288 + // bytes; mark as bad so the cursor can advance past it. 289 + return badRecord(fmt.Errorf("decode spindle.member: %w", err)) 228 290 } 229 291 return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt) 230 292 case jsOpDelete: ··· 238 300 case jsOpCreate, jsOpUpdate: 239 301 var rec tangled.Repo 240 302 if err := json.Unmarshal(c.Record, &rec); err != nil { 241 - return fmt.Errorf("decode repo: %w", err) 303 + // See applySpindleMember: decode errors are permanent. 304 + return badRecord(fmt.Errorf("decode repo: %w", err)) 242 305 } 243 306 244 307 // Capture the prior (knot, spindle) before the upsert so the ··· 340 403 case jsOpCreate, jsOpUpdate: 341 404 var rec tangled.RepoCollaborator 342 405 if err := json.Unmarshal(c.Record, &rec); err != nil { 343 - return fmt.Errorf("decode repo.collaborator: %w", err) 406 + // See applySpindleMember: decode errors are permanent. 407 + return badRecord(fmt.Errorf("decode repo.collaborator: %w", err)) 344 408 } 345 409 return st.UpsertRepoCollaborator(ctx, did, c.RKey, 346 410 deref(rec.Repo), deref(rec.RepoDid),
+58
jetstream_test.go
··· 10 10 import ( 11 11 "context" 12 12 "encoding/json" 13 + "path/filepath" 13 14 "testing" 14 15 15 16 jsmodels "github.com/bluesky-social/jetstream/pkg/models" ··· 257 258 requireCursor(t, s, 1000) 258 259 } 259 260 261 + // TestHandleTransientStoreErrorDoesNotAdvanceCursor is the partner to 262 + // TestHandleBadRecordAdvancesCursor: the cursor must hold its previous 263 + // position when applyCommit fails for an *infrastructure* reason 264 + // (here: store closed mid-flight, simulating SQLite busy / shutdown 265 + // races). Saving the cursor through such a failure would permanently 266 + // skip a perfectly good record and silently lose membership state. 267 + func TestHandleTransientStoreErrorDoesNotAdvanceCursor(t *testing.T) { 268 + // Build the store inline (not via newTestStore) so the cleanup 269 + // tolerates the explicit Close we do below to provoke errors. 270 + path := filepath.Join(t.TempDir(), "tack.db") 271 + s, err := openStore(path) 272 + if err != nil { 273 + t.Fatalf("openStore: %v", err) 274 + } 275 + ctx := context.Background() 276 + 277 + // Seed a baseline cursor; after the failed apply it must still be 278 + // 500, not 1000. 279 + if err := s.SaveCursor(ctx, 500); err != nil { 280 + t.Fatalf("seed cursor: %v", err) 281 + } 282 + 283 + // Close the underlying DB. Subsequent store calls will fail with 284 + // "sql: database is closed", which is a transient-style error from 285 + // applyCommit's perspective: the record itself is well-formed. 286 + if err := s.db.Close(); err != nil { 287 + t.Fatalf("close db: %v", err) 288 + } 289 + 290 + rec := tangled.SpindleMember{ 291 + Instance: "https://spindle.example", 292 + Subject: "did:plc:alice", 293 + CreatedAt: "2026-01-01T00:00:00Z", 294 + } 295 + evt := commitEvent(1000, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk", rec) 296 + 297 + // Expect a non-nil error: handler propagates transient failures. 298 + if err := handleJetstreamEvent(ctx, s, nil, "", evt); err == nil { 299 + t.Fatalf("handle should return transient error, got nil") 300 + } 301 + 302 + // Re-open the same DB file to inspect the persisted cursor; the 303 + // closed handle obviously can't answer queries. 304 + s2, err := openStore(path) 305 + if err != nil { 306 + t.Fatalf("re-open: %v", err) 307 + } 308 + defer s2.Close() 309 + got, err := s2.LoadCursor(ctx) 310 + if err != nil { 311 + t.Fatalf("load cursor: %v", err) 312 + } 313 + if got == nil || *got != 500 { 314 + t.Fatalf("cursor = %v, want 500 (must NOT advance on transient error)", got) 315 + } 316 + } 317 + 260 318 // TestRepoEventSubscribesKnotForOurSpindle confirms that observing a 261 319 // sh.tangled.repo whose .spindle field equals our hostname results in a 262 320 // dynamic AddKnot call. This is the hot path for picking up new repos

History

1 round 0 comments
sign up or login to add to the discussion
mitchellh.com submitted #0
1 commit
expand
jetstream: hold cursor on transient apply failures
merge conflicts detected
expand
  • jetstream.go:20
  • jetstream_test.go:10
expand 0 comments