···2929 "encoding/json"
3030 "fmt"
3131 "log/slog"
3232+ "time"
32333334 "tangled.org/core/api/tangled"
3435 "tangled.org/core/eventconsumer"
···7576 c *eventconsumer.Consumer
7677 log *slog.Logger
77787979+ // cursors is the persistent cursor store the underlying consumer
8080+ // reads at every (re)connect. We retain a reference here so we
8181+ // can pre-seed an entry to "now" the first time we ever see a
8282+ // knot — see seedCursorIfMissing for why.
8383+ cursors cursor.Store
8484+7885 // provider dispatches each incoming pipeline trigger to whatever
7986 // backend actually runs it (today: the fake provider; tomorrow:
8087 // Buildkite). The consumer doesn't care which — it just hands
···116123 return nil, fmt.Errorf("open knot cursor store: %w", err)
117124 }
118125119119- kc := &knotConsumer{log: logger, provider: provider}
126126+ kc := &knotConsumer{log: logger, cursors: cursorStore, provider: provider}
120127121128 ccfg := eventconsumer.NewConsumerConfig()
122129 ccfg.Logger = logger
···124131 ccfg.ProcessFunc = kc.process
125132 ccfg.CursorStore = cursorStore
126133 for _, k := range knots {
134134+ // Pin a brand-new knot's cursor to "now" before the consumer
135135+ // ever connects, so a fresh tack install doesn't replay every
136136+ // historical pipeline event the knot has retained and fire a
137137+ // duplicate Buildkite build for each one.
138138+ kc.seedCursorIfMissing(k)
127139 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{}
128140 logger.Info("seeding knot source", "knot", k)
129141 }
···145157 if knot == "" {
146158 return
147159 }
160160+ // Same first-run protection as in startKnotConsumer: a knot we
161161+ // have never observed before must not retroactively fire
162162+ // pipelines for triggers older than the moment we learned about
163163+ // it. AddSource reads the cursor synchronously when it dials.
164164+ k.seedCursorIfMissing(knot)
148165 k.log.Info("adding knot source", "knot", knot)
149166 k.c.AddSource(ctx, eventconsumer.NewKnotSource(knot))
167167+}
168168+169169+// seedCursorIfMissing writes the current time (as nanoseconds since
170170+// the unix epoch, the same unit eventconsumer.worker uses when it
171171+// advances cursors after each message) to the cursor store for knot
172172+// iff no cursor is already persisted for it.
173173+//
174174+// Without this, a brand-new install — or the first time we ever
175175+// dial a previously-unseen knot — connects to /events with no
176176+// cursor query parameter, which the knot servers interpret as
177177+// "stream from the beginning of time." For pipeline records that
178178+// would fire one Buildkite build per historical trigger the knot
179179+// still has retained. Pinning the cursor up-front limits us to
180180+// events that arrive *after* tack learned about the knot.
181181+//
182182+// Get returning 0 means "no cursor stored" across all the cursor
183183+// store implementations we use (memory/sqlite/redis), so it's a
184184+// safe sentinel to gate the write.
185185+func (k *knotConsumer) seedCursorIfMissing(knot string) {
186186+ if k.cursors.Get(knot) != 0 {
187187+ return
188188+ }
189189+ now := time.Now().UnixNano()
190190+ k.cursors.Set(knot, now)
191191+ k.log.Info("seeded fresh knot cursor to now",
192192+ "knot", knot,
193193+ "cursor", now,
194194+ )
150195}
151196152197// RemoveKnot is currently a no-op — see the interface comment for the