Stitch any CI into Tangled
1package main
2
3// HTTP surface of the spindle.
4//
5// Four roles to keep in mind:
6//
7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during
8// spindle registration to confirm the operator owns this instance.
9// 2. Event stream: the appview holds a long-lived websocket against
10// /events to receive sh.tangled.pipeline.status frames as builds
11// progress. Today this is just a keep-alive; payloads land once the
12// Buildkite webhook receiver is wired up.
13// 3. Webhooks: Buildkite POSTs build/job state changes to
14// /webhooks/buildkite, which we'll translate into pipeline.status
15// events on (2).
16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the
17// configured Provider so the appview (or a curling operator) can
18// pull captured workflow output for a specific run.
19
20import (
21 "context"
22 "encoding/json"
23 "errors"
24 "fmt"
25 "io"
26 "log/slog"
27 "net/http"
28 "strconv"
29 "time"
30
31 "github.com/gorilla/websocket"
32 "tangled.org/core/api/tangled"
33
34 "go.mitchellh.com/tack/internal/buildkite"
35)
36
37// wsWriteWait bounds how long any single websocket write (frame or
38// control) is allowed to block before we treat the peer as dead. A
39// client that stops reading but keeps the TCP connection open would
40// otherwise hang the handler indefinitely on a full kernel send buffer.
41// 10s is intentionally generous: real backpressure resolves in
42// milliseconds, so anything past that is a stuck peer we'd rather drop
43// than serve.
44const wsWriteWait = 10 * time.Second
45
46// runHTTP starts the spindle's HTTP server and blocks until ctx is
47// cancelled or the listener returns a fatal error. On ctx cancellation it
48// performs a graceful shutdown with a bounded timeout.
49//
50// The logger is read from ctx via loggerFrom. The broker is the
51// in-process pub/sub used by /events to fan published records out to
52// connected websocket subscribers. bkProvider may be nil — when a
53// deployment runs the fake provider, /webhooks/buildkite still
54// registers but responds 503, so a misdirected Buildkite webhook
55// gets a clear "this spindle isn't accepting Buildkite events" rather
56// than a misleading 200.
57func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider, bkProvider *buildkiteProvider) error {
58 logger := loggerFrom(ctx)
59
60 mux := http.NewServeMux()
61 mux.HandleFunc("GET /", rootHandler())
62 mux.HandleFunc("GET /events", eventsHandler(logger, br))
63 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider))
64 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID))
65 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler(logger, bkProvider))
66
67 srv := &http.Server{
68 Addr: cfg.Addr,
69 Handler: mux,
70 ReadHeaderTimeout: 5 * time.Second,
71 }
72
73 // Run ListenAndServe on a goroutine so we can race it against ctx.Done.
74 errCh := make(chan error, 1)
75 go func() {
76 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID)
77 errCh <- srv.ListenAndServe()
78 }()
79
80 select {
81 case <-ctx.Done():
82 logger.Info("shutting down")
83 case err := <-errCh:
84 // ErrServerClosed means we shut ourselves down cleanly elsewhere
85 // — anything else is a real failure to report.
86 if err != nil && !errors.Is(err, http.ErrServerClosed) {
87 return fmt.Errorf("http server: %w", err)
88 }
89 }
90
91 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
92 defer cancel()
93 return srv.Shutdown(shutdownCtx)
94}
95
96// rootHandler responds at "/" with a friendly identifier page. Mainly
97// useful as a liveness check during deployment, and as a calling card
98// for the curious operator who hits the root in a browser.
99func rootHandler() http.HandlerFunc {
100 // Plain-text banner. "tack" rendered in figlet's larry3d font; the
101 // shape mirrors the style the bluesky PDS serves at its own root,
102 // which feels like the right vibe for an atproto-adjacent service.
103 // The two backticks in the larry3d output collide with Go's raw
104 // string delimiter, so we concatenate them in as interpreted
105 // substrings to keep the art byte-identical to figlet's output.
106 const banner = ` __ __
107/\ \__ /\ \
108\ \ ,_\ __ ___\ \ \/'\
109 \ \ \/ /'__` + "`" + `\ /'___\ \ , <
110 \ \ \_/\ \L\.\_/\ \__/\ \ \\` + "`" + `\
111 \ \__\ \__/.\_\ \____\\ \_\ \_\
112 \/__/\/__/\/_/\/____/ \/_/\/_/
113
114
115This is a tack server: a tangled spindle for other CI services.
116
117Most API routes are under /xrpc/
118
119 Code: https://github.com/mitchellh/tack
120`
121 return func(w http.ResponseWriter, r *http.Request) {
122 // Only respond at exactly "/" — without this guard, the
123 // "GET /" pattern would also catch arbitrary unmatched
124 // paths like "/foo" and lie about being the root.
125 if r.URL.Path != "/" {
126 http.NotFound(w, r)
127 return
128 }
129 w.Header().Set("Content-Type", "text/plain; charset=utf-8")
130 fmt.Fprint(w, banner)
131 }
132}
133
134// ownerHandler implements sh.tangled.owner so the Tangled appview can verify
135// this spindle's owner during registration.
136func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc {
137 return func(w http.ResponseWriter, r *http.Request) {
138 w.Header().Set("Content-Type", "application/json")
139 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil {
140 logger.Error("encode owner response", "err", err)
141 }
142 }
143}
144
145// buildkiteWebhookHandler receives Buildkite Pipelines webhook events,
146// authenticates the request against whichever scheme the provider was
147// configured with, and hands the decoded payload to the provider for
148// translation into a sh.tangled.pipeline.status publish.
149//
150// Authentication is intentionally fail-closed: when bk is nil (no
151// Buildkite provider configured) we 503 instead of accepting events
152// silently. The body is buffered up front because signature mode
153// HMACs the raw bytes — we can't rely on the JSON decoder reading
154// the request body before verification.
155//
156// Acknowledgement contract with Buildkite: we 200 on any well-formed
157// event we accepted (including events we deliberately ignore, like
158// job.* or builds we don't track), and 5xx only on internal failure
159// the operator should look at. A 4xx/5xx makes Buildkite retry,
160// which we don't want for "this isn't an event we care about".
161func buildkiteWebhookHandler(logger *slog.Logger, bk *buildkiteProvider) http.HandlerFunc {
162 return func(w http.ResponseWriter, r *http.Request) {
163 if bk == nil {
164 http.Error(w, "buildkite provider not configured",
165 http.StatusServiceUnavailable)
166 return
167 }
168
169 // Cap body size so a malicious sender can't exhaust
170 // memory; Buildkite payloads in practice are well under
171 // 64 KiB but a generous-but-bounded ceiling is the
172 // right shape here.
173 body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
174 if err != nil {
175 logger.Warn("buildkite webhook: read body", "err", err)
176 http.Error(w, "read body", http.StatusBadRequest)
177 return
178 }
179
180 if err := bk.VerifyWebhook(r.Header, body); err != nil {
181 logger.Warn("buildkite webhook: verify failed",
182 "err", err,
183 "remote", r.RemoteAddr,
184 )
185 http.Error(w, "unauthorized", http.StatusUnauthorized)
186 return
187 }
188
189 var payload buildkite.WebhookPayload
190 if err := json.Unmarshal(body, &payload); err != nil {
191 logger.Warn("buildkite webhook: decode body", "err", err)
192 http.Error(w, "bad payload", http.StatusBadRequest)
193 return
194 }
195 // The X-Buildkite-Event header is authoritative for the
196 // event name; the body field is convenience but doesn't
197 // always match exactly. Prefer the header.
198 if h := r.Header.Get("X-Buildkite-Event"); h != "" {
199 payload.Event = h
200 }
201
202 // Translate + publish on the request context so a slow
203 // store/broker doesn't outlive an aborted webhook
204 // connection.
205 if err := bk.HandleWebhook(r.Context(), payload); err != nil {
206 logger.Error("buildkite webhook: handle", "err", err,
207 "event", payload.Event,
208 "build_uuid", payload.Build.ID,
209 )
210 http.Error(w, "internal error", http.StatusInternalServerError)
211 return
212 }
213 w.WriteHeader(http.StatusOK)
214 }
215}
216
217// logsHandler serves captured workflow logs over a WebSocket,
218// matching the wire protocol of the upstream Tangled spindle so the
219// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in
220// source. The path shape is
221//
222// GET /logs/{knot}/{pipelineRkey}/{workflow}
223//
224// which matches the (knot, pipelineRkey, workflow) tuple
225// Provider.Spawn is invoked with — the same identity used in the
226// pipeline ATURI. Workflow names commonly contain a dot (e.g.
227// "test.yml"); ServeMux path patterns match a single segment, so the
228// literal value flows straight through r.PathValue.
229//
230// Wire shape per frame: a single TextMessage carrying one JSON
231// LogLine record (defined in provider.go; byte-compatible with
232// tangled.org/core/spindle/models.LogLine). The Provider hands us a
233// channel of LogLine values; we marshal each one and forward it as
234// one frame so the appview's per-line decode path works unchanged.
235//
236// Error mapping is intentionally done *before* the WebSocket upgrade:
237// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500
238// so the appview's websocket.DefaultDialer surfaces a real HTTP
239// status rather than an immediate close.
240func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc {
241 upgrader := websocket.Upgrader{
242 ReadBufferSize: 1024,
243 WriteBufferSize: 1024,
244 }
245 return func(w http.ResponseWriter, r *http.Request) {
246 knot := r.PathValue("knot")
247 pipelineRkey := r.PathValue("pipelineRkey")
248 workflow := r.PathValue("workflow")
249
250 // Defensive: ServeMux won't match an empty segment, but a
251 // future router change shouldn't be allowed to silently
252 // produce an "all logs" query.
253 if knot == "" || pipelineRkey == "" || workflow == "" {
254 http.Error(w, "missing path component", http.StatusBadRequest)
255 return
256 }
257
258 // Establish the log channel BEFORE the WebSocket upgrade so
259 // ErrLogsNotFound / backend errors surface as a real HTTP
260 // status to the appview's dialer rather than as an immediate
261 // post-upgrade close. ctx scopes the producer's lifetime —
262 // it's cancelled below the moment the client disconnects.
263 ctx, cancel := context.WithCancel(r.Context())
264 defer cancel()
265
266 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow)
267 if err != nil {
268 if errors.Is(err, ErrLogsNotFound) {
269 http.Error(w, "logs not found", http.StatusNotFound)
270 return
271 }
272 logger.Error("logs fetch failed",
273 "err", err,
274 "knot", knot,
275 "pipeline_rkey", pipelineRkey,
276 "workflow", workflow,
277 )
278 http.Error(w, "logs unavailable", http.StatusInternalServerError)
279 return
280 }
281
282 conn, err := upgrader.Upgrade(w, r, nil)
283 if err != nil {
284 // Upgrade already wrote a response; just record the
285 // failure for diagnostics.
286 logger.Error("logs websocket upgrade failed",
287 "err", err,
288 "knot", knot,
289 "pipeline_rkey", pipelineRkey,
290 "workflow", workflow,
291 )
292 return
293 }
294 defer func() {
295 // Send a close frame on the way out so the appview proxy
296 // sees a clean shutdown. Mirrors upstream
297 // spindle.(*Spindle).Logs. WriteControl honours the
298 // deadline argument directly, so a stuck peer can't hang
299 // us here.
300 _ = conn.WriteControl(
301 websocket.CloseMessage,
302 websocket.FormatCloseMessage(
303 websocket.CloseNormalClosure, "log stream complete",
304 ),
305 time.Now().Add(wsWriteWait),
306 )
307 conn.Close()
308 }()
309
310 // Detect client disconnect by trying to read; we don't expect
311 // any payloads from the client, so any read outcome (including
312 // EOF) signals the connection has gone away. The cancel hits
313 // the producer goroutine inside the Provider, which stops
314 // sending and closes ch — our drain loop then exits cleanly.
315 go func() {
316 for {
317 if _, _, err := conn.NextReader(); err != nil {
318 cancel()
319 return
320 }
321 }
322 }()
323
324 // Drain the channel; closure means the run is complete (or
325 // the producer hit ctx). Marshal-then-write each LogLine as
326 // a single TextMessage frame.
327 for {
328 select {
329 case <-ctx.Done():
330 return
331 case line, ok := <-ch:
332 if !ok {
333 return
334 }
335 frame, err := json.Marshal(line)
336 if err != nil {
337 // The struct is fully internal; a marshal failure
338 // is a programmer bug. Log and bail rather than
339 // poison the stream with a half-frame.
340 logger.Error("marshal log line",
341 "err", err,
342 "knot", knot,
343 "pipeline_rkey", pipelineRkey,
344 "workflow", workflow,
345 )
346 return
347 }
348 // Bound the write so a client that stopped reading
349 // but kept the TCP connection open can't hang us on a
350 // full kernel send buffer. WriteMessage doesn't take a
351 // deadline argument the way WriteControl does — we
352 // have to set it on the conn before each frame.
353 if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
354 logger.Debug("logs set write deadline failed",
355 "err", err,
356 "knot", knot,
357 "pipeline_rkey", pipelineRkey,
358 "workflow", workflow,
359 )
360 return
361 }
362 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
363 logger.Debug("logs frame write failed",
364 "err", err,
365 "knot", knot,
366 "pipeline_rkey", pipelineRkey,
367 "workflow", workflow,
368 )
369 return
370 }
371 }
372 }
373 }
374}
375
376// eventsHandler upgrades to a WebSocket and streams persisted records
377// to the connected client. The wire protocol mirrors the upstream
378// Tangled spindle so the appview's eventconsumer treats us as a
379// drop-in source:
380//
381// - Optional ?cursor=<int64> resumes after that rowid; absent or 0
382// means "from the beginning of our retained log".
383// - We do a backfill pass first (everything with created > cursor),
384// then loop: on each broker signal, drain new rows; on a 30s
385// timer, write a websocket ping so intermediaries don't idle the
386// connection out.
387//
388// We subscribe to the broker *before* the backfill so any Publish that
389// races between the cursor read and the loop entry is captured by the
390// pending channel signal — the loop will see it on its first iteration
391// and call streamEvents again, which is idempotent on the cursor.
392func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc {
393 upgrader := websocket.Upgrader{
394 ReadBufferSize: 1024,
395 WriteBufferSize: 1024,
396 }
397 return func(w http.ResponseWriter, r *http.Request) {
398 conn, err := upgrader.Upgrade(w, r, nil)
399 if err != nil {
400 logger.Error("websocket upgrade failed", "err", err)
401 return
402 }
403 defer conn.Close()
404
405 // Parse the resume cursor up front. An unparseable cursor is a
406 // client bug, but rather than 4xx the upgraded connection we
407 // log it and start from zero — same behaviour as the upstream
408 // spindle.
409 var cursor int64
410 if raw := r.URL.Query().Get("cursor"); raw != "" {
411 parsed, err := strconv.ParseInt(raw, 10, 64)
412 if err != nil {
413 logger.Warn("events: bad cursor, starting from 0",
414 "cursor", raw, "err", err,
415 )
416 } else {
417 cursor = parsed
418 }
419 }
420 logger.Debug("events client connected",
421 "remote", r.RemoteAddr, "cursor", cursor,
422 )
423
424 // Subscribe before the backfill so a Publish that races between
425 // the EventsAfter read and our select loop is captured by the
426 // pending channel signal — we'll re-drain on the first wake-up.
427 sig := br.Subscribe()
428 defer br.Unsubscribe(sig)
429
430 ctx, cancel := context.WithCancel(r.Context())
431 defer cancel()
432
433 // Detect client disconnect by trying to read; we don't expect
434 // any payloads from the client, so any read outcome (including
435 // EOF) signals the connection has gone away.
436 go func() {
437 for {
438 if _, _, err := conn.NextReader(); err != nil {
439 cancel()
440 return
441 }
442 }
443 }()
444
445 // Initial backfill. If this fails the connection is unusable
446 // (we can't promise ordering after a partial write) so just
447 // return and let the client reconnect with the same cursor.
448 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
449 logger.Debug("events backfill ended", "err", err, "cursor", cursor)
450 return
451 }
452
453 ticker := time.NewTicker(30 * time.Second)
454 defer ticker.Stop()
455 for {
456 select {
457 case <-ctx.Done():
458 logger.Debug("events client disconnected",
459 "remote", r.RemoteAddr, "cursor", cursor,
460 )
461 return
462 case <-sig:
463 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil {
464 logger.Debug("events stream ended", "err", err, "cursor", cursor)
465 return
466 }
467 case <-ticker.C:
468 // WriteControl takes its own deadline argument, so
469 // the ping itself can't hang us — but we still want a
470 // generous-but-bounded ceiling to match the per-frame
471 // write timeout.
472 if err := conn.WriteControl(
473 websocket.PingMessage, nil,
474 time.Now().Add(wsWriteWait),
475 ); err != nil {
476 logger.Debug("events ping failed", "err", err)
477 return
478 }
479 }
480 }
481 }
482}
483
484// streamEvents drains every event row with `created > *cursor`, writes
485// each as a wire envelope frame, and advances *cursor in lockstep. The
486// cursor is updated *after* the write succeeds so a half-flushed batch
487// (interrupted by a websocket error) replays cleanly on the next
488// connection.
489//
490// It is safe to call repeatedly: when there are no new rows the query
491// returns an empty slice and we noop.
492func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error {
493 rows, err := st.EventsAfter(ctx, *cursor)
494 if err != nil {
495 return fmt.Errorf("read events: %w", err)
496 }
497 for _, row := range rows {
498 frame, err := json.Marshal(eventsEnvelope{
499 Rkey: row.Rkey,
500 Nsid: row.Nsid,
501 Event: row.EventJSON,
502 Created: row.Created,
503 })
504 if err != nil {
505 return fmt.Errorf("marshal envelope: %w", err)
506 }
507 // Bound the per-frame write so a client that stopped reading
508 // (but didn't close the TCP connection) can't hang the
509 // handler on a full kernel send buffer. WriteMessage has no
510 // deadline argument of its own — we set it on the conn.
511 if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil {
512 return fmt.Errorf("set write deadline: %w", err)
513 }
514 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil {
515 return fmt.Errorf("write frame: %w", err)
516 }
517 *cursor = row.Created
518 }
519 return nil
520}