···2929 "encoding/json"
3030 "fmt"
3131 "log/slog"
3232+ "time"
32333334 "tangled.org/core/api/tangled"
3435 "tangled.org/core/eventconsumer"
···7374type knotConsumer struct {
7475 c *eventconsumer.Consumer
7576 log *slog.Logger
7777+ // br is how we publish synthesized sh.tangled.pipeline.status
7878+ // records back out to /events subscribers. Today it's driven by
7979+ // the fake-job stand-in in process(); once we hook up Buildkite,
8080+ // the webhook handler will be the primary publisher.
8181+ br *broker
7682}
77837884// Compile-time interface conformance check.
···8894// restart is harmless. When we start translating triggers into real
8995// Buildkite builds, this should switch to a SQLite-backed cursor store
9096// to avoid duplicate builds.
9191-func startKnotConsumer(ctx context.Context, cfg config, st *store) (*knotConsumer, error) {
9797+func startKnotConsumer(ctx context.Context, cfg config, st *store, br *broker) (*knotConsumer, error) {
9298 logger := loggerFrom(ctx).With("component", "knotconsumer")
939994100 knots, err := st.KnotsForSpindle(ctx, cfg.Hostname)
···96102 return nil, fmt.Errorf("load known knots: %w", err)
97103 }
981049999- kc := &knotConsumer{log: logger}
105105+ kc := &knotConsumer{log: logger, br: br}
100106101107 ccfg := eventconsumer.NewConsumerConfig()
102108 ccfg.Logger = logger
···194200 "workflows", len(p.Workflows),
195201 )
196202203203+ // Stand-in for the real Buildkite integration. Spawn one fake
204204+ // job per workflow so the /events fan-out has something to
205205+ // emit and the appview can show progress end to end. We hand
206206+ // each goroutine the worker ctx (app-scoped) so they survive
207207+ // process() returning but exit cleanly on shutdown.
208208+ k.spawnFakeJobs(ctx, src.Key(), msg.Rkey, p.Workflows)
209209+197210 default:
198211 // Knots may publish other record types over the same stream; we
199212 // don't care about them yet. Debug-only so it's available when
···208221209222 return nil
210223}
224224+225225+// fakeJob constants. Pulled out so it's obvious where the timing
226226+// numbers come from, and trivially adjustable when we want to dial the
227227+// fake up or down.
228228+const (
229229+ // fakeJobDuration is the wall-clock length of a fake run. Total
230230+ // publishes per workflow = (fakeJobDuration / fakeJobInterval) + 1
231231+ // (one final "success").
232232+ fakeJobDuration = 30 * time.Second
233233+ // fakeJobInterval is how often we emit a "running" heartbeat.
234234+ fakeJobInterval = 5 * time.Second
235235+)
236236+237237+// spawnFakeJobs starts a goroutine per workflow. They each emit a
238238+// stream of sh.tangled.pipeline.status records via the broker until
239239+// either the fake duration elapses (success) or ctx is cancelled.
240240+//
241241+// This is a deliberate stand-in: it lets us validate the entire
242242+// jetstream → knot → broker → /events → appview pipeline before the
243243+// real Buildkite plumbing is in place.
244244+func (k *knotConsumer) spawnFakeJobs(ctx context.Context, knot, pipelineRkey string, workflows []*tangled.Pipeline_Workflow) {
245245+ if len(workflows) == 0 {
246246+ // Nothing to fake — without a workflow name there's no valid
247247+ // pipeline.status record to publish.
248248+ k.log.Warn("pipeline has no workflows; skipping fake run",
249249+ "knot", knot, "rkey", pipelineRkey,
250250+ )
251251+ return
252252+ }
253253+ for _, wf := range workflows {
254254+ if wf == nil || wf.Name == "" {
255255+ continue
256256+ }
257257+ go k.runFakeJob(ctx, knot, pipelineRkey, wf.Name)
258258+ }
259259+}
260260+261261+// runFakeJob emits a "running" status every fakeJobInterval for
262262+// fakeJobDuration, then a final "success". It returns early if ctx is
263263+// cancelled (shutdown) — without doing a final publish, since we'd be
264264+// writing to a broker whose store may be closing.
265265+func (k *knotConsumer) runFakeJob(ctx context.Context, knot, pipelineRkey, workflow string) {
266266+ // pipelineURI is what the appview parses out of the status record
267267+ // to associate it with the originating pipeline. Format mirrors
268268+ // what the upstream spindle emits: at://did:web:<knot>/<nsid>/<rkey>
269269+ // — the appview strips the did:web: prefix and uses the hostname
270270+ // as the knot identifier.
271271+ pipelineURI := fmt.Sprintf("at://did:web:%s/%s/%s",
272272+ knot, tangled.PipelineNSID, pipelineRkey,
273273+ )
274274+275275+ logger := k.log.With(
276276+ "knot", knot,
277277+ "pipeline_rkey", pipelineRkey,
278278+ "workflow", workflow,
279279+ )
280280+281281+ // Heartbeat phase. seq doubles as a per-workflow disambiguator in
282282+ // the synthesized status rkey so multiple fakes don't collide.
283283+ deadline := time.Now().Add(fakeJobDuration)
284284+ seq := 0
285285+ for time.Now().Before(deadline) {
286286+ if err := k.publishStatus(ctx, pipelineURI, workflow, "running", seq); err != nil {
287287+ logger.Error("publish fake running status", "err", err, "seq", seq)
288288+ return
289289+ }
290290+ seq++
291291+ select {
292292+ case <-ctx.Done():
293293+ logger.Debug("fake job cancelled mid-run", "seq", seq)
294294+ return
295295+ case <-time.After(fakeJobInterval):
296296+ }
297297+ }
298298+299299+ // Terminal status. Marked as "success" using the upstream
300300+ // StatusKind enum's success label (see tangled.org/core/spindle/models).
301301+ if err := k.publishStatus(ctx, pipelineURI, workflow, "success", seq); err != nil {
302302+ logger.Error("publish fake success status", "err", err, "seq", seq)
303303+ return
304304+ }
305305+ logger.Info("fake job complete")
306306+}
307307+308308+// publishStatus assembles a tangled.PipelineStatus, marshals it, and
309309+// hands it to the broker for persistence + fan-out. The rkey we mint
310310+// is purely synthetic — it just needs to be unique across our event
311311+// log; the appview keys its rows on (spindle, rkey).
312312+func (k *knotConsumer) publishStatus(ctx context.Context, pipelineURI, workflow, status string, seq int) error {
313313+ rec := tangled.PipelineStatus{
314314+ LexiconTypeID: tangled.PipelineStatusNSID,
315315+ Pipeline: pipelineURI,
316316+ Workflow: workflow,
317317+ Status: status,
318318+ CreatedAt: time.Now().UTC().Format(time.RFC3339),
319319+ }
320320+ body, err := json.Marshal(rec)
321321+ if err != nil {
322322+ return fmt.Errorf("marshal pipeline.status: %w", err)
323323+ }
324324+ rkey := fmt.Sprintf("fake-%d-%s-%d", time.Now().UnixNano(), workflow, seq)
325325+ if _, err := k.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil {
326326+ return fmt.Errorf("publish pipeline.status: %w", err)
327327+ }
328328+ return nil
329329+}
+4-2
main.go
···115115 br := newBroker(st)
116116117117 // Start the knot event-stream consumer first so the jetstream
118118- // loop has somewhere to register newly-observed knots into.
119119- knots, err := startKnotConsumer(ctx, cfg, st)
118118+ // loop has somewhere to register newly-observed knots into. It
119119+ // gets the broker so its (currently fake) pipeline runner can
120120+ // publish sh.tangled.pipeline.status events back out via /events.
121121+ knots, err := startKnotConsumer(ctx, cfg, st, br)
120122 if err != nil {
121123 logger.Error("failed to start knot consumer", "err", err)
122124 os.Exit(1)