···2929 "encoding/json"
3030 "fmt"
3131 "log/slog"
3232- "time"
33323433 "tangled.org/core/api/tangled"
3534 "tangled.org/core/eventconsumer"
···7473type knotConsumer struct {
7574 c *eventconsumer.Consumer
7675 log *slog.Logger
7777- // br is how we publish synthesized sh.tangled.pipeline.status
7878- // records back out to /events subscribers. Today it's driven by
7979- // the fake-job stand-in in process(); once we hook up Buildkite,
8080- // the webhook handler will be the primary publisher.
8181- br *broker
7676+7777+ // provider dispatches each incoming pipeline trigger to whatever
7878+ // backend actually runs it (today: the fake provider; tomorrow:
7979+ // Buildkite). The consumer doesn't care which — it just hands
8080+ // over the decoded record and lets the provider publish status
8181+ // records back through its own broker connection.
8282+ provider Provider
8283}
83848485// Compile-time interface conformance check.
···9495// restart is harmless. When we start translating triggers into real
9596// Buildkite builds, this should switch to a SQLite-backed cursor store
9697// to avoid duplicate builds.
9797-func startKnotConsumer(ctx context.Context, cfg config, st *store, br *broker) (*knotConsumer, error) {
9898+func startKnotConsumer(ctx context.Context, cfg config, st *store, provider Provider) (*knotConsumer, error) {
9899 logger := loggerFrom(ctx).With("component", "knotconsumer")
99100100101 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname)
···102103 return nil, fmt.Errorf("load known knots: %w", err)
103104 }
104105105105- kc := &knotConsumer{log: logger, br: br}
106106+ kc := &knotConsumer{log: logger, provider: provider}
106107107108 ccfg := eventconsumer.NewConsumerConfig()
108109 ccfg.Logger = logger
···200201 "workflows", len(p.Workflows),
201202 )
202203203203- // Stand-in for the real Buildkite integration. Spawn one fake
204204- // job per workflow so the /events fan-out has something to
205205- // emit and the appview can show progress end to end. We hand
206206- // each goroutine the worker ctx (app-scoped) so they survive
207207- // process() returning but exit cleanly on shutdown.
208208- k.spawnFakeJobs(ctx, src.Key(), msg.Rkey, p.Workflows)
204204+ // Hand the trigger to whichever Provider was configured.
205205+ // Spawn is non-blocking — it fans out into provider-owned
206206+ // goroutines so this worker can move on to the next event.
207207+ // The provider keeps ctx around for shutdown coordination.
208208+ k.provider.Spawn(ctx, src.Key(), msg.Rkey, p.Workflows)
209209210210 default:
211211 // Knots may publish other record types over the same stream; we
···221221222222 return nil
223223}
224224-225225-// fakeJob constants. Pulled out so it's obvious where the timing
226226-// numbers come from, and trivially adjustable when we want to dial the
227227-// fake up or down.
228228-const (
229229- // fakeJobDuration is the wall-clock length of a fake run. Total
230230- // publishes per workflow = (fakeJobDuration / fakeJobInterval) + 1
231231- // (one final "success").
232232- fakeJobDuration = 30 * time.Second
233233- // fakeJobInterval is how often we emit a "running" heartbeat.
234234- fakeJobInterval = 5 * time.Second
235235-)
236236-237237-// spawnFakeJobs starts a goroutine per workflow. They each emit a
238238-// stream of sh.tangled.pipeline.status records via the broker until
239239-// either the fake duration elapses (success) or ctx is cancelled.
240240-//
241241-// This is a deliberate stand-in: it lets us validate the entire
242242-// jetstream → knot → broker → /events → appview pipeline before the
243243-// real Buildkite plumbing is in place.
244244-func (k *knotConsumer) spawnFakeJobs(ctx context.Context, knot, pipelineRkey string, workflows []*tangled.Pipeline_Workflow) {
245245- if len(workflows) == 0 {
246246- // Nothing to fake — without a workflow name there's no valid
247247- // pipeline.status record to publish.
248248- k.log.Warn("pipeline has no workflows; skipping fake run",
249249- "knot", knot, "rkey", pipelineRkey,
250250- )
251251- return
252252- }
253253- for _, wf := range workflows {
254254- if wf == nil || wf.Name == "" {
255255- continue
256256- }
257257- go k.runFakeJob(ctx, knot, pipelineRkey, wf.Name)
258258- }
259259-}
260260-261261-// runFakeJob emits a "running" status every fakeJobInterval for
262262-// fakeJobDuration, then a final "success". It returns early if ctx is
263263-// cancelled (shutdown) — without doing a final publish, since we'd be
264264-// writing to a broker whose store may be closing.
265265-func (k *knotConsumer) runFakeJob(ctx context.Context, knot, pipelineRkey, workflow string) {
266266- // pipelineURI is what the appview parses out of the status record
267267- // to associate it with the originating pipeline. Format mirrors
268268- // what the upstream spindle emits: at://did:web:<knot>/<nsid>/<rkey>
269269- // — the appview strips the did:web: prefix and uses the hostname
270270- // as the knot identifier.
271271- pipelineURI := fmt.Sprintf("at://did:web:%s/%s/%s",
272272- knot, tangled.PipelineNSID, pipelineRkey,
273273- )
274274-275275- logger := k.log.With(
276276- "knot", knot,
277277- "pipeline_rkey", pipelineRkey,
278278- "workflow", workflow,
279279- )
280280-281281- // Heartbeat phase. seq doubles as a per-workflow disambiguator in
282282- // the synthesized status rkey so multiple fakes don't collide.
283283- deadline := time.Now().Add(fakeJobDuration)
284284- seq := 0
285285- for time.Now().Before(deadline) {
286286- if err := k.publishStatus(ctx, pipelineURI, workflow, "running", seq); err != nil {
287287- logger.Error("publish fake running status", "err", err, "seq", seq)
288288- return
289289- }
290290- seq++
291291- select {
292292- case <-ctx.Done():
293293- logger.Debug("fake job cancelled mid-run", "seq", seq)
294294- return
295295- case <-time.After(fakeJobInterval):
296296- }
297297- }
298298-299299- // Terminal status. Marked as "success" using the upstream
300300- // StatusKind enum's success label (see tangled.org/core/spindle/models).
301301- if err := k.publishStatus(ctx, pipelineURI, workflow, "success", seq); err != nil {
302302- logger.Error("publish fake success status", "err", err, "seq", seq)
303303- return
304304- }
305305- logger.Info("fake job complete")
306306-}
307307-308308-// publishStatus assembles a tangled.PipelineStatus, marshals it, and
309309-// hands it to the broker for persistence + fan-out. The rkey we mint
310310-// is purely synthetic — it just needs to be unique across our event
311311-// log; the appview keys its rows on (spindle, rkey).
312312-func (k *knotConsumer) publishStatus(ctx context.Context, pipelineURI, workflow, status string, seq int) error {
313313- rec := tangled.PipelineStatus{
314314- LexiconTypeID: tangled.PipelineStatusNSID,
315315- Pipeline: pipelineURI,
316316- Workflow: workflow,
317317- Status: status,
318318- CreatedAt: time.Now().UTC().Format(time.RFC3339),
319319- }
320320- body, err := json.Marshal(rec)
321321- if err != nil {
322322- return fmt.Errorf("marshal pipeline.status: %w", err)
323323- }
324324- rkey := fmt.Sprintf("fake-%d-%s-%d", time.Now().UnixNano(), workflow, seq)
325325- if _, err := k.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
326326- return fmt.Errorf("publish pipeline.status: %w", err)
327327- }
328328- return nil
329329-}
+11-3
main.go
···114114 // them to publish synthetic status events at startup.
115115 br := newBroker(st)
116116117117+ // Provider that turns Tangled pipeline triggers into
118118+ // pipeline.status events. The fake provider stands in for a real
119119+ // CI integration: it emits synthetic running/success heartbeats
120120+ // over the broker so the entire jetstream → knot → /events flow
121121+ // is exercisable end-to-end. Swap this for a Buildkite-backed
122122+ // implementation once that lands.
123123+ provider := newFakeProvider(br, logger)
124124+117125 // Start the knot event-stream consumer first so the jetstream
118126 // loop has somewhere to register newly-observed knots into. It
119119- // gets the broker so its (currently fake) pipeline runner can
120120- // publish sh.tangled.pipeline.status events back out via /events.
121121- knots, err := startKnotConsumer(ctx, cfg, st, br)
127127+ // gets the provider so each incoming pipeline trigger has
128128+ // something to dispatch to.
129129+ knots, err := startKnotConsumer(ctx, cfg, st, provider)
122130 if err != nil {
123131 logger.Error("failed to start knot consumer", "err", err)
124132 os.Exit(1)
+46
provider.go
···11+package main
22+33+// Provider is the abstraction over "the thing that turns a Tangled
44+// pipeline trigger into pipeline.status events". It exists so the rest
55+// of tack can stay agnostic to whether a given trigger is dispatched to
66+// Buildkite, run by a stub for testing, or anything else we plug in later.
77+88+import (
99+ "context"
1010+1111+ "tangled.org/core/api/tangled"
1212+)
1313+1414+// Provider dispatches a Tangled pipeline trigger to whatever backend
1515+// actually runs the workflows.
1616+//
1717+// Implementations are responsible for publishing
1818+// sh.tangled.pipeline.status records back through whatever channel
1919+// they were constructed with.
2020+type Provider interface {
2121+ // Spawn kicks off a pipeline run for every workflow in workflows.
2222+ //
2323+ // It MUST be non-blocking: the caller is the eventconsumer worker
2424+ // that's shared across all knot subscriptions, so per-pipeline
2525+ // work has to live on its own goroutine. A typical implementation
2626+ // fans out into a goroutine per workflow and returns immediately.
2727+ //
2828+ // ctx is the consumer's app-scoped context (lives until shutdown,
2929+ // not just for the duration of one event). Implementations are
3030+ // expected to honour cancellation: in-flight runs should wind
3131+ // down without issuing further publishes once ctx is done.
3232+ //
3333+ // knot is the knot hostname the trigger arrived on; it's the
3434+ // authority half of the pipeline ATURI that pipeline.status
3535+ // records reference. pipelineRkey is the trigger record's rkey
3636+ // on that knot. workflows is the unmodified slice from the
3737+ // decoded sh.tangled.pipeline record; implementations should
3838+ // tolerate nil entries and zero-length names defensively, since
3939+ // the lexicon doesn't enforce either.
4040+ Spawn(
4141+ ctx context.Context,
4242+ knot string,
4343+ pipelineRkey string,
4444+ workflows []*tangled.Pipeline_Workflow,
4545+ )
4646+}
+157
provider_fake.go
···11+package main
22+33+// fakeProvider is a stand-in Provider implementation: it doesn't talk
44+// to any external CI. For each workflow in a triggered pipeline it
55+// spawns a goroutine that emits a fixed-cadence stream of
66+// sh.tangled.pipeline.status records — "running" every five seconds
77+// for thirty seconds, then a final "success" — through the broker.
88+//
99+// The point is to exercise the entire trigger → broker → /events →
1010+// appview path end-to-end before any real CI integration exists. Once
1111+// the Buildkite provider lands, this one stays around as a reference
1212+// implementation and as the test double of choice when a test wants
1313+// "something that publishes plausible status updates" without the
1414+// timing weight of real builds.
1515+1616+import (
1717+ "context"
1818+ "encoding/json"
1919+ "fmt"
2020+ "log/slog"
2121+ "time"
2222+2323+ "tangled.org/core/api/tangled"
2424+)
2525+2626+// Fake-job timing knobs. Pulled out as constants so it's obvious
2727+// where the numbers come from and they can be tuned independently of
2828+// the rest of the file. Total publishes per workflow =
2929+// (fakeJobDuration / fakeJobInterval) heartbeats + 1 final success.
3030+const (
3131+ fakeJobDuration = 30 * time.Second
3232+ fakeJobInterval = 5 * time.Second
3333+)
3434+3535+// fakeProvider implements Provider against the in-process broker.
3636+type fakeProvider struct {
3737+ br *broker
3838+ log *slog.Logger
3939+}
4040+4141+// Compile-time interface check — keeps the fake honest if Provider
4242+// ever gains additional methods.
4343+var _ Provider = (*fakeProvider)(nil)
4444+4545+// newFakeProvider constructs a fakeProvider bound to br. The provided
4646+// logger is annotated with component=provider so its output stands
4747+// apart from the knot-consumer / jetstream noise.
4848+func newFakeProvider(br *broker, log *slog.Logger) *fakeProvider {
4949+ return &fakeProvider{
5050+ br: br,
5151+ log: log.With("component", "provider", "kind", "fake"),
5252+ }
5353+}
5454+5555+// Spawn satisfies Provider. It kicks off one runWorkflow goroutine per
5656+// workflow, returning immediately so the eventconsumer worker that
5757+// invoked us isn't blocked. Goroutines inherit ctx (app-scoped) and
5858+// will exit early on cancellation.
5959+func (p *fakeProvider) Spawn(
6060+ ctx context.Context,
6161+ knot string,
6262+ pipelineRkey string,
6363+ workflows []*tangled.Pipeline_Workflow,
6464+) {
6565+ if len(workflows) == 0 {
6666+ // Without a workflow name there's no valid pipeline.status
6767+ // record to publish. Log loudly enough that an operator
6868+ // staring at the logs can tell the trigger arrived but
6969+ // produced no fake activity.
7070+ p.log.Warn("pipeline has no workflows; skipping fake run",
7171+ "knot", knot, "rkey", pipelineRkey,
7272+ )
7373+ return
7474+ }
7575+ for _, wf := range workflows {
7676+ // Defensive: the lexicon allows pointer entries and doesn't
7777+ // enforce non-empty names. We can't publish a status for an
7878+ // unnamed workflow, so just skip it.
7979+ if wf == nil || wf.Name == "" {
8080+ continue
8181+ }
8282+ go p.runWorkflow(ctx, knot, pipelineRkey, wf.Name)
8383+ }
8484+}
8585+8686+// runWorkflow emits a "running" status every fakeJobInterval until
8787+// fakeJobDuration elapses, then a final "success". On ctx
8888+// cancellation it returns without issuing the terminal publish — the
8989+// broker's underlying store may already be closing during shutdown.
9090+func (p *fakeProvider) runWorkflow(ctx context.Context, knot, pipelineRkey, workflow string) {
9191+ // pipelineURI is what the appview parses out of the status record
9292+ // to associate it back with the originating pipeline. Format
9393+ // mirrors the upstream spindle's emission:
9494+ // at://did:web:<knot>/<nsid>/<rkey>. The appview strips the
9595+ // did:web: prefix and treats the remainder as the knot identifier.
9696+ pipelineURI := fmt.Sprintf("at://did:web:%s/%s/%s",
9797+ knot, tangled.PipelineNSID, pipelineRkey,
9898+ )
9999+100100+ logger := p.log.With(
101101+ "knot", knot,
102102+ "pipeline_rkey", pipelineRkey,
103103+ "workflow", workflow,
104104+ )
105105+106106+ // Heartbeat phase. seq doubles as a per-workflow disambiguator
107107+ // in the synthesized status rkey so concurrent fakes (across
108108+ // workflows or pipelines) don't collide.
109109+ deadline := time.Now().Add(fakeJobDuration)
110110+ seq := 0
111111+ for time.Now().Before(deadline) {
112112+ if err := p.publishStatus(ctx, pipelineURI, workflow, "running", seq); err != nil {
113113+ logger.Error("publish fake running status", "err", err, "seq", seq)
114114+ return
115115+ }
116116+ seq++
117117+ select {
118118+ case <-ctx.Done():
119119+ logger.Debug("fake job cancelled mid-run", "seq", seq)
120120+ return
121121+ case <-time.After(fakeJobInterval):
122122+ }
123123+ }
124124+125125+ // Terminal publish. "success" matches the upstream StatusKind
126126+ // enum (see tangled.org/core/spindle/models) — the appview
127127+ // routes status strings through that same enum.
128128+ if err := p.publishStatus(ctx, pipelineURI, workflow, "success", seq); err != nil {
129129+ logger.Error("publish fake success status", "err", err, "seq", seq)
130130+ return
131131+ }
132132+ logger.Info("fake job complete")
133133+}
134134+135135+// publishStatus assembles a tangled.PipelineStatus record, marshals
136136+// it, and pushes it through the broker. The synthesized rkey just
137137+// needs to be unique within our event log; the appview keys its rows
138138+// on (spindle, rkey) so we mix in time + workflow + sequence to avoid
139139+// collisions across concurrent workflows on the same pipeline.
140140+func (p *fakeProvider) publishStatus(ctx context.Context, pipelineURI, workflow, status string, seq int) error {
141141+ rec := tangled.PipelineStatus{
142142+ LexiconTypeID: tangled.PipelineStatusNSID,
143143+ Pipeline: pipelineURI,
144144+ Workflow: workflow,
145145+ Status: status,
146146+ CreatedAt: time.Now().UTC().Format(time.RFC3339),
147147+ }
148148+ body, err := json.Marshal(rec)
149149+ if err != nil {
150150+ return fmt.Errorf("marshal pipeline.status: %w", err)
151151+ }
152152+ rkey := fmt.Sprintf("fake-%d-%s-%d", time.Now().UnixNano(), workflow, seq)
153153+ if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
154154+ return fmt.Errorf("publish pipeline.status: %w", err)
155155+ }
156156+ return nil
157157+}