Stitch any CI into Tangled
151
fork

Configure Feed

Select the types of activity you want to include in your feed.

provider/sourcehut: add builds.sr.ht provider #15

open opened by j3s.sh targeting main from j3s.sh/tack: main

Submits jobs via GraphQL submit, polls for status, streams per-task logs from /query/log/ (the path that bypasses sourcehut's anti-bot proxy). Workflows opt in via tack.sourcehut.manifest; tack injects TACK_* env vars into the manifest before submission.

Signed-off-by: Jes Olson j3s@c3f.net

apologies if the LLM comments are a little excessive -- i didn't have time to clean them up fully. i reviewed & tested this code, you can see it in action here: https://tangled.org/j3s.sh/testy

and here is an example of a resulting sr.ht build: https://builds.sr.ht/~capsul/job/1749924

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:pxa3amkp7jhfclaads3zud7q/sh.tangled.repo.pull/3ml5t7m75n422
+1539 -2
Diff #1
+267
internal/sourcehut/sourcehut.go
··· 1 + // Package sourcehut is a minimal builds.sr.ht client used by the 2 + // sourcehut Provider implementation in tack. Job submission and 3 + // state queries go to GraphQL at /query; plain-text task logs are 4 + // fetched from /query/log/<id>[/<task>]/log. 5 + package sourcehut 6 + 7 + import ( 8 + "bytes" 9 + "context" 10 + "encoding/json" 11 + "errors" 12 + "fmt" 13 + "io" 14 + "net/http" 15 + "net/url" 16 + "strings" 17 + "time" 18 + ) 19 + 20 + // DefaultBaseURL is the public builds.sr.ht endpoint. Operators 21 + // running their own sourcehut instance can override it on the Client. 22 + const DefaultBaseURL = "https://builds.sr.ht" 23 + 24 + // ErrNotFound is returned by Get* methods when the upstream returns 25 + // 404 (or, for GraphQL, when the requested job's `job` field comes 26 + // back null). The Provider maps this onto its own ErrLogsNotFound 27 + // for the /logs handler. 28 + var ErrNotFound = errors.New("sourcehut: not found") 29 + 30 + // Client is a thin wrapper around net/http carrying API credentials 31 + // and the target instance base URL. Safe for concurrent use; the 32 + // embedded http.Client is goroutine-safe. 33 + type Client struct { 34 + http *http.Client 35 + baseURL string 36 + token string 37 + } 38 + 39 + // NewClient builds a Client targeting baseURL using the given OAuth2 40 + // personal access token. baseURL should be the scheme+host of the 41 + // builds.sr.ht instance (e.g. "https://builds.sr.ht"); pass empty to 42 + // fall back to DefaultBaseURL. 43 + func NewClient(baseURL, token string) *Client { 44 + if baseURL == "" { 45 + baseURL = DefaultBaseURL 46 + } 47 + return &Client{ 48 + http: &http.Client{Timeout: 30 * time.Second}, 49 + baseURL: strings.TrimRight(baseURL, "/"), 50 + token: token, 51 + } 52 + } 53 + 54 + // Owner is the small slice of the upstream owner object we care 55 + // about. CanonicalName carries the leading "~" (e.g. "~someone"). 56 + type Owner struct { 57 + CanonicalName string `json:"canonicalName"` 58 + } 59 + 60 + // Task is one step inside a sourcehut job. 61 + type Task struct { 62 + Name string `json:"name"` 63 + Status string `json:"status"` 64 + } 65 + 66 + // Job is the subset of the upstream job object we care about. 67 + type Job struct { 68 + ID int64 `json:"id"` 69 + Status string `json:"status"` 70 + Note string `json:"note"` 71 + Owner Owner `json:"owner"` 72 + Tasks []Task `json:"tasks"` 73 + } 74 + 75 + // SubmitRequest mirrors the arguments of the GraphQL `submit` 76 + // mutation. Manifest is the YAML build manifest exactly as the user 77 + // would submit it via the web UI. Execute=true tells builds.sr.ht to 78 + // start the job immediately; Secrets controls whether the runner 79 + // mounts the user's configured secrets. 80 + type SubmitRequest struct { 81 + Manifest string 82 + Tags []string 83 + Note string 84 + Secrets bool 85 + Execute bool 86 + } 87 + 88 + // gqlRequest / gqlResponse are the on-the-wire shapes for a single 89 + // GraphQL POST. We hand-roll them because the entire query surface we 90 + // touch is two operations — pulling in a real GraphQL library would 91 + // dwarf the call sites. 92 + type gqlRequest struct { 93 + Query string `json:"query"` 94 + Variables map[string]any `json:"variables,omitempty"` 95 + } 96 + 97 + type gqlError struct { 98 + Message string `json:"message"` 99 + } 100 + 101 + type gqlResponse struct { 102 + Data json.RawMessage `json:"data"` 103 + Errors []gqlError `json:"errors"` 104 + } 105 + 106 + // SubmitJob fires a job via the GraphQL `submit` mutation and returns 107 + // the freshly created Job. 108 + // 109 + // The submit mutation's response is shallow — it doesn't include the 110 + // per-task list — so callers wanting a populated Tasks slice should 111 + // follow up with GetJob once the runner has scheduled the job. That 112 + // matches how the Provider already drives its watch loop. 113 + func (c *Client) SubmitJob(ctx context.Context, req SubmitRequest) (*Job, error) { 114 + const query = `mutation($manifest:String!, $tags:[String!], $note:String, $execute:Boolean, $secrets:Boolean){ 115 + submit(manifest:$manifest, tags:$tags, note:$note, execute:$execute, secrets:$secrets){ 116 + id status note owner{ canonicalName } 117 + } 118 + }` 119 + vars := map[string]any{ 120 + "manifest": req.Manifest, 121 + "execute": req.Execute, 122 + "secrets": req.Secrets, 123 + } 124 + if len(req.Tags) > 0 { 125 + vars["tags"] = req.Tags 126 + } 127 + if req.Note != "" { 128 + vars["note"] = req.Note 129 + } 130 + 131 + raw, err := c.gql(ctx, query, vars) 132 + if err != nil { 133 + return nil, err 134 + } 135 + var wrap struct { 136 + Submit *Job `json:"submit"` 137 + } 138 + if err := json.Unmarshal(raw, &wrap); err != nil { 139 + return nil, fmt.Errorf("decode submit response: %w", err) 140 + } 141 + if wrap.Submit == nil { 142 + // Sourcehut returns `submit: null` with no errors when the 143 + // manifest fails server-side validation in some edge cases; 144 + // guard so the caller doesn't dereference nil. 145 + return nil, errors.New("sourcehut: submit returned null job") 146 + } 147 + return wrap.Submit, nil 148 + } 149 + 150 + // GetJob fetches the current state of a job via GraphQL. Returns 151 + // ErrNotFound when the upstream `job(id:)` field resolves to null — 152 + // the usual signal for an unknown or deleted job. 153 + func (c *Client) GetJob(ctx context.Context, id int64) (*Job, error) { 154 + const query = `query($id:Int!){ 155 + job(id:$id){ 156 + id status note owner{ canonicalName } tasks{ name status } 157 + } 158 + }` 159 + raw, err := c.gql(ctx, query, map[string]any{"id": id}) 160 + if err != nil { 161 + return nil, err 162 + } 163 + var wrap struct { 164 + Job *Job `json:"job"` 165 + } 166 + if err := json.Unmarshal(raw, &wrap); err != nil { 167 + return nil, fmt.Errorf("decode job response: %w", err) 168 + } 169 + if wrap.Job == nil { 170 + return nil, ErrNotFound 171 + } 172 + return wrap.Job, nil 173 + } 174 + 175 + // GetTaskLog fetches the plain-text log for a single task inside a 176 + // job. taskName must match the task's `name` exactly; pass an empty 177 + // string to fetch the master log (the wrapper output that contains 178 + // setup steps before any task runs). Returns ErrNotFound on 404 — 179 + // common for a task that hasn't started yet, or a master log fetched 180 + // before the runner has produced output. 181 + func (c *Client) GetTaskLog(ctx context.Context, jobID int64, taskName string) (string, error) { 182 + path := fmt.Sprintf("%s/query/log/%d/log", c.baseURL, jobID) 183 + if taskName != "" { 184 + path = fmt.Sprintf("%s/query/log/%d/%s/log", 185 + c.baseURL, jobID, url.PathEscape(taskName), 186 + ) 187 + } 188 + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil) 189 + if err != nil { 190 + return "", fmt.Errorf("build request: %w", err) 191 + } 192 + c.setAuth(httpReq) 193 + 194 + resp, err := c.http.Do(httpReq) 195 + if err != nil { 196 + return "", fmt.Errorf("get task log: %w", err) 197 + } 198 + defer resp.Body.Close() 199 + if resp.StatusCode == http.StatusNotFound { 200 + return "", ErrNotFound 201 + } 202 + if resp.StatusCode != http.StatusOK { 203 + raw, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) 204 + return "", fmt.Errorf("get task log: status %d: %s", 205 + resp.StatusCode, strings.TrimSpace(string(raw)), 206 + ) 207 + } 208 + body, err := io.ReadAll(resp.Body) 209 + if err != nil { 210 + return "", fmt.Errorf("read task log: %w", err) 211 + } 212 + return string(body), nil 213 + } 214 + 215 + // JobWebURL renders the human-visible URL for a job. 216 + func (c *Client) JobWebURL(owner string, jobID int64) string { 217 + return fmt.Sprintf("%s/%s/job/%d", c.baseURL, owner, jobID) 218 + } 219 + 220 + // gql posts a single GraphQL request and returns the raw `data` 221 + // payload. Errors from the upstream "errors" array are surfaced as a 222 + // single Go error so callers don't have to hand-decode them. 223 + func (c *Client) gql(ctx context.Context, query string, vars map[string]any) (json.RawMessage, error) { 224 + body, err := json.Marshal(gqlRequest{Query: query, Variables: vars}) 225 + if err != nil { 226 + return nil, fmt.Errorf("marshal gql request: %w", err) 227 + } 228 + httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, 229 + c.baseURL+"/query", bytes.NewReader(body), 230 + ) 231 + if err != nil { 232 + return nil, fmt.Errorf("build request: %w", err) 233 + } 234 + c.setAuth(httpReq) 235 + httpReq.Header.Set("Content-Type", "application/json") 236 + 237 + resp, err := c.http.Do(httpReq) 238 + if err != nil { 239 + return nil, fmt.Errorf("post gql: %w", err) 240 + } 241 + defer resp.Body.Close() 242 + if resp.StatusCode != http.StatusOK { 243 + raw, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) 244 + return nil, fmt.Errorf("gql: status %d: %s", 245 + resp.StatusCode, strings.TrimSpace(string(raw)), 246 + ) 247 + } 248 + var out gqlResponse 249 + if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { 250 + return nil, fmt.Errorf("decode gql response: %w", err) 251 + } 252 + if len(out.Errors) > 0 { 253 + msgs := make([]string, 0, len(out.Errors)) 254 + for _, e := range out.Errors { 255 + msgs = append(msgs, e.Message) 256 + } 257 + return nil, fmt.Errorf("gql errors: %s", strings.Join(msgs, "; ")) 258 + } 259 + return out.Data, nil 260 + } 261 + 262 + // setAuth applies the OAuth2 Bearer header builds.sr.ht expects. 263 + func (c *Client) setAuth(req *http.Request) { 264 + if c.token != "" { 265 + req.Header.Set("Authorization", "Bearer "+c.token) 266 + } 267 + }
+24 -2
main.go
··· 52 52 // TektonNamespace using its pod's service account credentials. 53 53 TektonEnabled bool 54 54 TektonNamespace string 55 + 56 + // Sourcehut mode is gated on a token: when empty the provider is 57 + // not registered and workflows naming `tack.sourcehut` route 58 + // nowhere. SourcehutInstance overrides the public builds.sr.ht 59 + // endpoint for users running their own deployment; the token must 60 + // authenticate against whichever instance is targeted. 61 + SourcehutToken string 62 + SourcehutInstance string 55 63 } 56 64 57 65 func loadConfig() (config, error) { ··· 68 76 BuildkiteWebhookMode: buildkite.WebhookMode( 69 77 envOr("TACK_BUILDKITE_WEBHOOK_MODE", string(buildkite.WebhookModeToken)), 70 78 ), 71 - TektonEnabled: os.Getenv("TACK_TEKTON_ENABLED") == "1", 72 - TektonNamespace: envOr("TACK_TEKTON_NAMESPACE", "default"), 79 + TektonEnabled: os.Getenv("TACK_TEKTON_ENABLED") == "1", 80 + TektonNamespace: envOr("TACK_TEKTON_NAMESPACE", "default"), 81 + SourcehutToken: os.Getenv("TACK_SOURCEHUT_TOKEN"), 82 + SourcehutInstance: os.Getenv("TACK_SOURCEHUT_INSTANCE"), 73 83 } 74 84 addrFlag := flag.String("addr", cfg.Addr, "HTTP listen address (overrides TACK_LISTEN_ADDR)") 75 85 flag.Parse() ··· 217 227 "namespace", cfg.TektonNamespace, 218 228 ) 219 229 } 230 + if cfg.SourcehutToken != "" { 231 + shProvider := newSourcehutProvider( 232 + br, st, 233 + cfg.SourcehutToken, 234 + cfg.SourcehutInstance, 235 + logger, 236 + ) 237 + providers["sourcehut"] = shProvider 238 + logger.Info("sourcehut provider enabled", 239 + "instance", shProvider.defaultInstance, 240 + ) 241 + } 220 242 provider := newProviderRouter(logger, providers) 221 243 222 244 // Start the knot event-stream consumer first so the jetstream
+580
provider_sourcehut.go
··· 1 + package main 2 + 3 + // sourcehutProvider implements Provider against a builds.sr.ht 4 + // instance. Spawn submits one job per workflow through GraphQL's 5 + // `submit` mutation; status is observed via an in-process poll loop 6 + // — same shape as the Tekton provider. 7 + // 8 + // The build manifest a workflow targets is carried inline in the 9 + // workflow's YAML body under `tack.sourcehut.manifest`. Tack injects 10 + // a small set of TACK_* environment variables into the manifest so 11 + // the user's tasks can dispatch on the originating Tangled trigger. 12 + 13 + import ( 14 + "context" 15 + "encoding/json" 16 + "errors" 17 + "fmt" 18 + "log/slog" 19 + "strings" 20 + "sync" 21 + "time" 22 + 23 + "go.yaml.in/yaml/v2" 24 + "tangled.org/core/api/tangled" 25 + 26 + "go.mitchellh.com/tack/internal/sourcehut" 27 + ) 28 + 29 + // defaultSourcehutPollInterval is how often the watch loop re-fetches 30 + // upstream job state. 31 + const defaultSourcehutPollInterval = 5 * time.Second 32 + 33 + // workflowSourcehutDoc is the tack-flavoured schema we expect inside 34 + // each Tangled workflow's Raw YAML body. Same nesting convention as 35 + // the other providers (`tack: { sourcehut: ... }`) so a workflow can 36 + // grow other top-level keys without colliding with our namespace. 37 + type workflowSourcehutDoc struct { 38 + Tack struct { 39 + Sourcehut sourcehutWorkflowConfig `yaml:"sourcehut"` 40 + } `yaml:"tack"` 41 + } 42 + 43 + // sourcehutWorkflowConfig is the sourcehut-specific subset of a 44 + // workflow YAML. 45 + // 46 + // Manifest is the YAML build manifest the user wants builds.sr.ht to 47 + // run, exactly as they'd paste it into the web UI. We treat it as an 48 + // opaque string until Spawn time, then decode/inject env vars/re-emit 49 + // just before submitting so users keep authorial control of their 50 + // manifests. 51 + // 52 + // Instance optionally overrides the provider's default sourcehut host 53 + // (e.g. for users running their own builds.sr.ht). It must be a full 54 + // URL including scheme so we don't have to guess http vs https. 55 + // 56 + // Tags/Note flow straight through to the submit API — they're useful 57 + // for the user's own filtering on builds.sr.ht's job list view. 58 + // 59 + // Secrets controls whether the runner mounts the user's configured 60 + // sourcehut secrets. Default is false because secrets injection is a 61 + // blast-radius decision that should be opt-in per workflow. 62 + type sourcehutWorkflowConfig struct { 63 + Manifest string `yaml:"manifest"` 64 + Instance string `yaml:"instance"` 65 + Tags []string `yaml:"tags"` 66 + Note string `yaml:"note"` 67 + Secrets bool `yaml:"secrets"` 68 + } 69 + 70 + // parseSourcehutWorkflowConfig decodes `tack.sourcehut` from a workflow 71 + // body and validates the small set of fields we require. An empty body 72 + // or a missing manifest is a structural error so spawnWorkflow can 73 + // short-circuit cleanly instead of submitting a malformed job that 74 + // builds.sr.ht would reject. 75 + func parseSourcehutWorkflowConfig(raw string) (*sourcehutWorkflowConfig, error) { 76 + if strings.TrimSpace(raw) == "" { 77 + return nil, errors.New("workflow body is empty") 78 + } 79 + var doc workflowSourcehutDoc 80 + if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 81 + return nil, fmt.Errorf("parse workflow yaml: %w", err) 82 + } 83 + cfg := doc.Tack.Sourcehut 84 + if strings.TrimSpace(cfg.Manifest) == "" { 85 + return nil, errors.New("workflow yaml: `tack.sourcehut.manifest` is required") 86 + } 87 + return &cfg, nil 88 + } 89 + 90 + // sourcehutProvider implements Provider. 91 + // 92 + // defaultInstance is the builds.sr.ht instance Spawn submits to when 93 + // a workflow doesn't override it; the configured token must 94 + // authenticate against that instance. defaultClient is the matching 95 + // Client; per-workflow `instance` overrides cause Spawn to mint a 96 + // short-lived sibling Client carrying the same token, since one token 97 + // can address only one sourcehut deployment in practice. 98 + type sourcehutProvider struct { 99 + br *broker 100 + st *store 101 + log *slog.Logger 102 + token string 103 + defaultClient *sourcehut.Client 104 + defaultInstance string 105 + pollInterval time.Duration 106 + 107 + // instanceClients caches a Client per non-default instance URL so a 108 + // workflow that targets a custom builds.sr.ht doesn't mint (and 109 + // leak) a fresh http.Transport + connection pool on every Spawn, 110 + // watch tick, and Logs call. The default instance always uses 111 + // defaultClient and never lands here. 112 + mu sync.Mutex 113 + instanceClients map[string]*sourcehut.Client 114 + } 115 + 116 + var _ Provider = (*sourcehutProvider)(nil) 117 + 118 + // newSourcehutProvider wires a provider to its sourcehut client and 119 + // to the broker it publishes pipeline.status records on. instance is 120 + // the default builds.sr.ht base URL; pass empty to use 121 + // sourcehut.DefaultBaseURL. 122 + func newSourcehutProvider( 123 + br *broker, 124 + st *store, 125 + token string, 126 + instance string, 127 + log *slog.Logger, 128 + ) *sourcehutProvider { 129 + if instance == "" { 130 + instance = sourcehut.DefaultBaseURL 131 + } 132 + return &sourcehutProvider{ 133 + br: br, 134 + st: st, 135 + log: log.With("component", "provider", "kind", "sourcehut"), 136 + token: token, 137 + defaultClient: sourcehut.NewClient(instance, token), 138 + defaultInstance: instance, 139 + pollInterval: defaultSourcehutPollInterval, 140 + instanceClients: map[string]*sourcehut.Client{}, 141 + } 142 + } 143 + 144 + // Spawn satisfies Provider. For each workflow it submits a separate 145 + // builds.sr.ht job so each workflow gets its own status timeline. The 146 + // actual API call runs on a goroutine — submission is one HTTP round 147 + // trip, but Spawn is contractually non-blocking. 148 + func (p *sourcehutProvider) Spawn( 149 + ctx context.Context, 150 + knot string, 151 + pipelineRkey string, 152 + actor string, 153 + trigger *tangled.Pipeline_TriggerMetadata, 154 + workflows []*tangled.Pipeline_Workflow, 155 + ) { 156 + if len(workflows) == 0 { 157 + p.log.Warn("pipeline has no workflows; nothing to spawn", 158 + "knot", knot, "rkey", pipelineRkey, 159 + ) 160 + return 161 + } 162 + for _, wf := range workflows { 163 + if wf == nil || wf.Name == "" { 164 + continue 165 + } 166 + wf := wf 167 + go p.spawnWorkflow(ctx, knot, pipelineRkey, actor, trigger, wf) 168 + } 169 + } 170 + 171 + func (p *sourcehutProvider) spawnWorkflow( 172 + ctx context.Context, 173 + knot string, 174 + pipelineRkey string, 175 + actor string, 176 + trigger *tangled.Pipeline_TriggerMetadata, 177 + wf *tangled.Pipeline_Workflow, 178 + ) { 179 + logger := p.log.With( 180 + "knot", knot, 181 + "pipeline_rkey", pipelineRkey, 182 + "workflow", wf.Name, 183 + "actor", actor, 184 + ) 185 + 186 + cfg, err := parseSourcehutWorkflowConfig(wf.Raw) 187 + if err != nil { 188 + logger.Error("invalid workflow config; refusing to spawn", "err", err) 189 + return 190 + } 191 + 192 + commit, branch := triggerCommitAndBranch(trigger) 193 + manifest, err := injectSourcehutEnvironment(cfg.Manifest, map[string]string{ 194 + "TACK_KNOT": knot, 195 + "TACK_PIPELINE_RKEY": pipelineRkey, 196 + "TACK_WORKFLOW": wf.Name, 197 + "TACK_WORKFLOW_RAW": wf.Raw, 198 + "TACK_ACTOR": actor, 199 + "TACK_COMMIT": commit, 200 + "TACK_BRANCH": branch, 201 + }) 202 + if err != nil { 203 + logger.Error("inject TACK_* env into manifest", "err", err) 204 + return 205 + } 206 + 207 + client, instance := p.clientFor(cfg.Instance) 208 + logger = logger.With("instance", instance) 209 + 210 + tags := cfg.Tags 211 + if len(tags) == 0 { 212 + // Auto-tag with "tack" so an operator browsing the builds.sr.ht 213 + // job list can filter to the jobs originating from this spindle 214 + // without users having to remember to set tags themselves. 215 + tags = []string{"tack"} 216 + } 217 + note := cfg.Note 218 + if note == "" { 219 + note = fmt.Sprintf("tangled: %s @ %s", wf.Name, shortCommit(commit)) 220 + } 221 + 222 + job, err := client.SubmitJob(ctx, sourcehut.SubmitRequest{ 223 + Manifest: manifest, 224 + Tags: tags, 225 + Note: note, 226 + Secrets: cfg.Secrets, 227 + Execute: true, 228 + }) 229 + if err != nil { 230 + logger.Error("submit sourcehut job", "err", err) 231 + return 232 + } 233 + logger.Info("sourcehut job submitted", 234 + "job_id", job.ID, 235 + "owner", job.Owner.CanonicalName, 236 + "web_url", client.JobWebURL(job.Owner.CanonicalName, job.ID), 237 + ) 238 + 239 + pipelineURI := pipelineATURI(knot, pipelineRkey) 240 + ref := SourcehutJobRef{ 241 + Knot: knot, 242 + PipelineRkey: pipelineRkey, 243 + Workflow: wf.Name, 244 + JobID: job.ID, 245 + Owner: job.Owner.CanonicalName, 246 + Instance: instance, 247 + PipelineURI: pipelineURI, 248 + } 249 + if err := p.st.InsertSourcehutJob(ctx, ref); err != nil { 250 + // Without the row the watch goroutine and any /logs lookup 251 + // can't recover the job. Surface loudly and bail. 252 + logger.Error("persist sourcehut job mapping", "err", err, 253 + "job_id", job.ID, 254 + ) 255 + return 256 + } 257 + 258 + if err := p.publishStatus(ctx, pipelineURI, wf.Name, 259 + "pending", job.ID, nil, nil); err != nil { 260 + logger.Error("publish initial pending status", "err", err) 261 + } 262 + 263 + go p.watchJob(ctx, ref, client) 264 + } 265 + 266 + // watchJob polls the job until it reaches a terminal state, publishing 267 + // pipeline.status records on every distinct transition. We mirror the 268 + // Tekton provider's structure: a single goroutine per job, suppressed 269 + // duplicate publishes via `last`, exit on terminal or ctx cancellation. 270 + func (p *sourcehutProvider) watchJob( 271 + ctx context.Context, 272 + ref SourcehutJobRef, 273 + client *sourcehut.Client, 274 + ) { 275 + logger := p.log.With( 276 + "knot", ref.Knot, 277 + "pipeline_rkey", ref.PipelineRkey, 278 + "workflow", ref.Workflow, 279 + "job_id", ref.JobID, 280 + "instance", ref.Instance, 281 + ) 282 + logger.Debug("watchJob: starting") 283 + 284 + last := "" 285 + ticker := time.NewTicker(p.pollInterval) 286 + defer ticker.Stop() 287 + 288 + // Issue one immediate poll so a fast-completing job (e.g. a tiny 289 + // manifest the runner finished before the first tick) doesn't sit 290 + // at "pending" for the whole interval. 291 + for { 292 + job, err := client.GetJob(ctx, ref.JobID) 293 + if errors.Is(err, sourcehut.ErrNotFound) { 294 + logger.Warn("job disappeared while watching") 295 + return 296 + } 297 + if err != nil { 298 + logger.Debug("get sourcehut job", "err", err) 299 + } else { 300 + status, terminal, ok := mapSourcehutStatus(job.Status) 301 + logger.Debug("watchJob: poll", 302 + "upstream_status", job.Status, 303 + "status", status, "terminal", terminal, "ok", ok, "last", last, 304 + ) 305 + if !ok { 306 + // Keep polling on an unrecognised status, but warn 307 + // so a silent infinite loop is at least visible. 308 + logger.Warn("unmapped sourcehut status; continuing to poll", 309 + "upstream_status", job.Status, 310 + ) 311 + } else if status != last { 312 + last = status 313 + if err := p.publishStatus(ctx, ref.PipelineURI, ref.Workflow, 314 + status, ref.JobID, nil, nil); err != nil { 315 + logger.Error("publish sourcehut status", "err", err, "status", status) 316 + } 317 + } 318 + if ok && terminal { 319 + logger.Debug("watchJob: terminal status reached; exiting", "status", status) 320 + return 321 + } 322 + } 323 + 324 + select { 325 + case <-ctx.Done(): 326 + logger.Debug("watchJob: context cancelled") 327 + return 328 + case <-ticker.C: 329 + } 330 + } 331 + } 332 + 333 + // mapSourcehutStatus translates builds.sr.ht's job status into the 334 + // Tangled spindle StatusKind strings the appview consumes. The 335 + // upstream enum is uppercase over GraphQL; we lowercase first so a 336 + // surface change between cases doesn't fall to the not-mapped branch. 337 + func mapSourcehutStatus(state string) (status string, terminal bool, ok bool) { 338 + switch strings.ToLower(state) { 339 + case "pending", "queued": 340 + return "pending", false, true 341 + case "running": 342 + return "running", false, true 343 + case "success": 344 + return "success", true, true 345 + case "failed", "timeout": 346 + return "failed", true, true 347 + case "cancelled": 348 + return "cancelled", true, true 349 + default: 350 + return "", false, false 351 + } 352 + } 353 + 354 + // Logs satisfies Provider. We resolve the (knot, rkey, workflow) tuple 355 + // to a sourcehut job, fetch its tasks, and stream the per-task logs as 356 + // LogLine frames bracketed by per-task control frames. 357 + // 358 + // Snapshot read, not a tail — for an in-progress job each task's log 359 + // is fetched at its current length and the channel closes. The 360 + // appview's repeated /logs calls during a running build give us 361 + // "good enough" liveness. 362 + func (p *sourcehutProvider) Logs( 363 + ctx context.Context, 364 + knot string, 365 + pipelineRkey string, 366 + workflow string, 367 + ) (<-chan LogLine, error) { 368 + ref, err := p.st.LookupSourcehutJobByTuple(ctx, knot, pipelineRkey, workflow) 369 + if err != nil { 370 + return nil, fmt.Errorf("lookup sourcehut job: %w", err) 371 + } 372 + if ref == nil { 373 + return nil, ErrLogsNotFound 374 + } 375 + 376 + client, _ := p.clientFor(ref.Instance) 377 + 378 + job, err := client.GetJob(ctx, ref.JobID) 379 + if err != nil { 380 + if errors.Is(err, sourcehut.ErrNotFound) { 381 + return nil, ErrLogsNotFound 382 + } 383 + return nil, fmt.Errorf("get sourcehut job: %w", err) 384 + } 385 + 386 + out := make(chan LogLine, 32) 387 + go func() { 388 + defer close(out) 389 + stepID := 0 390 + // "setup" is the master log: setup output (sources clone, 391 + // package install) before any task runs. Always emit it as 392 + // step 0 even if empty so the renderer has a stable timeline. 393 + if !p.streamStep(ctx, out, client, ref, "setup", "", stepID) { 394 + return 395 + } 396 + stepID++ 397 + for _, task := range job.Tasks { 398 + name := task.Name 399 + if name == "" { 400 + name = fmt.Sprintf("task %d", stepID) 401 + } 402 + if !p.streamStep(ctx, out, client, ref, name, task.Name, stepID) { 403 + return 404 + } 405 + stepID++ 406 + } 407 + }() 408 + return out, nil 409 + } 410 + 411 + // streamStep emits one step's worth of frames into out: a start 412 + // control, one data frame per non-empty log line, and an end control. 413 + // stepName is the human-visible label; logTask is the name passed to 414 + // GetTaskLog (empty string fetches the master log). Returns false if 415 + // the context fired and the caller should bail. 416 + func (p *sourcehutProvider) streamStep( 417 + ctx context.Context, 418 + out chan<- LogLine, 419 + client *sourcehut.Client, 420 + ref *SourcehutJobRef, 421 + stepName, logTask string, 422 + stepID int, 423 + ) bool { 424 + if !sendLine(ctx, out, LogLine{ 425 + Kind: LogKindControl, Time: time.Now(), 426 + Content: stepName, StepId: stepID, StepStatus: StepStatusStart, 427 + }) { 428 + return false 429 + } 430 + body, err := client.GetTaskLog(ctx, ref.JobID, logTask) 431 + if err != nil && !errors.Is(err, sourcehut.ErrNotFound) { 432 + // Don't fail the whole stream on one task; emit the end frame 433 + // and move on so the renderer at least sees what other tasks 434 + // produced. ErrNotFound (no log yet) is treated as an empty body. 435 + p.log.Debug("fetch sourcehut task log", 436 + "err", err, "job_id", ref.JobID, "task", logTask, 437 + ) 438 + body = "" 439 + } 440 + if !emitLogBody(ctx, out, body, stepID) { 441 + return false 442 + } 443 + return sendLine(ctx, out, LogLine{ 444 + Kind: LogKindControl, Time: time.Now(), 445 + Content: stepName, StepId: stepID, StepStatus: StepStatusEnd, 446 + }) 447 + } 448 + 449 + // emitLogBody pushes one LogLine per non-empty line of body into out. 450 + // Returns false if the context fired so the caller can stop draining. 451 + func emitLogBody(ctx context.Context, out chan<- LogLine, body string, stepID int) bool { 452 + if body == "" { 453 + return true 454 + } 455 + for _, line := range strings.Split(strings.TrimRight(body, "\n"), "\n") { 456 + if line == "" { 457 + continue 458 + } 459 + if !sendLine(ctx, out, LogLine{ 460 + Kind: LogKindData, 461 + Time: time.Now(), 462 + Content: line + "\n", 463 + StepId: stepID, 464 + Stream: "stdout", 465 + }) { 466 + return false 467 + } 468 + } 469 + return true 470 + } 471 + 472 + // publishStatus assembles a tangled.PipelineStatus record and pushes 473 + // it through the broker. jobID is mixed into the rkey so multiple 474 + // status records for the same job don't collide on the events table's 475 + // (rkey) uniqueness, and so an operator grepping the log can find 476 + // every record pertaining to a given sourcehut job. 477 + func (p *sourcehutProvider) publishStatus( 478 + ctx context.Context, 479 + pipelineURI, workflow, status string, 480 + jobID int64, 481 + errMsg *string, 482 + exitCode *int64, 483 + ) error { 484 + rec := tangled.PipelineStatus{ 485 + LexiconTypeID: tangled.PipelineStatusNSID, 486 + Pipeline: pipelineURI, 487 + Workflow: workflow, 488 + Status: status, 489 + CreatedAt: time.Now().UTC().Format(time.RFC3339), 490 + Error: errMsg, 491 + ExitCode: exitCode, 492 + } 493 + body, err := json.Marshal(rec) 494 + if err != nil { 495 + return fmt.Errorf("marshal pipeline.status: %w", err) 496 + } 497 + rkey := fmt.Sprintf("sh-%d-%s-%d", jobID, status, time.Now().UnixNano()) 498 + if _, err := p.br.Publish(ctx, rkey, tangled.PipelineStatusNSID, body); err != nil { 499 + return fmt.Errorf("publish pipeline.status: %w", err) 500 + } 501 + return nil 502 + } 503 + 504 + // injectSourcehutEnvironment decodes the user-supplied manifest, 505 + // merges env into its top-level `environment` map, and re-emits it 506 + // as YAML. We round-trip via map[string]any rather than a typed 507 + // schema so unknown fields aren't dropped; comments and key ordering 508 + // are not preserved. 509 + // 510 + // Existing user-set entries win over our injected vars — an explicit 511 + // `TACK_FOO: ...` is a deliberate override. 512 + // 513 + // `environment:` must be a map. A sequence form is rejected loudly 514 + // rather than silently replaced with our map. 515 + func injectSourcehutEnvironment(manifest string, env map[string]string) (string, error) { 516 + var doc map[string]any 517 + if err := yaml.Unmarshal([]byte(manifest), &doc); err != nil { 518 + return "", fmt.Errorf("parse manifest: %w", err) 519 + } 520 + if doc == nil { 521 + doc = map[string]any{} 522 + } 523 + var current map[any]any 524 + switch existing := doc["environment"].(type) { 525 + case nil: 526 + current = map[any]any{} 527 + case map[any]any: 528 + current = existing 529 + default: 530 + return "", fmt.Errorf( 531 + "manifest `environment` must be a map, got %T", existing, 532 + ) 533 + } 534 + for k, v := range env { 535 + if _, exists := current[k]; exists { 536 + continue 537 + } 538 + current[k] = v 539 + } 540 + doc["environment"] = current 541 + 542 + out, err := yaml.Marshal(doc) 543 + if err != nil { 544 + return "", fmt.Errorf("emit manifest: %w", err) 545 + } 546 + return string(out), nil 547 + } 548 + 549 + // clientFor resolves a workflow's optional `instance` override to the 550 + // client that should issue API calls for that workflow, plus the 551 + // concrete instance URL the row was/should-be persisted as. The 552 + // no-override (or matches-default) case reuses the long-lived default 553 + // client. Custom instances are memoised so repeated calls (per-poll, 554 + // per-Logs) reuse one Client — and therefore one connection pool — 555 + // per distinct upstream. 556 + func (p *sourcehutProvider) clientFor(instance string) (*sourcehut.Client, string) { 557 + if instance == "" || instance == p.defaultInstance { 558 + return p.defaultClient, p.defaultInstance 559 + } 560 + p.mu.Lock() 561 + defer p.mu.Unlock() 562 + if c, ok := p.instanceClients[instance]; ok { 563 + return c, instance 564 + } 565 + c := sourcehut.NewClient(instance, p.token) 566 + p.instanceClients[instance] = c 567 + return c, instance 568 + } 569 + 570 + // shortCommit returns the first 12 hex chars of a commit SHA, or "?" 571 + // when commit is empty. 572 + func shortCommit(commit string) string { 573 + if commit == "" { 574 + return "?" 575 + } 576 + if len(commit) > 12 { 577 + return commit[:12] 578 + } 579 + return commit 580 + }
+487
provider_sourcehut_test.go
··· 1 + package main 2 + 3 + // Provider-level integration tests for the sourcehut implementation: 4 + // Spawn → SubmitJob + persist + initial pending publish, watch loop → 5 + // status transitions, and Logs → snapshot of master + per-task logs. 6 + // builds.sr.ht is stubbed with httptest so the tests don't need 7 + // network access. 8 + 9 + import ( 10 + "context" 11 + "encoding/json" 12 + "fmt" 13 + "io" 14 + "log/slog" 15 + "net/http" 16 + "net/http/httptest" 17 + "strings" 18 + "sync/atomic" 19 + "testing" 20 + "time" 21 + 22 + "tangled.org/core/api/tangled" 23 + ) 24 + 25 + // newSourcehutTestProvider wires a sourcehutProvider against an 26 + // httptest server impersonating builds.sr.ht. 27 + func newSourcehutTestProvider( 28 + t *testing.T, 29 + handler http.HandlerFunc, 30 + ) (*sourcehutProvider, *store, *broker, *httptest.Server) { 31 + t.Helper() 32 + srv := httptest.NewServer(handler) 33 + t.Cleanup(srv.Close) 34 + 35 + st := newTestStore(t) 36 + br := newBroker(st) 37 + logger := slog.Default() 38 + p := newSourcehutProvider(br, st, "tok", srv.URL, logger) 39 + return p, st, br, srv 40 + } 41 + 42 + // gqlBody is a minimal GraphQL request body for routing test stubs 43 + // without pulling in a real GraphQL parser. 44 + type gqlBody struct { 45 + Query string `json:"query"` 46 + Variables map[string]any `json:"variables"` 47 + } 48 + 49 + // gqlOp returns the op keyword ("query" / "mutation" / "subscription") 50 + // at the head of a GraphQL document. Used by test stubs to route a 51 + // single /query endpoint to the right canned response without 52 + // substring-matching on field names — which would silently misroute a 53 + // query that happened to mention "submit" or vice-versa. 54 + func gqlOp(query string) string { 55 + q := strings.TrimSpace(query) 56 + for _, kw := range []string{"mutation", "query", "subscription"} { 57 + if strings.HasPrefix(q, kw) { 58 + return kw 59 + } 60 + } 61 + return "" 62 + } 63 + 64 + // writeGQL is the test-side helper for shaping a successful GraphQL 65 + // response. It wraps `data` under the canonical {"data": …} envelope 66 + // the client decodes against. 67 + func writeGQL(w http.ResponseWriter, data map[string]any) { 68 + w.Header().Set("Content-Type", "application/json") 69 + _ = json.NewEncoder(w).Encode(map[string]any{"data": data}) 70 + } 71 + 72 + // TestSourcehutSpawn covers the full submit path: trigger → API call 73 + // → DB row → "pending" status on the broker. 74 + func TestSourcehutSpawn(t *testing.T) { 75 + manifestCh := make(chan string, 1) 76 + notesCh := make(chan string, 1) 77 + tagsCh := make(chan []string, 1) 78 + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 79 + if r.Method != http.MethodPost || r.URL.Path != "/query" { 80 + http.NotFound(w, r) 81 + return 82 + } 83 + var body gqlBody 84 + _ = json.NewDecoder(r.Body).Decode(&body) 85 + switch gqlOp(body.Query) { 86 + case "mutation": 87 + if m, _ := body.Variables["manifest"].(string); m != "" { 88 + select { 89 + case manifestCh <- m: 90 + default: 91 + } 92 + } 93 + if n, _ := body.Variables["note"].(string); n != "" { 94 + select { 95 + case notesCh <- n: 96 + default: 97 + } 98 + } 99 + if raw, ok := body.Variables["tags"].([]any); ok { 100 + ts := make([]string, 0, len(raw)) 101 + for _, v := range raw { 102 + if s, ok := v.(string); ok { 103 + ts = append(ts, s) 104 + } 105 + } 106 + select { 107 + case tagsCh <- ts: 108 + default: 109 + } 110 + } 111 + writeGQL(w, map[string]any{ 112 + "submit": map[string]any{ 113 + "id": 42, 114 + "status": "PENDING", 115 + "owner": map[string]any{"canonicalName": "~tester"}, 116 + }, 117 + }) 118 + case "query": 119 + // Watch loop; keep the job pending so the test exits 120 + // deterministically once the row and initial publish land. 121 + writeGQL(w, map[string]any{ 122 + "job": map[string]any{ 123 + "id": 42, 124 + "status": "PENDING", 125 + "owner": map[string]any{"canonicalName": "~tester"}, 126 + "tasks": []any{}, 127 + }, 128 + }) 129 + default: 130 + http.Error(w, "unknown op", http.StatusBadRequest) 131 + } 132 + }) 133 + p, st, _, _ := newSourcehutTestProvider(t, handler) 134 + 135 + trigger := &tangled.Pipeline_TriggerMetadata{ 136 + Push: &tangled.Pipeline_PushTriggerData{ 137 + NewSha: "abcdef0123", 138 + Ref: "refs/heads/main", 139 + }, 140 + } 141 + raw := strings.Join([]string{ 142 + "tack:", 143 + " sourcehut:", 144 + " manifest: |", 145 + " image: alpine/edge", 146 + " tasks:", 147 + " - hello: |", 148 + " echo hi", 149 + " tags: [tack, ci]", 150 + " note: integration", 151 + }, "\n") + "\n" 152 + 153 + ctx, cancel := context.WithCancel(context.Background()) 154 + defer cancel() 155 + p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", trigger, 156 + []*tangled.Pipeline_Workflow{{Name: "ci.yml", Raw: raw}}, 157 + ) 158 + 159 + select { 160 + case manifest := <-manifestCh: 161 + // Manifest should round-trip through injectSourcehutEnvironment; 162 + // we expect TACK_* env vars to land in a top-level environment 163 + // block that didn't originally exist. 164 + if !strings.Contains(manifest, "TACK_KNOT") { 165 + t.Fatalf("manifest missing TACK_KNOT: %q", manifest) 166 + } 167 + if !strings.Contains(manifest, "abcdef0123") { 168 + t.Fatalf("manifest missing TACK_COMMIT value: %q", manifest) 169 + } 170 + // User-supplied tasks must be preserved through the round trip. 171 + if !strings.Contains(manifest, "echo hi") { 172 + t.Fatalf("user manifest body lost: %q", manifest) 173 + } 174 + select { 175 + case note := <-notesCh: 176 + if note != "integration" { 177 + t.Fatalf("note=%q; want integration", note) 178 + } 179 + case <-time.After(time.Second): 180 + t.Fatal("note variable not sent") 181 + } 182 + select { 183 + case ts := <-tagsCh: 184 + if len(ts) != 2 || ts[0] != "tack" || ts[1] != "ci" { 185 + t.Fatalf("tags=%v", ts) 186 + } 187 + case <-time.After(time.Second): 188 + t.Fatal("tags variable not sent") 189 + } 190 + case <-time.After(2 * time.Second): 191 + t.Fatal("SubmitJob not called") 192 + } 193 + 194 + // Wait for the row to land — that's the load-bearing artifact. 195 + deadline := time.Now().Add(2 * time.Second) 196 + var ref *SourcehutJobRef 197 + for time.Now().Before(deadline) { 198 + var err error 199 + ref, err = st.LookupSourcehutJobByTuple( 200 + context.Background(), 201 + "knot.example.com", "rkey-1", "ci.yml", 202 + ) 203 + if err != nil { 204 + t.Fatalf("lookup: %v", err) 205 + } 206 + if ref != nil { 207 + break 208 + } 209 + time.Sleep(20 * time.Millisecond) 210 + } 211 + if ref == nil { 212 + t.Fatal("sourcehut job row not persisted within deadline") 213 + } 214 + if ref.JobID != 42 || ref.Owner != "~tester" { 215 + t.Fatalf("ref mismatch: %+v", ref) 216 + } 217 + 218 + rows, err := st.EventsAfter(context.Background(), 0) 219 + if err != nil { 220 + t.Fatalf("EventsAfter: %v", err) 221 + } 222 + if len(rows) == 0 { 223 + t.Fatal("no pending status published") 224 + } 225 + var rec tangled.PipelineStatus 226 + if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil { 227 + t.Fatalf("decode status: %v", err) 228 + } 229 + if rec.Status != "pending" || rec.Workflow != "ci.yml" { 230 + t.Fatalf("unexpected status: %+v", rec) 231 + } 232 + if !strings.Contains(rec.Pipeline, "knot.example.com") || 233 + !strings.Contains(rec.Pipeline, "rkey-1") { 234 + t.Fatalf("pipeline ATURI wrong: %s", rec.Pipeline) 235 + } 236 + } 237 + 238 + // TestSourcehutSpawnInvalidConfig confirms a workflow without a manifest 239 + // is rejected loudly without firing any HTTP call. 240 + func TestSourcehutSpawnInvalidConfig(t *testing.T) { 241 + var called atomic.Bool 242 + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 243 + called.Store(true) 244 + }) 245 + p, st, _, _ := newSourcehutTestProvider(t, handler) 246 + 247 + p.Spawn(context.Background(), "knot.example.com", "rkey-1", "did:plc:actor", 248 + &tangled.Pipeline_TriggerMetadata{Push: &tangled.Pipeline_PushTriggerData{ 249 + NewSha: "abc", Ref: "refs/heads/main", 250 + }}, 251 + []*tangled.Pipeline_Workflow{ 252 + // `tack.sourcehut` block but no manifest. 253 + {Name: "ci.yml", Raw: "tack:\n sourcehut:\n note: hi\n"}, 254 + }, 255 + ) 256 + 257 + time.Sleep(50 * time.Millisecond) 258 + if called.Load() { 259 + t.Fatal("SubmitJob called despite missing manifest") 260 + } 261 + rows, _ := st.EventsAfter(context.Background(), 0) 262 + if len(rows) != 0 { 263 + t.Fatalf("got %d events, want 0", len(rows)) 264 + } 265 + } 266 + 267 + // TestSourcehutWatchTransitions exercises the watch loop: a job that 268 + // flips through PENDING → RUNNING → SUCCESS should produce three 269 + // distinct status records in order, with no duplicates. 270 + func TestSourcehutWatchTransitions(t *testing.T) { 271 + states := []string{"PENDING", "RUNNING", "RUNNING", "SUCCESS"} 272 + var idx atomic.Int32 273 + idx.Store(-1) 274 + 275 + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 276 + if r.Method != http.MethodPost || r.URL.Path != "/query" { 277 + http.NotFound(w, r) 278 + return 279 + } 280 + var body gqlBody 281 + _ = json.NewDecoder(r.Body).Decode(&body) 282 + if gqlOp(body.Query) != "query" { 283 + http.Error(w, "unknown op", http.StatusBadRequest) 284 + return 285 + } 286 + n := int(idx.Add(1)) 287 + if n >= len(states) { 288 + n = len(states) - 1 289 + } 290 + writeGQL(w, map[string]any{ 291 + "job": map[string]any{ 292 + "id": 7, 293 + "status": states[n], 294 + "owner": map[string]any{"canonicalName": "~tester"}, 295 + "tasks": []any{}, 296 + }, 297 + }) 298 + }) 299 + p, st, _, _ := newSourcehutTestProvider(t, handler) 300 + 301 + pipelineURI := pipelineATURI("knot.example.com", "rkey-2") 302 + ref := SourcehutJobRef{ 303 + Knot: "knot.example.com", PipelineRkey: "rkey-2", Workflow: "ci.yml", 304 + JobID: 7, Owner: "~tester", 305 + Instance: p.defaultInstance, PipelineURI: pipelineURI, 306 + } 307 + if err := st.InsertSourcehutJob(context.Background(), ref); err != nil { 308 + t.Fatalf("insert: %v", err) 309 + } 310 + 311 + // Lower the poll interval so the watch loop completes in 312 + // milliseconds rather than waiting for the production 5s tick. 313 + p.pollInterval = 20 * time.Millisecond 314 + 315 + ctx, cancel := context.WithCancel(context.Background()) 316 + defer cancel() 317 + done := make(chan struct{}) 318 + go func() { 319 + defer close(done) 320 + p.watchJob(ctx, ref, p.defaultClient) 321 + }() 322 + 323 + select { 324 + case <-done: 325 + case <-time.After(2 * time.Second): 326 + t.Fatal("watch loop did not exit on terminal status") 327 + } 328 + 329 + rows, err := st.EventsAfter(context.Background(), 0) 330 + if err != nil { 331 + t.Fatalf("EventsAfter: %v", err) 332 + } 333 + gotStatuses := []string{} 334 + for _, r := range rows { 335 + var rec tangled.PipelineStatus 336 + if err := json.Unmarshal(r.EventJSON, &rec); err != nil { 337 + t.Fatalf("decode: %v", err) 338 + } 339 + gotStatuses = append(gotStatuses, rec.Status) 340 + } 341 + want := []string{"pending", "running", "success"} 342 + if fmt.Sprint(gotStatuses) != fmt.Sprint(want) { 343 + t.Fatalf("statuses = %v; want %v", gotStatuses, want) 344 + } 345 + } 346 + 347 + // TestSourcehutLogs covers the snapshot Logs path: master log + each 348 + // task in the job's tasks list, bracketed by control frames. 349 + func TestSourcehutLogs(t *testing.T) { 350 + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 351 + switch { 352 + case r.Method == http.MethodPost && r.URL.Path == "/query": 353 + writeGQL(w, map[string]any{ 354 + "job": map[string]any{ 355 + "id": 99, 356 + "status": "SUCCESS", 357 + "owner": map[string]any{"canonicalName": "~tester"}, 358 + "tasks": []any{ 359 + map[string]any{"name": "build", "status": "SUCCESS"}, 360 + map[string]any{"name": "test", "status": "SUCCESS"}, 361 + }, 362 + }, 363 + }) 364 + case r.Method == http.MethodGet && r.URL.Path == "/query/log/99/log": 365 + _, _ = io.WriteString(w, "setup-output\n") 366 + case r.Method == http.MethodGet && r.URL.Path == "/query/log/99/build/log": 367 + _, _ = io.WriteString(w, "build-line-1\nbuild-line-2\n") 368 + case r.Method == http.MethodGet && r.URL.Path == "/query/log/99/test/log": 369 + _, _ = io.WriteString(w, "test-line\n") 370 + default: 371 + http.NotFound(w, r) 372 + } 373 + }) 374 + p, st, _, _ := newSourcehutTestProvider(t, handler) 375 + 376 + if err := st.InsertSourcehutJob(context.Background(), SourcehutJobRef{ 377 + Knot: "knot.example.com", PipelineRkey: "rkey-3", Workflow: "ci.yml", 378 + JobID: 99, Owner: "~tester", 379 + Instance: p.defaultInstance, 380 + PipelineURI: pipelineATURI("knot.example.com", "rkey-3"), 381 + }); err != nil { 382 + t.Fatalf("insert: %v", err) 383 + } 384 + 385 + ch, err := p.Logs(context.Background(), 386 + "knot.example.com", "rkey-3", "ci.yml") 387 + if err != nil { 388 + t.Fatalf("Logs: %v", err) 389 + } 390 + var lines []LogLine 391 + for line := range ch { 392 + lines = append(lines, line) 393 + } 394 + 395 + wantContents := []string{ 396 + "setup", "setup-output\n", "setup", 397 + "build", "build-line-1\n", "build-line-2\n", "build", 398 + "test", "test-line\n", "test", 399 + } 400 + got := make([]string, 0, len(lines)) 401 + for _, l := range lines { 402 + got = append(got, l.Content) 403 + } 404 + if fmt.Sprint(got) != fmt.Sprint(wantContents) { 405 + t.Fatalf("log contents:\n got: %v\nwant: %v", got, wantContents) 406 + } 407 + if lines[0].Kind != LogKindControl || lines[0].StepStatus != StepStatusStart { 408 + t.Fatalf("first frame not start control: %+v", lines[0]) 409 + } 410 + if lines[len(lines)-1].Kind != LogKindControl || 411 + lines[len(lines)-1].StepStatus != StepStatusEnd { 412 + t.Fatalf("last frame not end control: %+v", lines[len(lines)-1]) 413 + } 414 + } 415 + 416 + // TestSourcehutLogsNotFound asserts the Provider contract: looking up 417 + // logs for a tuple no job was ever spawned for returns ErrLogsNotFound. 418 + func TestSourcehutLogsNotFound(t *testing.T) { 419 + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 420 + t.Fatalf("unexpected request to upstream: %s %s", r.Method, r.URL.Path) 421 + }) 422 + p, _, _, _ := newSourcehutTestProvider(t, handler) 423 + 424 + _, err := p.Logs(context.Background(), 425 + "knot.example.com", "rkey-missing", "ci.yml") 426 + if err != ErrLogsNotFound { 427 + t.Fatalf("err = %v; want ErrLogsNotFound", err) 428 + } 429 + } 430 + 431 + // TestSourcehutMapStatus pins the mapping table independently of the 432 + // rest of the provider, since it's the contract between us and the 433 + // appview's status enum. 434 + func TestSourcehutMapStatus(t *testing.T) { 435 + cases := []struct { 436 + in string 437 + want string 438 + terminal bool 439 + ok bool 440 + }{ 441 + // Sourcehut emits uppercase enum values over GraphQL; the 442 + // lowercased variants are kept here as a regression guard 443 + // against a future surface flip. 444 + {"PENDING", "pending", false, true}, 445 + {"QUEUED", "pending", false, true}, 446 + {"RUNNING", "running", false, true}, 447 + {"SUCCESS", "success", true, true}, 448 + {"FAILED", "failed", true, true}, 449 + {"TIMEOUT", "failed", true, true}, 450 + {"CANCELLED", "cancelled", true, true}, 451 + {"pending", "pending", false, true}, 452 + {"weird", "", false, false}, 453 + } 454 + for _, c := range cases { 455 + got, terminal, ok := mapSourcehutStatus(c.in) 456 + if got != c.want || terminal != c.terminal || ok != c.ok { 457 + t.Errorf("mapSourcehutStatus(%q) = %q,%v,%v; want %q,%v,%v", 458 + c.in, got, terminal, ok, c.want, c.terminal, c.ok) 459 + } 460 + } 461 + } 462 + 463 + // TestSourcehutInjectEnvironmentPreservesUserOverride confirms a 464 + // user-set env entry is not clobbered by our injected default — 465 + // explicit user intent wins. 466 + func TestSourcehutInjectEnvironmentPreservesUserOverride(t *testing.T) { 467 + manifest := strings.Join([]string{ 468 + "image: alpine/edge", 469 + "environment:", 470 + " TACK_KNOT: user-supplied", 471 + "tasks:", 472 + " - hello: echo hi", 473 + }, "\n") + "\n" 474 + out, err := injectSourcehutEnvironment(manifest, map[string]string{ 475 + "TACK_KNOT": "from-tack", 476 + "TACK_COMMIT": "abc", 477 + }) 478 + if err != nil { 479 + t.Fatalf("inject: %v", err) 480 + } 481 + if !strings.Contains(out, "TACK_KNOT: user-supplied") { 482 + t.Fatalf("user override clobbered: %q", out) 483 + } 484 + if !strings.Contains(out, "TACK_COMMIT: abc") { 485 + t.Fatalf("missing injected var: %q", out) 486 + } 487 + }
+69
store.go
··· 541 541 PipelineURI string 542 542 } 543 543 544 + // SourcehutJobRef is the persisted link from a Tangled workflow tuple 545 + // to the builds.sr.ht job tack submitted for it. Owner is the 546 + // canonical name (with leading "~") returned by the upstream API and 547 + // is the path component log URLs are rooted at — without it we can't 548 + // fetch logs even though the job ID alone is unique. Instance is the 549 + // base URL the job lives on; persisted per-row so post-creation API 550 + // calls always target the same server the workflow originally ran on. 551 + type SourcehutJobRef struct { 552 + Knot string 553 + PipelineRkey string 554 + Workflow string 555 + JobID int64 556 + Owner string 557 + Instance string 558 + PipelineURI string 559 + } 560 + 544 561 // InsertBuildkiteBuild records that a Buildkite build was created on 545 562 // behalf of the given (knot, pipelineRkey, workflow) tuple. Uses 546 563 // INSERT OR REPLACE so that an unlikely build-uuid collision (or a ··· 672 689 return nil 673 690 } 674 691 692 + // InsertSourcehutJob records the latest builds.sr.ht job submitted for 693 + // a Tangled workflow tuple. Reusing the tuple as the primary key 694 + // intentionally makes /logs resolve to the newest job for that workflow 695 + // identity — same behaviour as InsertTektonRun. 696 + func (s *store) InsertSourcehutJob(ctx context.Context, ref SourcehutJobRef) error { 697 + now := time.Now().UTC() 698 + _, err := s.db.ExecContext(ctx, 699 + `INSERT INTO sourcehut_jobs ( 700 + knot, pipeline_rkey, workflow, 701 + job_id, owner, instance, 702 + pipeline_uri, created_at, created_unix_ns 703 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) 704 + ON CONFLICT(knot, pipeline_rkey, workflow) DO UPDATE SET 705 + job_id = excluded.job_id, 706 + owner = excluded.owner, 707 + instance = excluded.instance, 708 + pipeline_uri = excluded.pipeline_uri, 709 + created_at = excluded.created_at, 710 + created_unix_ns = excluded.created_unix_ns`, 711 + ref.Knot, ref.PipelineRkey, ref.Workflow, 712 + ref.JobID, ref.Owner, ref.Instance, 713 + ref.PipelineURI, now.Format(time.RFC3339Nano), now.UnixNano(), 714 + ) 715 + if err != nil { 716 + return fmt.Errorf("insert sourcehut_job: %w", err) 717 + } 718 + return nil 719 + } 720 + 721 + // LookupSourcehutJobByTuple resolves the appview's path-based identity 722 + // back to the concrete builds.sr.ht job tack submitted for it. 723 + func (s *store) LookupSourcehutJobByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*SourcehutJobRef, error) { 724 + var ref SourcehutJobRef 725 + err := s.db.QueryRowContext(ctx, 726 + `SELECT knot, pipeline_rkey, workflow, 727 + job_id, owner, instance, pipeline_uri 728 + FROM sourcehut_jobs 729 + WHERE knot = ? AND pipeline_rkey = ? AND workflow = ?`, 730 + knot, pipelineRkey, workflow, 731 + ).Scan( 732 + &ref.Knot, &ref.PipelineRkey, &ref.Workflow, 733 + &ref.JobID, &ref.Owner, &ref.Instance, &ref.PipelineURI, 734 + ) 735 + if errors.Is(err, sql.ErrNoRows) { 736 + return nil, nil 737 + } 738 + if err != nil { 739 + return nil, fmt.Errorf("lookup sourcehut_job by tuple: %w", err) 740 + } 741 + return &ref, nil 742 + } 743 + 675 744 // LookupTektonRunByTuple resolves the appview's path-based identity to 676 745 // the concrete PipelineRun tack created in Kubernetes. 677 746 func (s *store) LookupTektonRunByTuple(ctx context.Context, knot, pipelineRkey, workflow string) (*TektonRunRef, error) {
+23
store_migrate.go
··· 142 142 ); 143 143 CREATE INDEX IF NOT EXISTS tekton_runs_uid 144 144 ON tekton_runs (pipeline_run_uid); 145 + 146 + -- Mapping from a Tangled workflow tuple to the latest builds.sr.ht 147 + -- job tack submitted for it. Same shape as tekton_runs: like Tekton, 148 + -- sourcehut is observed by polling rather than webhooks, so the read 149 + -- path is purely tuple → job_id resolution at /logs and watch time. 150 + -- instance is the builds.sr.ht-compatible base URL the job lives on 151 + -- (job IDs are scoped to one instance), preserved per-row so a later 152 + -- change to the provider's default instance can't silently retarget 153 + -- historical lookups at the wrong server. 154 + CREATE TABLE IF NOT EXISTS sourcehut_jobs ( 155 + knot TEXT NOT NULL, 156 + pipeline_rkey TEXT NOT NULL, 157 + workflow TEXT NOT NULL, 158 + job_id INTEGER NOT NULL, 159 + owner TEXT NOT NULL, 160 + instance TEXT NOT NULL, 161 + pipeline_uri TEXT NOT NULL, 162 + created_at TEXT NOT NULL, 163 + created_unix_ns INTEGER NOT NULL, 164 + PRIMARY KEY (knot, pipeline_rkey, workflow) 165 + ); 166 + CREATE INDEX IF NOT EXISTS sourcehut_jobs_id 167 + ON sourcehut_jobs (job_id); 145 168 ` 146 169 147 170 // migrate applies the schema. Safe to call repeatedly.
+1
README.md
··· 110 110 111 111 * [Buildkite](docs/buildkite.md) 112 112 * [Tekton](docs/tekton.md) 113 + * [sourcehut](docs/sourcehut.md)
+88
docs/sourcehut.md
··· 1 + # sourcehut 2 + 3 + The sourcehut provider submits jobs to a 4 + [builds.sr.ht](https://man.sr.ht/builds.sr.ht/) instance. Each Tangled 5 + workflow becomes one job: tack submits the job via GraphQL, polls it 6 + until terminal, and publishes `sh.tangled.pipeline.status` records on 7 + each transition. 8 + 9 + ## Configure tack 10 + 11 + | Env var | Description | 12 + | ----------------------- | ------------------------------------------------------ | 13 + | `TACK_SOURCEHUT_TOKEN` | Personal access token for builds.sr.ht (enables provider) | 14 + | `TACK_SOURCEHUT_INSTANCE` | Base URL override (default `https://builds.sr.ht`) | 15 + 16 + Generate a token at `https://meta.sr.ht/oauth2/personal-token` with 17 + `builds.sr.ht/JOBS:RW` access, set the `TACK_SOURCEHUT_TOKEN=$token` env var, 18 + then start tack. 19 + 20 + ## Workflow YAML 21 + 22 + Your sourcehut build manifest is defined via tangled workflow inline - see `tack.sourcehut.manifest`. 23 + It is submitted to builds.sr.ht verbatim, with a few `TACK_*` environment 24 + variables merged into its top-level `environment:` map. 25 + 26 + ```yaml 27 + when: 28 + - event: ["push"] 29 + branch: ["main"] 30 + 31 + engine: tack 32 + 33 + tack: 34 + sourcehut: 35 + manifest: | 36 + image: alpine/edge 37 + sources: 38 + - https://tangled.org/j3s.sh/testy 39 + tasks: 40 + - test: | 41 + ls -l testy 42 + ``` 43 + 44 + Optional fields: 45 + 46 + ```yaml 47 + tack: 48 + sourcehut: 49 + instance: https://selfhosted.sr.ht.example.org 50 + tags: ["tack", "ci"] 51 + note: "manual note for the job list" 52 + secrets: true 53 + ``` 54 + 55 + * `instance`: full URL (with scheme) of an alternate builds.sr.ht 56 + deployment. Defaults to the provider's configured instance. 57 + * `tags`: passed through to the submit API. Defaults to `["tack"]` so 58 + jobs are filterable in the builds.sr.ht UI. 59 + * `note`: passed through to the submit API. Defaults to 60 + `tangled: <workflow> @ <short-commit>`. 61 + * `secrets`: opt in to sourcehut secret injection. Default `false`. 62 + 63 + ## Injected environment 64 + 65 + Tack merges the following into the manifest's `environment:` map 66 + before submitting. These can be overridden via user definition. 67 + 68 + | Variable | Value | 69 + | -------------------- | -------------------------------------- | 70 + | `TACK_KNOT` | Knot host the trigger came from | 71 + | `TACK_PIPELINE_RKEY` | Pipeline record rkey | 72 + | `TACK_WORKFLOW` | Workflow name | 73 + | `TACK_WORKFLOW_RAW` | Raw workflow YAML body | 74 + | `TACK_ACTOR` | Triggering DID | 75 + | `TACK_COMMIT` | Commit SHA | 76 + | `TACK_BRANCH` | Branch ref | 77 + 78 + ## Status mapping 79 + 80 + | builds.sr.ht status | tack status | 81 + | --------------------- | ----------- | 82 + | `pending`, `queued` | `pending` | 83 + | `running` | `running` | 84 + | `success` | `success` | 85 + | `failed`, `timeout` | `failed` | 86 + | `cancelled` | `cancelled` | 87 + 88 + Unknown upstream statuses are logged.

History

2 rounds 0 comments
sign up or login to add to the discussion
2 commits
expand
provider/sourcehut: add builds.sr.ht provider
sourcehut: add docs
merge conflicts detected
expand
  • main.go:52
  • store.go:541
  • store_migrate.go:142
expand 0 comments
j3s.sh submitted #0
1 commit
expand
provider/sourcehut: add builds.sr.ht provider
expand 0 comments