Stitch any CI into Tangled
84
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 520 lines 18 kB view raw
1package main 2 3// HTTP surface of the spindle. 4// 5// Four roles to keep in mind: 6// 7// 1. Verification: the Tangled appview hits /xrpc/sh.tangled.owner during 8// spindle registration to confirm the operator owns this instance. 9// 2. Event stream: the appview holds a long-lived websocket against 10// /events to receive sh.tangled.pipeline.status frames as builds 11// progress. Today this is just a keep-alive; payloads land once the 12// Buildkite webhook receiver is wired up. 13// 3. Webhooks: Buildkite POSTs build/job state changes to 14// /webhooks/buildkite, which we'll translate into pipeline.status 15// events on (2). 16// 4. Logs: GET /logs/{knot}/{pipelineRkey}/{workflow} delegates to the 17// configured Provider so the appview (or a curling operator) can 18// pull captured workflow output for a specific run. 19 20import ( 21 "context" 22 "encoding/json" 23 "errors" 24 "fmt" 25 "io" 26 "log/slog" 27 "net/http" 28 "strconv" 29 "time" 30 31 "github.com/gorilla/websocket" 32 "tangled.org/core/api/tangled" 33 34 "go.mitchellh.com/tack/internal/buildkite" 35) 36 37// wsWriteWait bounds how long any single websocket write (frame or 38// control) is allowed to block before we treat the peer as dead. A 39// client that stops reading but keeps the TCP connection open would 40// otherwise hang the handler indefinitely on a full kernel send buffer. 41// 10s is intentionally generous: real backpressure resolves in 42// milliseconds, so anything past that is a stuck peer we'd rather drop 43// than serve. 44const wsWriteWait = 10 * time.Second 45 46// runHTTP starts the spindle's HTTP server and blocks until ctx is 47// cancelled or the listener returns a fatal error. On ctx cancellation it 48// performs a graceful shutdown with a bounded timeout. 49// 50// The logger is read from ctx via loggerFrom. The broker is the 51// in-process pub/sub used by /events to fan published records out to 52// connected websocket subscribers. bkProvider may be nil — when a 53// deployment runs the fake provider, /webhooks/buildkite still 54// registers but responds 503, so a misdirected Buildkite webhook 55// gets a clear "this spindle isn't accepting Buildkite events" rather 56// than a misleading 200. 57func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider, bkProvider *buildkiteProvider) error { 58 logger := loggerFrom(ctx) 59 60 mux := http.NewServeMux() 61 mux.HandleFunc("GET /", rootHandler()) 62 mux.HandleFunc("GET /events", eventsHandler(logger, br)) 63 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider)) 64 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID)) 65 mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler(logger, bkProvider)) 66 67 srv := &http.Server{ 68 Addr: cfg.Addr, 69 Handler: mux, 70 ReadHeaderTimeout: 5 * time.Second, 71 } 72 73 // Run ListenAndServe on a goroutine so we can race it against ctx.Done. 74 errCh := make(chan error, 1) 75 go func() { 76 logger.Info("listening", "addr", cfg.Addr, "owner", cfg.OwnerDID) 77 errCh <- srv.ListenAndServe() 78 }() 79 80 select { 81 case <-ctx.Done(): 82 logger.Info("shutting down") 83 case err := <-errCh: 84 // ErrServerClosed means we shut ourselves down cleanly elsewhere 85 // — anything else is a real failure to report. 86 if err != nil && !errors.Is(err, http.ErrServerClosed) { 87 return fmt.Errorf("http server: %w", err) 88 } 89 } 90 91 shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 92 defer cancel() 93 return srv.Shutdown(shutdownCtx) 94} 95 96// rootHandler responds at "/" with a friendly identifier page. Mainly 97// useful as a liveness check during deployment, and as a calling card 98// for the curious operator who hits the root in a browser. 99func rootHandler() http.HandlerFunc { 100 // Plain-text banner. "tack" rendered in figlet's larry3d font; the 101 // shape mirrors the style the bluesky PDS serves at its own root, 102 // which feels like the right vibe for an atproto-adjacent service. 103 // The two backticks in the larry3d output collide with Go's raw 104 // string delimiter, so we concatenate them in as interpreted 105 // substrings to keep the art byte-identical to figlet's output. 106 const banner = ` __ __ 107/\ \__ /\ \ 108\ \ ,_\ __ ___\ \ \/'\ 109 \ \ \/ /'__` + "`" + `\ /'___\ \ , < 110 \ \ \_/\ \L\.\_/\ \__/\ \ \\` + "`" + `\ 111 \ \__\ \__/.\_\ \____\\ \_\ \_\ 112 \/__/\/__/\/_/\/____/ \/_/\/_/ 113 114 115This is a tack server: a tangled spindle for other CI services. 116 117Most API routes are under /xrpc/ 118 119 Code: https://github.com/mitchellh/tack 120` 121 return func(w http.ResponseWriter, r *http.Request) { 122 // Only respond at exactly "/" — without this guard, the 123 // "GET /" pattern would also catch arbitrary unmatched 124 // paths like "/foo" and lie about being the root. 125 if r.URL.Path != "/" { 126 http.NotFound(w, r) 127 return 128 } 129 w.Header().Set("Content-Type", "text/plain; charset=utf-8") 130 fmt.Fprint(w, banner) 131 } 132} 133 134// ownerHandler implements sh.tangled.owner so the Tangled appview can verify 135// this spindle's owner during registration. 136func ownerHandler(logger *slog.Logger, owner string) http.HandlerFunc { 137 return func(w http.ResponseWriter, r *http.Request) { 138 w.Header().Set("Content-Type", "application/json") 139 if err := json.NewEncoder(w).Encode(tangled.Owner_Output{Owner: owner}); err != nil { 140 logger.Error("encode owner response", "err", err) 141 } 142 } 143} 144 145// buildkiteWebhookHandler receives Buildkite Pipelines webhook events, 146// authenticates the request against whichever scheme the provider was 147// configured with, and hands the decoded payload to the provider for 148// translation into a sh.tangled.pipeline.status publish. 149// 150// Authentication is intentionally fail-closed: when bk is nil (no 151// Buildkite provider configured) we 503 instead of accepting events 152// silently. The body is buffered up front because signature mode 153// HMACs the raw bytes — we can't rely on the JSON decoder reading 154// the request body before verification. 155// 156// Acknowledgement contract with Buildkite: we 200 on any well-formed 157// event we accepted (including events we deliberately ignore, like 158// job.* or builds we don't track), and 5xx only on internal failure 159// the operator should look at. A 4xx/5xx makes Buildkite retry, 160// which we don't want for "this isn't an event we care about". 161func buildkiteWebhookHandler(logger *slog.Logger, bk *buildkiteProvider) http.HandlerFunc { 162 return func(w http.ResponseWriter, r *http.Request) { 163 if bk == nil { 164 http.Error(w, "buildkite provider not configured", 165 http.StatusServiceUnavailable) 166 return 167 } 168 169 // Cap body size so a malicious sender can't exhaust 170 // memory; Buildkite payloads in practice are well under 171 // 64 KiB but a generous-but-bounded ceiling is the 172 // right shape here. 173 body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) 174 if err != nil { 175 logger.Warn("buildkite webhook: read body", "err", err) 176 http.Error(w, "read body", http.StatusBadRequest) 177 return 178 } 179 180 if err := bk.VerifyWebhook(r.Header, body); err != nil { 181 logger.Warn("buildkite webhook: verify failed", 182 "err", err, 183 "remote", r.RemoteAddr, 184 ) 185 http.Error(w, "unauthorized", http.StatusUnauthorized) 186 return 187 } 188 189 var payload buildkite.WebhookPayload 190 if err := json.Unmarshal(body, &payload); err != nil { 191 logger.Warn("buildkite webhook: decode body", "err", err) 192 http.Error(w, "bad payload", http.StatusBadRequest) 193 return 194 } 195 // The X-Buildkite-Event header is authoritative for the 196 // event name; the body field is convenience but doesn't 197 // always match exactly. Prefer the header. 198 if h := r.Header.Get("X-Buildkite-Event"); h != "" { 199 payload.Event = h 200 } 201 202 // Translate + publish on the request context so a slow 203 // store/broker doesn't outlive an aborted webhook 204 // connection. 205 if err := bk.HandleWebhook(r.Context(), payload); err != nil { 206 logger.Error("buildkite webhook: handle", "err", err, 207 "event", payload.Event, 208 "build_uuid", payload.Build.ID, 209 ) 210 http.Error(w, "internal error", http.StatusInternalServerError) 211 return 212 } 213 w.WriteHeader(http.StatusOK) 214 } 215} 216 217// logsHandler serves captured workflow logs over a WebSocket, 218// matching the wire protocol of the upstream Tangled spindle so the 219// appview's log proxy (appview/pipelines.Logs) treats us as a drop-in 220// source. The path shape is 221// 222// GET /logs/{knot}/{pipelineRkey}/{workflow} 223// 224// which matches the (knot, pipelineRkey, workflow) tuple 225// Provider.Spawn is invoked with — the same identity used in the 226// pipeline ATURI. Workflow names commonly contain a dot (e.g. 227// "test.yml"); ServeMux path patterns match a single segment, so the 228// literal value flows straight through r.PathValue. 229// 230// Wire shape per frame: a single TextMessage carrying one JSON 231// LogLine record (defined in provider.go; byte-compatible with 232// tangled.org/core/spindle/models.LogLine). The Provider hands us a 233// channel of LogLine values; we marshal each one and forward it as 234// one frame so the appview's per-line decode path works unchanged. 235// 236// Error mapping is intentionally done *before* the WebSocket upgrade: 237// ErrLogsNotFound becomes 404 and any other Logs() error becomes 500 238// so the appview's websocket.DefaultDialer surfaces a real HTTP 239// status rather than an immediate close. 240func logsHandler(logger *slog.Logger, provider Provider) http.HandlerFunc { 241 upgrader := websocket.Upgrader{ 242 ReadBufferSize: 1024, 243 WriteBufferSize: 1024, 244 } 245 return func(w http.ResponseWriter, r *http.Request) { 246 knot := r.PathValue("knot") 247 pipelineRkey := r.PathValue("pipelineRkey") 248 workflow := r.PathValue("workflow") 249 250 // Defensive: ServeMux won't match an empty segment, but a 251 // future router change shouldn't be allowed to silently 252 // produce an "all logs" query. 253 if knot == "" || pipelineRkey == "" || workflow == "" { 254 http.Error(w, "missing path component", http.StatusBadRequest) 255 return 256 } 257 258 // Establish the log channel BEFORE the WebSocket upgrade so 259 // ErrLogsNotFound / backend errors surface as a real HTTP 260 // status to the appview's dialer rather than as an immediate 261 // post-upgrade close. ctx scopes the producer's lifetime — 262 // it's cancelled below the moment the client disconnects. 263 ctx, cancel := context.WithCancel(r.Context()) 264 defer cancel() 265 266 ch, err := provider.Logs(ctx, knot, pipelineRkey, workflow) 267 if err != nil { 268 if errors.Is(err, ErrLogsNotFound) { 269 http.Error(w, "logs not found", http.StatusNotFound) 270 return 271 } 272 logger.Error("logs fetch failed", 273 "err", err, 274 "knot", knot, 275 "pipeline_rkey", pipelineRkey, 276 "workflow", workflow, 277 ) 278 http.Error(w, "logs unavailable", http.StatusInternalServerError) 279 return 280 } 281 282 conn, err := upgrader.Upgrade(w, r, nil) 283 if err != nil { 284 // Upgrade already wrote a response; just record the 285 // failure for diagnostics. 286 logger.Error("logs websocket upgrade failed", 287 "err", err, 288 "knot", knot, 289 "pipeline_rkey", pipelineRkey, 290 "workflow", workflow, 291 ) 292 return 293 } 294 defer func() { 295 // Send a close frame on the way out so the appview proxy 296 // sees a clean shutdown. Mirrors upstream 297 // spindle.(*Spindle).Logs. WriteControl honours the 298 // deadline argument directly, so a stuck peer can't hang 299 // us here. 300 _ = conn.WriteControl( 301 websocket.CloseMessage, 302 websocket.FormatCloseMessage( 303 websocket.CloseNormalClosure, "log stream complete", 304 ), 305 time.Now().Add(wsWriteWait), 306 ) 307 conn.Close() 308 }() 309 310 // Detect client disconnect by trying to read; we don't expect 311 // any payloads from the client, so any read outcome (including 312 // EOF) signals the connection has gone away. The cancel hits 313 // the producer goroutine inside the Provider, which stops 314 // sending and closes ch — our drain loop then exits cleanly. 315 go func() { 316 for { 317 if _, _, err := conn.NextReader(); err != nil { 318 cancel() 319 return 320 } 321 } 322 }() 323 324 // Drain the channel; closure means the run is complete (or 325 // the producer hit ctx). Marshal-then-write each LogLine as 326 // a single TextMessage frame. 327 for { 328 select { 329 case <-ctx.Done(): 330 return 331 case line, ok := <-ch: 332 if !ok { 333 return 334 } 335 frame, err := json.Marshal(line) 336 if err != nil { 337 // The struct is fully internal; a marshal failure 338 // is a programmer bug. Log and bail rather than 339 // poison the stream with a half-frame. 340 logger.Error("marshal log line", 341 "err", err, 342 "knot", knot, 343 "pipeline_rkey", pipelineRkey, 344 "workflow", workflow, 345 ) 346 return 347 } 348 // Bound the write so a client that stopped reading 349 // but kept the TCP connection open can't hang us on a 350 // full kernel send buffer. WriteMessage doesn't take a 351 // deadline argument the way WriteControl does — we 352 // have to set it on the conn before each frame. 353 if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { 354 logger.Debug("logs set write deadline failed", 355 "err", err, 356 "knot", knot, 357 "pipeline_rkey", pipelineRkey, 358 "workflow", workflow, 359 ) 360 return 361 } 362 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 363 logger.Debug("logs frame write failed", 364 "err", err, 365 "knot", knot, 366 "pipeline_rkey", pipelineRkey, 367 "workflow", workflow, 368 ) 369 return 370 } 371 } 372 } 373 } 374} 375 376// eventsHandler upgrades to a WebSocket and streams persisted records 377// to the connected client. The wire protocol mirrors the upstream 378// Tangled spindle so the appview's eventconsumer treats us as a 379// drop-in source: 380// 381// - Optional ?cursor=<int64> resumes after that rowid; absent or 0 382// means "from the beginning of our retained log". 383// - We do a backfill pass first (everything with created > cursor), 384// then loop: on each broker signal, drain new rows; on a 30s 385// timer, write a websocket ping so intermediaries don't idle the 386// connection out. 387// 388// We subscribe to the broker *before* the backfill so any Publish that 389// races between the cursor read and the loop entry is captured by the 390// pending channel signal — the loop will see it on its first iteration 391// and call streamEvents again, which is idempotent on the cursor. 392func eventsHandler(logger *slog.Logger, br *broker) http.HandlerFunc { 393 upgrader := websocket.Upgrader{ 394 ReadBufferSize: 1024, 395 WriteBufferSize: 1024, 396 } 397 return func(w http.ResponseWriter, r *http.Request) { 398 conn, err := upgrader.Upgrade(w, r, nil) 399 if err != nil { 400 logger.Error("websocket upgrade failed", "err", err) 401 return 402 } 403 defer conn.Close() 404 405 // Parse the resume cursor up front. An unparseable cursor is a 406 // client bug, but rather than 4xx the upgraded connection we 407 // log it and start from zero — same behaviour as the upstream 408 // spindle. 409 var cursor int64 410 if raw := r.URL.Query().Get("cursor"); raw != "" { 411 parsed, err := strconv.ParseInt(raw, 10, 64) 412 if err != nil { 413 logger.Warn("events: bad cursor, starting from 0", 414 "cursor", raw, "err", err, 415 ) 416 } else { 417 cursor = parsed 418 } 419 } 420 logger.Debug("events client connected", 421 "remote", r.RemoteAddr, "cursor", cursor, 422 ) 423 424 // Subscribe before the backfill so a Publish that races between 425 // the EventsAfter read and our select loop is captured by the 426 // pending channel signal — we'll re-drain on the first wake-up. 427 sig := br.Subscribe() 428 defer br.Unsubscribe(sig) 429 430 ctx, cancel := context.WithCancel(r.Context()) 431 defer cancel() 432 433 // Detect client disconnect by trying to read; we don't expect 434 // any payloads from the client, so any read outcome (including 435 // EOF) signals the connection has gone away. 436 go func() { 437 for { 438 if _, _, err := conn.NextReader(); err != nil { 439 cancel() 440 return 441 } 442 } 443 }() 444 445 // Initial backfill. If this fails the connection is unusable 446 // (we can't promise ordering after a partial write) so just 447 // return and let the client reconnect with the same cursor. 448 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 449 logger.Debug("events backfill ended", "err", err, "cursor", cursor) 450 return 451 } 452 453 ticker := time.NewTicker(30 * time.Second) 454 defer ticker.Stop() 455 for { 456 select { 457 case <-ctx.Done(): 458 logger.Debug("events client disconnected", 459 "remote", r.RemoteAddr, "cursor", cursor, 460 ) 461 return 462 case <-sig: 463 if err := streamEvents(ctx, conn, br.st, &cursor); err != nil { 464 logger.Debug("events stream ended", "err", err, "cursor", cursor) 465 return 466 } 467 case <-ticker.C: 468 // WriteControl takes its own deadline argument, so 469 // the ping itself can't hang us — but we still want a 470 // generous-but-bounded ceiling to match the per-frame 471 // write timeout. 472 if err := conn.WriteControl( 473 websocket.PingMessage, nil, 474 time.Now().Add(wsWriteWait), 475 ); err != nil { 476 logger.Debug("events ping failed", "err", err) 477 return 478 } 479 } 480 } 481 } 482} 483 484// streamEvents drains every event row with `created > *cursor`, writes 485// each as a wire envelope frame, and advances *cursor in lockstep. The 486// cursor is updated *after* the write succeeds so a half-flushed batch 487// (interrupted by a websocket error) replays cleanly on the next 488// connection. 489// 490// It is safe to call repeatedly: when there are no new rows the query 491// returns an empty slice and we noop. 492func streamEvents(ctx context.Context, conn *websocket.Conn, st *store, cursor *int64) error { 493 rows, err := st.EventsAfter(ctx, *cursor) 494 if err != nil { 495 return fmt.Errorf("read events: %w", err) 496 } 497 for _, row := range rows { 498 frame, err := json.Marshal(eventsEnvelope{ 499 Rkey: row.Rkey, 500 Nsid: row.Nsid, 501 Event: row.EventJSON, 502 Created: row.Created, 503 }) 504 if err != nil { 505 return fmt.Errorf("marshal envelope: %w", err) 506 } 507 // Bound the per-frame write so a client that stopped reading 508 // (but didn't close the TCP connection) can't hang the 509 // handler on a full kernel send buffer. WriteMessage has no 510 // deadline argument of its own — we set it on the conn. 511 if err := conn.SetWriteDeadline(time.Now().Add(wsWriteWait)); err != nil { 512 return fmt.Errorf("set write deadline: %w", err) 513 } 514 if err := conn.WriteMessage(websocket.TextMessage, frame); err != nil { 515 return fmt.Errorf("write frame: %w", err) 516 } 517 *cursor = row.Created 518 } 519 return nil 520}