···2233// HTTP surface of the spindle.
44//
55-// Three roles to keep in mind:
55+// Four roles to keep in mind:
66//
77// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during
88// spindle registration to confirm the operator owns this instance.
···1313// 3. Webhooks: Buildkite POSTs build/job state changes to
1414// /webhooks/buildkite, which we'll translate into pipeline.status
1515// events on (2).
1616+// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the
1717+// configured Provider so the appview (or a curling operator) can
1818+// pull captured workflow output for a specific run.
16191720import (
1821 "context"
···3538// The logger is read from ctx via loggerFrom. The broker is the
3639// in-process pub/sub used by /events to fan published records out to
3740// connected websocket subscribers.
3838-func runHTTP(ctx context.Context, cfg config, br *broker) error {
4141+func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider) error {
3942 logger := loggerFrom(ctx)
40434144 mux := http.NewServeMux()
4245 mux.HandleFunc("GET /", rootHandler())
4346 mux.HandleFunc("GET /events", eventsHandler(logger, br))
4747+ mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider))
4448 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID))
4549 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler())
4650···97101func buildkiteWebhookHandler() http.HandlerFunc {
98102 return func(w http.ResponseWriter, r *http.Request) {
99103 http.Error(w, "not implemented", http.StatusNotImplemented)
104104+ }
105105+}
106106+107107+// logsHandler serves captured workflow logs over a WebSocket,
108108+// matching the wire protocol of the upstream Tangled spindle so the
109109+// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in
110110+// source. The path shape is
111111+//
112112+// GET /logs/{knot}/{pipelineRkey}/{workflow}
113113+//
114114+// which matches the (knot, pipelineRkey, workflow) tuple
115115+// Provider.Spawn is invoked with — the same identity used in the
116116+// pipeline ATURI. Workflow names commonly contain a dot (e.g.
117117+// "test.yml"); ServeMux path patterns match a single segment, so the
118118+// literal value flows straight through r.PathValue.
119119+//
120120+// Wire shape per frame: a single TextMessage carrying one JSON
121121+// LogLine record (defined in provider.go; byte-compatible with
122122+// tangled.org/core/spindle/models.LogLine). The Provider hands us a
123123+// channel of LogLine values; we marshal each one and forward it as
124124+// one frame so the appview's per-line decode path works unchanged.
125125+//
126126+// Error mapping is intentionally done *before* the WebSocket upgrade:
127127+// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500
128128+// so the appview's websocket.DefaultDialer surfaces a real HTTP
129129+// status rather than an immediate close.
130130+func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc {
131131+ upgrader := websocket.Upgrader{
132132+ ReadBufferSize: 1024,
133133+ WriteBufferSize: 1024,
134134+ }
135135+ return func(w http.ResponseWriter, r *http.Request) {
136136+ knot := r.PathValue("knot")
137137+ pipelineRkey := r.PathValue("pipelineRkey")
138138+ workflow := r.PathValue("workflow")
139139+140140+ // Defensive: ServeMux won't match an empty segment, but a
141141+ // future router change shouldn't be allowed to silently
142142+ // produce an "all logs" query.
143143+ if knot == "" || pipelineRkey == "" || workflow == "" {
144144+ http.Error(w, "missing path component", http.StatusBadRequest)
145145+ return
146146+ }
147147+148148+ // Establish the log channel BEFORE the WebSocket upgrade so
149149+ // ErrLogsNotFound / backend errors surface as a real HTTP
150150+ // status to the appview's dialer rather than as an immediate
151151+ // post-upgrade close. ctx scopes the producer's lifetime —
152152+ // it's cancelled below the moment the client disconnects.
153153+ ctx, cancel := context.WithCancel(r.Context())
154154+ defer cancel()
155155+156156+ ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow)
157157+ if err != nil {
158158+ if errors.Is(err, ErrLogsNotFound) {
159159+ http.Error(w, "logs not found", http.StatusNotFound)
160160+ return
161161+ }
162162+ logger.Error("logs fetch failed",
163163+ "err", err,
164164+ "knot", knot,
165165+ "pipeline_rkey", pipelineRkey,
166166+ "workflow", workflow,
167167+ )
168168+ http.Error(w, "logs unavailable", http.StatusInternalServerError)
169169+ return
170170+ }
171171+172172+ conn, err := upgrader.Upgrade(w, r, nil)
173173+ if err != nil {
174174+ // Upgrade already wrote a response; just record the
175175+ // failure for diagnostics.
176176+ logger.Error("logs websocket upgrade failed",
177177+ "err", err,
178178+ "knot", knot,
179179+ "pipeline_rkey", pipelineRkey,
180180+ "workflow", workflow,
181181+ )
182182+ return
183183+ }
184184+ defer func() {
185185+ // Send a close frame on the way out so the appview proxy
186186+ // sees a clean shutdown. Mirrors upstream
187187+ // spindle.(*Spindle).Logs.
188188+ _ = conn.WriteControl(
189189+ websocket.CloseMessage,
190190+ websocket.FormatCloseMessage(
191191+ websocket.CloseNormalClosure, "log stream complete",
192192+ ),
193193+ time.Now().Add(time.Second),
194194+ )
195195+ conn.Close()
196196+ }()
197197+198198+ // Detect client disconnect by trying to read; we don't expect
199199+ // any payloads from the client, so any read outcome (including
200200+ // EOF) signals the connection has gone away. The cancel hits
201201+ // the producer goroutine inside the Provider, which stops
202202+ // sending and closes ch — our drain loop then exits cleanly.
203203+ go func() {
204204+ for {
205205+ if _, _, err := conn.NextReader(); err != nil {
206206+ cancel()
207207+ return
208208+ }
209209+ }
210210+ }()
211211+212212+ // Drain the channel; closure means the run is complete (or
213213+ // the producer hit ctx). Marshal-then-write each LogLine as
214214+ // a single TextMessage frame.
215215+ for {
216216+ select {
217217+ case <-ctx.Done():
218218+ return
219219+ case line, ok := <-ch:
220220+ if !ok {
221221+ return
222222+ }
223223+ frame, err := json.Marshal(line)
224224+ if err != nil {
225225+ // The struct is fully internal; a marshal failure
226226+ // is a programmer bug. Log and bail rather than
227227+ // poison the stream with a half-frame.
228228+ logger.Error("marshal log line",
229229+ "err", err,
230230+ "knot", knot,
231231+ "pipeline_rkey", pipelineRkey,
232232+ "workflow", workflow,
233233+ )
234234+ return
235235+ }
236236+ if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
237237+ logger.Debug("logs frame write failed",
238238+ "err", err,
239239+ "knot", knot,
240240+ "pipeline_rkey", pipelineRkey,
241241+ "workflow", workflow,
242242+ )
243243+ return
244244+ }
245245+ }
246246+ }
100247 }
101248}
102249
+1-1
main.go
···143143144144 // Run the HTTP server. This blocks until ctx is cancelled or the
145145 // listener errors.
146146- if err := runHTTP(ctx, cfg, br); err != nil {
146146+ if err := runHTTP(ctx, cfg, br, provider); err != nil {
147147 logger.Error("http server error", "err", err)
148148 os.Exit(1)
149149 }
+75-1
provider.go
···7788import (
99 "context"
1010+ "errors"
1111+ "time"
10121113 "tangled.org/core/api/tangled"
1214)
13151616+// LogLine is the on-the-wire shape of a single log frame emitted by a
1717+// Provider. It mirrors tangled.org/core/spindle/models.LogLine on the
1818+// JSON level without importing the upstream package — that package
1919+// transitively pulls in git, vault, redis and a few hundred other
2020+// modules just to expose a handful of types we don't otherwise need.
2121+//
2222+// The JSON tags here MUST stay byte-compatible with the upstream
2323+// struct: the appview's log proxy decodes against the upstream type,
2424+// so any tag drift breaks the appview's renderer.
2525+type LogLine struct {
2626+ Kind string `json:"kind"`
2727+ Content string `json:"content"`
2828+ Time time.Time `json:"time"`
2929+ StepId int `json:"step_id"`
3030+ Stream string `json:"stream,omitempty"`
3131+ StepStatus string `json:"step_status,omitempty"`
3232+ StepKind string `json:"step_kind,omitempty"`
3333+ StepCommand string `json:"step_command,omitempty"`
3434+}
3535+3636+// LogKind / StepStatus enum values, matching the upstream constants
3737+// (LogKindData, LogKindControl, StepStatusStart, StepStatusEnd) on
3838+// the wire. Use these instead of bare strings so we don't drift.
3939+const (
4040+ LogKindData = "data"
4141+ LogKindControl = "control"
4242+ StepStatusStart = "start"
4343+ StepStatusEnd = "end"
4444+)
4545+4646+// ErrLogsNotFound is returned by Provider.Logs when the requested
4747+// (knot, pipelineRkey, workflow) tuple has no recorded logs — either
4848+// because that workflow never ran on this spindle, or because the
4949+// provider has since dropped it. The HTTP /logs handler translates
5050+// this into a 404 *before* upgrading to WebSocket so the appview's
5151+// dialer sees a real HTTP error rather than an immediate close.
5252+var ErrLogsNotFound = errors.New("logs not found")
5353+1454// Provider dispatches a Tangled pipeline trigger to whatever backend
1515-// actually runs the workflows.
5555+// actually runs the workflows, and exposes per-workflow log retrieval.
1656//
1757// Implementations are responsible for publishing
1858// sh.tangled.pipeline.status records back through whatever channel
···4383 pipelineRkey string,
4484 workflows []*tangled.Pipeline_Workflow,
4585 )
8686+8787+ // Logs returns a channel of log frames for a single workflow run,
8888+ // identified by the same (knot, pipelineRkey, workflow) tuple
8989+ // Spawn was invoked with.
9090+ //
9191+ // Each LogLine corresponds 1:1 to one frame written by the HTTP
9292+ // /logs WebSocket handler — the handler marshals the LogLine and
9393+ // emits it as a single TextMessage so appview clients see the
9494+ // exact same wire shape they would from the stock Tangled spindle
9595+ // (whose on-disk log file holds the same records, one per line).
9696+ //
9797+ // The returned channel is closed by the provider when the stream
9898+ // is complete; a closed channel is the only "end of stream"
9999+ // signal — there is no separate done channel. Implementations
100100+ // MUST also stop sending and close the channel when ctx is
101101+ // cancelled, so a disconnecting client doesn't leak a producer
102102+ // goroutine.
103103+ //
104104+ // Backend errors that occur *after* a successful return (e.g.
105105+ // the upstream Buildkite log socket dying mid-stream) are logged
106106+ // internally and surface to the consumer as an early channel
107107+ // close. The same observable behaviour the appview already
108108+ // handles for a websocket that closes mid-stream.
109109+ //
110110+ // Returns ErrLogsNotFound if no logs exist for the requested
111111+ // tuple. Any other returned error indicates a backend failure
112112+ // and should surface as a 5xx to the HTTP caller. On a non-nil
113113+ // error, the channel is nil and there is nothing to drain.
114114+ Logs(
115115+ ctx context.Context,
116116+ knot string,
117117+ pipelineRkey string,
118118+ workflow string,
119119+ ) (<-chan LogLine, error)
46120}
+122-8
provider_fake.go
···55// spawns a goroutine that emits a fixed-cadence stream of
66// sh.tangled.pipeline.status records — "running" every five seconds
77// for thirty seconds, then a final "success" — through the broker.
88+// In parallel it appends synthetic LogLine records into an in-memory
99+// buffer, so /logs queries against a fake run replay something that
1010+// matches the upstream spindle's wire format.
811//
912// The point is to exercise the entire trigger → broker → /events →
1013// appview path end-to-end before any real CI integration exists. Once
···1821 "encoding/json"
1922 "fmt"
2023 "log/slog"
2424+ "sync"
2125 "time"
22262327 "tangled.org/core/api/tangled"
···3236 fakeJobInterval = 5 * time.Second
3337)
34383939+// fakeLogChanBuffer is the size of the buffered channel returned by
4040+// fakeProvider.Logs. Big enough that a snapshot of a completed fake
4141+// run (≈ 8 lines today) drops in without blocking the producer
4242+// goroutine even if the consumer is briefly slow; small enough that
4343+// a misbehaving consumer can't pin a runaway amount of memory.
4444+const fakeLogChanBuffer = 64
4545+3546// fakeProvider implements Provider against the in-process broker.
4747+//
4848+// logs holds the captured per-workflow LogLine slices, keyed by the
4949+// fakeLogKey of (knot, pipelineRkey, workflow). The mutex guards the
5050+// map *and* concurrent appends to any contained slice — runWorkflow
5151+// is the only writer for a given key, but Logs() may snapshot the
5252+// slice concurrently.
3653type fakeProvider struct {
3754 br *broker
3855 log *slog.Logger
5656+5757+ mu sync.Mutex
5858+ logs map[string][]LogLine
3959}
40604161// Compile-time interface check — keeps the fake honest if Provider
···4767// apart from the knot-consumer / jetstream noise.
4868func newFakeProvider(br *broker, log *slog.Logger) *fakeProvider {
4969 return &fakeProvider{
5050- br: br,
5151- log: log.With("component", "provider", "kind", "fake"),
7070+ br: br,
7171+ log: log.With("component", "provider", "kind", "fake"),
7272+ logs: make(map[string][]LogLine),
5273 }
5374}
5475···83104 }
84105}
85106107107+// Logs satisfies Provider. It snapshots the in-memory LogLine slice
108108+// for (knot, pipelineRkey, workflow) and returns a buffered channel
109109+// that a goroutine drains the snapshot into. The snapshot is taken
110110+// under the mutex so subsequent appends by a still-running workflow
111111+// won't be reflected — clients that want live updates re-poll. We
112112+// don't follow live writes here; the fake's purpose is to exercise
113113+// the wire format end-to-end, not to reproduce hpcloud/tail.
114114+//
115115+// The producer goroutine honours ctx so that a disconnecting HTTP
116116+// client tears the channel send down promptly instead of waiting on
117117+// an unbuffered receiver.
118118+func (p *fakeProvider) Logs(
119119+ ctx context.Context,
120120+ knot string,
121121+ pipelineRkey string,
122122+ workflow string,
123123+) (<-chan LogLine, error) {
124124+ key := fakeLogKey(knot, pipelineRkey, workflow)
125125+126126+ p.mu.Lock()
127127+ stored, ok := p.logs[key]
128128+ if !ok {
129129+ p.mu.Unlock()
130130+ return nil, ErrLogsNotFound
131131+ }
132132+ // Copy under the lock so subsequent appendLogLine writes can't
133133+ // race with our drain goroutine reading the slice header.
134134+ snapshot := make([]LogLine, len(stored))
135135+ copy(snapshot, stored)
136136+ p.mu.Unlock()
137137+138138+ out := make(chan LogLine, fakeLogChanBuffer)
139139+ go func() {
140140+ defer close(out)
141141+ for _, line := range snapshot {
142142+ select {
143143+ case <-ctx.Done():
144144+ return
145145+ case out <- line:
146146+ }
147147+ }
148148+ }()
149149+ return out, nil
150150+}
151151+86152// runWorkflow emits a "running" status every fakeJobInterval until
8787-// fakeJobDuration elapses, then a final "success". On ctx
8888-// cancellation it returns without issuing the terminal publish — the
8989-// broker's underlying store may already be closing during shutdown.
153153+// fakeJobDuration elapses, then a final "success". Alongside each
154154+// status it appends a corresponding LogLine into the in-memory log
155155+// buffer so /logs returns something coherent.
156156+//
157157+// On ctx cancellation it returns without issuing the terminal publish
158158+// or the closing control frame — the broker's underlying store may
159159+// already be closing during shutdown.
90160func (p *fakeProvider) runWorkflow(ctx context.Context, knot, pipelineRkey, workflow string) {
91161 // pipelineURI is what the appview parses out of the status record
92162 // to associate it back with the originating pipeline. Format
···103173 "workflow", workflow,
104174 )
105175176176+ // Start control frame. The appview's log renderer keys timing on
177177+ // matching start/end control frames per step_id, so we always
178178+ // emit one even though the fake has only a single synthetic step.
179179+ p.appendLogLine(knot, pipelineRkey, workflow, LogLine{
180180+ Kind: LogKindControl,
181181+ Time: time.Now(),
182182+ Content: workflow,
183183+ StepId: 0,
184184+ StepStatus: StepStatusStart,
185185+ })
186186+106187 // Heartbeat phase. seq doubles as a per-workflow disambiguator
107188 // in the synthesized status rkey so concurrent fakes (across
108189 // workflows or pipelines) don't collide.
109190 deadline := time.Now().Add(fakeJobDuration)
110191 seq := 0
111192 for time.Now().Before(deadline) {
193193+ p.appendLogLine(knot, pipelineRkey, workflow, LogLine{
194194+ Kind: LogKindData,
195195+ Time: time.Now(),
196196+ Content: fmt.Sprintf("[fake] heartbeat %d at %s\n",
197197+ seq, time.Now().UTC().Format(time.RFC3339),
198198+ ),
199199+ StepId: 0,
200200+ Stream: "stdout",
201201+ })
112202 if err := p.publishStatus(ctx, pipelineURI, workflow, "running", seq); err != nil {
113203 logger.Error("publish fake running status", "err", err, "seq", seq)
114204 return
···122212 }
123213 }
124214125125- // Terminal publish. "success" matches the upstream StatusKind
126126- // enum (see tangled.org/core/spindle/models) — the appview
127127- // routes status strings through that same enum.
215215+ // End control + terminal status publish. "success" matches the
216216+ // upstream StatusKind enum (see tangled.org/core/spindle/models)
217217+ // — the appview routes status strings through that same enum.
218218+ p.appendLogLine(knot, pipelineRkey, workflow, LogLine{
219219+ Kind: LogKindControl,
220220+ Time: time.Now(),
221221+ Content: workflow,
222222+ StepId: 0,
223223+ StepStatus: StepStatusEnd,
224224+ })
128225 if err := p.publishStatus(ctx, pipelineURI, workflow, "success", seq); err != nil {
129226 logger.Error("publish fake success status", "err", err, "seq", seq)
130227 return
···155252 }
156253 return nil
157254}
255255+256256+// appendLogLine records line as the next captured frame for the
257257+// workflow, allocating a slice on first use. Holding the mutex across
258258+// the append ensures a concurrent Logs() snapshot doesn't observe a
259259+// half-grown slice header.
260260+func (p *fakeProvider) appendLogLine(knot, pipelineRkey, workflow string, line LogLine) {
261261+ key := fakeLogKey(knot, pipelineRkey, workflow)
262262+ p.mu.Lock()
263263+ defer p.mu.Unlock()
264264+ p.logs[key] = append(p.logs[key], line)
265265+}
266266+267267+// fakeLogKey is the canonical map key for the in-memory log buffer.
268268+// Centralised so reader and writer can't drift on separator choice.
269269+func fakeLogKey(knot, pipelineRkey, workflow string) string {
270270+ return knot + "\x00" + pipelineRkey + "\x00" + workflow
271271+}