Stitch any CI into Tangled
77
fork

Configure Feed

Select the types of activity you want to include in your feed.

jetstream: rewind cursor a few seconds on reconnect #5

open opened by mitchellh.com targeting main from push-wmonqmkmtqwq

Jetstream cursors are time-based and the upstream docs explicitly note that exact-boundary replay across a disconnect is not guaranteed gapless. Resuming from the precise saved TimeUS could therefore drop events that straddle the reconnect window.

On every (re)connect, subtract a fixed jetstreamRewind (5s) from the loaded cursor before handing it to ConnectAndRead, clamping at zero so a tiny saved cursor can't go negative. The replayed events are safe to re-apply: applyCommit dispatches only to UPSERTs and DELETEs keyed on (did, rkey), so duplicates collapse into the same row state.

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:onu3oqfahfubgbetlr4giknc/sh.tangled.repo.pull/3mktuh5eozt22
+26 -1
Diff #0
+26 -1
jetstream.go
··· 38 38 jsOpDelete = "delete" 39 39 ) 40 40 41 + // jetstreamRewind is how far before the persisted cursor we resume on 42 + // reconnect. Jetstream's docs recommend rewinding a few seconds because 43 + // the cursor is a time-based filter and exact-boundary replay is not 44 + // guaranteed gapless across disconnects. The handler's mutations are 45 + // idempotent (UPSERTs and DELETEs keyed on (did, rkey)) so the small 46 + // amount of duplicate replay this introduces is harmless. 47 + const jetstreamRewind = 5 * time.Second 48 + 41 49 // startJetstream dials the configured jetstream endpoint and spawns a 42 50 // background goroutine that consumes events for the lifetime of ctx. It 43 51 // returns once the client is constructed; connection errors surface in ··· 106 114 logger.Warn("ignoring unreadable cursor; resuming from now", "err", err) 107 115 cur = nil 108 116 } 117 + // Rewind a few seconds before the saved cursor on reconnect. 118 + // Jetstream cursors are time-based and the docs explicitly note 119 + // that exact-boundary replay is not guaranteed gapless, so a 120 + // small negative buffer protects against missing events that 121 + // straddle the disconnect. Duplicates the rewind produces are 122 + // safe: every applyCommit path is an idempotent upsert/delete 123 + // keyed on (did, rkey), and SaveCursor only moves forward in 124 + // practice because TimeUS is monotonic. 109 125 if cur != nil { 110 - logger.Info("connecting to jetstream", "cursor_us", *cur) 126 + rewound := *cur - int64(jetstreamRewind/time.Microsecond) 127 + if rewound < 0 { 128 + rewound = 0 129 + } 130 + logger.Info("connecting to jetstream", 131 + "cursor_us", *cur, 132 + "rewound_us", rewound, 133 + "rewind", jetstreamRewind, 134 + ) 135 + cur = &rewound 111 136 } else { 112 137 logger.Info("connecting to jetstream from now (no cursor)") 113 138 }

History

1 round 0 comments
sign up or login to add to the discussion
mitchellh.com submitted #0
1 commit
expand
jetstream: rewind cursor a few seconds on reconnect
merge conflicts detected
expand
  • jetstream.go:38
expand 0 comments