···32323333 "tangled.org/core/api/tangled"
3434 "tangled.org/core/eventconsumer"
3535+ "tangled.org/core/eventconsumer/cursor"
3536)
36373738// KnotConsumer is the surface area the rest of tack uses to interact
···9091// background, and returns the wrapper. The consumer keeps running until
9192// ctx is cancelled.
9293//
9393-// Cursor persistence is intentionally in-memory for now: we only log
9494-// events, so re-receiving a few seconds of pipeline triggers after a
9595-// restart is harmless. When we start translating triggers into real
9696-// Buildkite builds, this should switch to a SQLite-backed cursor store
9797-// to avoid duplicate builds.
9494+// Cursor persistence is backed by tangled-core's SQLite cursor store
9595+// pointed at the same database file as the rest of tack. Without
9696+// this, a restart would replay every recent pipeline event from each
9797+// knot and fire a duplicate Buildkite build for each one. The
9898+// cursor.SqliteStore opens its own *sql.DB on the file, but
9999+// mattn/go-sqlite3 in WAL mode tolerates concurrent connections, and
100100+// the `cursors` table the upstream package creates doesn't collide
101101+// with any of our migrations.
98102func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) {
99103 logger := loggerFrom(ctx).With("component", "knotconsumer")
100104···103107 return nil, fmt.Errorf("load known knots: %w", err)
104108 }
105109110110+ // Persistent per-source cursor store. Keyed by source.Key() (the
111111+ // knot hostname for KnotSource), so each knot resumes exactly
112112+ // where it left off and we don't re-fire builds for events that
113113+ // were already processed before the previous shutdown.
114114+ cursorStore, err := cursor.NewSQLiteStore(cfg.DBPath)
115115+ if err != nil {
116116+ return nil, fmt.Errorf("open knot cursor store: %w", err)
117117+ }
118118+106119 kc := &knotConsumer{log: logger, provider: provider}
107120108121 ccfg := eventconsumer.NewConsumerConfig()
109122 ccfg.Logger = logger
110123 ccfg.Dev = cfg.Dev
111124 ccfg.ProcessFunc = kc.process
125125+ ccfg.CursorStore = cursorStore
112126 for _, k := range knots {
113127 ccfg.Sources[eventconsumer.NewKnotSource(k)] = struct{}{}
114128 logger.Info("seeding knot source", "knot", k)