Stitch any CI into Tangled
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "log/slog"
8 "testing"
9 "time"
10
11 "tangled.org/core/api/tangled"
12
13 "go.mitchellh.com/tack/internal/k8s"
14)
15
16func newTektonTestProvider(t *testing.T) (*tektonProvider, *store, *broker, *k8s.FakeClient) {
17 t.Helper()
18 st := newTestStore(t)
19 br := newBroker(st)
20 client := k8s.NewFakeClient()
21 p := newTektonProvider(br, st, client, "ci", slog.Default())
22 return p, st, br, client
23}
24
25func TestTektonWorkflowConfig(t *testing.T) {
26 raw := "tack:\n tekton:\n pipeline: repo-ci\n service_account: runner\n params:\n image: example/app\n"
27 cfg, err := parseTektonWorkflowConfig(raw)
28 if err != nil {
29 t.Fatalf("parse: %v", err)
30 }
31 if cfg.Pipeline != "repo-ci" || cfg.ServiceAccount != "runner" {
32 t.Fatalf("cfg mismatch: %+v", cfg)
33 }
34 if got := cfg.Params["image"]; got != "example/app" {
35 t.Fatalf("params[image] = %q", got)
36 }
37
38 if _, err := parseTektonWorkflowConfig("tack:\n tekton: {}\n"); err == nil {
39 t.Fatal("missing pipeline should fail")
40 }
41}
42
43func TestTektonBuildPipelineRun(t *testing.T) {
44 cfg := &tektonWorkflowConfig{
45 Pipeline: "repo-ci",
46 ServiceAccount: "runner",
47 Params: map[string]string{
48 "image": "example/app",
49 },
50 }
51 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef", "main")
52 if len(name) > 63 || name == "" {
53 t.Fatalf("bad generated name: %q", name)
54 }
55
56 obj := buildTektonPipelineRun("ci", name, cfg,
57 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main",
58 &tangled.Pipeline_Workflow{Name: "ci.yml"},
59 )
60 if obj.GetAPIVersion() != tektonAPIVersion || obj.GetKind() != tektonRunKind {
61 t.Fatalf("type meta mismatch: %s %s", obj.GetAPIVersion(), obj.GetKind())
62 }
63 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name")
64 if !ok || pipeline != "repo-ci" {
65 t.Fatalf("pipelineRef.name = %q", pipeline)
66 }
67 sa, ok := obj.NestedString("spec", "serviceAccountName")
68 if !ok || sa != "runner" {
69 t.Fatalf("serviceAccountName = %q", sa)
70 }
71 params, ok := obj.NestedSlice("spec", "params")
72 if !ok || len(params) != 1 {
73 t.Fatalf("params = %+v", params)
74 }
75 if obj.GetAnnotations()[tektonAnnotationActor] != "did:plc:actor" ||
76 obj.GetAnnotations()[tektonAnnotationCommit] != "abcdef" {
77 t.Fatalf("annotations missing identity: %+v", obj.GetAnnotations())
78 }
79}
80
81func TestTektonStatusMapping(t *testing.T) {
82 tests := []struct {
83 name string
84 cond string
85 reason string
86 status string
87 terminal bool
88 ok bool
89 }{
90 {name: "unknown", cond: "Unknown", status: "running", ok: true},
91 {name: "success", cond: "True", status: "success", terminal: true, ok: true},
92 {name: "failed", cond: "False", reason: "Failed", status: "failed", terminal: true, ok: true},
93 {name: "cancelled", cond: "False", reason: "PipelineRunCancelled", status: "cancelled", terminal: true, ok: true},
94 {name: "stopped", cond: "False", reason: "PipelineRunStopped", status: "cancelled", terminal: true, ok: true},
95 }
96 for _, tt := range tests {
97 t.Run(tt.name, func(t *testing.T) {
98 obj := tektonStatusObject(tt.cond, tt.reason)
99 status, terminal, ok := mapTektonPipelineRunStatus(obj)
100 if status != tt.status || terminal != tt.terminal || ok != tt.ok {
101 t.Fatalf("got %q/%v/%v; want %q/%v/%v",
102 status, terminal, ok, tt.status, tt.terminal, tt.ok)
103 }
104 })
105 }
106}
107
108func TestTektonSpawnCreatesPipelineRun(t *testing.T) {
109 p, st, _, client := newTektonTestProvider(t)
110 ctx, cancel := context.WithCancel(context.Background())
111 defer cancel()
112
113 trigger := &tangled.Pipeline_TriggerMetadata{
114 Push: &tangled.Pipeline_PushTriggerData{
115 NewSha: "abcdef0123",
116 Ref: "refs/heads/main",
117 },
118 }
119 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", trigger,
120 []*tangled.Pipeline_Workflow{{Name: "ci.yml",
121 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}},
122 )
123
124 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml")
125 if ref.Namespace != "ci" || ref.PipelineName != "repo-ci" {
126 t.Fatalf("ref mismatch: %+v", ref)
127 }
128
129 obj, err := client.GetObject(context.Background(), pipelineRunsGVR, "ci", ref.PipelineRunName)
130 if err != nil {
131 t.Fatalf("get PipelineRun: %v", err)
132 }
133 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name")
134 if !ok || pipeline != "repo-ci" {
135 t.Fatalf("pipelineRef.name = %q", pipeline)
136 }
137
138 rows, err := st.EventsAfter(context.Background(), 0)
139 if err != nil {
140 t.Fatalf("EventsAfter: %v", err)
141 }
142 if len(rows) != 1 {
143 t.Fatalf("got %d events, want 1", len(rows))
144 }
145 var rec tangled.PipelineStatus
146 if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil {
147 t.Fatalf("decode status: %v", err)
148 }
149 if rec.Status != "pending" || rec.Workflow != "ci.yml" {
150 t.Fatalf("bad pending status: %+v", rec)
151 }
152}
153
154func TestTektonSpawnAlreadyExists(t *testing.T) {
155 p, st, _, client := newTektonTestProvider(t)
156 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef0123", "main")
157 existing := buildTektonPipelineRun("ci", name,
158 &tektonWorkflowConfig{Pipeline: "repo-ci"},
159 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef0123", "main",
160 &tangled.Pipeline_Workflow{Name: "ci.yml"},
161 )
162 existing.SetUID("uid-1")
163 client.SeedObject(pipelineRunsGVR, "ci", existing)
164
165 ctx, cancel := context.WithCancel(context.Background())
166 defer cancel()
167 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor",
168 &tangled.Pipeline_TriggerMetadata{Push: &tangled.Pipeline_PushTriggerData{
169 NewSha: "abcdef0123",
170 Ref: "refs/heads/main",
171 }},
172 []*tangled.Pipeline_Workflow{{Name: "ci.yml",
173 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}},
174 )
175
176 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml")
177 if ref.PipelineRunName != name || ref.PipelineRunUID != "uid-1" {
178 t.Fatalf("ref mismatch: %+v", ref)
179 }
180}
181
182func TestTektonLogsLookup(t *testing.T) {
183 p, st, _, client := newTektonTestProvider(t)
184 ctx := context.Background()
185 if _, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml"); !errors.Is(err, ErrLogsNotFound) {
186 t.Fatalf("logs before mapping err = %v; want ErrLogsNotFound", err)
187 }
188 ref := TektonRunRef{
189 Knot: "knot.example.com",
190 PipelineRkey: "rkey-1",
191 Workflow: "ci.yml",
192 Namespace: "ci",
193 PipelineRunName: "run-1",
194 PipelineRunUID: "uid-1",
195 PipelineName: "repo-ci",
196 PipelineURI: pipelineATURI("knot.example.com", "rkey-1"),
197 }
198 if err := st.InsertTektonRun(ctx, ref); err != nil {
199 t.Fatalf("insert ref: %v", err)
200 }
201 // With the mapping in place but no TaskRuns yet, Logs must NOT
202 // return ErrLogsNotFound: the workflow has been spawned and is
203 // just queueing inside Tekton. Surfacing 404 here mistranslates
204 // "still scheduling" as "doesn't exist" at the HTTP layer (see
205 // http.go's /logs handler). Verify we get an open channel that
206 // stays open until ctx is cancelled.
207 {
208 waitCtx, cancel := context.WithCancel(ctx)
209 ch, err := p.Logs(waitCtx, "knot.example.com", "rkey-1", "ci.yml")
210 if err != nil {
211 cancel()
212 t.Fatalf("logs before TaskRuns err = %v; want nil", err)
213 }
214 if ch == nil {
215 cancel()
216 t.Fatalf("logs before TaskRuns: nil channel")
217 }
218 // Channel must not produce any frames or close before we
219 // cancel. A premature close would mean the goroutine treated
220 // "no TaskRuns yet" as "stream done", which is what we just
221 // fixed.
222 select {
223 case line, ok := <-ch:
224 cancel()
225 if !ok {
226 t.Fatalf("logs channel closed before TaskRuns appeared")
227 }
228 t.Fatalf("unexpected frame before TaskRuns: %+v", line)
229 case <-time.After(50 * time.Millisecond):
230 }
231 cancel()
232 // After cancellation the producer goroutine must close the
233 // channel and not strand any frames.
234 for line := range ch {
235 t.Fatalf("unexpected frame after cancel: %+v", line)
236 }
237 }
238
239 // Seed a terminal PipelineRun so Logs takes the snapshot path
240 // (fetchCompletedTaskRunLogs, Follow=false) and closes the
241 // channel after draining the seeded TaskRun. The non-terminal
242 // path follows pod logs live and would only EOF on ctx cancel,
243 // which is not what this assertion is exercising.
244 client.SeedObject(pipelineRunsGVR, "ci", k8s.Object{
245 "apiVersion": "tekton.dev/v1",
246 "kind": "PipelineRun",
247 "metadata": map[string]any{
248 "name": "run-1",
249 "namespace": "ci",
250 },
251 "status": map[string]any{
252 "conditions": []any{map[string]any{
253 "type": "Succeeded",
254 "status": "True",
255 }},
256 },
257 })
258 client.SeedObject(taskRunsGVR, "ci", k8s.Object{
259 "apiVersion": "tekton.dev/v1",
260 "kind": "TaskRun",
261 "metadata": map[string]any{
262 "name": "task-1",
263 "namespace": "ci",
264 "labels": map[string]any{
265 "tekton.dev/pipelineRun": "run-1",
266 },
267 },
268 })
269 client.SeedPod("ci", k8s.Pod{
270 Name: "pod-1",
271 Namespace: "ci",
272 Labels: map[string]string{
273 "tekton.dev/taskRun": "task-1",
274 },
275 Containers: []k8s.Container{{Name: "step-test"}},
276 })
277 client.SetPodLog("ci", "pod-1", "step-test", "hello\n")
278
279 ch, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml")
280 if err != nil {
281 t.Fatalf("Logs after pods: %v", err)
282 }
283 var got []LogLine
284 for line := range ch {
285 got = append(got, line)
286 }
287 if len(got) < 2 || got[0].StepStatus != StepStatusStart ||
288 got[len(got)-1].StepStatus != StepStatusEnd {
289 t.Fatalf("log frames = %+v", got)
290 }
291}
292
293func tektonStatusObject(condStatus, reason string) k8s.Object {
294 return k8s.Object{
295 "status": map[string]any{
296 "conditions": []any{map[string]any{
297 "type": "Succeeded",
298 "status": condStatus,
299 "reason": reason,
300 }},
301 },
302 }
303}
304
305func waitTektonRef(t *testing.T, st *store, knot, rkey, workflow string) *TektonRunRef {
306 t.Helper()
307 deadline := time.Now().Add(2 * time.Second)
308 for time.Now().Before(deadline) {
309 ref, err := st.LookupTektonRunByTuple(context.Background(), knot, rkey, workflow)
310 if err != nil {
311 t.Fatalf("lookup: %v", err)
312 }
313 if ref != nil {
314 return ref
315 }
316 time.Sleep(20 * time.Millisecond)
317 }
318 t.Fatal("tekton run row not persisted within deadline")
319 return nil
320}