Stitch any CI into Tangled
86
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 320 lines 10 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "log/slog" 8 "testing" 9 "time" 10 11 "tangled.org/core/api/tangled" 12 13 "go.mitchellh.com/tack/internal/k8s" 14) 15 16func newTektonTestProvider(t *testing.T) (*tektonProvider, *store, *broker, *k8s.FakeClient) { 17 t.Helper() 18 st := newTestStore(t) 19 br := newBroker(st) 20 client := k8s.NewFakeClient() 21 p := newTektonProvider(br, st, client, "ci", slog.Default()) 22 return p, st, br, client 23} 24 25func TestTektonWorkflowConfig(t *testing.T) { 26 raw := "tack:\n tekton:\n pipeline: repo-ci\n service_account: runner\n params:\n image: example/app\n" 27 cfg, err := parseTektonWorkflowConfig(raw) 28 if err != nil { 29 t.Fatalf("parse: %v", err) 30 } 31 if cfg.Pipeline != "repo-ci" || cfg.ServiceAccount != "runner" { 32 t.Fatalf("cfg mismatch: %+v", cfg) 33 } 34 if got := cfg.Params["image"]; got != "example/app" { 35 t.Fatalf("params[image] = %q", got) 36 } 37 38 if _, err := parseTektonWorkflowConfig("tack:\n tekton: {}\n"); err == nil { 39 t.Fatal("missing pipeline should fail") 40 } 41} 42 43func TestTektonBuildPipelineRun(t *testing.T) { 44 cfg := &tektonWorkflowConfig{ 45 Pipeline: "repo-ci", 46 ServiceAccount: "runner", 47 Params: map[string]string{ 48 "image": "example/app", 49 }, 50 } 51 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef", "main") 52 if len(name) > 63 || name == "" { 53 t.Fatalf("bad generated name: %q", name) 54 } 55 56 obj := buildTektonPipelineRun("ci", name, cfg, 57 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef", "main", 58 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 59 ) 60 if obj.GetAPIVersion() != tektonAPIVersion || obj.GetKind() != tektonRunKind { 61 t.Fatalf("type meta mismatch: %s %s", obj.GetAPIVersion(), obj.GetKind()) 62 } 63 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name") 64 if !ok || pipeline != "repo-ci" { 65 t.Fatalf("pipelineRef.name = %q", pipeline) 66 } 67 sa, ok := obj.NestedString("spec", "serviceAccountName") 68 if !ok || sa != "runner" { 69 t.Fatalf("serviceAccountName = %q", sa) 70 } 71 params, ok := obj.NestedSlice("spec", "params") 72 if !ok || len(params) != 1 { 73 t.Fatalf("params = %+v", params) 74 } 75 if obj.GetAnnotations()[tektonAnnotationActor] != "did:plc:actor" || 76 obj.GetAnnotations()[tektonAnnotationCommit] != "abcdef" { 77 t.Fatalf("annotations missing identity: %+v", obj.GetAnnotations()) 78 } 79} 80 81func TestTektonStatusMapping(t *testing.T) { 82 tests := []struct { 83 name string 84 cond string 85 reason string 86 status string 87 terminal bool 88 ok bool 89 }{ 90 {name: "unknown", cond: "Unknown", status: "running", ok: true}, 91 {name: "success", cond: "True", status: "success", terminal: true, ok: true}, 92 {name: "failed", cond: "False", reason: "Failed", status: "failed", terminal: true, ok: true}, 93 {name: "cancelled", cond: "False", reason: "PipelineRunCancelled", status: "cancelled", terminal: true, ok: true}, 94 {name: "stopped", cond: "False", reason: "PipelineRunStopped", status: "cancelled", terminal: true, ok: true}, 95 } 96 for _, tt := range tests { 97 t.Run(tt.name, func(t *testing.T) { 98 obj := tektonStatusObject(tt.cond, tt.reason) 99 status, terminal, ok := mapTektonPipelineRunStatus(obj) 100 if status != tt.status || terminal != tt.terminal || ok != tt.ok { 101 t.Fatalf("got %q/%v/%v; want %q/%v/%v", 102 status, terminal, ok, tt.status, tt.terminal, tt.ok) 103 } 104 }) 105 } 106} 107 108func TestTektonSpawnCreatesPipelineRun(t *testing.T) { 109 p, st, _, client := newTektonTestProvider(t) 110 ctx, cancel := context.WithCancel(context.Background()) 111 defer cancel() 112 113 trigger := &tangled.Pipeline_TriggerMetadata{ 114 Push: &tangled.Pipeline_PushTriggerData{ 115 NewSha: "abcdef0123", 116 Ref: "refs/heads/main", 117 }, 118 } 119 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", trigger, 120 []*tangled.Pipeline_Workflow{{Name: "ci.yml", 121 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}}, 122 ) 123 124 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml") 125 if ref.Namespace != "ci" || ref.PipelineName != "repo-ci" { 126 t.Fatalf("ref mismatch: %+v", ref) 127 } 128 129 obj, err := client.GetObject(context.Background(), pipelineRunsGVR, "ci", ref.PipelineRunName) 130 if err != nil { 131 t.Fatalf("get PipelineRun: %v", err) 132 } 133 pipeline, ok := obj.NestedString("spec", "pipelineRef", "name") 134 if !ok || pipeline != "repo-ci" { 135 t.Fatalf("pipelineRef.name = %q", pipeline) 136 } 137 138 rows, err := st.EventsAfter(context.Background(), 0) 139 if err != nil { 140 t.Fatalf("EventsAfter: %v", err) 141 } 142 if len(rows) != 1 { 143 t.Fatalf("got %d events, want 1", len(rows)) 144 } 145 var rec tangled.PipelineStatus 146 if err := json.Unmarshal(rows[0].EventJSON, &rec); err != nil { 147 t.Fatalf("decode status: %v", err) 148 } 149 if rec.Status != "pending" || rec.Workflow != "ci.yml" { 150 t.Fatalf("bad pending status: %+v", rec) 151 } 152} 153 154func TestTektonSpawnAlreadyExists(t *testing.T) { 155 p, st, _, client := newTektonTestProvider(t) 156 name := tektonPipelineRunName("knot.example.com", "rkey-1", "ci.yml", "abcdef0123", "main") 157 existing := buildTektonPipelineRun("ci", name, 158 &tektonWorkflowConfig{Pipeline: "repo-ci"}, 159 "knot.example.com", "rkey-1", "did:plc:actor", "abcdef0123", "main", 160 &tangled.Pipeline_Workflow{Name: "ci.yml"}, 161 ) 162 existing.SetUID("uid-1") 163 client.SeedObject(pipelineRunsGVR, "ci", existing) 164 165 ctx, cancel := context.WithCancel(context.Background()) 166 defer cancel() 167 p.Spawn(ctx, "knot.example.com", "rkey-1", "did:plc:actor", 168 &tangled.Pipeline_TriggerMetadata{Push: &tangled.Pipeline_PushTriggerData{ 169 NewSha: "abcdef0123", 170 Ref: "refs/heads/main", 171 }}, 172 []*tangled.Pipeline_Workflow{{Name: "ci.yml", 173 Raw: "tack:\n tekton:\n pipeline: repo-ci\n"}}, 174 ) 175 176 ref := waitTektonRef(t, st, "knot.example.com", "rkey-1", "ci.yml") 177 if ref.PipelineRunName != name || ref.PipelineRunUID != "uid-1" { 178 t.Fatalf("ref mismatch: %+v", ref) 179 } 180} 181 182func TestTektonLogsLookup(t *testing.T) { 183 p, st, _, client := newTektonTestProvider(t) 184 ctx := context.Background() 185 if _, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml"); !errors.Is(err, ErrLogsNotFound) { 186 t.Fatalf("logs before mapping err = %v; want ErrLogsNotFound", err) 187 } 188 ref := TektonRunRef{ 189 Knot: "knot.example.com", 190 PipelineRkey: "rkey-1", 191 Workflow: "ci.yml", 192 Namespace: "ci", 193 PipelineRunName: "run-1", 194 PipelineRunUID: "uid-1", 195 PipelineName: "repo-ci", 196 PipelineURI: pipelineATURI("knot.example.com", "rkey-1"), 197 } 198 if err := st.InsertTektonRun(ctx, ref); err != nil { 199 t.Fatalf("insert ref: %v", err) 200 } 201 // With the mapping in place but no TaskRuns yet, Logs must NOT 202 // return ErrLogsNotFound: the workflow has been spawned and is 203 // just queueing inside Tekton. Surfacing 404 here mistranslates 204 // "still scheduling" as "doesn't exist" at the HTTP layer (see 205 // http.go's /logs handler). Verify we get an open channel that 206 // stays open until ctx is cancelled. 207 { 208 waitCtx, cancel := context.WithCancel(ctx) 209 ch, err := p.Logs(waitCtx, "knot.example.com", "rkey-1", "ci.yml") 210 if err != nil { 211 cancel() 212 t.Fatalf("logs before TaskRuns err = %v; want nil", err) 213 } 214 if ch == nil { 215 cancel() 216 t.Fatalf("logs before TaskRuns: nil channel") 217 } 218 // Channel must not produce any frames or close before we 219 // cancel. A premature close would mean the goroutine treated 220 // "no TaskRuns yet" as "stream done", which is what we just 221 // fixed. 222 select { 223 case line, ok := <-ch: 224 cancel() 225 if !ok { 226 t.Fatalf("logs channel closed before TaskRuns appeared") 227 } 228 t.Fatalf("unexpected frame before TaskRuns: %+v", line) 229 case <-time.After(50 * time.Millisecond): 230 } 231 cancel() 232 // After cancellation the producer goroutine must close the 233 // channel and not strand any frames. 234 for line := range ch { 235 t.Fatalf("unexpected frame after cancel: %+v", line) 236 } 237 } 238 239 // Seed a terminal PipelineRun so Logs takes the snapshot path 240 // (fetchCompletedTaskRunLogs, Follow=false) and closes the 241 // channel after draining the seeded TaskRun. The non-terminal 242 // path follows pod logs live and would only EOF on ctx cancel, 243 // which is not what this assertion is exercising. 244 client.SeedObject(pipelineRunsGVR, "ci", k8s.Object{ 245 "apiVersion": "tekton.dev/v1", 246 "kind": "PipelineRun", 247 "metadata": map[string]any{ 248 "name": "run-1", 249 "namespace": "ci", 250 }, 251 "status": map[string]any{ 252 "conditions": []any{map[string]any{ 253 "type": "Succeeded", 254 "status": "True", 255 }}, 256 }, 257 }) 258 client.SeedObject(taskRunsGVR, "ci", k8s.Object{ 259 "apiVersion": "tekton.dev/v1", 260 "kind": "TaskRun", 261 "metadata": map[string]any{ 262 "name": "task-1", 263 "namespace": "ci", 264 "labels": map[string]any{ 265 "tekton.dev/pipelineRun": "run-1", 266 }, 267 }, 268 }) 269 client.SeedPod("ci", k8s.Pod{ 270 Name: "pod-1", 271 Namespace: "ci", 272 Labels: map[string]string{ 273 "tekton.dev/taskRun": "task-1", 274 }, 275 Containers: []k8s.Container{{Name: "step-test"}}, 276 }) 277 client.SetPodLog("ci", "pod-1", "step-test", "hello\n") 278 279 ch, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml") 280 if err != nil { 281 t.Fatalf("Logs after pods: %v", err) 282 } 283 var got []LogLine 284 for line := range ch { 285 got = append(got, line) 286 } 287 if len(got) < 2 || got[0].StepStatus != StepStatusStart || 288 got[len(got)-1].StepStatus != StepStatusEnd { 289 t.Fatalf("log frames = %+v", got) 290 } 291} 292 293func tektonStatusObject(condStatus, reason string) k8s.Object { 294 return k8s.Object{ 295 "status": map[string]any{ 296 "conditions": []any{map[string]any{ 297 "type": "Succeeded", 298 "status": condStatus, 299 "reason": reason, 300 }}, 301 }, 302 } 303} 304 305func waitTektonRef(t *testing.T, st *store, knot, rkey, workflow string) *TektonRunRef { 306 t.Helper() 307 deadline := time.Now().Add(2 * time.Second) 308 for time.Now().Before(deadline) { 309 ref, err := st.LookupTektonRunByTuple(context.Background(), knot, rkey, workflow) 310 if err != nil { 311 t.Fatalf("lookup: %v", err) 312 } 313 if ref != nil { 314 return ref 315 } 316 time.Sleep(20 * time.Millisecond) 317 } 318 t.Fatal("tekton run row not persisted within deadline") 319 return nil 320}