Stitch any CI into Tangled
151
fork

Configure Feed

Select the types of activity you want to include in your feed.

provider/tekton: stream non-terminal pod logs with follow=true

The Logs path for a still-running PipelineRun called StreamPodLogs
without follow=true, so the apiserver returned a snapshot of the
container log and EOF'd at the current tail. The provider then
emitted StepStatusEnd and moved on, which made still-running steps
look complete and dropped any subsequent log bytes unless the client
reconnected. The same one-shot pass also took a single snapshot of
TaskRuns, so any TaskRun spawned later in the PipelineRun (sequential
runAfter, finally, retries) was silently ignored.

StreamPodLogs now takes a LogOptions{Follow bool}. The non-terminal
path uses Follow=true so the read only EOFs once the container
actually terminates; the terminal path keeps the snapshot read since
the log is already complete. The Logs goroutine is replaced with a
poll loop that re-lists TaskRuns and re-checks the PipelineRun's
terminal state each iteration, only closing the channel once the
PipelineRun is terminal AND no new TaskRuns appeared in the last
pass. The fake k8s client mirrors the apiserver's follow semantics
by holding the reader open until ctx is cancelled.

authored by

Mitchell Hashimoto and committed by
Tangled
e2ac696c b621a513

+192 -55
+39 -2
internal/k8s/fake.go
··· 228 228 namespace string, 229 229 podName string, 230 230 container string, 231 + opts LogOptions, 231 232 ) (io.ReadCloser, error) { 232 233 select { 233 234 case <-ctx.Done(): ··· 236 237 } 237 238 238 239 c.mu.Lock() 239 - defer c.mu.Unlock() 240 240 raw, ok := c.podLogs[podLogKey{ 241 241 namespace: namespace, 242 242 podName: podName, 243 243 container: container, 244 244 }] 245 + c.mu.Unlock() 245 246 if !ok { 246 247 return nil, fmt.Errorf( 247 248 "%w: pod log %s/%s[%s]", 248 249 ErrNotFound, namespace, podName, container, 249 250 ) 250 251 } 251 - return io.NopCloser(strings.NewReader(raw)), nil 252 + if !opts.Follow { 253 + return io.NopCloser(strings.NewReader(raw)), nil 254 + } 255 + // In Follow mode, mirror the kube apiserver: yield the seeded bytes, 256 + // then keep the stream open until ctx is cancelled. Tests that exercise 257 + // the live-streaming path drive termination by cancelling the context; 258 + // tests that want a one-shot snapshot should leave Follow=false. 259 + return &fakeFollowReader{rest: raw, ctx: ctx}, nil 260 + } 261 + 262 + // fakeFollowReader returns the seeded log bytes, then blocks on ctx.Done() 263 + // before EOFing. This matches the apiserver behaviour for follow=true: 264 + // the connection only closes once the container terminates (or the caller 265 + // hangs up), so callers using bufio.Scanner do not see a premature EOF and 266 + // treat a still-running container as completed. 267 + type fakeFollowReader struct { 268 + rest string 269 + ctx context.Context 270 + done bool 252 271 } 272 + 273 + var _ io.ReadCloser = (*fakeFollowReader)(nil) 274 + 275 + func (r *fakeFollowReader) Read(p []byte) (int, error) { 276 + if len(r.rest) > 0 { 277 + n := copy(p, r.rest) 278 + r.rest = r.rest[n:] 279 + return n, nil 280 + } 281 + if r.done { 282 + return 0, io.EOF 283 + } 284 + r.done = true 285 + <-r.ctx.Done() 286 + return 0, io.EOF 287 + } 288 + 289 + func (r *fakeFollowReader) Close() error { return nil } 253 290 254 291 func (c *FakeClient) createObject( 255 292 gvr GVR,
+9
internal/k8s/incluster.go
··· 242 242 namespace string, 243 243 podName string, 244 244 container string, 245 + opts LogOptions, 245 246 ) (io.ReadCloser, error) { 246 247 query := url.Values{} 247 248 if container != "" { 248 249 query.Set("container", container) 250 + } 251 + // follow=true makes the API server hold the connection open and push 252 + // new log bytes as the container writes them, only closing when the 253 + // container terminates. Without it, the response is a snapshot of 254 + // whatever has been written so far, which silently truncates live 255 + // runs. 256 + if opts.Follow { 257 + query.Set("follow", "true") 249 258 } 250 259 resp, err := c.do(ctx, http.MethodGet, 251 260 coreResourcePath(namespace, podsResource, podName, podLogsSubresource),
+14
internal/k8s/k8s.go
··· 61 61 namespace string, 62 62 podName string, 63 63 container string, 64 + opts LogOptions, 64 65 ) (io.ReadCloser, error) 66 + } 67 + 68 + // LogOptions controls how StreamPodLogs reads container logs. 69 + type LogOptions struct { 70 + // Follow tells the API server to keep the connection open and stream 71 + // new log bytes as the container writes them, only EOFing when the 72 + // container terminates (or ctx is cancelled). Without Follow, the 73 + // returned reader is a snapshot: it yields the bytes that exist at 74 + // request time and then EOFs immediately, even if the container is 75 + // still running. Live-streaming callers MUST set Follow=true; using 76 + // the snapshot mode for a still-running container makes the caller 77 + // look at a frozen view and treat the container as if it had finished. 78 + Follow bool 65 79 } 66 80 67 81 // GVR identifies a Kubernetes resource by the path segments the API server
+111 -53
provider_tekton.go
··· 486 486 out := make(chan LogLine, 32) 487 487 go func() { 488 488 defer close(out) 489 + p.streamPipelineRunLogs(ctx, out, *ref) 490 + }() 491 + return out, nil 492 + } 489 493 490 - taskRuns := p.waitForTaskRuns(ctx, *ref) 491 - if len(taskRuns) == 0 { 492 - // Either ctx was cancelled, or the PipelineRun 493 - // reached a terminal state without ever scheduling 494 - // any TaskRuns. Both cases close an empty stream; 495 - // there's nothing to send. 494 + // streamPipelineRunLogs drives the Logs channel for a single PipelineRun. 495 + // It polls for TaskRuns until the PipelineRun is terminal (or ctx is 496 + // cancelled), streaming each TaskRun's logs exactly once. 497 + // 498 + // The loop is deliberately a poll rather than a one-shot pass for two 499 + // reasons: 500 + // 501 + // 1. Live runs need follow semantics. Streaming a still-running TaskRun 502 + // uses Follow=true on the pod log stream, so streamTaskRunLogs only 503 + // returns once the container actually terminates. The outer loop 504 + // then re-checks for new TaskRuns and the PipelineRun's terminal 505 + // state, instead of closing the channel mid-run. 506 + // 507 + // 2. PipelineRuns can spawn additional TaskRuns over time (sequential 508 + // `runAfter` tasks, finally blocks, retries). A single snapshot 509 + // would silently drop any TaskRun that appears after the snapshot, 510 + // even though the workflow is still running. 511 + // 512 + // The loop terminates only when the PipelineRun is terminal AND the 513 + // most recent listing produced no new TaskRuns, which together mean no 514 + // further TaskRuns will ever appear. 515 + func (p *tektonProvider) streamPipelineRunLogs( 516 + ctx context.Context, 517 + out chan<- LogLine, 518 + ref TektonRunRef, 519 + ) { 520 + const pollInterval = 1 * time.Second 521 + seen := map[string]bool{} 522 + stepID := 0 523 + for { 524 + if err := ctx.Err(); err != nil { 496 525 return 497 526 } 498 - p.log.Debug("Logs: found TaskRuns for PipelineRun", 499 - "pipeline_run", ref.PipelineRunName, "count", len(taskRuns), 500 - ) 527 + 528 + taskRuns, err := p.taskRunsForPipelineRun(ctx, ref) 529 + if err != nil { 530 + p.log.Debug("Logs: list TaskRuns failed", 531 + "err", err, "pipeline_run", ref.PipelineRunName, 532 + ) 533 + } 534 + // Snapshot the terminal state *after* the listing so we never 535 + // observe terminal=true while still missing a TaskRun that 536 + // existed at list time. The reverse race (terminal observed 537 + // before a TaskRun spawn) is handled by the next iteration: we 538 + // only exit when terminal is true AND no new TaskRuns showed up 539 + // in this pass. 540 + terminal := p.isPipelineRunTerminal(ctx, ref) 501 541 502 - terminal := p.isPipelineRunTerminal(ctx, *ref) 503 - p.log.Debug("Logs: pipeline run terminal state", 504 - "pipeline_run", ref.PipelineRunName, "terminal", terminal, 542 + var fresh []k8s.Object 543 + for _, tr := range taskRuns { 544 + name := tr.GetName() 545 + if name == "" || seen[name] { 546 + continue 547 + } 548 + seen[name] = true 549 + fresh = append(fresh, tr) 550 + } 551 + p.log.Debug("Logs: poll iteration", 552 + "pipeline_run", ref.PipelineRunName, 553 + "task_runs_total", len(taskRuns), 554 + "task_runs_new", len(fresh), 555 + "terminal", terminal, 505 556 ) 506 557 507 - stepID := 0 508 - for _, tr := range taskRuns { 558 + for _, tr := range fresh { 509 559 taskName := tr.GetName() 510 560 if taskName == "" { 511 561 taskName = fmt.Sprintf("task %d", stepID) 512 562 } 513 - p.log.Debug("Logs: streaming TaskRun", "task_run", taskName, "step_id", stepID, "terminal", terminal) 563 + p.log.Debug("Logs: streaming TaskRun", 564 + "task_run", taskName, "step_id", stepID, "terminal", terminal, 565 + ) 514 566 if !sendLine(ctx, out, LogLine{ 515 567 Kind: LogKindControl, 516 568 Time: time.Now(), ··· 521 573 return 522 574 } 523 575 576 + // terminal is the snapshot taken at the top of this 577 + // iteration. If the pipeline was terminal then, all 578 + // TaskRuns we see are guaranteed complete and we can 579 + // take the snapshot fast-path; otherwise we follow the 580 + // pod logs live so StepStatusEnd is only emitted once 581 + // the container actually exits. 524 582 if terminal { 525 - p.fetchCompletedTaskRunLogs(ctx, out, *ref, tr, stepID) 583 + p.fetchCompletedTaskRunLogs(ctx, out, ref, tr, stepID) 526 584 } else { 527 - p.streamTaskRunLogs(ctx, out, *ref, tr, stepID) 585 + p.streamTaskRunLogs(ctx, out, ref, tr, stepID) 528 586 } 529 587 530 588 if !sendLine(ctx, out, LogLine{ ··· 536 594 }) { 537 595 return 538 596 } 539 - p.log.Debug("Logs: finished TaskRun", "task_run", taskName, "step_id", stepID) 597 + p.log.Debug("Logs: finished TaskRun", 598 + "task_run", taskName, "step_id", stepID, 599 + ) 540 600 stepID++ 541 601 } 542 - p.log.Debug("Logs: all TaskRuns streamed", "pipeline_run", ref.PipelineRunName) 543 - }() 544 - return out, nil 545 - } 546 602 547 - // waitForTaskRuns blocks until at least one TaskRun belonging to the 548 - // PipelineRun in ref is observable, the PipelineRun reaches a terminal 549 - // state with no TaskRuns, or ctx is cancelled. It exists so Logs can 550 - // keep the channel open across the gap between PipelineRun creation 551 - // and Tekton scheduling its TaskRuns, instead of mistranslating that 552 - // gap into a 404. The first iteration always probes immediately so a 553 - // PipelineRun that already has TaskRuns is returned without waiting. 554 - func (p *tektonProvider) waitForTaskRuns( 555 - ctx context.Context, 556 - ref TektonRunRef, 557 - ) []k8s.Object { 558 - const interval = 1 * time.Second 559 - for { 560 - taskRuns, err := p.taskRunsForPipelineRun(ctx, ref) 561 - if err != nil { 562 - p.log.Debug("waitForTaskRuns: list TaskRuns failed", 563 - "err", err, "pipeline_run", ref.PipelineRunName, 564 - ) 565 - } else if len(taskRuns) > 0 { 566 - return taskRuns 567 - } 568 - // If the PipelineRun has already reached a terminal state and 569 - // still has no TaskRuns, there's nothing more to wait for — 570 - // return empty so the caller closes the stream cleanly. 571 - if p.isPipelineRunTerminal(ctx, ref) { 572 - p.log.Debug("waitForTaskRuns: PipelineRun terminal with no TaskRuns", 603 + // Done condition: the PipelineRun is terminal AND we found no 604 + // new TaskRuns this iteration. Both are required because a 605 + // terminal PipelineRun can still expose a freshly-listed 606 + // TaskRun whose pod we haven't drained yet. 607 + if terminal && len(fresh) == 0 { 608 + p.log.Debug("Logs: pipeline run terminal, no new TaskRuns", 573 609 "pipeline_run", ref.PipelineRunName, 574 610 ) 575 - return nil 611 + return 612 + } 613 + 614 + // When we processed new TaskRuns this round, loop again 615 + // immediately to re-list — Follow=true streaming may have 616 + // blocked us long enough for additional TaskRuns or terminal 617 + // transitions to have happened. 618 + if len(fresh) > 0 { 619 + continue 576 620 } 621 + 577 622 select { 578 623 case <-ctx.Done(): 579 - return nil 580 - case <-time.After(interval): 624 + return 625 + case <-time.After(pollInterval): 581 626 } 582 627 } 583 628 } ··· 661 706 p.log.Debug("fetchCompletedTaskRunLogs: reading container logs", 662 707 "pod", pod.Name, "container", c.Name, 663 708 ) 664 - rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name) 709 + // Snapshot read (Follow=false) is correct here: the TaskRun 710 + // has already terminated, so the API server has the full log 711 + // available and there is nothing more to wait for. 712 + rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name, 713 + k8s.LogOptions{Follow: false}, 714 + ) 665 715 if err != nil { 666 716 p.log.Debug("fetchCompletedTaskRunLogs: stream failed", "err", err, 667 717 "pod", pod.Name, "container", c.Name, ··· 720 770 p.log.Debug("streamTaskRunLogs: streaming container", 721 771 "pod", pod.Name, "container", c.Name, "step_id", stepID, 722 772 ) 723 - rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name) 773 + // Live tail with Follow=true: the apiserver holds the 774 + // connection open until the container terminates (or ctx is 775 + // cancelled), so sendReaderLines only returns once the 776 + // container is actually done. Without this, the read EOFs at 777 + // the current tail position and the caller emits a spurious 778 + // StepStatusEnd while the step is still running. 779 + rc, err := p.client.StreamPodLogs(ctx, ref.Namespace, pod.Name, c.Name, 780 + k8s.LogOptions{Follow: true}, 781 + ) 724 782 if err != nil { 725 783 p.log.Debug("streamTaskRunLogs: stream pod logs failed", "err", err, 726 784 "pod", pod.Name, "container", c.Name,
+19
provider_tekton_test.go
··· 236 236 } 237 237 } 238 238 239 + // Seed a terminal PipelineRun so Logs takes the snapshot path 240 + // (fetchCompletedTaskRunLogs, Follow=false) and closes the 241 + // channel after draining the seeded TaskRun. The non-terminal 242 + // path follows pod logs live and would only EOF on ctx cancel, 243 + // which is not what this assertion is exercising. 244 + client.SeedObject(pipelineRunsGVR, "ci", k8s.Object{ 245 + "apiVersion": "tekton.dev/v1", 246 + "kind": "PipelineRun", 247 + "metadata": map[string]any{ 248 + "name": "run-1", 249 + "namespace": "ci", 250 + }, 251 + "status": map[string]any{ 252 + "conditions": []any{map[string]any{ 253 + "type": "Succeeded", 254 + "status": "True", 255 + }}, 256 + }, 257 + }) 239 258 client.SeedObject(taskRunsGVR, "ci", k8s.Object{ 240 259 "apiVersion": "tekton.dev/v1", 241 260 "kind": "TaskRun",