Stitch any CI into Tangled
151
fork

Configure Feed

Select the types of activity you want to include in your feed.

jetstream: hold cursor on transient apply failures

Previously, `handleJetstreamEvent` saved the time-based cursor after
every event regardless of whether `applyCommit` succeeded. That is fine
for permanently bad records (malformed JSON, schema violations) where
replaying achieves nothing, but wrong for transient infra failures
(SQLite busy, store closed during shutdown, disk full): the cursor
would advance past a perfectly good event and silently drop the
membership or repo row that backs it, with no way to recover short of
a manual replay.

`applyCommit` now distinguishes the two classes via a new
`badRecordError` wrapper. JSON decode failures in `applySpindleMember`,
`applyRepo`, and `applyRepoCollaborator` are wrapped with
`badRecord(...)` so they remain cursor-advancing. Everything else
returned from `applyCommit` is treated as transient:
`handleJetstreamEvent` logs it, returns the error to the scheduler, and
skips `SaveCursor` so the next reconnect (which already rewinds by
`jetstreamRewind`) will redeliver and retry.

+133 -11
+75 -11
jetstream.go
··· 20 20 import ( 21 21 "context" 22 22 "encoding/json" 23 + "errors" 23 24 "fmt" 24 25 "time" 25 26 ··· 159 160 } 160 161 161 162 // handleJetstreamEvent is the per-event callback for the JetStream. It 162 - // applies the event to the store and advances the persisted cursor. Any 163 - // returned error is logged by the scheduler but does not tear down the 164 - // connection — the next event will retry the cursor write implicitly. 163 + // applies the event to the store and, when appropriate, advances the 164 + // persisted cursor. Any returned error is logged by the scheduler but 165 + // does not tear down the connection. 166 + // 167 + // Cursor-advancement policy: 168 + // 169 + // - Apply succeeded: advance the cursor. 170 + // - Apply failed with a badRecordError (malformed record / unusable 171 + // input): advance anyway, since replaying a permanently-broken 172 + // event on every reconnect would stall the firehose forever. 173 + // - Apply failed with anything else (treated as transient: store 174 + // hiccup, SQLite busy, disk full, shutdown race, ...): do NOT 175 + // advance. Saving the cursor here would permanently skip the 176 + // record, which for membership/repo state means we'd silently 177 + // lose a row that the rest of tack relies on. Leaving the cursor 178 + // in place gives the next reconnect (which rewinds by 179 + // jetstreamRewind) a chance to re-deliver and re-apply it. 165 180 func handleJetstreamEvent(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { 166 181 // We only care about commits, which are the actual record CRUD 167 182 // operations on a user's PDS. Account/identity events are ignored ··· 174 189 // Dispatch on collection. Unknown collections shouldn't happen given 175 190 // our wantedCollections filter, but be defensive — jetstream may 176 191 // send schema changes ahead of us updating the filter. 177 - if err := applyCommit(ctx, st, knots, hostname, evt); err != nil { 192 + applyErr := applyCommit(ctx, st, knots, hostname, evt) 193 + if applyErr != nil { 178 194 logger.Error("apply commit", 179 - "err", err, 195 + "err", applyErr, 180 196 "did", evt.Did, 181 197 "collection", evt.Commit.Collection, 182 198 "op", evt.Commit.Operation, 183 199 "rkey", evt.Commit.RKey, 200 + "transient", !isBadRecord(applyErr), 184 201 ) 185 202 186 - // Fall through to cursor save: a single bad record shouldn't 187 - // stall the cursor forever and force us to re-process every 188 - // subsequent event after a restart. 203 + // Transient failure: bail without advancing the cursor so the 204 + // next delivery can retry this event. Returning the error 205 + // surfaces it in the scheduler's logs too. 206 + if !isBadRecord(applyErr) { 207 + return applyErr 208 + } 209 + // Otherwise (badRecordError) fall through to cursor save: a 210 + // single bad record shouldn't stall the cursor forever and 211 + // force us to re-process every subsequent event after a 212 + // restart. 189 213 } 190 214 191 215 // Advance the cursor. TimeUS is the jetstream-assigned microsecond ··· 199 223 return nil 200 224 } 201 225 226 + // badRecordError marks an applyCommit failure as caused by the *record 227 + // itself* being permanently unusable, e.g. a malformed JSON body, or a 228 + // field that violates an invariant we can't recover from on retry. 229 + // 230 + // We need this distinction because the jetstream cursor is time-based 231 + // and once advanced past an event we will never see it again. So: 232 + // 233 + // - Permanent bad-input failures should still advance the cursor: 234 + // replaying the same broken record on every restart accomplishes 235 + // nothing and would stall progress on every later event behind it. 236 + // - Transient infrastructure failures (SQLite busy, disk full, store 237 + // closed mid-shutdown, etc.) must NOT advance the cursor: the 238 + // record is fine and the next attempt, on reconnect or after the 239 + // transient condition clears, should reapply it. Skipping past 240 + // such a failure can permanently lose membership/repo state. 241 + // 242 + // Anything returned from applyCommit that isn't a badRecordError is 243 + // treated as transient by handleJetstreamEvent. 244 + type badRecordError struct{ err error } 245 + 246 + func (e *badRecordError) Error() string { return e.err.Error() } 247 + func (e *badRecordError) Unwrap() error { return e.err } 248 + 249 + // badRecord wraps err so handleJetstreamEvent recognizes it as a 250 + // permanent, cursor-advancing failure. Use this for any error caused by 251 + // the contents of the record (decode errors, schema violations); never 252 + // for store/IO errors. 253 + func badRecord(err error) error { return &badRecordError{err: err} } 254 + 255 + // isBadRecord reports whether err (or anything it wraps) is a 256 + // badRecordError. 257 + func isBadRecord(err error) bool { 258 + var b *badRecordError 259 + return errors.As(err, &b) 260 + } 261 + 202 262 // applyCommit routes a commit to the right store mutation based on its 203 263 // collection NSID and operation. 204 264 func applyCommit(ctx context.Context, st *store, knots KnotConsumer, hostname string, evt *jsmodels.Event) error { ··· 224 284 case jsOpCreate, jsOpUpdate: 225 285 var rec tangled.SpindleMember 226 286 if err := json.Unmarshal(c.Record, &rec); err != nil { 227 - return fmt.Errorf("decode spindle.member: %w", err) 287 + // Decode failures are a permanent property of the record's 288 + // bytes; mark as bad so the cursor can advance past it. 289 + return badRecord(fmt.Errorf("decode spindle.member: %w", err)) 228 290 } 229 291 return st.UpsertSpindleMember(ctx, did, c.RKey, rec.Instance, rec.Subject, rec.CreatedAt) 230 292 case jsOpDelete: ··· 238 300 case jsOpCreate, jsOpUpdate: 239 301 var rec tangled.Repo 240 302 if err := json.Unmarshal(c.Record, &rec); err != nil { 241 - return fmt.Errorf("decode repo: %w", err) 303 + // See applySpindleMember: decode errors are permanent. 304 + return badRecord(fmt.Errorf("decode repo: %w", err)) 242 305 } 243 306 244 307 // Capture the prior (knot, spindle) before the upsert so the ··· 340 403 case jsOpCreate, jsOpUpdate: 341 404 var rec tangled.RepoCollaborator 342 405 if err := json.Unmarshal(c.Record, &rec); err != nil { 343 - return fmt.Errorf("decode repo.collaborator: %w", err) 406 + // See applySpindleMember: decode errors are permanent. 407 + return badRecord(fmt.Errorf("decode repo.collaborator: %w", err)) 344 408 } 345 409 return st.UpsertRepoCollaborator(ctx, did, c.RKey, 346 410 deref(rec.Repo), deref(rec.RepoDid),
+58
jetstream_test.go
··· 10 10 import ( 11 11 "context" 12 12 "encoding/json" 13 + "path/filepath" 13 14 "testing" 14 15 15 16 jsmodels "github.com/bluesky-social/jetstream/pkg/models" ··· 255 256 t.Fatalf("bad record should not have inserted; got %d rows", n) 256 257 } 257 258 requireCursor(t, s, 1000) 259 + } 260 + 261 + // TestHandleTransientStoreErrorDoesNotAdvanceCursor is the partner to 262 + // TestHandleBadRecordAdvancesCursor: the cursor must hold its previous 263 + // position when applyCommit fails for an *infrastructure* reason 264 + // (here: store closed mid-flight, simulating SQLite busy / shutdown 265 + // races). Saving the cursor through such a failure would permanently 266 + // skip a perfectly good record and silently lose membership state. 267 + func TestHandleTransientStoreErrorDoesNotAdvanceCursor(t *testing.T) { 268 + // Build the store inline (not via newTestStore) so the cleanup 269 + // tolerates the explicit Close we do below to provoke errors. 270 + path := filepath.Join(t.TempDir(), "tack.db") 271 + s, err := openStore(path) 272 + if err != nil { 273 + t.Fatalf("openStore: %v", err) 274 + } 275 + ctx := context.Background() 276 + 277 + // Seed a baseline cursor; after the failed apply it must still be 278 + // 500, not 1000. 279 + if err := s.SaveCursor(ctx, 500); err != nil { 280 + t.Fatalf("seed cursor: %v", err) 281 + } 282 + 283 + // Close the underlying DB. Subsequent store calls will fail with 284 + // "sql: database is closed", which is a transient-style error from 285 + // applyCommit's perspective: the record itself is well-formed. 286 + if err := s.db.Close(); err != nil { 287 + t.Fatalf("close db: %v", err) 288 + } 289 + 290 + rec := tangled.SpindleMember{ 291 + Instance: "https://spindle.example", 292 + Subject: "did:plc:alice", 293 + CreatedAt: "2026-01-01T00:00:00Z", 294 + } 295 + evt := commitEvent(1000, "did:plc:owner", tangled.SpindleMemberNSID, jsOpCreate, "rk", rec) 296 + 297 + // Expect a non-nil error: handler propagates transient failures. 298 + if err := handleJetstreamEvent(ctx, s, nil, "", evt); err == nil { 299 + t.Fatalf("handle should return transient error, got nil") 300 + } 301 + 302 + // Re-open the same DB file to inspect the persisted cursor; the 303 + // closed handle obviously can't answer queries. 304 + s2, err := openStore(path) 305 + if err != nil { 306 + t.Fatalf("re-open: %v", err) 307 + } 308 + defer s2.Close() 309 + got, err := s2.LoadCursor(ctx) 310 + if err != nil { 311 + t.Fatalf("load cursor: %v", err) 312 + } 313 + if got == nil || *got != 500 { 314 + t.Fatalf("cursor = %v, want 500 (must NOT advance on transient error)", got) 315 + } 258 316 } 259 317 260 318 // TestRepoEventSubscribesKnotForOurSpindle confirms that observing a