Stitch any CI into Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

buildkite

+1842 -36
+5 -1
AGENTS.md
··· 9 9 - Comment heavily, but not redundantly. Explain the "why" behind decisions 10 10 clearly, don't repeat the "what." 11 11 - Try to limit line length below 100 characters, but don't be afraid to break 12 - this rule if it improves readability. 12 + this rule if it improves readability. 13 13 - Do not make lines too short to follow this rule either. Try 14 14 to use as much of the max characters as possible without sacrificing 15 15 readability. ··· 18 18 19 19 - Add compile-time interface checks whenever a type implements an interface 20 20 we care about. For example: `var _ io.Reader = (*MyType)(nil)`. 21 + 22 + ## Providers 23 + 24 + - Buildkite API: <https://buildkite.com/docs/apis/rest-api.md>
+29 -13
README.md
··· 35 35 go run . -addr :8080 36 36 ``` 37 37 38 - ## Endpoints (planned) 38 + ## Configuration 39 39 40 - - `GET /events` — WebSocket stream of pipeline status events, 41 - consumed by the Tangled appview. 42 - - `POST /webhooks/buildkite` — Buildkite webhook receiver. 43 - - `POST /xrpc/sh.tangled.pipeline.cancelPipeline` — cancel a running build. 40 + ### Required 41 + 42 + | Env var | Description | 43 + | ---------------- | ----------------------------------------------------------- | 44 + | `TACK_HOSTNAME` | This spindle's hostname (matches `sh.tangled.repo.spindle`) | 45 + | `TACK_OWNER_DID` | DID of the spindle operator | 44 46 45 - ## Configuration (planned) 47 + ### Optional 46 48 47 - | Env var | Description | 48 - | ---------------------- | ------------------------------------ | 49 - | `TACK_BUILDKITE_TOKEN` | Buildkite API token | 50 - | `TACK_BUILDKITE_ORG` | Buildkite organization slug | 51 - | `TACK_JETSTREAM_URL` | Tangled Jetstream WebSocket URL | 52 - | `TACK_DB_PATH` | Local SQLite path for the event log | 53 - | `TACK_OWNER_DID` | DID of the spindle operator | 49 + | Env var | Description | 50 + | -------------------- | -------------------------------------------------------- | 51 + | `TACK_LISTEN_ADDR` | HTTP listen address (default `:8080`) | 52 + | `TACK_DB_PATH` | Local SQLite path (default `tack.db`) | 53 + | `TACK_JETSTREAM_URL` | Tangled Jetstream WebSocket URL | 54 + | `TACK_DEV` | Use `ws://` for knot event-streams (any non-empty value) | 55 + 56 + ### Buildkite 57 + 58 + Setting `TACK_BUILDKITE_TOKEN` enables Buildkite mode; when unset, tack 59 + runs the in-process fake provider for local development. When 60 + Buildkite mode is enabled, every other variable in this section is 61 + required. 62 + 63 + | Env var | Description | 64 + | ------------------------------- | ------------------------------------------------------------------------------ | 65 + | `TACK_BUILDKITE_TOKEN` | Buildkite API token (enables Buildkite mode) | 66 + | `TACK_BUILDKITE_ORG` | Buildkite organization slug | 67 + | `TACK_BUILDKITE_PIPELINE` | Buildkite pipeline slug to fire builds on | 68 + | `TACK_BUILDKITE_WEBHOOK_SECRET` | Shared secret for `/webhooks/buildkite` auth | 69 + | `TACK_BUILDKITE_WEBHOOK_MODE` | `token` (default) or `signature` — must match Buildkite's notification setting |
+78 -7
http.go
··· 22 22 "encoding/json" 23 23 "errors" 24 24 "fmt" 25 + "io" 25 26 "log/slog" 26 27 "net/http" 27 28 "strconv" ··· 29 30 30 31 "github.com/gorilla/websocket" 31 32 "tangled.org/core/api/tangled" 33 + 34 + "github.com/mitchellh/tack/internal/buildkite" 32 35 ) 33 36 34 37 // runHTTP starts the spindle's HTTP server and blocks until ctx is ··· 37 40 // 38 41 // The logger is read from ctx via loggerFrom. The broker is the 39 42 // in-process pub/sub used by /events to fan published records out to 40 - // connected websocket subscribers. 41 - func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider) error { 43 + // connected websocket subscribers. bkProvider may be nil — when a 44 + // deployment runs the fake provider, /webhooks/buildkite still 45 + // registers but responds 503, so a misdirected Buildkite webhook 46 + // gets a clear "this spindle isn't accepting Buildkite events" rather 47 + // than a misleading 200. 48 + func runHTTP(ctx context.Context, cfg config, br *broker, provider Provider, bkProvider *buildkiteProvider) error { 42 49 logger := loggerFrom(ctx) 43 50 44 51 mux := http.NewServeMux() ··· 46 53 mux.HandleFunc("GET /events", eventsHandler(logger, br)) 47 54 mux.HandleFunc("GET /logs/{knot}/{pipelineRkey}/{workflow}", logsHandler(logger, provider)) 48 55 mux.HandleFunc("GET /xrpc/"+tangled.OwnerNSID, ownerHandler(logger, cfg.OwnerDID)) 49 - mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler()) 56 + mux.HandleFunc("POST /webhooks/buildkite", buildkiteWebhookHandler(logger, bkProvider)) 50 57 51 58 srv := &http.Server{ 52 59 Addr: cfg.Addr, ··· 96 103 } 97 104 } 98 105 99 - // buildkiteWebhookHandler is a placeholder until we implement Buildkite -> 100 - // pipeline.status translation. 101 - func buildkiteWebhookHandler() http.HandlerFunc { 106 + // buildkiteWebhookHandler receives Buildkite Pipelines webhook events, 107 + // authenticates the request against whichever scheme the provider was 108 + // configured with, and hands the decoded payload to the provider for 109 + // translation into a sh.tangled.pipeline.status publish. 110 + // 111 + // Authentication is intentionally fail-closed: when bk is nil (no 112 + // Buildkite provider configured) we 503 instead of accepting events 113 + // silently. The body is buffered up front because signature mode 114 + // HMACs the raw bytes — we can't rely on the JSON decoder reading 115 + // the request body before verification. 116 + // 117 + // Acknowledgement contract with Buildkite: we 200 on any well-formed 118 + // event we accepted (including events we deliberately ignore, like 119 + // job.* or builds we don't track), and 5xx only on internal failure 120 + // the operator should look at. A 4xx/5xx makes Buildkite retry, 121 + // which we don't want for "this isn't an event we care about". 122 + func buildkiteWebhookHandler(logger *slog.Logger, bk *buildkiteProvider) http.HandlerFunc { 102 123 return func(w http.ResponseWriter, r *http.Request) { 103 - http.Error(w, "not implemented", http.StatusNotImplemented) 124 + if bk == nil { 125 + http.Error(w, "buildkite provider not configured", 126 + http.StatusServiceUnavailable) 127 + return 128 + } 129 + 130 + // Cap body size so a malicious sender can't exhaust 131 + // memory; Buildkite payloads in practice are well under 132 + // 64 KiB but a generous-but-bounded ceiling is the 133 + // right shape here. 134 + body, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) 135 + if err != nil { 136 + logger.Warn("buildkite webhook: read body", "err", err) 137 + http.Error(w, "read body", http.StatusBadRequest) 138 + return 139 + } 140 + 141 + if err := bk.VerifyWebhook(r.Header, body); err != nil { 142 + logger.Warn("buildkite webhook: verify failed", 143 + "err", err, 144 + "remote", r.RemoteAddr, 145 + ) 146 + http.Error(w, "unauthorized", http.StatusUnauthorized) 147 + return 148 + } 149 + 150 + var payload buildkite.WebhookPayload 151 + if err := json.Unmarshal(body, &payload); err != nil { 152 + logger.Warn("buildkite webhook: decode body", "err", err) 153 + http.Error(w, "bad payload", http.StatusBadRequest) 154 + return 155 + } 156 + // The X-Buildkite-Event header is authoritative for the 157 + // event name; the body field is convenience but doesn't 158 + // always match exactly. Prefer the header. 159 + if h := r.Header.Get("X-Buildkite-Event"); h != "" { 160 + payload.Event = h 161 + } 162 + 163 + // Translate + publish on the request context so a slow 164 + // store/broker doesn't outlive an aborted webhook 165 + // connection. 166 + if err := bk.HandleWebhook(r.Context(), payload); err != nil { 167 + logger.Error("buildkite webhook: handle", "err", err, 168 + "event", payload.Event, 169 + "build_uuid", payload.Build.ID, 170 + ) 171 + http.Error(w, "internal error", http.StatusInternalServerError) 172 + return 173 + } 174 + w.WriteHeader(http.StatusOK) 104 175 } 105 176 } 106 177
+346
internal/buildkite/buildkite.go
··· 1 + // Package buildkite is a small Buildkite REST + webhook client tack 2 + // uses to drive its Buildkite-backed Provider implementation. 3 + // 4 + // The package deliberately covers a tiny slice of the upstream API 5 + // (create build, get build, fetch job log, decode + authenticate 6 + // webhook payloads). It exists as its own package so the rest of 7 + // tack — particularly the Provider implementation that translates 8 + // Tangled triggers into Buildkite builds — can stay focused on 9 + // translation rather than HTTP plumbing. 10 + // 11 + // Naming convention: types here are *not* prefixed with "Buildkite". 12 + // Imported as `buildkite.Client`, `buildkite.Build`, etc., the package 13 + // path supplies the disambiguation already. 14 + package buildkite 15 + 16 + import ( 17 + "bytes" 18 + "context" 19 + "crypto/hmac" 20 + "crypto/sha256" 21 + "encoding/hex" 22 + "encoding/json" 23 + "errors" 24 + "fmt" 25 + "io" 26 + "net/http" 27 + "strconv" 28 + "strings" 29 + "time" 30 + ) 31 + 32 + // APIBase is the public Buildkite REST API root. Exported as a var 33 + // (not a const) so tests can swap it for an httptest server URL 34 + // without hooking up a real Buildkite account. 35 + var APIBase = "https://api.buildkite.com" 36 + 37 + // ErrNotFound is returned by Get* methods when the upstream returns 38 + // 404. Callers translate it to whatever shape they need (the Provider 39 + // maps it onto its own ErrLogsNotFound for the /logs handler). 40 + var ErrNotFound = errors.New("buildkite: not found") 41 + 42 + // Client is a thin wrapper around net/http carrying API credentials 43 + // + organization context so call sites don't repeat them. Safe for 44 + // concurrent use; the embedded http.Client is goroutine-safe. 45 + type Client struct { 46 + http *http.Client 47 + token string 48 + org string 49 + } 50 + 51 + // NewClient builds a Client with sensible defaults. The 30s timeout 52 + // covers individual requests, not the whole client lifetime — 53 + // long-poll-style endpoints aren't used here, so a generous-but-bounded 54 + // per-request timeout is the right default. 55 + func NewClient(token, org string) *Client { 56 + return &Client{ 57 + http: &http.Client{Timeout: 30 * time.Second}, 58 + token: token, 59 + org: org, 60 + } 61 + } 62 + 63 + // Job is the subset of a Buildkite job object we care about: the ID 64 + // and name we need to fetch logs for it, plus its state for 65 + // surfacing in webhook handling. 66 + // 67 + // Buildkite jobs come in several "type" values (script, waiter, 68 + // manual) — only "script" jobs have logs, but we decode the slice 69 + // as-is and let the caller decide whether to skip non-script entries. 70 + type Job struct { 71 + ID string `json:"id"` 72 + Type string `json:"type"` 73 + Name string `json:"name"` 74 + State string `json:"state"` 75 + } 76 + 77 + // Build is the subset of a Buildkite build object we care about. 78 + // Fields not present here are dropped silently by the JSON decoder — 79 + // keep this list lean so additions to the upstream schema don't 80 + // force us to touch this file. 81 + type Build struct { 82 + ID string `json:"id"` 83 + Number int64 `json:"number"` 84 + State string `json:"state"` 85 + WebURL string `json:"web_url"` 86 + Commit string `json:"commit"` 87 + Branch string `json:"branch"` 88 + Message string `json:"message"` 89 + MetaData map[string]string `json:"meta_data"` 90 + Jobs []Job `json:"jobs"` 91 + Pipeline map[string]interface{} `json:"pipeline"` 92 + } 93 + 94 + // CreateBuildRequest is the request body for POST /builds. Only the 95 + // fields callers actively use are exposed — the upstream API accepts 96 + // many more, but we'd just be passing through dead options. 97 + // 98 + // IgnorePipelineBranchFilters defaults to false on the wire (omitempty 99 + // elides the zero value); callers that want pipeline-level branch 100 + // filters bypassed should set it to true. 101 + type CreateBuildRequest struct { 102 + Commit string `json:"commit"` 103 + Branch string `json:"branch"` 104 + Message string `json:"message,omitempty"` 105 + Env map[string]string `json:"env,omitempty"` 106 + MetaData map[string]string `json:"meta_data,omitempty"` 107 + IgnorePipelineBranchFilters bool `json:"ignore_pipeline_branch_filters,omitempty"` 108 + } 109 + 110 + // CreateBuild fires a build on the named pipeline. Returns the 111 + // decoded response so the caller can persist build_uuid + number for 112 + // later webhook lookup. 113 + // 114 + // Buildkite returns 201 on success; anything else is wrapped into an 115 + // error that includes the response body so a misconfigured pipeline 116 + // (e.g. wrong slug, missing branch) surfaces useful diagnostics into 117 + // the caller's log. 118 + func (c *Client) CreateBuild( 119 + ctx context.Context, 120 + pipelineSlug string, 121 + req CreateBuildRequest, 122 + ) (*Build, error) { 123 + body, err := json.Marshal(req) 124 + if err != nil { 125 + return nil, fmt.Errorf("marshal build request: %w", err) 126 + } 127 + 128 + url := fmt.Sprintf("%s/v2/organizations/%s/pipelines/%s/builds", 129 + APIBase, c.org, pipelineSlug, 130 + ) 131 + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(body)) 132 + if err != nil { 133 + return nil, fmt.Errorf("build request: %w", err) 134 + } 135 + httpReq.Header.Set("Authorization", "Bearer "+c.token) 136 + httpReq.Header.Set("Content-Type", "application/json") 137 + 138 + resp, err := c.http.Do(httpReq) 139 + if err != nil { 140 + return nil, fmt.Errorf("create build: %w", err) 141 + } 142 + defer resp.Body.Close() 143 + 144 + if resp.StatusCode != http.StatusCreated { 145 + raw, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) 146 + return nil, fmt.Errorf("create build: status %d: %s", 147 + resp.StatusCode, strings.TrimSpace(string(raw)), 148 + ) 149 + } 150 + 151 + var out Build 152 + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { 153 + return nil, fmt.Errorf("decode build response: %w", err) 154 + } 155 + return &out, nil 156 + } 157 + 158 + // GetBuild fetches the full build record by number, including the 159 + // jobs slice. Used by callers that need the current set of jobs for 160 + // a known (pipelineSlug, buildNumber) pair. 161 + // 162 + // Returns ErrNotFound when Buildkite responds 404. 163 + func (c *Client) GetBuild( 164 + ctx context.Context, 165 + pipelineSlug string, 166 + buildNumber int64, 167 + ) (*Build, error) { 168 + url := fmt.Sprintf("%s/v2/organizations/%s/pipelines/%s/builds/%d", 169 + APIBase, c.org, pipelineSlug, buildNumber, 170 + ) 171 + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) 172 + if err != nil { 173 + return nil, fmt.Errorf("build request: %w", err) 174 + } 175 + httpReq.Header.Set("Authorization", "Bearer "+c.token) 176 + 177 + resp, err := c.http.Do(httpReq) 178 + if err != nil { 179 + return nil, fmt.Errorf("get build: %w", err) 180 + } 181 + defer resp.Body.Close() 182 + 183 + if resp.StatusCode == http.StatusNotFound { 184 + return nil, ErrNotFound 185 + } 186 + if resp.StatusCode != http.StatusOK { 187 + raw, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) 188 + return nil, fmt.Errorf("get build: status %d: %s", 189 + resp.StatusCode, strings.TrimSpace(string(raw)), 190 + ) 191 + } 192 + 193 + var out Build 194 + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { 195 + return nil, fmt.Errorf("decode build: %w", err) 196 + } 197 + return &out, nil 198 + } 199 + 200 + // GetJobLog fetches the plain-text log for a single job. Buildkite 201 + // supports several formats; we ask for text/plain explicitly so the 202 + // response is one big string the caller can split on newlines. 203 + // 204 + // Returns ErrNotFound when Buildkite responds 404 (typical for a 205 + // job that hasn't started yet — it has no log to serve). 206 + func (c *Client) GetJobLog( 207 + ctx context.Context, 208 + pipelineSlug string, 209 + buildNumber int64, 210 + jobID string, 211 + ) (string, error) { 212 + url := fmt.Sprintf("%s/v2/organizations/%s/pipelines/%s/builds/%d/jobs/%s/log", 213 + APIBase, c.org, pipelineSlug, buildNumber, jobID, 214 + ) 215 + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) 216 + if err != nil { 217 + return "", fmt.Errorf("build request: %w", err) 218 + } 219 + httpReq.Header.Set("Authorization", "Bearer "+c.token) 220 + httpReq.Header.Set("Accept", "text/plain") 221 + 222 + resp, err := c.http.Do(httpReq) 223 + if err != nil { 224 + return "", fmt.Errorf("get job log: %w", err) 225 + } 226 + defer resp.Body.Close() 227 + 228 + if resp.StatusCode == http.StatusNotFound { 229 + return "", ErrNotFound 230 + } 231 + if resp.StatusCode != http.StatusOK { 232 + raw, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) 233 + return "", fmt.Errorf("get job log: status %d: %s", 234 + resp.StatusCode, strings.TrimSpace(string(raw)), 235 + ) 236 + } 237 + 238 + body, err := io.ReadAll(resp.Body) 239 + if err != nil { 240 + return "", fmt.Errorf("read job log: %w", err) 241 + } 242 + return string(body), nil 243 + } 244 + 245 + // WebhookPayload is the small slice of the webhook body callers 246 + // actually decode. Build events all wrap a "build" object; job 247 + // events wrap a "job" — keeping both around lets callers add 248 + // job-level mapping later without changing the decoder. 249 + type WebhookPayload struct { 250 + Event string `json:"event"` 251 + Build Build `json:"build"` 252 + Job Job `json:"job"` 253 + } 254 + 255 + // WebhookMode selects how an inbound webhook request is 256 + // authenticated. The two values correspond directly to the two 257 + // settings Buildkite's notification service exposes for this: 258 + // WebhookModeToken sends the secret in the X-Buildkite-Token header 259 + // in plain text; WebhookModeSignature sends an HMAC-SHA256 of the 260 + // body in X-Buildkite-Signature. 261 + type WebhookMode string 262 + 263 + const ( 264 + WebhookModeToken WebhookMode = "token" 265 + WebhookModeSignature WebhookMode = "signature" 266 + ) 267 + 268 + // VerifySignature validates the X-Buildkite-Signature header against 269 + // secret using the documented "<timestamp>.<body>" HMAC-SHA256 270 + // scheme. Returns nil when the header is well-formed and the digest 271 + // matches; any other condition returns an error. 272 + // 273 + // We deliberately do NOT enforce a freshness window on timestamp: 274 + // callers in practice consume idempotent or storage-deduplicated 275 + // events, so a replayed event is at worst a duplicate publish. 276 + // Callers that need stricter freshness should layer it on top. 277 + // 278 + // The header format is "timestamp=<unix>,signature=<hex>". 279 + func VerifySignature(header, secret string, body []byte) error { 280 + if header == "" { 281 + return errors.New("missing X-Buildkite-Signature header") 282 + } 283 + if secret == "" { 284 + // A misconfigured server is a programmer bug, but we'd 285 + // rather fail closed than silently accept any signature. 286 + return errors.New("server has no webhook secret configured") 287 + } 288 + 289 + var ts, sig string 290 + for _, part := range strings.Split(header, ",") { 291 + k, v, ok := strings.Cut(strings.TrimSpace(part), "=") 292 + if !ok { 293 + continue 294 + } 295 + switch k { 296 + case "timestamp": 297 + ts = v 298 + case "signature": 299 + sig = v 300 + } 301 + } 302 + if ts == "" || sig == "" { 303 + return errors.New("malformed signature header") 304 + } 305 + // Sanity-check the timestamp is a parseable int. The value 306 + // itself isn't validated against the clock (see comment above), 307 + // but a non-numeric timestamp is structurally invalid. 308 + if _, err := strconv.ParseInt(ts, 10, 64); err != nil { 309 + return fmt.Errorf("invalid timestamp: %w", err) 310 + } 311 + 312 + mac := hmac.New(sha256.New, []byte(secret)) 313 + mac.Write([]byte(ts)) 314 + mac.Write([]byte(".")) 315 + mac.Write(body) 316 + expected := hex.EncodeToString(mac.Sum(nil)) 317 + 318 + // Compare in constant time to keep the verifier from leaking 319 + // the expected digest through timing. 320 + if !hmac.Equal([]byte(expected), []byte(sig)) { 321 + return errors.New("signature mismatch") 322 + } 323 + return nil 324 + } 325 + 326 + // VerifyToken handles the simpler X-Buildkite-Token mode: the 327 + // configured secret is sent verbatim in the header. Constant-time 328 + // comparison keeps a brute-forcing attacker from learning the token 329 + // one byte at a time. 330 + // 331 + // Returns nil on match. Two error cases: 332 + // - missing header on the request (caller should 401), 333 + // - server has no expected token configured (caller should 500; 334 + // fail-closed, never accept-anything). 335 + func VerifyToken(header, expected string) error { 336 + if header == "" { 337 + return errors.New("missing X-Buildkite-Token header") 338 + } 339 + if expected == "" { 340 + return errors.New("server has no webhook token configured") 341 + } 342 + if !hmac.Equal([]byte(header), []byte(expected)) { 343 + return errors.New("token mismatch") 344 + } 345 + return nil 346 + }
+207
internal/buildkite/buildkite_test.go
··· 1 + package buildkite 2 + 3 + // Tests for the Buildkite REST client + webhook signature/token 4 + // verifiers. Provider-level (Tangled translation) tests live with 5 + // the provider in the main package. 6 + 7 + import ( 8 + "context" 9 + "crypto/hmac" 10 + "crypto/sha256" 11 + "encoding/hex" 12 + "encoding/json" 13 + "fmt" 14 + "io" 15 + "net/http" 16 + "net/http/httptest" 17 + "strings" 18 + "testing" 19 + ) 20 + 21 + // TestVerifySignature covers the HMAC mode end to end. The reference 22 + // digest is computed with the documented "<timestamp>.<body>" 23 + // preimage so the test pins the wire format, not just the helper. 24 + func TestVerifySignature(t *testing.T) { 25 + const secret = "shhh" 26 + body := []byte(`{"event":"build.finished"}`) 27 + const ts = "1700000000" 28 + 29 + mac := hmac.New(sha256.New, []byte(secret)) 30 + mac.Write([]byte(ts)) 31 + mac.Write([]byte(".")) 32 + mac.Write(body) 33 + good := hex.EncodeToString(mac.Sum(nil)) 34 + 35 + cases := []struct { 36 + name string 37 + header string 38 + secret string 39 + body []byte 40 + wantErr bool 41 + }{ 42 + {"valid", "timestamp=" + ts + ",signature=" + good, secret, body, false}, 43 + {"valid with whitespace", " timestamp=" + ts + " , signature=" + good + " ", secret, body, false}, 44 + {"empty header", "", secret, body, true}, 45 + {"empty server secret", "timestamp=" + ts + ",signature=" + good, "", body, true}, 46 + {"missing timestamp", "signature=" + good, secret, body, true}, 47 + {"missing signature", "timestamp=" + ts, secret, body, true}, 48 + {"non-numeric timestamp", "timestamp=abc,signature=" + good, secret, body, true}, 49 + {"wrong signature", "timestamp=" + ts + ",signature=00", secret, body, true}, 50 + {"wrong body", "timestamp=" + ts + ",signature=" + good, secret, []byte("nope"), true}, 51 + } 52 + for _, c := range cases { 53 + t.Run(c.name, func(t *testing.T) { 54 + err := VerifySignature(c.header, c.secret, c.body) 55 + if (err != nil) != c.wantErr { 56 + t.Fatalf("err=%v wantErr=%v", err, c.wantErr) 57 + } 58 + }) 59 + } 60 + } 61 + 62 + // TestVerifyToken pins the token-mode behaviour: it must be a 63 + // constant-time exact-match check, never a prefix or substring. 64 + func TestVerifyToken(t *testing.T) { 65 + cases := []struct { 66 + name string 67 + header string 68 + expect string 69 + wantErr bool 70 + }{ 71 + {"match", "abc123", "abc123", false}, 72 + {"empty header", "", "abc123", true}, 73 + {"empty expected", "abc123", "", true}, 74 + {"mismatch", "abc124", "abc123", true}, 75 + {"prefix is not a match", "abc", "abc123", true}, 76 + } 77 + for _, c := range cases { 78 + t.Run(c.name, func(t *testing.T) { 79 + err := VerifyToken(c.header, c.expect) 80 + if (err != nil) != c.wantErr { 81 + t.Fatalf("err=%v wantErr=%v", err, c.wantErr) 82 + } 83 + }) 84 + } 85 + } 86 + 87 + // TestClientCreateBuild covers the request shape we send (auth 88 + // header, JSON body, URL) and the response decoding for the happy 89 + // path. 90 + func TestClientCreateBuild(t *testing.T) { 91 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 92 + if r.URL.Path != "/v2/organizations/myorg/pipelines/mypipe/builds" { 93 + t.Errorf("bad path %s", r.URL.Path) 94 + } 95 + if got := r.Header.Get("Authorization"); got != "Bearer tok" { 96 + t.Errorf("bad auth %q", got) 97 + } 98 + if got := r.Header.Get("Content-Type"); got != "application/json" { 99 + t.Errorf("bad content-type %q", got) 100 + } 101 + var got CreateBuildRequest 102 + if err := json.NewDecoder(r.Body).Decode(&got); err != nil { 103 + t.Fatalf("decode: %v", err) 104 + } 105 + if got.Commit != "abc" || got.Branch != "main" { 106 + t.Errorf("bad body: %+v", got) 107 + } 108 + if got.MetaData["k"] != "v" { 109 + t.Errorf("missing meta_data: %+v", got.MetaData) 110 + } 111 + w.WriteHeader(http.StatusCreated) 112 + _ = json.NewEncoder(w).Encode(Build{ 113 + ID: "uuid-1", 114 + Number: 42, 115 + State: "scheduled", 116 + }) 117 + })) 118 + defer srv.Close() 119 + 120 + prev := APIBase 121 + APIBase = srv.URL 122 + defer func() { APIBase = prev }() 123 + 124 + c := NewClient("tok", "myorg") 125 + build, err := c.CreateBuild(context.Background(), "mypipe", CreateBuildRequest{ 126 + Commit: "abc", 127 + Branch: "main", 128 + MetaData: map[string]string{"k": "v"}, 129 + }) 130 + if err != nil { 131 + t.Fatalf("CreateBuild: %v", err) 132 + } 133 + if build.ID != "uuid-1" || build.Number != 42 || build.State != "scheduled" { 134 + t.Fatalf("unexpected build: %+v", build) 135 + } 136 + } 137 + 138 + // TestClientCreateBuildError makes sure non-2xx responses surface 139 + // the upstream error body — that text ends up in operator logs, so 140 + // silently dropping it would make misconfigurations very painful to 141 + // diagnose. 142 + func TestClientCreateBuildError(t *testing.T) { 143 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 144 + w.WriteHeader(http.StatusUnprocessableEntity) 145 + fmt.Fprint(w, `{"message":"branch is required"}`) 146 + })) 147 + defer srv.Close() 148 + 149 + prev := APIBase 150 + APIBase = srv.URL 151 + defer func() { APIBase = prev }() 152 + 153 + c := NewClient("tok", "myorg") 154 + _, err := c.CreateBuild(context.Background(), "mypipe", CreateBuildRequest{}) 155 + if err == nil { 156 + t.Fatal("expected error") 157 + } 158 + if !strings.Contains(err.Error(), "branch is required") { 159 + t.Fatalf("error missing upstream body: %v", err) 160 + } 161 + } 162 + 163 + // TestClientGetJobLog confirms we send the right Accept header (so 164 + // Buildkite returns plain text, not JSON) and surface 404 as 165 + // ErrNotFound for callers to translate. 166 + func TestClientGetJobLog(t *testing.T) { 167 + t.Run("ok", func(t *testing.T) { 168 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 169 + if got := r.Header.Get("Accept"); got != "text/plain" { 170 + t.Errorf("bad accept %q", got) 171 + } 172 + if !strings.HasSuffix(r.URL.Path, "/builds/7/jobs/job-1/log") { 173 + t.Errorf("bad path %s", r.URL.Path) 174 + } 175 + io.WriteString(w, "line1\nline2\n") 176 + })) 177 + defer srv.Close() 178 + 179 + prev := APIBase 180 + APIBase = srv.URL 181 + defer func() { APIBase = prev }() 182 + 183 + body, err := NewClient("tok", "myorg"). 184 + GetJobLog(context.Background(), "mypipe", 7, "job-1") 185 + if err != nil { 186 + t.Fatalf("GetJobLog: %v", err) 187 + } 188 + if body != "line1\nline2\n" { 189 + t.Fatalf("body = %q", body) 190 + } 191 + }) 192 + t.Run("404", func(t *testing.T) { 193 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 194 + http.Error(w, "no", http.StatusNotFound) 195 + })) 196 + defer srv.Close() 197 + prev := APIBase 198 + APIBase = srv.URL 199 + defer func() { APIBase = prev }() 200 + 201 + _, err := NewClient("tok", "myorg"). 202 + GetJobLog(context.Background(), "mypipe", 7, "job-1") 203 + if err != ErrNotFound { 204 + t.Fatalf("err = %v; want ErrNotFound", err) 205 + } 206 + }) 207 + }
+1 -1
knot.go
··· 205 205 // Spawn is non-blocking — it fans out into provider-owned 206 206 // goroutines so this worker can move on to the next event. 207 207 // The provider keeps ctx around for shutdown coordination. 208 - k.provider.Spawn(ctx, src.Key(), msg.Rkey, p.Workflows) 208 + k.provider.Spawn(ctx, src.Key(), msg.Rkey, p.TriggerMetadata, p.Workflows) 209 209 210 210 default: 211 211 // Knots may publish other record types over the same stream; we
+84 -13
main.go
··· 8 8 "context" 9 9 "errors" 10 10 "flag" 11 + "fmt" 11 12 "log/slog" 12 13 "os" 13 14 "os/signal" 14 15 "syscall" 15 16 16 17 charmlog "github.com/charmbracelet/log" 18 + 19 + "github.com/mitchellh/tack/internal/buildkite" 17 20 ) 18 21 19 22 // config is the runtime configuration, sourced from environment variables and ··· 28 31 // Dev flips the knot event-stream scheme from wss:// to ws://. 29 32 // Useful when running against a local knot during development. 30 33 Dev bool 34 + 35 + // Buildkite-mode configuration. BuildkiteToken is the switch: 36 + // when empty we fall back to the in-process fake provider 37 + // (useful for local development against a real Tangled 38 + // jetstream); when set, the other Buildkite fields are 39 + // required and tack will refuse to start without them. 40 + BuildkiteToken string 41 + BuildkiteOrg string 42 + BuildkitePipeline string 43 + BuildkiteWebhookSecret string 44 + BuildkiteWebhookMode buildkite.WebhookMode 31 45 } 32 46 33 47 func loadConfig() (config, error) { 34 48 cfg := config{ 35 - Addr: envOr("TACK_LISTEN_ADDR", ":8080"), 36 - Hostname: os.Getenv("TACK_HOSTNAME"), 37 - OwnerDID: os.Getenv("TACK_OWNER_DID"), 38 - JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"), 39 - DBPath: envOr("TACK_DB_PATH", "tack.db"), 40 - Dev: os.Getenv("TACK_DEV") != "", 49 + Addr: envOr("TACK_LISTEN_ADDR", ":8080"), 50 + Hostname: os.Getenv("TACK_HOSTNAME"), 51 + OwnerDID: os.Getenv("TACK_OWNER_DID"), 52 + JetstreamURL: envOr("TACK_JETSTREAM_URL", "wss://jetstream1.us-west.bsky.network/subscribe"), 53 + DBPath: envOr("TACK_DB_PATH", "tack.db"), 54 + Dev: os.Getenv("TACK_DEV") != "", 55 + BuildkiteToken: os.Getenv("TACK_BUILDKITE_TOKEN"), 56 + BuildkiteOrg: os.Getenv("TACK_BUILDKITE_ORG"), 57 + BuildkitePipeline: os.Getenv("TACK_BUILDKITE_PIPELINE"), 58 + BuildkiteWebhookSecret: os.Getenv("TACK_BUILDKITE_WEBHOOK_SECRET"), 59 + BuildkiteWebhookMode: buildkite.WebhookMode( 60 + envOr("TACK_BUILDKITE_WEBHOOK_MODE", string(buildkite.WebhookModeToken)), 61 + ), 41 62 } 42 63 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)") 43 64 flag.Parse() ··· 56 77 return cfg, errors.New("TACK_HOSTNAME is required") 57 78 } 58 79 80 + // If the operator opted into Buildkite mode (by supplying a 81 + // token), every other Buildkite knob has to be present. Half- 82 + // configured Buildkite leads to confusing failures deep in the 83 + // provider; catch it at startup. 84 + if cfg.BuildkiteToken != "" { 85 + if cfg.BuildkiteOrg == "" { 86 + return cfg, errors.New("TACK_BUILDKITE_ORG is required when TACK_BUILDKITE_TOKEN is set") 87 + } 88 + if cfg.BuildkitePipeline == "" { 89 + return cfg, errors.New("TACK_BUILDKITE_PIPELINE is required when TACK_BUILDKITE_TOKEN is set") 90 + } 91 + if cfg.BuildkiteWebhookSecret == "" { 92 + return cfg, errors.New("TACK_BUILDKITE_WEBHOOK_SECRET is required when TACK_BUILDKITE_TOKEN is set") 93 + } 94 + switch cfg.BuildkiteWebhookMode { 95 + case buildkite.WebhookModeToken, buildkite.WebhookModeSignature: 96 + default: 97 + return cfg, fmt.Errorf("TACK_BUILDKITE_WEBHOOK_MODE must be %q or %q; got %q", 98 + buildkite.WebhookModeToken, buildkite.WebhookModeSignature, 99 + cfg.BuildkiteWebhookMode, 100 + ) 101 + } 102 + } 103 + 59 104 return cfg, nil 60 105 } 61 106 ··· 115 160 br := newBroker(st) 116 161 117 162 // Provider that turns Tangled pipeline triggers into 118 - // pipeline.status events. The fake provider stands in for a real 119 - // CI integration: it emits synthetic running/success heartbeats 120 - // over the broker so the entire jetstream → knot → /events flow 121 - // is exercisable end-to-end. Swap this for a Buildkite-backed 122 - // implementation once that lands. 123 - provider := newFakeProvider(br, logger) 163 + // pipeline.status events. The Buildkite provider is the real 164 + // integration; the fake one stands in when no Buildkite token is 165 + // configured so the full jetstream → knot → /events flow is 166 + // still exercisable locally without a Buildkite account. 167 + // 168 + // bkProvider is kept as a typed pointer separately because the 169 + // /webhooks/buildkite handler needs the concrete *buildkiteProvider 170 + // (for HandleWebhook + signature verification), not the abstract 171 + // Provider surface. 172 + var ( 173 + provider Provider 174 + bkProvider *buildkiteProvider 175 + ) 176 + if cfg.BuildkiteToken != "" { 177 + bkProvider = newBuildkiteProvider( 178 + br, st, 179 + buildkite.NewClient(cfg.BuildkiteToken, cfg.BuildkiteOrg), 180 + cfg.BuildkitePipeline, 181 + cfg.BuildkiteWebhookSecret, 182 + cfg.BuildkiteWebhookMode, 183 + logger, 184 + ) 185 + provider = bkProvider 186 + logger.Info("buildkite provider enabled", 187 + "org", cfg.BuildkiteOrg, 188 + "pipeline", cfg.BuildkitePipeline, 189 + "webhook_mode", cfg.BuildkiteWebhookMode, 190 + ) 191 + } else { 192 + provider = newFakeProvider(br, logger) 193 + logger.Info("fake provider enabled (set TACK_BUILDKITE_TOKEN to use buildkite)") 194 + } 124 195 125 196 // Start the knot event-stream consumer first so the jetstream 126 197 // loop has somewhere to register newly-observed knots into. It ··· 143 214 144 215 // Run the HTTP server. This blocks until ctx is cancelled or the 145 216 // listener errors. 146 - if err := runHTTP(ctx, cfg, br, provider); err != nil { 217 + if err := runHTTP(ctx, cfg, br, provider, bkProvider); err != nil { 147 218 logger.Error("http server error", "err", err) 148 219 os.Exit(1) 149 220 }
+5 -1
provider.go
··· 73 73 // knot is the knot hostname the trigger arrived on; it's the 74 74 // authority half of the pipeline ATURI that pipeline.status 75 75 // records reference. pipelineRkey is the trigger record's rkey 76 - // on that knot. workflows is the unmodified slice from the 76 + // on that knot. trigger is the decoded record's TriggerMetadata 77 + // (may be nil — the lexicon doesn't enforce its presence) and 78 + // carries the commit/branch/PR data a real CI provider needs to 79 + // kick off a build. workflows is the unmodified slice from the 77 80 // decoded sh.tangled.pipeline record; implementations should 78 81 // tolerate nil entries and zero-length names defensively, since 79 82 // the lexicon doesn't enforce either. ··· 81 84 ctx context.Context, 82 85 knot string, 83 86 pipelineRkey string, 87 + trigger *tangled.Pipeline_TriggerMetadata, 84 88 workflows []*tangled.Pipeline_Workflow, 85 89 ) 86 90
+552
provider_buildkite.go
··· 1 + package main 2 + 3 + // buildkiteProvider implements Provider against a real Buildkite 4 + // account. Spawn translates a Tangled pipeline trigger into one 5 + // Buildkite build per workflow; status updates flow back asynchronously 6 + // through the /webhooks/buildkite handler (see http.go), which looks 7 + // the build UUID up in the buildkite_builds table to recover the 8 + // (knot, pipelineRkey, workflow) tuple this provider persisted at 9 + // Spawn time and publishes a sh.tangled.pipeline.status record on 10 + // the in-process broker. 11 + // 12 + // Only one Buildkite pipeline is used per spindle (TACK_BUILDKITE_PIPELINE). 13 + // Every Tangled workflow runs as a build on that single pipeline, with 14 + // the workflow identity plumbed through env + meta_data. The operator 15 + // configures their Buildkite pipeline to read those env vars and 16 + // dispatch accordingly (e.g. via `pipeline upload`). Mapping every 17 + // Tangled workflow to its own Buildkite pipeline would force operators 18 + // to provision Buildkite resources for each workflow file in every 19 + // repo that points at the spindle — friction we don't want to impose. 20 + 21 + import ( 22 + "context" 23 + "encoding/json" 24 + "errors" 25 + "fmt" 26 + "log/slog" 27 + "net/http" 28 + "strings" 29 + "time" 30 + 31 + "tangled.org/core/api/tangled" 32 + 33 + "github.com/mitchellh/tack/internal/buildkite" 34 + ) 35 + 36 + // Buildkite-side meta_data keys carrying the Tangled identity of a 37 + // build. Mirrored into env vars (see envFromTuple) so an operator's 38 + // Buildkite pipeline script can also reach them via $TACK_*. They 39 + // stay tightly namespaced so a coexisting Buildkite job that uses 40 + // meta_data for its own purposes won't collide. 41 + const ( 42 + bkMetaKnot = "tack:knot" 43 + bkMetaPipelineRkey = "tack:pipeline_rkey" 44 + bkMetaWorkflow = "tack:workflow" 45 + ) 46 + 47 + // buildkiteProvider implements Provider. 48 + // 49 + // webhookSecret + webhookMode live on the provider rather than on 50 + // the HTTP server because the provider is the single owner of 51 + // "everything Buildkite-y": colocating the auth knob with the API 52 + // client and the state translator keeps configuration drift to one 53 + // place and makes the http.go side pure transport. 54 + type buildkiteProvider struct { 55 + br *broker 56 + st *store 57 + log *slog.Logger 58 + client *buildkite.Client 59 + pipelineSlug string 60 + webhookSecret string 61 + webhookMode buildkite.WebhookMode 62 + } 63 + 64 + // Compile-time interface conformance check. 65 + var _ Provider = (*buildkiteProvider)(nil) 66 + 67 + // newBuildkiteProvider wires a provider to its Buildkite client and 68 + // to the broker it publishes pipeline.status records on. pipelineSlug 69 + // is the Buildkite pipeline that all builds get fired on (see file 70 + // header for why there's only one). webhookSecret/webhookMode govern 71 + // inbound /webhooks/buildkite request authentication. 72 + func newBuildkiteProvider( 73 + br *broker, 74 + st *store, 75 + client *buildkite.Client, 76 + pipelineSlug string, 77 + webhookSecret string, 78 + webhookMode buildkite.WebhookMode, 79 + log *slog.Logger, 80 + ) *buildkiteProvider { 81 + return &buildkiteProvider{ 82 + br: br, 83 + st: st, 84 + log: log.With("component", "provider", "kind", "buildkite"), 85 + client: client, 86 + pipelineSlug: pipelineSlug, 87 + webhookSecret: webhookSecret, 88 + webhookMode: webhookMode, 89 + } 90 + } 91 + 92 + // VerifyWebhook authenticates an inbound webhook request using 93 + // whichever mode the provider was configured with. Returns nil on 94 + // success; the HTTP handler maps any returned error to 401. 95 + func (p *buildkiteProvider) VerifyWebhook(headers http.Header, body []byte) error { 96 + switch p.webhookMode { 97 + case buildkite.WebhookModeSignature: 98 + return buildkite.VerifySignature( 99 + headers.Get("X-Buildkite-Signature"), 100 + p.webhookSecret, body, 101 + ) 102 + default: 103 + // Token mode is the Buildkite default and our default, so 104 + // any unrecognised value falls through to it rather than 105 + // fail-closed at startup. 106 + return buildkite.VerifyToken( 107 + headers.Get("X-Buildkite-Token"), 108 + p.webhookSecret, 109 + ) 110 + } 111 + } 112 + 113 + // Spawn satisfies Provider. For each workflow it fires a separate 114 + // Buildkite build off the configured pipeline so each workflow gets 115 + // its own status timeline. The actual API call runs on a goroutine — 116 + // CreateBuild is one HTTP round-trip, but we still want Spawn to be 117 + // non-blocking per the interface contract. 118 + // 119 + // On a successful create we persist the build UUID → (knot, rkey, 120 + // workflow) mapping and publish a "pending" pipeline.status so the 121 + // appview sees activity immediately, instead of waiting for the 122 + // first webhook to land. 123 + func (p *buildkiteProvider) Spawn( 124 + ctx context.Context, 125 + knot string, 126 + pipelineRkey string, 127 + trigger *tangled.Pipeline_TriggerMetadata, 128 + workflows []*tangled.Pipeline_Workflow, 129 + ) { 130 + if len(workflows) == 0 { 131 + p.log.Warn("pipeline has no workflows; nothing to spawn", 132 + "knot", knot, "rkey", pipelineRkey, 133 + ) 134 + return 135 + } 136 + 137 + // Derive build inputs once. Every workflow on this trigger 138 + // targets the same commit/branch — only the workflow name 139 + // varies between the per-workflow goroutines below. 140 + commit, branch := triggerCommitAndBranch(trigger) 141 + if commit == "" { 142 + // Buildkite's create-build API requires a commit; we'd 143 + // rather log loudly and skip than fire builds on "HEAD" 144 + // and silently get whatever main happens to look like. 145 + p.log.Error("trigger has no commit; refusing to spawn", 146 + "knot", knot, "rkey", pipelineRkey, 147 + ) 148 + return 149 + } 150 + 151 + for _, wf := range workflows { 152 + if wf == nil || wf.Name == "" { 153 + continue 154 + } 155 + wf := wf 156 + go p.spawnWorkflow(ctx, knot, pipelineRkey, commit, branch, wf) 157 + } 158 + } 159 + 160 + // spawnWorkflow does the per-workflow API + persistence work for 161 + // Spawn. Errors are logged with full context but not returned — 162 + // nothing in tack consumes the result, and a failed Spawn just 163 + // surfaces as the absence of any status update for the affected 164 + // workflow. 165 + func (p *buildkiteProvider) spawnWorkflow( 166 + ctx context.Context, 167 + knot string, 168 + pipelineRkey string, 169 + commit string, 170 + branch string, 171 + wf *tangled.Pipeline_Workflow, 172 + ) { 173 + logger := p.log.With( 174 + "knot", knot, 175 + "pipeline_rkey", pipelineRkey, 176 + "workflow", wf.Name, 177 + ) 178 + 179 + pipelineURI := pipelineATURI(knot, pipelineRkey) 180 + meta := map[string]string{ 181 + bkMetaKnot: knot, 182 + bkMetaPipelineRkey: pipelineRkey, 183 + bkMetaWorkflow: wf.Name, 184 + } 185 + env := envFromTuple(knot, pipelineRkey, wf) 186 + 187 + req := buildkite.CreateBuildRequest{ 188 + Commit: commit, 189 + Branch: branch, 190 + Message: fmt.Sprintf("tangled: %s", wf.Name), 191 + Env: env, 192 + MetaData: meta, 193 + IgnorePipelineBranchFilters: true, 194 + } 195 + 196 + build, err := p.client.CreateBuild(ctx, p.pipelineSlug, req) 197 + if err != nil { 198 + logger.Error("create buildkite build", "err", err) 199 + return 200 + } 201 + logger.Info("buildkite build created", 202 + "build_uuid", build.ID, 203 + "build_number", build.Number, 204 + "web_url", build.WebURL, 205 + ) 206 + 207 + if err := p.st.InsertBuildkiteBuild(ctx, BuildkiteBuildRef{ 208 + BuildUUID: build.ID, 209 + BuildNumber: build.Number, 210 + PipelineSlug: p.pipelineSlug, 211 + Knot: knot, 212 + PipelineRkey: pipelineRkey, 213 + Workflow: wf.Name, 214 + PipelineURI: pipelineURI, 215 + }); err != nil { 216 + // Webhook handlers will fail to translate this build's 217 + // events because they can't recover the tuple. Surface 218 + // loudly and bail; we don't want a half-tracked build 219 + // silently leaking status into the broker. 220 + logger.Error("persist buildkite build mapping", "err", err, 221 + "build_uuid", build.ID, 222 + ) 223 + return 224 + } 225 + 226 + // Initial status publish so the appview shows the build as 227 + // queued without waiting for the first webhook. This mirrors 228 + // the upstream spindle's "schedule then run" cadence. 229 + if err := p.publishStatus( 230 + ctx, pipelineURI, wf.Name, "pending", build.ID, 231 + nil, nil, 232 + ); err != nil { 233 + logger.Error("publish initial pending status", "err", err) 234 + } 235 + } 236 + 237 + // Logs satisfies Provider. We resolve the (knot, rkey, workflow) 238 + // tuple to a Buildkite build via the store, fetch the current jobs 239 + // list, then drain each job's plain-text log into the channel as one 240 + // LogLine per output line. 241 + // 242 + // Per-job control frames bracket each job so the appview's renderer 243 + // has start/end markers to lay out timing — same shape as the fake 244 + // provider and the upstream spindle. 245 + // 246 + // This is a snapshot read, not a tail — finished or in-progress, we 247 + // fetch what's there and close. Live tailing would require Buildkite 248 + // agent log streaming, which the public REST API doesn't expose; the 249 + // appview's repeated /logs calls during a running build give us 250 + // "good enough" liveness without that complexity. 251 + func (p *buildkiteProvider) Logs( 252 + ctx context.Context, 253 + knot string, 254 + pipelineRkey string, 255 + workflow string, 256 + ) (<-chan LogLine, error) { 257 + ref, err := p.st.LookupBuildkiteBuildByTuple(ctx, knot, pipelineRkey, workflow) 258 + if err != nil { 259 + return nil, fmt.Errorf("lookup build for logs: %w", err) 260 + } 261 + if ref == nil { 262 + return nil, ErrLogsNotFound 263 + } 264 + 265 + // Fresh fetch so we get the current job set, not whatever was 266 + // returned at create time (when most jobs are still nil). The 267 + // upstream's not-found is mapped to the Provider-shaped one 268 + // here because the /logs handler only knows about ErrLogsNotFound. 269 + build, err := p.client.GetBuild(ctx, ref.PipelineSlug, ref.BuildNumber) 270 + if err != nil { 271 + if errors.Is(err, buildkite.ErrNotFound) { 272 + return nil, ErrLogsNotFound 273 + } 274 + return nil, fmt.Errorf("get build for logs: %w", err) 275 + } 276 + 277 + out := make(chan LogLine, 64) 278 + go func() { 279 + defer close(out) 280 + stepID := 0 281 + for _, job := range build.Jobs { 282 + // Only "script" jobs have agent-produced logs. 283 + // Waiter / manual / trigger jobs have no body to 284 + // fetch; skip them so we don't hit Buildkite with 285 + // 404-bound requests. 286 + if job.Type != "" && job.Type != "script" { 287 + continue 288 + } 289 + 290 + name := job.Name 291 + if name == "" { 292 + name = job.ID 293 + } 294 + 295 + // Job-level start frame so the appview can bound 296 + // timing per job. 297 + if !sendLine(ctx, out, LogLine{ 298 + Kind: LogKindControl, 299 + Time: time.Now(), 300 + Content: name, 301 + StepId: stepID, 302 + StepStatus: StepStatusStart, 303 + }) { 304 + return 305 + } 306 + 307 + body, err := p.client.GetJobLog(ctx, ref.PipelineSlug, ref.BuildNumber, job.ID) 308 + if err != nil { 309 + p.log.Debug("fetch job log", 310 + "err", err, 311 + "build_uuid", ref.BuildUUID, 312 + "job_id", job.ID, 313 + ) 314 + // Don't fail the whole stream on one job; 315 + // emit the end frame and move on so the 316 + // appview at least sees what other jobs 317 + // produced. 318 + body = "" 319 + } 320 + 321 + for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") { 322 + if line == "" { 323 + // Skip the leading empty entry that 324 + // Split produces for empty bodies. 325 + continue 326 + } 327 + if !sendLine(ctx, out, LogLine{ 328 + Kind: LogKindData, 329 + Time: time.Now(), 330 + Content: line + "\n", 331 + StepId: stepID, 332 + Stream: "stdout", 333 + }) { 334 + return 335 + } 336 + } 337 + 338 + if !sendLine(ctx, out, LogLine{ 339 + Kind: LogKindControl, 340 + Time: time.Now(), 341 + Content: name, 342 + StepId: stepID, 343 + StepStatus: StepStatusEnd, 344 + }) { 345 + return 346 + } 347 + stepID++ 348 + } 349 + }() 350 + return out, nil 351 + } 352 + 353 + // publishStatus assembles a tangled.PipelineStatus record and pushes 354 + // it through the broker. buildUUID is mixed into the rkey so multiple 355 + // status events for the same workflow don't collide on the events 356 + // table's (rkey) uniqueness — and so an operator grepping the log 357 + // can find every record that pertains to a given Buildkite build. 358 + // 359 + // errMsg/exitCode are optional; pass nil for non-failure transitions. 360 + func (p *buildkiteProvider) publishStatus( 361 + ctx context.Context, 362 + pipelineURI, workflow, status, buildUUID string, 363 + errMsg *string, 364 + exitCode *int64, 365 + ) error { 366 + rec := tangled.PipelineStatus{ 367 + LexiconTypeID: tangled.PipelineStatusNSID, 368 + Pipeline: pipelineURI, 369 + Workflow: workflow, 370 + Status: status, 371 + CreatedAt: time.Now().UTC().Format(time.RFC3339), 372 + Error: errMsg, 373 + ExitCode: exitCode, 374 + } 375 + body, err := json.Marshal(rec) 376 + if err != nil { 377 + return fmt.Errorf("marshal pipeline.status: %w", err) 378 + } 379 + rkey := fmt.Sprintf("bk-%s-%s-%d", buildUUID, status, time.Now().UnixNano()) 380 + if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 381 + return fmt.Errorf("publish pipeline.status: %w", err) 382 + } 383 + return nil 384 + } 385 + 386 + // HandleWebhook applies a decoded Buildkite webhook payload: looks 387 + // the build up in the store, translates the Buildkite state into a 388 + // Tangled StatusKind, and publishes a pipeline.status record. Used 389 + // by the HTTP webhook handler so both the ingress logic and the 390 + // translation logic live next to each other. 391 + // 392 + // Returns nil for events we intentionally ignore (job.* events, 393 + // build.scheduled which we already publish locally on Spawn, builds 394 + // we don't have a mapping for) so the handler can 200 them — webhook 395 + // retries from Buildkite on a 4xx/5xx are noisy and not what we want 396 + // for "we just don't care about this event". 397 + func (p *buildkiteProvider) HandleWebhook( 398 + ctx context.Context, 399 + payload buildkite.WebhookPayload, 400 + ) error { 401 + // Only build.* events drive pipeline.status today. Everything 402 + // else (job.*, agent.*, ping) is acknowledged silently. 403 + if !strings.HasPrefix(payload.Event, "build.") { 404 + return nil 405 + } 406 + 407 + ref, err := p.st.LookupBuildkiteBuildByUUID(ctx, payload.Build.ID) 408 + if err != nil { 409 + return fmt.Errorf("lookup build by uuid: %w", err) 410 + } 411 + if ref == nil { 412 + // Most likely: this build was triggered outside tack and 413 + // just happens to share our webhook URL. Nothing to do. 414 + p.log.Debug("webhook for unknown build; ignoring", 415 + "event", payload.Event, 416 + "build_uuid", payload.Build.ID, 417 + ) 418 + return nil 419 + } 420 + 421 + status, ok := mapBuildkiteState(payload.Build.State) 422 + if !ok { 423 + // Unknown / transient state ("blocked", "skipped", 424 + // "not_run", "waiting"…) — log so we can extend the map 425 + // later, but don't error out the webhook. 426 + p.log.Debug("unmapped buildkite state; ignoring", 427 + "event", payload.Event, 428 + "state", payload.Build.State, 429 + "build_uuid", payload.Build.ID, 430 + ) 431 + return nil 432 + } 433 + 434 + if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 435 + status, ref.BuildUUID, nil, nil); err != nil { 436 + return fmt.Errorf("publish webhook status: %w", err) 437 + } 438 + p.log.Info("buildkite webhook → pipeline.status", 439 + "event", payload.Event, 440 + "state", payload.Build.State, 441 + "status", status, 442 + "build_uuid", payload.Build.ID, 443 + "workflow", ref.Workflow, 444 + ) 445 + return nil 446 + } 447 + 448 + // mapBuildkiteState translates Buildkite's build state strings into 449 + // the Tangled spindle StatusKind enum. The mapping aligns with the 450 + // upstream constants (StatusKindRunning/Failed/Cancelled/Success); 451 + // states that don't have a direct analogue (blocked, skipped, 452 + // not_run) are reported as not-mapped so the caller can decide 453 + // whether to ignore them. 454 + func mapBuildkiteState(state string) (string, bool) { 455 + switch state { 456 + case "scheduled": 457 + return "pending", true 458 + case "running", "failing": 459 + return "running", true 460 + case "passed": 461 + return "success", true 462 + case "failed": 463 + return "failed", true 464 + case "canceled", "canceling": 465 + return "cancelled", true 466 + default: 467 + return "", false 468 + } 469 + } 470 + 471 + // envFromTuple builds the env block forwarded into the Buildkite 472 + // build. These are the only handle a user's Buildkite pipeline has 473 + // on the originating Tangled trigger: their pipeline.yml typically 474 + // reads $TACK_WORKFLOW and dispatches based on it (e.g. running a 475 + // `pipeline upload` against a workflow-specific YAML file). 476 + // 477 + // TACK_WORKFLOW_RAW carries the entire YAML body of the workflow as 478 + // captured in the Tangled record. It can be empty if the workflow 479 + // definition omitted it; consumers should defend. 480 + func envFromTuple(knot, pipelineRkey string, wf *tangled.Pipeline_Workflow) map[string]string { 481 + return map[string]string{ 482 + "TACK_KNOT": knot, 483 + "TACK_PIPELINE_RKEY": pipelineRkey, 484 + "TACK_WORKFLOW": wf.Name, 485 + "TACK_WORKFLOW_RAW": wf.Raw, 486 + } 487 + } 488 + 489 + // pipelineATURI returns the at-uri the appview joins pipeline.status 490 + // records back to their originating pipeline on. Format mirrors the 491 + // upstream spindle; the appview strips the `did:web:` prefix and 492 + // treats the remainder as the knot identifier. 493 + func pipelineATURI(knot, pipelineRkey string) string { 494 + return fmt.Sprintf("at://did:web:%s/%s/%s", 495 + knot, tangled.PipelineNSID, pipelineRkey, 496 + ) 497 + } 498 + 499 + // triggerCommitAndBranch extracts (commit, branch) from a Tangled 500 + // pipeline trigger, regardless of whether it was a push, a pull 501 + // request, or a manual run. Returns empty strings on a fully-empty 502 + // trigger so the caller can decide whether that's fatal. 503 + func triggerCommitAndBranch(trigger *tangled.Pipeline_TriggerMetadata) (string, string) { 504 + if trigger == nil { 505 + return "", "" 506 + } 507 + switch { 508 + case trigger.Push != nil: 509 + // For push events, NewSha is the commit being built and 510 + // Ref is the full ref (e.g. "refs/heads/main") — strip 511 + // the prefix so Buildkite's branch-aware features work. 512 + return trigger.Push.NewSha, refToBranch(trigger.Push.Ref) 513 + case trigger.PullRequest != nil: 514 + // PRs build the source commit on the source branch. 515 + // Buildkite's pipeline can opt into PR-aware behaviour 516 + // via pull_request_id (not currently plumbed through). 517 + return trigger.PullRequest.SourceSha, trigger.PullRequest.SourceBranch 518 + default: 519 + // Manual triggers and any future kinds: fall back to the 520 + // repo default branch with no commit, which the caller 521 + // will treat as fatal — manual triggers will need 522 + // additional plumbing to pick a commit. 523 + if trigger.Repo != nil { 524 + return "", trigger.Repo.DefaultBranch 525 + } 526 + return "", "" 527 + } 528 + } 529 + 530 + // refToBranch strips the conventional refs/heads/ prefix from a git 531 + // ref. Refs that don't match the prefix (tags, refs/pull/N/head) are 532 + // returned as-is so downstream tooling can decide what to do with 533 + // them — Buildkite happily accepts either form in `branch`. 534 + func refToBranch(ref string) string { 535 + const prefix = "refs/heads/" 536 + if strings.HasPrefix(ref, prefix) { 537 + return strings.TrimPrefix(ref, prefix) 538 + } 539 + return ref 540 + } 541 + 542 + // sendLine pushes one LogLine into out, returning false if ctx 543 + // fired first. Centralised so the per-job loop in Logs stays 544 + // focused on the wire-shape decisions. 545 + func sendLine(ctx context.Context, out chan<- LogLine, line LogLine) bool { 546 + select { 547 + case <-ctx.Done(): 548 + return false 549 + case out <- line: 550 + return true 551 + } 552 + }
+405
provider_buildkite_test.go
··· 1 + package main 2 + 3 + // Provider-level integration tests for the Buildkite implementation: 4 + // Spawn → CreateBuild + persist + initial pending publish, and 5 + // HandleWebhook → translate state + publish status. Buildkite itself 6 + // is stubbed with httptest so the tests don't need network access. 7 + 8 + import ( 9 + "context" 10 + "crypto/hmac" 11 + "crypto/sha256" 12 + "encoding/hex" 13 + "encoding/json" 14 + "fmt" 15 + "io" 16 + "log/slog" 17 + "net/http" 18 + "net/http/httptest" 19 + "strings" 20 + "testing" 21 + "time" 22 + 23 + "tangled.org/core/api/tangled" 24 + 25 + "github.com/mitchellh/tack/internal/buildkite" 26 + ) 27 + 28 + // newBuildkiteTestProvider wires a buildkiteProvider against an 29 + // httptest server impersonating api.buildkite.com. Returns the 30 + // store/broker so tests can inspect publishes + persistence. 31 + func newBuildkiteTestProvider( 32 + t *testing.T, 33 + mode buildkite.WebhookMode, 34 + secret string, 35 + bkHandler http.HandlerFunc, 36 + ) (*buildkiteProvider, *store, *broker, *httptest.Server) { 37 + t.Helper() 38 + srv := httptest.NewServer(bkHandler) 39 + t.Cleanup(srv.Close) 40 + 41 + prev := buildkite.APIBase 42 + buildkite.APIBase = srv.URL 43 + t.Cleanup(func() { buildkite.APIBase = prev }) 44 + 45 + st := newTestStore(t) 46 + br := newBroker(st) 47 + logger := slog.Default() 48 + p := newBuildkiteProvider( 49 + br, st, 50 + buildkite.NewClient("tok", "myorg"), 51 + "mypipe", 52 + secret, mode, 53 + logger, 54 + ) 55 + return p, st, br, srv 56 + } 57 + 58 + // TestBuildkiteSpawn covers the full create-build path: trigger → 59 + // API call → DB row → "pending" status on the broker. 60 + func TestBuildkiteSpawn(t *testing.T) { 61 + bk := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 62 + w.WriteHeader(http.StatusCreated) 63 + _ = json.NewEncoder(w).Encode(buildkite.Build{ 64 + ID: "uuid-1", 65 + Number: 7, 66 + }) 67 + }) 68 + p, st, _, _ := newBuildkiteTestProvider(t, buildkite.WebhookModeToken, "secret", bk) 69 + 70 + trigger := &tangled.Pipeline_TriggerMetadata{ 71 + Push: &tangled.Pipeline_PushTriggerData{ 72 + NewSha: "abcdef0123", 73 + Ref: "refs/heads/main", 74 + }, 75 + } 76 + workflows := []*tangled.Pipeline_Workflow{ 77 + {Name: "test.yml", Raw: "steps:\n - run: true\n"}, 78 + } 79 + 80 + p.Spawn(context.Background(), "knot.example.com", "rkey-1", trigger, workflows) 81 + 82 + // Spawn fans out into goroutines; wait briefly for the side 83 + // effects to land. The store row is the load-bearing artifact 84 + // — once it's present, the publish has already happened too. 85 + deadline := time.Now().Add(2 * time.Second) 86 + var ref *BuildkiteBuildRef 87 + for time.Now().Before(deadline) { 88 + var err error 89 + ref, err = st.LookupBuildkiteBuildByUUID(context.Background(), "uuid-1") 90 + if err != nil { 91 + t.Fatalf("lookup: %v", err) 92 + } 93 + if ref != nil { 94 + break 95 + } 96 + time.Sleep(20 * time.Millisecond) 97 + } 98 + if ref == nil { 99 + t.Fatal("buildkite build row not persisted within deadline") 100 + } 101 + if ref.Workflow != "test.yml" || ref.Knot != "knot.example.com" || ref.PipelineRkey != "rkey-1" { 102 + t.Fatalf("ref mismatch: %+v", ref) 103 + } 104 + if ref.PipelineSlug != "mypipe" || ref.BuildNumber != 7 { 105 + t.Fatalf("buildkite ref mismatch: %+v", ref) 106 + } 107 + 108 + // One pending status should be on the events log. 109 + rows, err := st.EventsAfter(context.Background(), 0) 110 + if err != nil { 111 + t.Fatalf("EventsAfter: %v", err) 112 + } 113 + if len(rows) != 1 { 114 + t.Fatalf("got %d events, want 1", len(rows)) 115 + } 116 + var rec tangled.PipelineStatus 117 + if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil { 118 + t.Fatalf("decode status: %v", err) 119 + } 120 + if rec.Status != "pending" || rec.Workflow != "test.yml" { 121 + t.Fatalf("unexpected status: %+v", rec) 122 + } 123 + if !strings.Contains(rec.Pipeline, "knot.example.com") || 124 + !strings.Contains(rec.Pipeline, "rkey-1") { 125 + t.Fatalf("pipeline ATURI wrong: %s", rec.Pipeline) 126 + } 127 + } 128 + 129 + // TestBuildkiteSpawnNoCommit confirms we don't fire a build when the 130 + // trigger has no commit to build — kicking one off would resolve to 131 + // whatever main looks like at agent-fetch time, which is dangerously 132 + // surprising. 133 + func TestBuildkiteSpawnNoCommit(t *testing.T) { 134 + called := false 135 + bk := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 136 + called = true 137 + }) 138 + p, st, _, _ := newBuildkiteTestProvider(t, buildkite.WebhookModeToken, "secret", bk) 139 + 140 + p.Spawn(context.Background(), "knot.example.com", "rkey-1", 141 + &tangled.Pipeline_TriggerMetadata{Manual: &tangled.Pipeline_ManualTriggerData{}}, 142 + []*tangled.Pipeline_Workflow{{Name: "test.yml"}}, 143 + ) 144 + 145 + // Give any rogue goroutine a moment. 146 + time.Sleep(50 * time.Millisecond) 147 + if called { 148 + t.Fatal("CreateBuild called despite missing commit") 149 + } 150 + rows, _ := st.EventsAfter(context.Background(), 0) 151 + if len(rows) != 0 { 152 + t.Fatalf("got %d events, want 0", len(rows)) 153 + } 154 + } 155 + 156 + // TestBuildkiteHandleWebhook checks the translation pipeline: 157 + // recorded build + matching webhook → success status published. 158 + func TestBuildkiteHandleWebhook(t *testing.T) { 159 + p, st, _, _ := newBuildkiteTestProvider(t, buildkite.WebhookModeToken, "secret", 160 + func(w http.ResponseWriter, r *http.Request) { t.Fatal("buildkite shouldn't be called") }) 161 + 162 + // Pre-seed a known build mapping. 163 + if err := st.InsertBuildkiteBuild(context.Background(), BuildkiteBuildRef{ 164 + BuildUUID: "uuid-1", 165 + BuildNumber: 7, 166 + PipelineSlug: "mypipe", 167 + Knot: "knot.example.com", 168 + PipelineRkey: "rkey-1", 169 + Workflow: "test.yml", 170 + PipelineURI: "at://did:web:knot.example.com/sh.tangled.pipeline/rkey-1", 171 + }); err != nil { 172 + t.Fatalf("InsertBuildkiteBuild: %v", err) 173 + } 174 + 175 + err := p.HandleWebhook(context.Background(), buildkite.WebhookPayload{ 176 + Event: "build.finished", 177 + Build: buildkite.Build{ID: "uuid-1", State: "passed"}, 178 + }) 179 + if err != nil { 180 + t.Fatalf("HandleWebhook: %v", err) 181 + } 182 + 183 + rows, err := st.EventsAfter(context.Background(), 0) 184 + if err != nil { 185 + t.Fatalf("EventsAfter: %v", err) 186 + } 187 + if len(rows) != 1 { 188 + t.Fatalf("got %d events, want 1", len(rows)) 189 + } 190 + var rec tangled.PipelineStatus 191 + if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil { 192 + t.Fatalf("decode: %v", err) 193 + } 194 + if rec.Status != "success" || rec.Workflow != "test.yml" { 195 + t.Fatalf("bad status: %+v", rec) 196 + } 197 + } 198 + 199 + // TestBuildkiteHandleWebhookIgnored covers the "we don't care" paths: 200 + // non-build events and unknown builds must be no-op (no publish, no 201 + // error) so Buildkite doesn't retry them. 202 + func TestBuildkiteHandleWebhookIgnored(t *testing.T) { 203 + p, st, _, _ := newBuildkiteTestProvider(t, buildkite.WebhookModeToken, "secret", 204 + func(w http.ResponseWriter, r *http.Request) {}) 205 + 206 + // Non-build event: no lookup, no publish. 207 + if err := p.HandleWebhook(context.Background(), buildkite.WebhookPayload{ 208 + Event: "job.started", 209 + Build: buildkite.Build{ID: "uuid-x"}, 210 + }); err != nil { 211 + t.Fatalf("HandleWebhook (job.started): %v", err) 212 + } 213 + 214 + // Build event for unknown UUID: no publish. 215 + if err := p.HandleWebhook(context.Background(), buildkite.WebhookPayload{ 216 + Event: "build.finished", 217 + Build: buildkite.Build{ID: "unknown-uuid", State: "passed"}, 218 + }); err != nil { 219 + t.Fatalf("HandleWebhook (unknown): %v", err) 220 + } 221 + 222 + // Known build but unmapped state: no publish. 223 + if err := st.InsertBuildkiteBuild(context.Background(), BuildkiteBuildRef{ 224 + BuildUUID: "uuid-blocked", PipelineSlug: "mypipe", 225 + Knot: "k", PipelineRkey: "r", Workflow: "w", 226 + PipelineURI: "at://x", 227 + }); err != nil { 228 + t.Fatalf("seed: %v", err) 229 + } 230 + if err := p.HandleWebhook(context.Background(), buildkite.WebhookPayload{ 231 + Event: "build.finished", 232 + Build: buildkite.Build{ID: "uuid-blocked", State: "blocked"}, 233 + }); err != nil { 234 + t.Fatalf("HandleWebhook (blocked): %v", err) 235 + } 236 + 237 + rows, _ := st.EventsAfter(context.Background(), 0) 238 + if len(rows) != 0 { 239 + t.Fatalf("got %d events, want 0", len(rows)) 240 + } 241 + } 242 + 243 + // TestBuildkiteWebhookHandlerHTTP exercises the full HTTP path 244 + // including auth: a request signed with the wrong secret must be 245 + // rejected, and a correctly-signed one must reach the provider. 246 + func TestBuildkiteWebhookHandlerHTTP(t *testing.T) { 247 + // Signature mode is the more interesting code path; we cover 248 + // token mode in the verifier-level tests above. 249 + const secret = "swordfish" 250 + p, st, _, _ := newBuildkiteTestProvider(t, buildkite.WebhookModeSignature, secret, 251 + func(w http.ResponseWriter, r *http.Request) { /* unused */ }) 252 + 253 + // Pre-seed so the provider's HandleWebhook can resolve the build. 254 + if err := st.InsertBuildkiteBuild(context.Background(), BuildkiteBuildRef{ 255 + BuildUUID: "uuid-2", 256 + BuildNumber: 9, 257 + PipelineSlug: "mypipe", 258 + Knot: "knot.example.com", 259 + PipelineRkey: "rkey-2", 260 + Workflow: "test.yml", 261 + PipelineURI: "at://did:web:knot.example.com/sh.tangled.pipeline/rkey-2", 262 + }); err != nil { 263 + t.Fatalf("seed: %v", err) 264 + } 265 + 266 + body, _ := json.Marshal(map[string]any{ 267 + "event": "build.finished", 268 + "build": map[string]any{ 269 + "id": "uuid-2", 270 + "state": "failed", 271 + }, 272 + }) 273 + 274 + logger := slog.Default() 275 + handler := buildkiteWebhookHandler(logger, p) 276 + 277 + // Unsigned request → 401. 278 + t.Run("rejects unsigned", func(t *testing.T) { 279 + req := httptest.NewRequest(http.MethodPost, "/webhooks/buildkite", 280 + strings.NewReader(string(body))) 281 + req.Header.Set("X-Buildkite-Event", "build.finished") 282 + w := httptest.NewRecorder() 283 + handler(w, req) 284 + if w.Code != http.StatusUnauthorized { 285 + t.Fatalf("status = %d; want 401", w.Code) 286 + } 287 + }) 288 + 289 + // Wrong-secret request → 401. 290 + t.Run("rejects bad signature", func(t *testing.T) { 291 + ts := fmt.Sprintf("%d", time.Now().Unix()) 292 + mac := hmac.New(sha256.New, []byte("wrong")) 293 + mac.Write([]byte(ts)) 294 + mac.Write([]byte(".")) 295 + mac.Write(body) 296 + sig := hex.EncodeToString(mac.Sum(nil)) 297 + 298 + req := httptest.NewRequest(http.MethodPost, "/webhooks/buildkite", 299 + strings.NewReader(string(body))) 300 + req.Header.Set("X-Buildkite-Event", "build.finished") 301 + req.Header.Set("X-Buildkite-Signature", "timestamp="+ts+",signature="+sig) 302 + w := httptest.NewRecorder() 303 + handler(w, req) 304 + if w.Code != http.StatusUnauthorized { 305 + t.Fatalf("status = %d; want 401", w.Code) 306 + } 307 + }) 308 + 309 + // Valid request → 200, status published. 310 + t.Run("accepts valid", func(t *testing.T) { 311 + ts := fmt.Sprintf("%d", time.Now().Unix()) 312 + mac := hmac.New(sha256.New, []byte(secret)) 313 + mac.Write([]byte(ts)) 314 + mac.Write([]byte(".")) 315 + mac.Write(body) 316 + sig := hex.EncodeToString(mac.Sum(nil)) 317 + 318 + req := httptest.NewRequest(http.MethodPost, "/webhooks/buildkite", 319 + strings.NewReader(string(body))) 320 + req.Header.Set("X-Buildkite-Event", "build.finished") 321 + req.Header.Set("X-Buildkite-Signature", "timestamp="+ts+",signature="+sig) 322 + w := httptest.NewRecorder() 323 + handler(w, req) 324 + if w.Code != http.StatusOK { 325 + b, _ := io.ReadAll(w.Body) 326 + t.Fatalf("status = %d body=%s; want 200", w.Code, string(b)) 327 + } 328 + rows, _ := st.EventsAfter(context.Background(), 0) 329 + if len(rows) != 1 { 330 + t.Fatalf("got %d events, want 1", len(rows)) 331 + } 332 + var rec tangled.PipelineStatus 333 + if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil { 334 + t.Fatalf("decode: %v", err) 335 + } 336 + if rec.Status != "failed" { 337 + t.Fatalf("status = %q; want failed", rec.Status) 338 + } 339 + }) 340 + } 341 + 342 + // TestBuildkiteWebhookHandlerNoProvider confirms the 503 branch when 343 + // tack is running with the fake provider — a misdirected webhook 344 + // must get a clear "not configured here" instead of a misleading 345 + // 200 OK that silently throws the event away. 346 + func TestBuildkiteWebhookHandlerNoProvider(t *testing.T) { 347 + handler := buildkiteWebhookHandler(slog.Default(), nil) 348 + req := httptest.NewRequest(http.MethodPost, "/webhooks/buildkite", 349 + strings.NewReader("{}")) 350 + w := httptest.NewRecorder() 351 + handler(w, req) 352 + if w.Code != http.StatusServiceUnavailable { 353 + t.Fatalf("status = %d; want 503", w.Code) 354 + } 355 + } 356 + 357 + // TestTriggerCommitAndBranch pins the trigger-shape mapping. Each 358 + // case pairs an input trigger with the (commit, branch) tuple a 359 + // real CI provider would feed into its build-creation API. 360 + func TestTriggerCommitAndBranch(t *testing.T) { 361 + cases := []struct { 362 + name string 363 + in *tangled.Pipeline_TriggerMetadata 364 + wantCommit string 365 + wantBranch string 366 + }{ 367 + {"nil", nil, "", ""}, 368 + {"push refs/heads", 369 + &tangled.Pipeline_TriggerMetadata{ 370 + Push: &tangled.Pipeline_PushTriggerData{NewSha: "abc", Ref: "refs/heads/main"}, 371 + }, 372 + "abc", "main", 373 + }, 374 + {"push tag ref preserved", 375 + &tangled.Pipeline_TriggerMetadata{ 376 + Push: &tangled.Pipeline_PushTriggerData{NewSha: "abc", Ref: "refs/tags/v1"}, 377 + }, 378 + "abc", "refs/tags/v1", 379 + }, 380 + {"pull request", 381 + &tangled.Pipeline_TriggerMetadata{ 382 + PullRequest: &tangled.Pipeline_PullRequestTriggerData{ 383 + SourceSha: "def", SourceBranch: "feature", 384 + }, 385 + }, 386 + "def", "feature", 387 + }, 388 + {"manual with default branch", 389 + &tangled.Pipeline_TriggerMetadata{ 390 + Manual: &tangled.Pipeline_ManualTriggerData{}, 391 + Repo: &tangled.Pipeline_TriggerRepo{DefaultBranch: "main"}, 392 + }, 393 + "", "main", 394 + }, 395 + } 396 + for _, c := range cases { 397 + t.Run(c.name, func(t *testing.T) { 398 + gotC, gotB := triggerCommitAndBranch(c.in) 399 + if gotC != c.wantCommit || gotB != c.wantBranch { 400 + t.Fatalf("got (%q,%q); want (%q,%q)", 401 + gotC, gotB, c.wantCommit, c.wantBranch) 402 + } 403 + }) 404 + } 405 + }
+1
provider_fake.go
··· 81 81 ctx context.Context, 82 82 knot string, 83 83 pipelineRkey string, 84 + _ *tangled.Pipeline_TriggerMetadata, 84 85 workflows []*tangled.Pipeline_Workflow, 85 86 ) { 86 87 if len(workflows) == 0 {
+103
store.go
··· 322 322 return id, nil 323 323 } 324 324 325 + // BuildkiteBuildRef is the persisted mapping from one Buildkite build 326 + // to the Tangled pipeline tuple that spawned it. It's the row written 327 + // by the Buildkite provider at Spawn time and read back from two 328 + // places: the webhook handler (by build UUID) when an event arrives, 329 + // and the /logs handler (by knot+rkey+workflow) when an appview 330 + // client asks for output. 331 + type BuildkiteBuildRef struct { 332 + BuildUUID string 333 + BuildNumber int64 334 + PipelineSlug string 335 + Knot string 336 + PipelineRkey string 337 + Workflow string 338 + PipelineURI string 339 + } 340 + 341 + // InsertBuildkiteBuild records that a Buildkite build was created on 342 + // behalf of the given (knot, pipelineRkey, workflow) tuple. Uses 343 + // INSERT OR REPLACE so that an unlikely build-uuid collision (or a 344 + // Buildkite-side rebuild that re-fires us) just refreshes the row 345 + // instead of failing. 346 + func (s *store) InsertBuildkiteBuild(ctx context.Context, ref BuildkiteBuildRef) error { 347 + _, err := s.db.ExecContext(ctx, 348 + `INSERT INTO buildkite_builds ( 349 + build_uuid, build_number, pipeline_slug, 350 + knot, pipeline_rkey, workflow, 351 + pipeline_uri, created_at 352 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) 353 + ON CONFLICT(build_uuid) DO UPDATE SET 354 + build_number = excluded.build_number, 355 + pipeline_slug = excluded.pipeline_slug, 356 + knot = excluded.knot, 357 + pipeline_rkey = excluded.pipeline_rkey, 358 + workflow = excluded.workflow, 359 + pipeline_uri = excluded.pipeline_uri, 360 + created_at = excluded.created_at`, 361 + ref.BuildUUID, ref.BuildNumber, ref.PipelineSlug, 362 + ref.Knot, ref.PipelineRkey, ref.Workflow, 363 + ref.PipelineURI, time.Now().UTC().Format(time.RFC3339Nano), 364 + ) 365 + if err != nil { 366 + return fmt.Errorf("insert buildkite_build: %w", err) 367 + } 368 + return nil 369 + } 370 + 371 + // LookupBuildkiteBuildByUUID returns the saved mapping for the given 372 + // Buildkite build UUID, or nil when no such build is recorded. 373 + // Returning a nil pointer rather than a sentinel error keeps the 374 + // webhook handler's "we don't know about this build" branch a simple 375 + // nil check. 376 + func (s *store) LookupBuildkiteBuildByUUID(ctx context.Context, buildUUID string) (*BuildkiteBuildRef, error) { 377 + var ref BuildkiteBuildRef 378 + err := s.db.QueryRowContext(ctx, 379 + `SELECT build_uuid, build_number, pipeline_slug, 380 + knot, pipeline_rkey, workflow, pipeline_uri 381 + FROM buildkite_builds WHERE build_uuid = ?`, 382 + buildUUID, 383 + ).Scan( 384 + &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, 385 + &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 386 + ) 387 + if errors.Is(err, sql.ErrNoRows) { 388 + return nil, nil 389 + } 390 + if err != nil { 391 + return nil, fmt.Errorf("lookup buildkite_build by uuid: %w", err) 392 + } 393 + return &ref, nil 394 + } 395 + 396 + // LookupBuildkiteBuildByTuple finds the most recently created build 397 + // for (knot, pipelineRkey, workflow). Returns nil when no build has 398 + // been recorded for that tuple — used by /logs to translate the 399 + // appview's path-based identity back into something Buildkite knows. 400 + // 401 + // "Most recent" matters because a workflow may have multiple builds 402 + // over time (rebuilds, re-triggers). We always serve logs for the 403 + // latest run; older runs are still queryable by build UUID directly 404 + // if anyone ever wants that. 405 + func (s *store) LookupBuildkiteBuildByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*BuildkiteBuildRef, error) { 406 + var ref BuildkiteBuildRef 407 + err := s.db.QueryRowContext(ctx, 408 + `SELECT build_uuid, build_number, pipeline_slug, 409 + knot, pipeline_rkey, workflow, pipeline_uri 410 + FROM buildkite_builds 411 + WHERE knot = ? AND pipeline_rkey = ? AND workflow = ? 412 + ORDER BY created_at DESC 413 + LIMIT 1`, 414 + knot, pipelineRkey, workflow, 415 + ).Scan( 416 + &ref.BuildUUID, &ref.BuildNumber, &ref.PipelineSlug, 417 + &ref.Knot, &ref.PipelineRkey, &ref.Workflow, &ref.PipelineURI, 418 + ) 419 + if errors.Is(err, sql.ErrNoRows) { 420 + return nil, nil 421 + } 422 + if err != nil { 423 + return nil, fmt.Errorf("lookup buildkite_build by tuple: %w", err) 424 + } 425 + return &ref, nil 426 + } 427 + 325 428 // EventsAfter returns every event row with `created` strictly greater 326 429 // than cursor, in cursor order. Used by /events to backfill a 327 430 // reconnecting subscriber and to drain newly-published rows on each
+26
store_migrate.go
··· 78 78 event_json TEXT NOT NULL, 79 79 inserted_at TEXT NOT NULL 80 80 ); 81 + 82 + -- Mapping from a Buildkite build back to the Tangled pipeline that 83 + -- spawned it. The Buildkite webhook receiver only knows the build 84 + -- UUID; everything we need to publish a pipeline.status record 85 + -- (knot, pipeline rkey, workflow name, full pipeline ATURI) lives 86 + -- on this row. 87 + -- 88 + -- pipeline_uri is denormalized off (knot, pipeline_rkey) so the 89 + -- webhook handler doesn't have to recompute the at:// string on 90 + -- every event — it's a constant for the lifetime of the build and 91 + -- the webhook is the hot path for status fan-out. 92 + -- 93 + -- The (knot, pipeline_rkey, workflow) index supports the /logs 94 + -- handler, which only knows that tuple at request time. 95 + CREATE TABLE IF NOT EXISTS buildkite_builds ( 96 + build_uuid TEXT PRIMARY KEY, 97 + build_number INTEGER NOT NULL, 98 + pipeline_slug TEXT NOT NULL, 99 + knot TEXT NOT NULL, 100 + pipeline_rkey TEXT NOT NULL, 101 + workflow TEXT NOT NULL, 102 + pipeline_uri TEXT NOT NULL, 103 + created_at TEXT NOT NULL 104 + ); 105 + CREATE INDEX IF NOT EXISTS buildkite_builds_lookup 106 + ON buildkite_builds (knot, pipeline_rkey, workflow); 81 107 ` 82 108 83 109 // migrate applies the schema. Safe to call repeatedly.