Stitch any CI into Tangled
151
fork

Configure Feed

Select the types of activity you want to include in your feed.

provider/tekton: keep Logs open while waiting for TaskRuns

Logs returned ErrLogsNotFound whenever the store had a mapping for the
(knot, pipelineRkey, workflow) tuple but Tekton had not yet scheduled
any TaskRuns. The HTTP /logs handler translates ErrLogsNotFound into a
real 404 before upgrading to a WebSocket, so a freshly-spawned or
still-queueing PipelineRun appeared nonexistent to the appview. That
contradicts the provider contract, which reserves ErrLogsNotFound for
the case where the workflow never ran on this provider at all.

Logs now only returns ErrLogsNotFound when the store mapping is
missing. Once the mapping is found the call always returns an open
channel and defers TaskRun discovery to the producer goroutine, which
polls via a new waitForTaskRuns helper until a TaskRun appears, the
PipelineRun reaches a terminal state with no TaskRuns scheduled, or
ctx is cancelled. The terminal-with-no-TaskRuns and ctx-cancelled
cases close the channel cleanly without sending anything, so the
goroutine cannot leak across a disconnecting client.

authored by

Mitchell Hashimoto and committed by
Tangled
b621a513 9dd713fd

+107 -16
+71 -14
provider_tekton.go
··· 465 465 return nil, fmt.Errorf("lookup tekton run mapping: %w", err) 466 466 } 467 467 if ref == nil { 468 + // No mapping at all means this provider never spawned a 469 + // PipelineRun for the requested tuple, so a 404 is the 470 + // honest answer. 468 471 return nil, ErrLogsNotFound 469 472 } 470 473 471 - taskRuns, err := p.taskRunsForPipelineRun(ctx, *ref) 472 - if err != nil { 473 - return nil, err 474 - } 475 - p.log.Debug("Logs: found TaskRuns for PipelineRun", 476 - "pipeline_run", ref.PipelineRunName, "count", len(taskRuns), 477 - ) 478 - if len(taskRuns) == 0 { 479 - return nil, ErrLogsNotFound 480 - } 481 - 482 - terminal := p.isPipelineRunTerminal(ctx, *ref) 483 - p.log.Debug("Logs: pipeline run terminal state", "pipeline_run", ref.PipelineRunName, "terminal", terminal) 484 - 474 + // At this point the workflow *was* spawned — we have a row in the 475 + // store mapping the tuple to a PipelineRun. The TaskRuns the 476 + // PipelineRun fans out to are created asynchronously by Tekton 477 + // once the run is admitted, so a freshly-spawned or still-queueing 478 + // PipelineRun will momentarily report zero TaskRuns. Returning 479 + // ErrLogsNotFound here would mistranslate that into a 404 at the 480 + // HTTP layer (see provider.go contract: ErrLogsNotFound means the 481 + // workflow never ran), making just-spawned runs look nonexistent 482 + // to the appview. Instead, hand back an open channel and poll 483 + // inside the goroutine until TaskRuns materialize, ctx is 484 + // cancelled, or the PipelineRun reaches a terminal state with 485 + // nothing to stream. 485 486 out := make(chan LogLine, 32) 486 487 go func() { 487 488 defer close(out) 489 + 490 + taskRuns := p.waitForTaskRuns(ctx, *ref) 491 + if len(taskRuns) == 0 { 492 + // Either ctx was cancelled, or the PipelineRun 493 + // reached a terminal state without ever scheduling 494 + // any TaskRuns. Both cases close an empty stream; 495 + // there's nothing to send. 496 + return 497 + } 498 + p.log.Debug("Logs: found TaskRuns for PipelineRun", 499 + "pipeline_run", ref.PipelineRunName, "count", len(taskRuns), 500 + ) 501 + 502 + terminal := p.isPipelineRunTerminal(ctx, *ref) 503 + p.log.Debug("Logs: pipeline run terminal state", 504 + "pipeline_run", ref.PipelineRunName, "terminal", terminal, 505 + ) 506 + 488 507 stepID := 0 489 508 for _, tr := range taskRuns { 490 509 taskName := tr.GetName() ··· 523 542 p.log.Debug("Logs: all TaskRuns streamed", "pipeline_run", ref.PipelineRunName) 524 543 }() 525 544 return out, nil 545 + } 546 + 547 + // waitForTaskRuns blocks until at least one TaskRun belonging to the 548 + // PipelineRun in ref is observable, the PipelineRun reaches a terminal 549 + // state with no TaskRuns, or ctx is cancelled. It exists so Logs can 550 + // keep the channel open across the gap between PipelineRun creation 551 + // and Tekton scheduling its TaskRuns, instead of mistranslating that 552 + // gap into a 404. The first iteration always probes immediately so a 553 + // PipelineRun that already has TaskRuns is returned without waiting. 554 + func (p *tektonProvider) waitForTaskRuns( 555 + ctx context.Context, 556 + ref TektonRunRef, 557 + ) []k8s.Object { 558 + const interval = 1 * time.Second 559 + for { 560 + taskRuns, err := p.taskRunsForPipelineRun(ctx, ref) 561 + if err != nil { 562 + p.log.Debug("waitForTaskRuns: list TaskRuns failed", 563 + "err", err, "pipeline_run", ref.PipelineRunName, 564 + ) 565 + } else if len(taskRuns) > 0 { 566 + return taskRuns 567 + } 568 + // If the PipelineRun has already reached a terminal state and 569 + // still has no TaskRuns, there's nothing more to wait for — 570 + // return empty so the caller closes the stream cleanly. 571 + if p.isPipelineRunTerminal(ctx, ref) { 572 + p.log.Debug("waitForTaskRuns: PipelineRun terminal with no TaskRuns", 573 + "pipeline_run", ref.PipelineRunName, 574 + ) 575 + return nil 576 + } 577 + select { 578 + case <-ctx.Done(): 579 + return nil 580 + case <-time.After(interval): 581 + } 582 + } 526 583 } 527 584 528 585 // isPipelineRunTerminal returns true if the PipelineRun is in a terminal state right now.
+36 -2
provider_tekton_test.go
··· 198 198 if err := st.InsertTektonRun(ctx, ref); err != nil { 199 199 t.Fatalf("insert ref: %v", err) 200 200 } 201 - if _, err := p.Logs(ctx, "knot.example.com", "rkey-1", "ci.yml"); !errors.Is(err, ErrLogsNotFound) { 202 - t.Fatalf("logs before TaskRuns err = %v; want ErrLogsNotFound", err) 201 + // With the mapping in place but no TaskRuns yet, Logs must NOT 202 + // return ErrLogsNotFound: the workflow has been spawned and is 203 + // just queueing inside Tekton. Surfacing 404 here mistranslates 204 + // "still scheduling" as "doesn't exist" at the HTTP layer (see 205 + // http.go's /logs handler). Verify we get an open channel that 206 + // stays open until ctx is cancelled. 207 + { 208 + waitCtx, cancel := context.WithCancel(ctx) 209 + ch, err := p.Logs(waitCtx, "knot.example.com", "rkey-1", "ci.yml") 210 + if err != nil { 211 + cancel() 212 + t.Fatalf("logs before TaskRuns err = %v; want nil", err) 213 + } 214 + if ch == nil { 215 + cancel() 216 + t.Fatalf("logs before TaskRuns: nil channel") 217 + } 218 + // Channel must not produce any frames or close before we 219 + // cancel. A premature close would mean the goroutine treated 220 + // "no TaskRuns yet" as "stream done", which is what we just 221 + // fixed. 222 + select { 223 + case line, ok := <-ch: 224 + cancel() 225 + if !ok { 226 + t.Fatalf("logs channel closed before TaskRuns appeared") 227 + } 228 + t.Fatalf("unexpected frame before TaskRuns: %+v", line) 229 + case <-time.After(50 * time.Millisecond): 230 + } 231 + cancel() 232 + // After cancellation the producer goroutine must close the 233 + // channel and not strand any frames. 234 + for line := range ch { 235 + t.Fatalf("unexpected frame after cancel: %+v", line) 236 + } 203 237 } 204 238 205 239 client.SeedObject(taskRunsGVR, "ci", k8s.Object{