Stitch any CI into Tangled
151
fork

Configure Feed

Select the types of activity you want to include in your feed.

provider router

+396 -16
+4 -3
README.md
··· 80 80 | `TACK_JETSTREAM_URL` | Tangled Jetstream WebSocket URL | 81 81 | `TACK_DEV` | Use `ws://` for knot event-streams (any non-empty value) | 82 82 83 - When no provider is configured, tack runs an in-process fake provider 84 - that's useful for exercising the jetstream → knot → `/events` flow 85 - locally without a real CI account. 83 + All configured providers are active simultaneously. Each workflow 84 + chooses its provider via the first key under its top-level `tack:` 85 + block. e.g. `tack: { buildkite: { ... } }` runs on Buildkite, 86 + `tack: { fake: {} }` runs on the in-process fake provider. 86 87 87 88 ## Providers 88 89
+14 -13
main.go
··· 159 159 // them to publish synthetic status events at startup. 160 160 br := newBroker(st) 161 161 162 - // Provider that turns Tangled pipeline triggers into 163 - // pipeline.status events. The Buildkite provider is the real 164 - // integration; the fake one stands in when no Buildkite token is 165 - // configured so the full jetstream → knot → /events flow is 166 - // still exercisable locally without a Buildkite account. 162 + // Providers turn Tangled pipeline triggers into pipeline.status 163 + // events. We always wire the fake provider in so workflows can 164 + // opt into it (via `tack: { fake: ... }`) for end-to-end testing 165 + // even when Buildkite credentials are present; the Buildkite 166 + // provider is added on top when configured. Routing is per- 167 + // workflow and driven by the workflow YAML — see providerRouter. 167 168 // 168 169 // bkProvider is kept as a typed pointer separately because the 169 170 // /webhooks/buildkite handler needs the concrete *buildkiteProvider 170 171 // (for HandleWebhook + signature verification), not the abstract 171 172 // Provider surface. 172 - var ( 173 - provider Provider 174 - bkProvider *buildkiteProvider 175 - ) 173 + providers := map[string]Provider{ 174 + "fake": newFakeProvider(br, logger), 175 + } 176 + logger.Info("fake provider enabled (workflow opt-in via `tack.fake:`)") 177 + 178 + var bkProvider *buildkiteProvider 176 179 if cfg.BuildkiteToken != "" { 177 180 bkProvider = newBuildkiteProvider( 178 181 br, st, ··· 182 185 cfg.BuildkiteWebhookMode, 183 186 logger, 184 187 ) 185 - provider = bkProvider 188 + providers["buildkite"] = bkProvider 186 189 logger.Info("buildkite provider enabled", 187 190 "default_org", cfg.BuildkiteOrg, 188 191 "webhook_mode", cfg.BuildkiteWebhookMode, 189 192 ) 190 - } else { 191 - provider = newFakeProvider(br, logger) 192 - logger.Info("fake provider enabled (set TACK_BUILDKITE_TOKEN to use buildkite)") 193 193 } 194 + provider := newProviderRouter(logger, providers) 194 195 195 196 // Start the knot event-stream consumer first so the jetstream 196 197 // loop has somewhere to register newly-observed knots into. It
+149
provider_router.go
··· 1 + package main 2 + 3 + // providerRouter dispatches each incoming workflow to whichever 4 + // configured Provider matches the workflow's YAML body. Selection 5 + // happens per-workflow: we decode the top-level `tack:` map and pick 6 + // the first child key that names one of the registered providers. 7 + // This keeps the trigger plumbing oblivious to which backend any 8 + // given workflow will run on, and lets a single tack instance host 9 + // multiple providers concurrently — the workflow YAML is the source 10 + // of truth for routing, not the operator's env. 11 + // 12 + // Providers are registered as a (key → Provider) map. When a 13 + // workflow names more than one provider key under `tack:` (a config 14 + // mistake in practice) the router walks the YAML's child keys in 15 + // document order and picks the first one that has a registered 16 + // provider — Go's map iteration randomness never enters into it, 17 + // because the YAML's MapSlice preserves order. 18 + 19 + import ( 20 + "context" 21 + "errors" 22 + "fmt" 23 + "log/slog" 24 + "strings" 25 + 26 + "go.yaml.in/yaml/v2" 27 + "tangled.org/core/api/tangled" 28 + ) 29 + 30 + // providerRouter is itself a Provider so the rest of tack (knot 31 + // consumer, HTTP handlers) keeps talking to a single Provider value 32 + // regardless of how many real backends are wired in. 33 + type providerRouter struct { 34 + log *slog.Logger 35 + providers map[string]Provider 36 + } 37 + 38 + // Compile-time interface conformance check. 39 + var _ Provider = (*providerRouter)(nil) 40 + 41 + // newProviderRouter wires a router from a (key → Provider) map. 42 + func newProviderRouter( 43 + log *slog.Logger, 44 + providers map[string]Provider, 45 + ) *providerRouter { 46 + return &providerRouter{ 47 + log: log.With("component", "provider", "kind", "router"), 48 + providers: providers, 49 + } 50 + } 51 + 52 + // Spawn satisfies Provider. Each workflow is routed independently — 53 + // a single pipeline can mix workflows that target different 54 + // providers. Workflows whose YAML doesn't name a known provider are 55 + // logged loudly and skipped, since silently dropping them would 56 + // hide a config error from the operator. 57 + func (r *providerRouter) Spawn( 58 + ctx context.Context, 59 + knot string, 60 + pipelineRkey string, 61 + trigger *tangled.Pipeline_TriggerMetadata, 62 + workflows []*tangled.Pipeline_Workflow, 63 + ) { 64 + for _, wf := range workflows { 65 + // Defensive: the lexicon allows nil entries and doesn't 66 + // require a name. We can't route a workflow that has no 67 + // body to inspect. 68 + if wf == nil || wf.Name == "" { 69 + continue 70 + } 71 + p, err := r.pick(wf.Raw) 72 + if err != nil { 73 + r.log.Error("route workflow", 74 + "err", err, 75 + "knot", knot, 76 + "pipeline_rkey", pipelineRkey, 77 + "workflow", wf.Name, 78 + ) 79 + continue 80 + } 81 + // Hand the workflow off as a single-element slice so the 82 + // downstream provider's existing Spawn loop runs unchanged. 83 + p.Spawn(ctx, knot, pipelineRkey, trigger, 84 + []*tangled.Pipeline_Workflow{wf}, 85 + ) 86 + } 87 + } 88 + 89 + // Logs satisfies Provider. The (knot, rkey, workflow) tuple alone 90 + // doesn't tell us which backend ran the workflow — the YAML body 91 + // isn't carried on the request — so we ask each provider and 92 + // surface the first one that has a stream. ErrLogsNotFound from a 93 + // provider just means "not mine"; we keep walking. Any other error 94 + // is the answer (a real backend failure should surface to the HTTP 95 + // caller, not be masked by the next provider). 96 + // 97 + // Map iteration order is undefined, but in practice exactly one 98 + // provider should know about any given (knot, rkey, workflow) so the 99 + // order is moot. 100 + func (r *providerRouter) Logs( 101 + ctx context.Context, 102 + knot string, 103 + pipelineRkey string, 104 + workflow string, 105 + ) (<-chan LogLine, error) { 106 + for _, p := range r.providers { 107 + ch, err := p.Logs(ctx, knot, pipelineRkey, workflow) 108 + if errors.Is(err, ErrLogsNotFound) { 109 + continue 110 + } 111 + return ch, err 112 + } 113 + return nil, ErrLogsNotFound 114 + } 115 + 116 + // pick decodes raw and returns the first registered provider whose 117 + // key appears as a child of the top-level `tack:` map, walking the 118 + // YAML's children in document order. An empty body or YAML with no 119 + // `tack:` block — or one whose children name no registered provider 120 + // — is a structural error; no routing decision is possible. 121 + func (r *providerRouter) pick(raw string) (Provider, error) { 122 + if strings.TrimSpace(raw) == "" { 123 + return nil, errors.New("workflow body is empty") 124 + } 125 + // MapSlice preserves the on-the-wire ordering of the children 126 + // of `tack:` so "first match" is deterministic w.r.t. the YAML 127 + // document, not Go's randomised map iteration. 128 + var doc struct { 129 + Tack yaml.MapSlice `yaml:"tack"` 130 + } 131 + if err := yaml.Unmarshal([]byte(raw), &doc); err != nil { 132 + return nil, fmt.Errorf("parse workflow yaml: %w", err) 133 + } 134 + for _, item := range doc.Tack { 135 + // YAML map keys are usually strings, but the lexicon 136 + // doesn't enforce that — guard so a stray int/bool key 137 + // doesn't panic the type assertion. 138 + key, ok := item.Key.(string) 139 + if !ok { 140 + continue 141 + } 142 + if p, ok := r.providers[key]; ok { 143 + return p, nil 144 + } 145 + } 146 + return nil, fmt.Errorf( 147 + "workflow yaml has no `tack:` key matching a registered provider", 148 + ) 149 + }
+229
provider_router_test.go
··· 1 + package main 2 + 3 + // Tests for providerRouter. We use a tiny in-test stub Provider — 4 + // stubProvider — that records every Spawn call and serves canned 5 + // Logs responses, so the tests stay focused on routing behaviour 6 + // (which provider got called for which workflow YAML, and how 7 + // ErrLogsNotFound is fanned out) without dragging in either the 8 + // fake or Buildkite providers' end-to-end machinery. 9 + 10 + import ( 11 + "context" 12 + "errors" 13 + "log/slog" 14 + "sync" 15 + "testing" 16 + 17 + "tangled.org/core/api/tangled" 18 + ) 19 + 20 + // stubProvider is a minimal Provider for routing tests. spawnCalls 21 + // captures the workflow names handed to Spawn (single-element slices 22 + // per the router's contract) so a test can assert which provider got 23 + // which workflow. logsErr / logsCh govern what Logs returns; the 24 + // default (zero value) is ErrLogsNotFound + nil channel, which makes 25 + // fan-out tests easy to express by overriding only the provider that 26 + // should "claim" the request. 27 + type stubProvider struct { 28 + mu sync.Mutex 29 + spawnCalls []string 30 + 31 + logsErr error 32 + logsCh chan LogLine 33 + } 34 + 35 + var _ Provider = (*stubProvider)(nil) 36 + 37 + func (s *stubProvider) Spawn( 38 + _ context.Context, 39 + _ string, 40 + _ string, 41 + _ *tangled.Pipeline_TriggerMetadata, 42 + workflows []*tangled.Pipeline_Workflow, 43 + ) { 44 + s.mu.Lock() 45 + defer s.mu.Unlock() 46 + for _, wf := range workflows { 47 + if wf == nil { 48 + continue 49 + } 50 + s.spawnCalls = append(s.spawnCalls, wf.Name) 51 + } 52 + } 53 + 54 + func (s *stubProvider) Logs( 55 + _ context.Context, 56 + _ string, 57 + _ string, 58 + _ string, 59 + ) (<-chan LogLine, error) { 60 + if s.logsErr != nil { 61 + return nil, s.logsErr 62 + } 63 + if s.logsCh != nil { 64 + return s.logsCh, nil 65 + } 66 + return nil, ErrLogsNotFound 67 + } 68 + 69 + // names returns a defensive copy of spawnCalls so the test can read 70 + // it without racing the router's per-workflow loop. The router calls 71 + // Spawn synchronously in the test process, but a copy is the safer 72 + // pattern if that ever changes. 73 + func (s *stubProvider) names() []string { 74 + s.mu.Lock() 75 + defer s.mu.Unlock() 76 + out := make([]string, len(s.spawnCalls)) 77 + copy(out, s.spawnCalls) 78 + return out 79 + } 80 + 81 + // newRouterTest wires a router with a fixed pair of stubs ("a", "b") 82 + // so tests can focus on the YAML → provider mapping. The stubs are 83 + // returned alongside so each case can inspect what it received. 84 + func newRouterTest() (*providerRouter, *stubProvider, *stubProvider) { 85 + a := &stubProvider{} 86 + b := &stubProvider{} 87 + r := newProviderRouter(slog.Default(), map[string]Provider{ 88 + "a": a, 89 + "b": b, 90 + }) 91 + return r, a, b 92 + } 93 + 94 + // TestProviderRouterSpawnRoutesByYAMLKey exercises the basic happy 95 + // path: each workflow's `tack:` block names exactly one provider key, 96 + // and the router hands that workflow to the matching provider only. 97 + func TestProviderRouterSpawnRoutesByYAMLKey(t *testing.T) { 98 + r, a, b := newRouterTest() 99 + 100 + r.Spawn(context.Background(), "knot", "rkey", nil, 101 + []*tangled.Pipeline_Workflow{ 102 + {Name: "wf-a.yml", Raw: "tack:\n a: {}\n"}, 103 + {Name: "wf-b.yml", Raw: "tack:\n b: {}\n"}, 104 + }, 105 + ) 106 + 107 + if got, want := a.names(), []string{"wf-a.yml"}; !equalStrings(got, want) { 108 + t.Fatalf("provider a got %v; want %v", got, want) 109 + } 110 + if got, want := b.names(), []string{"wf-b.yml"}; !equalStrings(got, want) { 111 + t.Fatalf("provider b got %v; want %v", got, want) 112 + } 113 + } 114 + 115 + // TestProviderRouterSpawnFirstYAMLKeyWins pins tie-breaking when a 116 + // workflow lists multiple provider keys: the YAML's document-order 117 + // child of `tack:` wins, regardless of the Go map's iteration order. 118 + func TestProviderRouterSpawnFirstYAMLKeyWins(t *testing.T) { 119 + r, a, b := newRouterTest() 120 + 121 + // `b` is listed first under `tack:` so it should claim the 122 + // workflow even though both keys are registered. 123 + r.Spawn(context.Background(), "knot", "rkey", nil, 124 + []*tangled.Pipeline_Workflow{ 125 + {Name: "both.yml", Raw: "tack:\n b: {}\n a: {}\n"}, 126 + }, 127 + ) 128 + 129 + if got := a.names(); len(got) != 0 { 130 + t.Fatalf("provider a should not have been called; got %v", got) 131 + } 132 + if got, want := b.names(), []string{"both.yml"}; !equalStrings(got, want) { 133 + t.Fatalf("provider b got %v; want %v", got, want) 134 + } 135 + } 136 + 137 + // TestProviderRouterSpawnSkipsUnroutable confirms that workflows 138 + // whose YAML has no matching provider key are skipped (logged but 139 + // not dispatched) and don't poison the rest of the batch. 140 + func TestProviderRouterSpawnSkipsUnroutable(t *testing.T) { 141 + r, a, b := newRouterTest() 142 + 143 + r.Spawn(context.Background(), "knot", "rkey", nil, 144 + []*tangled.Pipeline_Workflow{ 145 + // No `tack:` key at all. 146 + {Name: "bare.yml", Raw: "steps: []\n"}, 147 + // `tack:` present but with an unknown sub-key. 148 + {Name: "unknown.yml", Raw: "tack:\n nope: {}\n"}, 149 + // Empty body — also unroutable. 150 + {Name: "empty.yml", Raw: ""}, 151 + // And one good one to prove the loop kept going. 152 + {Name: "good.yml", Raw: "tack:\n a: {}\n"}, 153 + }, 154 + ) 155 + 156 + if got, want := a.names(), []string{"good.yml"}; !equalStrings(got, want) { 157 + t.Fatalf("provider a got %v; want %v", got, want) 158 + } 159 + if got := b.names(); len(got) != 0 { 160 + t.Fatalf("provider b should not have been called; got %v", got) 161 + } 162 + } 163 + 164 + // TestProviderRouterLogsFanOut verifies that Logs walks the 165 + // providers and returns the channel from the first one that doesn't 166 + // say ErrLogsNotFound. We seed exactly one provider with a real 167 + // channel; map iteration order is unspecified but only one provider 168 + // can possibly answer, so the test is deterministic. 169 + func TestProviderRouterLogsFanOut(t *testing.T) { 170 + r, _, b := newRouterTest() 171 + 172 + want := make(chan LogLine) 173 + b.logsCh = want 174 + 175 + got, err := r.Logs(context.Background(), "k", "p", "w") 176 + if err != nil { 177 + t.Fatalf("Logs: %v", err) 178 + } 179 + if got != (<-chan LogLine)(want) { 180 + t.Fatalf("got channel %v; want %v", got, want) 181 + } 182 + } 183 + 184 + // TestProviderRouterLogsAllNotFound makes sure that when no provider 185 + // claims the tuple, the router surfaces ErrLogsNotFound itself — 186 + // this is what the HTTP handler maps to a 404. 187 + func TestProviderRouterLogsAllNotFound(t *testing.T) { 188 + r, _, _ := newRouterTest() 189 + 190 + ch, err := r.Logs(context.Background(), "k", "p", "w") 191 + if !errors.Is(err, ErrLogsNotFound) { 192 + t.Fatalf("err = %v; want ErrLogsNotFound", err) 193 + } 194 + if ch != nil { 195 + t.Fatalf("channel should be nil on not-found") 196 + } 197 + } 198 + 199 + // TestProviderRouterLogsBackendError confirms that a non-NotFound 200 + // error from a provider is returned verbatim instead of being 201 + // masked by the fan-out — backend failures must reach the HTTP 202 + // caller as 5xx, not be silently retried elsewhere. 203 + func TestProviderRouterLogsBackendError(t *testing.T) { 204 + a := &stubProvider{logsErr: errors.New("boom")} 205 + r := newProviderRouter(slog.Default(), map[string]Provider{"a": a}) 206 + 207 + ch, err := r.Logs(context.Background(), "k", "p", "w") 208 + if err == nil || err.Error() != "boom" { 209 + t.Fatalf("err = %v; want boom", err) 210 + } 211 + if ch != nil { 212 + t.Fatalf("channel should be nil on backend error") 213 + } 214 + } 215 + 216 + // equalStrings is a small helper to compare ordered string slices — 217 + // the router preserves workflow order within Spawn, so the tests 218 + // assert against ordered slices rather than sets. 219 + func equalStrings(a, b []string) bool { 220 + if len(a) != len(b) { 221 + return false 222 + } 223 + for i := range a { 224 + if a[i] != b[i] { 225 + return false 226 + } 227 + } 228 + return true 229 + }