Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

spindle: make workflows engine-agnostic

Signed-off-by: Winter <winter@winter.cafe>

authored by

Winter and committed by
Tangled
251f3d44 3ee002a1

+726 -830
api/tangled/cbor_gen.go

This is a binary file and will not be displayed.

api/tangled/tangledpipeline.go

This is a binary file and will not be displayed.

-2
cmd/gen.go
··· 27 27 tangled.KnotMember{}, 28 28 tangled.Pipeline{}, 29 29 tangled.Pipeline_CloneOpts{}, 30 - tangled.Pipeline_Dependency{}, 31 30 tangled.Pipeline_ManualTriggerData{}, 32 31 tangled.Pipeline_Pair{}, 33 32 tangled.Pipeline_PullRequestTriggerData{}, 34 33 tangled.Pipeline_PushTriggerData{}, 35 34 tangled.PipelineStatus{}, 36 - tangled.Pipeline_Step{}, 37 35 tangled.Pipeline_TriggerMetadata{}, 38 36 tangled.Pipeline_TriggerRepo{}, 39 37 tangled.Pipeline_Workflow{},
+26 -3
docs/spindle/pipeline.md
··· 4 4 repo. Generally: 5 5 6 6 * Pipelines are defined in YAML. 7 - * Dependencies can be specified from 8 - [Nixpkgs](https://search.nixos.org) or custom registries. 9 - * Environment variables can be set globally or per-step. 7 + * Workflows can run using different *engines*. 8 + 9 + The most barebones workflow looks like this: 10 + 11 + ```yaml 12 + when: 13 + - event: ["push"] 14 + branch: ["main"] 15 + 16 + engine: "nixery" 17 + 18 + # optional 19 + clone: 20 + skip: false 21 + depth: 50 22 + submodules: true 23 + ``` 24 + 25 + The `when` and `engine` fields are required, while every other aspect 26 + of how the definition is parsed is up to the engine. Currently, a spindle 27 + provides at least one of these built-in engines: 28 + 29 + ## `nixery` 30 + 31 + The Nixery engine uses an instance of [Nixery](https://nixery.dev) to run 32 + steps that use dependencies from [Nixpkgs](https://github.com/NixOS/nixpkgs). 10 33 11 34 Here's an example that uses all fields: 12 35
+7 -63
lexicons/pipeline/pipeline.json
··· 149 149 "type": "object", 150 150 "required": [ 151 151 "name", 152 - "dependencies", 153 - "steps", 154 - "environment", 155 - "clone" 152 + "engine", 153 + "clone", 154 + "raw" 156 155 ], 157 156 "properties": { 158 157 "name": { 159 158 "type": "string" 160 159 }, 161 - "dependencies": { 162 - "type": "array", 163 - "items": { 164 - "type": "ref", 165 - "ref": "#dependency" 166 - } 167 - }, 168 - "steps": { 169 - "type": "array", 170 - "items": { 171 - "type": "ref", 172 - "ref": "#step" 173 - } 174 - }, 175 - "environment": { 176 - "type": "array", 177 - "items": { 178 - "type": "ref", 179 - "ref": "#pair" 180 - } 160 + "engine": { 161 + "type": "string" 181 162 }, 182 163 "clone": { 183 164 "type": "ref", 184 165 "ref": "#cloneOpts" 185 - } 186 - } 187 - }, 188 - "dependency": { 189 - "type": "object", 190 - "required": [ 191 - "registry", 192 - "packages" 193 - ], 194 - "properties": { 195 - "registry": { 196 - "type": "string" 197 166 }, 198 - "packages": { 199 - "type": "array", 200 - "items": { 201 - "type": "string" 202 - } 167 + "raw": { 168 + "type": "string" 203 169 } 204 170 } 205 171 }, ··· 185 219 }, 186 220 "submodules": { 187 221 "type": "boolean" 188 - } 189 - } 190 - }, 191 - "step": { 192 - "type": "object", 193 - "required": [ 194 - "name", 195 - "command" 196 - ], 197 - "properties": { 198 - "name": { 199 - "type": "string" 200 - }, 201 - "command": { 202 - "type": "string" 203 - }, 204 - "environment": { 205 - "type": "array", 206 - "items": { 207 - "type": "ref", 208 - "ref": "#pair" 209 - } 210 222 } 211 223 } 212 224 },
+2 -2
nix/modules/spindle.nix
··· 111 111 "SPINDLE_SERVER_SECRETS_PROVIDER=${cfg.server.secrets.provider}" 112 112 "SPINDLE_SERVER_SECRETS_OPENBAO_PROXY_ADDR=${cfg.server.secrets.openbao.proxyAddr}" 113 113 "SPINDLE_SERVER_SECRETS_OPENBAO_MOUNT=${cfg.server.secrets.openbao.mount}" 114 - "SPINDLE_PIPELINES_NIXERY=${cfg.pipelines.nixery}" 115 - "SPINDLE_PIPELINES_WORKFLOW_TIMEOUT=${cfg.pipelines.workflowTimeout}" 114 + "SPINDLE_NIXERY_PIPELINES_NIXERY=${cfg.pipelines.nixery}" 115 + "SPINDLE_NIXERY_PIPELINES_WORKFLOW_TIMEOUT=${cfg.pipelines.workflowTimeout}" 116 116 ]; 117 117 ExecStart = "${cfg.package}/bin/spindle"; 118 118 Restart = "always";
+4 -4
spindle/config/config.go
··· 16 16 Dev bool `env:"DEV, default=false"` 17 17 Owner string `env:"OWNER, required"` 18 18 Secrets Secrets `env:",prefix=SECRETS_"` 19 + LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 19 20 } 20 21 21 22 func (s Server) Did() syntax.DID { ··· 33 32 Mount string `env:"MOUNT, default=spindle"` 34 33 } 35 34 36 - type Pipelines struct { 35 + type NixeryPipelines struct { 37 36 Nixery string `env:"NIXERY, default=nixery.tangled.sh"` 38 37 WorkflowTimeout string `env:"WORKFLOW_TIMEOUT, default=5m"` 39 - LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 40 38 } 41 39 42 40 type Config struct { 43 - Server Server `env:",prefix=SPINDLE_SERVER_"` 44 - Pipelines Pipelines `env:",prefix=SPINDLE_PIPELINES_"` 41 + Server Server `env:",prefix=SPINDLE_SERVER_"` 42 + NixeryPipelines NixeryPipelines `env:",prefix=SPINDLE_NIXERY_PIPELINES_"` 45 43 } 46 44 47 45 func Load(ctx context.Context) (*Config, error) {
+1 -1
spindle/engine/ansi_stripper.go spindle/engines/nixery/ansi_stripper.go
··· 1 - package engine 1 + package nixery 2 2 3 3 import ( 4 4 "io"
+67 -419
spindle/engine/engine.go
··· 4 4 "context" 5 5 "errors" 6 6 "fmt" 7 - "io" 8 7 "log/slog" 9 - "os" 10 - "strings" 11 - "sync" 12 - "time" 13 8 14 9 securejoin "github.com/cyphar/filepath-securejoin" 15 - "github.com/docker/docker/api/types/container" 16 - "github.com/docker/docker/api/types/image" 17 - "github.com/docker/docker/api/types/mount" 18 - "github.com/docker/docker/api/types/network" 19 - "github.com/docker/docker/api/types/volume" 20 - "github.com/docker/docker/client" 21 - "github.com/docker/docker/pkg/stdcopy" 22 10 "golang.org/x/sync/errgroup" 23 - "tangled.sh/tangled.sh/core/log" 24 11 "tangled.sh/tangled.sh/core/notifier" 25 12 "tangled.sh/tangled.sh/core/spindle/config" 26 13 "tangled.sh/tangled.sh/core/spindle/db" ··· 15 28 "tangled.sh/tangled.sh/core/spindle/secrets" 16 29 ) 17 30 18 - const ( 19 - workspaceDir = "/tangled/workspace" 31 + var ( 32 + ErrTimedOut = errors.New("timed out") 33 + ErrWorkflowFailed = errors.New("workflow failed") 20 34 ) 21 35 22 - type cleanupFunc func(context.Context) error 23 - 24 - type Engine struct { 25 - docker client.APIClient 26 - l *slog.Logger 27 - db *db.DB 28 - n *notifier.Notifier 29 - cfg *config.Config 30 - vault secrets.Manager 31 - 32 - cleanupMu sync.Mutex 33 - cleanup map[string][]cleanupFunc 34 - } 35 - 36 - func New(ctx context.Context, cfg *config.Config, db *db.DB, n *notifier.Notifier, vault secrets.Manager) (*Engine, error) { 37 - dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 38 - if err != nil { 39 - return nil, err 40 - } 41 - 42 - l := log.FromContext(ctx).With("component", "spindle") 43 - 44 - e := &Engine{ 45 - docker: dcli, 46 - l: l, 47 - db: db, 48 - n: n, 49 - cfg: cfg, 50 - vault: vault, 51 - } 52 - 53 - e.cleanup = make(map[string][]cleanupFunc) 54 - 55 - return e, nil 56 - } 57 - 58 - func (e *Engine) StartWorkflows(ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 59 - e.l.Info("starting all workflows in parallel", "pipeline", pipelineId) 36 + func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 37 + l.Info("starting all workflows in parallel", "pipeline", pipelineId) 60 38 61 39 // extract secrets 62 40 var allSecrets []secrets.UnlockedSecret 63 41 if didSlashRepo, err := securejoin.SecureJoin(pipeline.RepoOwner, pipeline.RepoName); err == nil { 64 - if res, err := e.vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil { 42 + if res, err := vault.GetSecretsUnlocked(ctx, secrets.DidSlashRepo(didSlashRepo)); err == nil { 65 43 allSecrets = res 66 44 } 67 45 } 68 46 69 - workflowTimeoutStr := e.cfg.Pipelines.WorkflowTimeout 70 - workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 71 - if err != nil { 72 - e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 73 - workflowTimeout = 5 * time.Minute 74 - } 75 - e.l.Info("using workflow timeout", "timeout", workflowTimeout) 76 - 77 47 eg, ctx := errgroup.WithContext(ctx) 78 - for _, w := range pipeline.Workflows { 79 - eg.Go(func() error { 80 - wid := models.WorkflowId{ 81 - PipelineId: pipelineId, 82 - Name: w.Name, 83 - } 48 + for eng, wfs := range pipeline.Workflows { 49 + workflowTimeout := eng.WorkflowTimeout() 50 + l.Info("using workflow timeout", "timeout", workflowTimeout) 84 51 85 - err := e.db.StatusRunning(wid, e.n) 86 - if err != nil { 87 - return err 88 - } 52 + for _, w := range wfs { 53 + eg.Go(func() error { 54 + wid := models.WorkflowId{ 55 + PipelineId: pipelineId, 56 + Name: w.Name, 57 + } 89 58 90 - err = e.SetupWorkflow(ctx, wid) 91 - if err != nil { 92 - e.l.Error("setting up worklow", "wid", wid, "err", err) 93 - return err 94 - } 95 - defer e.DestroyWorkflow(ctx, wid) 96 - 97 - reader, err := e.docker.ImagePull(ctx, w.Image, image.PullOptions{}) 98 - if err != nil { 99 - e.l.Error("pipeline image pull failed!", "image", w.Image, "workflowId", wid, "error", err.Error()) 100 - 101 - err := e.db.StatusFailed(wid, err.Error(), -1, e.n) 59 + err := db.StatusRunning(wid, n) 102 60 if err != nil { 103 61 return err 104 62 } 105 63 106 - return fmt.Errorf("pulling image: %w", err) 107 - } 108 - defer reader.Close() 109 - io.Copy(os.Stdout, reader) 64 + err = eng.SetupWorkflow(ctx, wid, &w) 65 + if err != nil { 66 + // TODO(winter): Should this always set StatusFailed? 67 + // In the original, we only do in a subset of cases. 68 + l.Error("setting up worklow", "wid", wid, "err", err) 110 69 111 - ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 112 - defer cancel() 113 - 114 - err = e.StartSteps(ctx, wid, w, allSecrets) 115 - if err != nil { 116 - if errors.Is(err, ErrTimedOut) { 117 - dbErr := e.db.StatusTimeout(wid, e.n) 70 + dbErr := db.StatusFailed(wid, err.Error(), -1, n) 118 71 if dbErr != nil { 119 72 return dbErr 120 73 } 74 + return err 75 + } 76 + defer eng.DestroyWorkflow(ctx, wid) 77 + 78 + wfLogger, err := models.NewWorkflowLogger(cfg.Server.LogDir, wid) 79 + if err != nil { 80 + l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 81 + wfLogger = nil 121 82 } else { 122 - dbErr := e.db.StatusFailed(wid, err.Error(), -1, e.n) 123 - if dbErr != nil { 124 - return dbErr 83 + defer wfLogger.Close() 84 + } 85 + 86 + ctx, cancel := context.WithTimeout(ctx, workflowTimeout) 87 + defer cancel() 88 + 89 + for stepIdx, step := range w.Steps { 90 + if wfLogger != nil { 91 + ctl := wfLogger.ControlWriter(stepIdx, step) 92 + ctl.Write([]byte(step.Name())) 93 + } 94 + 95 + err = eng.RunStep(ctx, wid, &w, stepIdx, allSecrets, wfLogger) 96 + if err != nil { 97 + if errors.Is(err, ErrTimedOut) { 98 + dbErr := db.StatusTimeout(wid, n) 99 + if dbErr != nil { 100 + return dbErr 101 + } 102 + } else { 103 + dbErr := db.StatusFailed(wid, err.Error(), -1, n) 104 + if dbErr != nil { 105 + return dbErr 106 + } 107 + } 108 + 109 + return fmt.Errorf("starting steps image: %w", err) 125 110 } 126 111 } 127 112 128 - return fmt.Errorf("starting steps image: %w", err) 129 - } 113 + err = db.StatusSuccess(wid, n) 114 + if err != nil { 115 + return err 116 + } 130 117 131 - err = e.db.StatusSuccess(wid, e.n) 132 - if err != nil { 133 - return err 134 - } 135 - 136 - return nil 137 - }) 118 + return nil 119 + }) 120 + } 138 121 } 139 122 140 - if err = eg.Wait(); err != nil { 141 - e.l.Error("failed to run one or more workflows", "err", err) 123 + if err := eg.Wait(); err != nil { 124 + l.Error("failed to run one or more workflows", "err", err) 142 125 } else { 143 - e.l.Error("successfully ran full pipeline") 126 + l.Error("successfully ran full pipeline") 144 127 } 145 - } 146 - 147 - // SetupWorkflow sets up a new network for the workflow and volumes for 148 - // the workspace and Nix store. These are persisted across steps and are 149 - // destroyed at the end of the workflow. 150 - func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId) error { 151 - e.l.Info("setting up workflow", "workflow", wid) 152 - 153 - _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 154 - Name: workspaceVolume(wid), 155 - Driver: "local", 156 - }) 157 - if err != nil { 158 - return err 159 - } 160 - e.registerCleanup(wid, func(ctx context.Context) error { 161 - return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 162 - }) 163 - 164 - _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 165 - Name: nixVolume(wid), 166 - Driver: "local", 167 - }) 168 - if err != nil { 169 - return err 170 - } 171 - e.registerCleanup(wid, func(ctx context.Context) error { 172 - return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 173 - }) 174 - 175 - _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 176 - Driver: "bridge", 177 - }) 178 - if err != nil { 179 - return err 180 - } 181 - e.registerCleanup(wid, func(ctx context.Context) error { 182 - return e.docker.NetworkRemove(ctx, networkName(wid)) 183 - }) 184 - 185 - return nil 186 - } 187 - 188 - // StartSteps starts all steps sequentially with the same base image. 189 - // ONLY marks pipeline as failed if container's exit code is non-zero. 190 - // All other errors are bubbled up. 191 - // Fixed version of the step execution logic 192 - func (e *Engine) StartSteps(ctx context.Context, wid models.WorkflowId, w models.Workflow, secrets []secrets.UnlockedSecret) error { 193 - workflowEnvs := ConstructEnvs(w.Environment) 194 - for _, s := range secrets { 195 - workflowEnvs.AddEnv(s.Key, s.Value) 196 - } 197 - 198 - for stepIdx, step := range w.Steps { 199 - select { 200 - case <-ctx.Done(): 201 - return ctx.Err() 202 - default: 203 - } 204 - 205 - envs := append(EnvVars(nil), workflowEnvs...) 206 - for k, v := range step.Environment { 207 - envs.AddEnv(k, v) 208 - } 209 - envs.AddEnv("HOME", workspaceDir) 210 - e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 211 - 212 - hostConfig := hostConfig(wid) 213 - resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 214 - Image: w.Image, 215 - Cmd: []string{"bash", "-c", step.Command}, 216 - WorkingDir: workspaceDir, 217 - Tty: false, 218 - Hostname: "spindle", 219 - Env: envs.Slice(), 220 - }, hostConfig, nil, nil, "") 221 - defer e.DestroyStep(ctx, resp.ID) 222 - if err != nil { 223 - return fmt.Errorf("creating container: %w", err) 224 - } 225 - 226 - err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 227 - if err != nil { 228 - return fmt.Errorf("connecting network: %w", err) 229 - } 230 - 231 - err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 232 - if err != nil { 233 - return err 234 - } 235 - e.l.Info("started container", "name", resp.ID, "step", step.Name) 236 - 237 - // start tailing logs in background 238 - tailDone := make(chan error, 1) 239 - go func() { 240 - tailDone <- e.TailStep(ctx, resp.ID, wid, stepIdx, step) 241 - }() 242 - 243 - // wait for container completion or timeout 244 - waitDone := make(chan struct{}) 245 - var state *container.State 246 - var waitErr error 247 - 248 - go func() { 249 - defer close(waitDone) 250 - state, waitErr = e.WaitStep(ctx, resp.ID) 251 - }() 252 - 253 - select { 254 - case <-waitDone: 255 - 256 - // wait for tailing to complete 257 - <-tailDone 258 - 259 - case <-ctx.Done(): 260 - e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) 261 - err = e.DestroyStep(context.Background(), resp.ID) 262 - if err != nil { 263 - e.l.Error("failed to destroy step", "container", resp.ID, "error", err) 264 - } 265 - 266 - // wait for both goroutines to finish 267 - <-waitDone 268 - <-tailDone 269 - 270 - return ErrTimedOut 271 - } 272 - 273 - select { 274 - case <-ctx.Done(): 275 - return ctx.Err() 276 - default: 277 - } 278 - 279 - if waitErr != nil { 280 - return waitErr 281 - } 282 - 283 - err = e.DestroyStep(ctx, resp.ID) 284 - if err != nil { 285 - return err 286 - } 287 - 288 - if state.ExitCode != 0 { 289 - e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 290 - if state.OOMKilled { 291 - return ErrOOMKilled 292 - } 293 - return ErrWorkflowFailed 294 - } 295 - } 296 - 297 - return nil 298 - } 299 - 300 - func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 301 - wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 302 - select { 303 - case err := <-errCh: 304 - if err != nil { 305 - return nil, err 306 - } 307 - case <-wait: 308 - } 309 - 310 - e.l.Info("waited for container", "name", containerID) 311 - 312 - info, err := e.docker.ContainerInspect(ctx, containerID) 313 - if err != nil { 314 - return nil, err 315 - } 316 - 317 - return info.State, nil 318 - } 319 - 320 - func (e *Engine) TailStep(ctx context.Context, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 321 - wfLogger, err := NewWorkflowLogger(e.cfg.Pipelines.LogDir, wid) 322 - if err != nil { 323 - e.l.Warn("failed to setup step logger; logs will not be persisted", "error", err) 324 - return err 325 - } 326 - defer wfLogger.Close() 327 - 328 - ctl := wfLogger.ControlWriter(stepIdx, step) 329 - ctl.Write([]byte(step.Name)) 330 - 331 - logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 332 - Follow: true, 333 - ShowStdout: true, 334 - ShowStderr: true, 335 - Details: false, 336 - Timestamps: false, 337 - }) 338 - if err != nil { 339 - return err 340 - } 341 - 342 - _, err = stdcopy.StdCopy( 343 - wfLogger.DataWriter("stdout"), 344 - wfLogger.DataWriter("stderr"), 345 - logs, 346 - ) 347 - if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 348 - return fmt.Errorf("failed to copy logs: %w", err) 349 - } 350 - 351 - return nil 352 - } 353 - 354 - func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 355 - err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 356 - if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 357 - return err 358 - } 359 - 360 - if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 361 - RemoveVolumes: true, 362 - RemoveLinks: false, 363 - Force: false, 364 - }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 365 - return err 366 - } 367 - 368 - return nil 369 - } 370 - 371 - func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 372 - e.cleanupMu.Lock() 373 - key := wid.String() 374 - 375 - fns := e.cleanup[key] 376 - delete(e.cleanup, key) 377 - e.cleanupMu.Unlock() 378 - 379 - for _, fn := range fns { 380 - if err := fn(ctx); err != nil { 381 - e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 382 - } 383 - } 384 - return nil 385 - } 386 - 387 - func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 388 - e.cleanupMu.Lock() 389 - defer e.cleanupMu.Unlock() 390 - 391 - key := wid.String() 392 - e.cleanup[key] = append(e.cleanup[key], fn) 393 - } 394 - 395 - func workspaceVolume(wid models.WorkflowId) string { 396 - return fmt.Sprintf("workspace-%s", wid) 397 - } 398 - 399 - func nixVolume(wid models.WorkflowId) string { 400 - return fmt.Sprintf("nix-%s", wid) 401 - } 402 - 403 - func networkName(wid models.WorkflowId) string { 404 - return fmt.Sprintf("workflow-network-%s", wid) 405 - } 406 - 407 - func hostConfig(wid models.WorkflowId) *container.HostConfig { 408 - hostConfig := &container.HostConfig{ 409 - Mounts: []mount.Mount{ 410 - { 411 - Type: mount.TypeVolume, 412 - Source: workspaceVolume(wid), 413 - Target: workspaceDir, 414 - }, 415 - { 416 - Type: mount.TypeVolume, 417 - Source: nixVolume(wid), 418 - Target: "/nix", 419 - }, 420 - { 421 - Type: mount.TypeTmpfs, 422 - Target: "/tmp", 423 - ReadOnly: false, 424 - TmpfsOptions: &mount.TmpfsOptions{ 425 - Mode: 0o1777, // world-writeable sticky bit 426 - Options: [][]string{ 427 - {"exec"}, 428 - }, 429 - }, 430 - }, 431 - { 432 - Type: mount.TypeVolume, 433 - Source: "etc-nix-" + wid.String(), 434 - Target: "/etc/nix", 435 - }, 436 - }, 437 - ReadonlyRootfs: false, 438 - CapDrop: []string{"ALL"}, 439 - CapAdd: []string{"CAP_DAC_OVERRIDE"}, 440 - SecurityOpt: []string{"no-new-privileges"}, 441 - ExtraHosts: []string{"host.docker.internal:host-gateway"}, 442 - } 443 - 444 - return hostConfig 445 - } 446 - 447 - // thanks woodpecker 448 - func isErrContainerNotFoundOrNotRunning(err error) bool { 449 - // Error response from daemon: Cannot kill container: ...: No such container: ... 450 - // Error response from daemon: Cannot kill container: ...: Container ... is not running" 451 - // Error response from podman daemon: can only kill running containers. ... is in state exited 452 - // Error: No such container: ... 453 - return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 454 128 }
+1 -1
spindle/engine/envs.go spindle/engines/nixery/envs.go
··· 1 - package engine 1 + package nixery 2 2 3 3 import ( 4 4 "fmt"
+1 -1
spindle/engine/envs_test.go spindle/engines/nixery/envs_test.go
··· 1 - package engine 1 + package nixery 2 2 3 3 import ( 4 4 "testing"
-9
spindle/engine/errors.go
··· 1 - package engine 2 - 3 - import "errors" 4 - 5 - var ( 6 - ErrOOMKilled = errors.New("oom killed") 7 - ErrTimedOut = errors.New("timed out") 8 - ErrWorkflowFailed = errors.New("workflow failed") 9 - )
+8 -10
spindle/engine/logger.go spindle/models/logger.go
··· 1 - package engine 1 + package models 2 2 3 3 import ( 4 4 "encoding/json" ··· 7 7 "os" 8 8 "path/filepath" 9 9 "strings" 10 - 11 - "tangled.sh/tangled.sh/core/spindle/models" 12 10 ) 13 11 14 12 type WorkflowLogger struct { ··· 14 16 encoder *json.Encoder 15 17 } 16 18 17 - func NewWorkflowLogger(baseDir string, wid models.WorkflowId) (*WorkflowLogger, error) { 19 + func NewWorkflowLogger(baseDir string, wid WorkflowId) (*WorkflowLogger, error) { 18 20 path := LogFilePath(baseDir, wid) 19 21 20 22 file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) ··· 28 30 }, nil 29 31 } 30 32 31 - func LogFilePath(baseDir string, workflowID models.WorkflowId) string { 33 + func LogFilePath(baseDir string, workflowID WorkflowId) string { 32 34 logFilePath := filepath.Join(baseDir, fmt.Sprintf("%s.log", workflowID.String())) 33 35 return logFilePath 34 36 } ··· 45 47 } 46 48 } 47 49 48 - func (l *WorkflowLogger) ControlWriter(idx int, step models.Step) io.Writer { 50 + func (l *WorkflowLogger) ControlWriter(idx int, step Step) io.Writer { 49 51 return &controlWriter{ 50 52 logger: l, 51 53 idx: idx, ··· 60 62 61 63 func (w *dataWriter) Write(p []byte) (int, error) { 62 64 line := strings.TrimRight(string(p), "\r\n") 63 - entry := models.NewDataLogLine(line, w.stream) 65 + entry := NewDataLogLine(line, w.stream) 64 66 if err := w.logger.encoder.Encode(entry); err != nil { 65 67 return 0, err 66 68 } ··· 70 72 type controlWriter struct { 71 73 logger *WorkflowLogger 72 74 idx int 73 - step models.Step 75 + step Step 74 76 } 75 77 76 78 func (w *controlWriter) Write(_ []byte) (int, error) { 77 - entry := models.NewControlLogLine(w.idx, w.step) 79 + entry := NewControlLogLine(w.idx, w.step) 78 80 if err := w.logger.encoder.Encode(entry); err != nil { 79 81 return 0, err 80 82 } 81 - return len(w.step.Name), nil 83 + return len(w.step.Name()), nil 82 84 }
+476
spindle/engines/nixery/engine.go
··· 1 + package nixery 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "io" 8 + "log/slog" 9 + "os" 10 + "path" 11 + "strings" 12 + "sync" 13 + "time" 14 + 15 + "github.com/docker/docker/api/types/container" 16 + "github.com/docker/docker/api/types/image" 17 + "github.com/docker/docker/api/types/mount" 18 + "github.com/docker/docker/api/types/network" 19 + "github.com/docker/docker/api/types/volume" 20 + "github.com/docker/docker/client" 21 + "github.com/docker/docker/pkg/stdcopy" 22 + "gopkg.in/yaml.v3" 23 + "tangled.sh/tangled.sh/core/api/tangled" 24 + "tangled.sh/tangled.sh/core/log" 25 + "tangled.sh/tangled.sh/core/spindle/config" 26 + "tangled.sh/tangled.sh/core/spindle/engine" 27 + "tangled.sh/tangled.sh/core/spindle/models" 28 + "tangled.sh/tangled.sh/core/spindle/secrets" 29 + ) 30 + 31 + const ( 32 + workspaceDir = "/tangled/workspace" 33 + ) 34 + 35 + type cleanupFunc func(context.Context) error 36 + 37 + type Engine struct { 38 + docker client.APIClient 39 + l *slog.Logger 40 + cfg *config.Config 41 + 42 + cleanupMu sync.Mutex 43 + cleanup map[string][]cleanupFunc 44 + } 45 + 46 + type Step struct { 47 + name string 48 + kind models.StepKind 49 + command string 50 + environment map[string]string 51 + } 52 + 53 + func (s Step) Name() string { 54 + return s.name 55 + } 56 + 57 + func (s Step) Command() string { 58 + return s.command 59 + } 60 + 61 + func (s Step) Kind() models.StepKind { 62 + return s.kind 63 + } 64 + 65 + // setupSteps get added to start of Steps 66 + type setupSteps []models.Step 67 + 68 + // addStep adds a step to the beginning of the workflow's steps. 69 + func (ss *setupSteps) addStep(step models.Step) { 70 + *ss = append(*ss, step) 71 + } 72 + 73 + type addlFields struct { 74 + image string 75 + env map[string]string 76 + } 77 + 78 + func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) { 79 + swf := &models.Workflow{} 80 + addl := addlFields{} 81 + 82 + dwf := &struct { 83 + Steps []struct { 84 + Command string `yaml:"command"` 85 + Name string `yaml:"name"` 86 + Environment map[string]string `yaml:"environment"` 87 + } `yaml:"steps"` 88 + Dependencies map[string][]string `yaml:"dependencies"` 89 + Environment map[string]string `yaml:"environment"` 90 + }{} 91 + err := yaml.Unmarshal([]byte(twf.Raw), &dwf) 92 + if err != nil { 93 + return nil, err 94 + } 95 + 96 + for _, dstep := range dwf.Steps { 97 + sstep := Step{} 98 + sstep.environment = dstep.Environment 99 + sstep.command = dstep.Command 100 + sstep.name = dstep.Name 101 + sstep.kind = models.StepKindUser 102 + swf.Steps = append(swf.Steps, sstep) 103 + } 104 + swf.Name = twf.Name 105 + addl.env = dwf.Environment 106 + addl.image = workflowImage(dwf.Dependencies, e.cfg.NixeryPipelines.Nixery) 107 + 108 + setup := &setupSteps{} 109 + 110 + setup.addStep(nixConfStep()) 111 + setup.addStep(cloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev)) 112 + // this step could be empty 113 + if s := dependencyStep(dwf.Dependencies); s != nil { 114 + setup.addStep(*s) 115 + } 116 + 117 + // append setup steps in order to the start of workflow steps 118 + swf.Steps = append(*setup, swf.Steps...) 119 + swf.Data = addl 120 + 121 + return swf, nil 122 + } 123 + 124 + func (e *Engine) WorkflowTimeout() time.Duration { 125 + workflowTimeoutStr := e.cfg.NixeryPipelines.WorkflowTimeout 126 + workflowTimeout, err := time.ParseDuration(workflowTimeoutStr) 127 + if err != nil { 128 + e.l.Error("failed to parse workflow timeout", "error", err, "timeout", workflowTimeoutStr) 129 + workflowTimeout = 5 * time.Minute 130 + } 131 + 132 + return workflowTimeout 133 + } 134 + 135 + func workflowImage(deps map[string][]string, nixery string) string { 136 + var dependencies string 137 + for reg, ds := range deps { 138 + if reg == "nixpkgs" { 139 + dependencies = path.Join(ds...) 140 + } 141 + } 142 + 143 + // load defaults from somewhere else 144 + dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 145 + 146 + return path.Join(nixery, dependencies) 147 + } 148 + 149 + func New(ctx context.Context, cfg *config.Config) (*Engine, error) { 150 + dcli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) 151 + if err != nil { 152 + return nil, err 153 + } 154 + 155 + l := log.FromContext(ctx).With("component", "spindle") 156 + 157 + e := &Engine{ 158 + docker: dcli, 159 + l: l, 160 + cfg: cfg, 161 + } 162 + 163 + e.cleanup = make(map[string][]cleanupFunc) 164 + 165 + return e, nil 166 + } 167 + 168 + // SetupWorkflow sets up a new network for the workflow and volumes for 169 + // the workspace and Nix store. These are persisted across steps and are 170 + // destroyed at the end of the workflow. 171 + func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow) error { 172 + e.l.Info("setting up workflow", "workflow", wid) 173 + 174 + _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 175 + Name: workspaceVolume(wid), 176 + Driver: "local", 177 + }) 178 + if err != nil { 179 + return err 180 + } 181 + e.registerCleanup(wid, func(ctx context.Context) error { 182 + return e.docker.VolumeRemove(ctx, workspaceVolume(wid), true) 183 + }) 184 + 185 + _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 186 + Name: nixVolume(wid), 187 + Driver: "local", 188 + }) 189 + if err != nil { 190 + return err 191 + } 192 + e.registerCleanup(wid, func(ctx context.Context) error { 193 + return e.docker.VolumeRemove(ctx, nixVolume(wid), true) 194 + }) 195 + 196 + _, err = e.docker.NetworkCreate(ctx, networkName(wid), network.CreateOptions{ 197 + Driver: "bridge", 198 + }) 199 + if err != nil { 200 + return err 201 + } 202 + e.registerCleanup(wid, func(ctx context.Context) error { 203 + return e.docker.NetworkRemove(ctx, networkName(wid)) 204 + }) 205 + 206 + addl := wf.Data.(addlFields) 207 + 208 + reader, err := e.docker.ImagePull(ctx, addl.image, image.PullOptions{}) 209 + if err != nil { 210 + e.l.Error("pipeline image pull failed!", "image", addl.image, "workflowId", wid, "error", err.Error()) 211 + 212 + return fmt.Errorf("pulling image: %w", err) 213 + } 214 + defer reader.Close() 215 + io.Copy(os.Stdout, reader) 216 + 217 + return nil 218 + } 219 + 220 + func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *models.WorkflowLogger) error { 221 + workflowEnvs := ConstructEnvs(w.Data.(addlFields).env) 222 + for _, s := range secrets { 223 + workflowEnvs.AddEnv(s.Key, s.Value) 224 + } 225 + 226 + step := w.Steps[idx].(Step) 227 + 228 + select { 229 + case <-ctx.Done(): 230 + return ctx.Err() 231 + default: 232 + } 233 + 234 + envs := append(EnvVars(nil), workflowEnvs...) 235 + for k, v := range step.environment { 236 + envs.AddEnv(k, v) 237 + } 238 + envs.AddEnv("HOME", workspaceDir) 239 + e.l.Debug("envs for step", "step", step.Name, "envs", envs.Slice()) 240 + 241 + hostConfig := hostConfig(wid) 242 + resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 243 + Image: w.Data.(addlFields).image, 244 + Cmd: []string{"bash", "-c", step.command}, 245 + WorkingDir: workspaceDir, 246 + Tty: false, 247 + Hostname: "spindle", 248 + Env: envs.Slice(), 249 + }, hostConfig, nil, nil, "") 250 + defer e.DestroyStep(ctx, resp.ID) 251 + if err != nil { 252 + return fmt.Errorf("creating container: %w", err) 253 + } 254 + 255 + err = e.docker.NetworkConnect(ctx, networkName(wid), resp.ID, nil) 256 + if err != nil { 257 + return fmt.Errorf("connecting network: %w", err) 258 + } 259 + 260 + err = e.docker.ContainerStart(ctx, resp.ID, container.StartOptions{}) 261 + if err != nil { 262 + return err 263 + } 264 + e.l.Info("started container", "name", resp.ID, "step", step.Name) 265 + 266 + // start tailing logs in background 267 + tailDone := make(chan error, 1) 268 + go func() { 269 + tailDone <- e.tailStep(ctx, wfLogger, resp.ID, wid, idx, step) 270 + }() 271 + 272 + // wait for container completion or timeout 273 + waitDone := make(chan struct{}) 274 + var state *container.State 275 + var waitErr error 276 + 277 + go func() { 278 + defer close(waitDone) 279 + state, waitErr = e.WaitStep(ctx, resp.ID) 280 + }() 281 + 282 + select { 283 + case <-waitDone: 284 + 285 + // wait for tailing to complete 286 + <-tailDone 287 + 288 + case <-ctx.Done(): 289 + e.l.Warn("step timed out; killing container", "container", resp.ID, "step", step.Name) 290 + err = e.DestroyStep(context.Background(), resp.ID) 291 + if err != nil { 292 + e.l.Error("failed to destroy step", "container", resp.ID, "error", err) 293 + } 294 + 295 + // wait for both goroutines to finish 296 + <-waitDone 297 + <-tailDone 298 + 299 + return engine.ErrTimedOut 300 + } 301 + 302 + select { 303 + case <-ctx.Done(): 304 + return ctx.Err() 305 + default: 306 + } 307 + 308 + if waitErr != nil { 309 + return waitErr 310 + } 311 + 312 + err = e.DestroyStep(ctx, resp.ID) 313 + if err != nil { 314 + return err 315 + } 316 + 317 + if state.ExitCode != 0 { 318 + e.l.Error("workflow failed!", "workflow_id", wid.String(), "error", state.Error, "exit_code", state.ExitCode, "oom_killed", state.OOMKilled) 319 + if state.OOMKilled { 320 + return ErrOOMKilled 321 + } 322 + return engine.ErrWorkflowFailed 323 + } 324 + 325 + return nil 326 + } 327 + 328 + func (e *Engine) WaitStep(ctx context.Context, containerID string) (*container.State, error) { 329 + wait, errCh := e.docker.ContainerWait(ctx, containerID, container.WaitConditionNotRunning) 330 + select { 331 + case err := <-errCh: 332 + if err != nil { 333 + return nil, err 334 + } 335 + case <-wait: 336 + } 337 + 338 + e.l.Info("waited for container", "name", containerID) 339 + 340 + info, err := e.docker.ContainerInspect(ctx, containerID) 341 + if err != nil { 342 + return nil, err 343 + } 344 + 345 + return info.State, nil 346 + } 347 + 348 + func (e *Engine) tailStep(ctx context.Context, wfLogger *models.WorkflowLogger, containerID string, wid models.WorkflowId, stepIdx int, step models.Step) error { 349 + if wfLogger == nil { 350 + return nil 351 + } 352 + 353 + logs, err := e.docker.ContainerLogs(ctx, containerID, container.LogsOptions{ 354 + Follow: true, 355 + ShowStdout: true, 356 + ShowStderr: true, 357 + Details: false, 358 + Timestamps: false, 359 + }) 360 + if err != nil { 361 + return err 362 + } 363 + 364 + _, err = stdcopy.StdCopy( 365 + wfLogger.DataWriter("stdout"), 366 + wfLogger.DataWriter("stderr"), 367 + logs, 368 + ) 369 + if err != nil && err != io.EOF && !errors.Is(err, context.DeadlineExceeded) { 370 + return fmt.Errorf("failed to copy logs: %w", err) 371 + } 372 + 373 + return nil 374 + } 375 + 376 + func (e *Engine) DestroyStep(ctx context.Context, containerID string) error { 377 + err := e.docker.ContainerKill(ctx, containerID, "9") // SIGKILL 378 + if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 379 + return err 380 + } 381 + 382 + if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 383 + RemoveVolumes: true, 384 + RemoveLinks: false, 385 + Force: false, 386 + }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 387 + return err 388 + } 389 + 390 + return nil 391 + } 392 + 393 + func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error { 394 + e.cleanupMu.Lock() 395 + key := wid.String() 396 + 397 + fns := e.cleanup[key] 398 + delete(e.cleanup, key) 399 + e.cleanupMu.Unlock() 400 + 401 + for _, fn := range fns { 402 + if err := fn(ctx); err != nil { 403 + e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err) 404 + } 405 + } 406 + return nil 407 + } 408 + 409 + func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) { 410 + e.cleanupMu.Lock() 411 + defer e.cleanupMu.Unlock() 412 + 413 + key := wid.String() 414 + e.cleanup[key] = append(e.cleanup[key], fn) 415 + } 416 + 417 + func workspaceVolume(wid models.WorkflowId) string { 418 + return fmt.Sprintf("workspace-%s", wid) 419 + } 420 + 421 + func nixVolume(wid models.WorkflowId) string { 422 + return fmt.Sprintf("nix-%s", wid) 423 + } 424 + 425 + func networkName(wid models.WorkflowId) string { 426 + return fmt.Sprintf("workflow-network-%s", wid) 427 + } 428 + 429 + func hostConfig(wid models.WorkflowId) *container.HostConfig { 430 + hostConfig := &container.HostConfig{ 431 + Mounts: []mount.Mount{ 432 + { 433 + Type: mount.TypeVolume, 434 + Source: workspaceVolume(wid), 435 + Target: workspaceDir, 436 + }, 437 + { 438 + Type: mount.TypeVolume, 439 + Source: nixVolume(wid), 440 + Target: "/nix", 441 + }, 442 + { 443 + Type: mount.TypeTmpfs, 444 + Target: "/tmp", 445 + ReadOnly: false, 446 + TmpfsOptions: &mount.TmpfsOptions{ 447 + Mode: 0o1777, // world-writeable sticky bit 448 + Options: [][]string{ 449 + {"exec"}, 450 + }, 451 + }, 452 + }, 453 + { 454 + Type: mount.TypeVolume, 455 + Source: "etc-nix-" + wid.String(), 456 + Target: "/etc/nix", 457 + }, 458 + }, 459 + ReadonlyRootfs: false, 460 + CapDrop: []string{"ALL"}, 461 + CapAdd: []string{"CAP_DAC_OVERRIDE"}, 462 + SecurityOpt: []string{"no-new-privileges"}, 463 + ExtraHosts: []string{"host.docker.internal:host-gateway"}, 464 + } 465 + 466 + return hostConfig 467 + } 468 + 469 + // thanks woodpecker 470 + func isErrContainerNotFoundOrNotRunning(err error) bool { 471 + // Error response from daemon: Cannot kill container: ...: No such container: ... 472 + // Error response from daemon: Cannot kill container: ...: Container ... is not running" 473 + // Error response from podman daemon: can only kill running containers. ... is in state exited 474 + // Error: No such container: ... 475 + return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 476 + }
+7
spindle/engines/nixery/errors.go
··· 1 + package nixery 2 + 3 + import "errors" 4 + 5 + var ( 6 + ErrOOMKilled = errors.New("oom killed") 7 + )
+17
spindle/models/engine.go
··· 1 + package models 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "tangled.sh/tangled.sh/core/api/tangled" 8 + "tangled.sh/tangled.sh/core/spindle/secrets" 9 + ) 10 + 11 + type Engine interface { 12 + InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*Workflow, error) 13 + SetupWorkflow(ctx context.Context, wid WorkflowId, wf *Workflow) error 14 + WorkflowTimeout() time.Duration 15 + DestroyWorkflow(ctx context.Context, wid WorkflowId) error 16 + RunStep(ctx context.Context, wid WorkflowId, w *Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger *WorkflowLogger) error 17 + }
+3 -3
spindle/models/models.go
··· 104 104 func NewControlLogLine(idx int, step Step) LogLine { 105 105 return LogLine{ 106 106 Kind: LogKindControl, 107 - Content: step.Name, 107 + Content: step.Name(), 108 108 StepId: idx, 109 - StepKind: step.Kind, 110 - StepCommand: step.Command, 109 + StepKind: step.Kind(), 110 + StepCommand: step.Command(), 111 111 } 112 112 }
+8 -103
spindle/models/pipeline.go
··· 1 1 package models 2 2 3 - import ( 4 - "path" 5 - 6 - "tangled.sh/tangled.sh/core/api/tangled" 7 - "tangled.sh/tangled.sh/core/spindle/config" 8 - ) 9 - 10 3 type Pipeline struct { 11 4 RepoOwner string 12 5 RepoName string 13 - Workflows []Workflow 6 + Workflows map[Engine][]Workflow 14 7 } 15 8 16 - type Step struct { 17 - Command string 18 - Name string 19 - Environment map[string]string 20 - Kind StepKind 9 + type Step interface { 10 + Name() string 11 + Command() string 12 + Kind() StepKind 21 13 } 22 14 23 15 type StepKind int ··· 22 30 ) 23 31 24 32 type Workflow struct { 25 - Steps []Step 26 - Environment map[string]string 27 - Name string 28 - Image string 29 - } 30 - 31 - // setupSteps get added to start of Steps 32 - type setupSteps []Step 33 - 34 - // addStep adds a step to the beginning of the workflow's steps. 35 - func (ss *setupSteps) addStep(step Step) { 36 - *ss = append(*ss, step) 37 - } 38 - 39 - // ToPipeline converts a tangled.Pipeline into a model.Pipeline. 40 - // In the process, dependencies are resolved: nixpkgs deps 41 - // are constructed atop nixery and set as the Workflow.Image, 42 - // and ones from custom registries 43 - func ToPipeline(pl tangled.Pipeline, cfg config.Config) *Pipeline { 44 - workflows := []Workflow{} 45 - 46 - for _, twf := range pl.Workflows { 47 - swf := &Workflow{} 48 - for _, tstep := range twf.Steps { 49 - sstep := Step{} 50 - sstep.Environment = stepEnvToMap(tstep.Environment) 51 - sstep.Command = tstep.Command 52 - sstep.Name = tstep.Name 53 - sstep.Kind = StepKindUser 54 - swf.Steps = append(swf.Steps, sstep) 55 - } 56 - swf.Name = twf.Name 57 - swf.Environment = workflowEnvToMap(twf.Environment) 58 - swf.Image = workflowImage(twf.Dependencies, cfg.Pipelines.Nixery) 59 - 60 - setup := &setupSteps{} 61 - 62 - setup.addStep(nixConfStep()) 63 - setup.addStep(cloneStep(*twf, *pl.TriggerMetadata, cfg.Server.Dev)) 64 - // this step could be empty 65 - if s := dependencyStep(*twf); s != nil { 66 - setup.addStep(*s) 67 - } 68 - 69 - // append setup steps in order to the start of workflow steps 70 - swf.Steps = append(*setup, swf.Steps...) 71 - 72 - workflows = append(workflows, *swf) 73 - } 74 - repoOwner := pl.TriggerMetadata.Repo.Did 75 - repoName := pl.TriggerMetadata.Repo.Repo 76 - return &Pipeline{ 77 - RepoOwner: repoOwner, 78 - RepoName: repoName, 79 - Workflows: workflows, 80 - } 81 - } 82 - 83 - func workflowEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string { 84 - envMap := map[string]string{} 85 - for _, env := range envs { 86 - if env != nil { 87 - envMap[env.Key] = env.Value 88 - } 89 - } 90 - return envMap 91 - } 92 - 93 - func stepEnvToMap(envs []*tangled.Pipeline_Pair) map[string]string { 94 - envMap := map[string]string{} 95 - for _, env := range envs { 96 - if env != nil { 97 - envMap[env.Key] = env.Value 98 - } 99 - } 100 - return envMap 101 - } 102 - 103 - func workflowImage(deps []*tangled.Pipeline_Dependency, nixery string) string { 104 - var dependencies string 105 - for _, d := range deps { 106 - if d.Registry == "nixpkgs" { 107 - dependencies = path.Join(d.Packages...) 108 - } 109 - } 110 - 111 - // load defaults from somewhere else 112 - dependencies = path.Join(dependencies, "bash", "git", "coreutils", "nix") 113 - 114 - return path.Join(nixery, dependencies) 33 + Steps []Step 34 + Name string 35 + Data any 115 36 }
+10 -13
spindle/models/setup_steps.go spindle/engines/nixery/setup_steps.go
··· 1 - package models 1 + package nixery 2 2 3 3 import ( 4 4 "fmt" ··· 13 13 setupCmd := `echo 'extra-experimental-features = nix-command flakes' >> /etc/nix/nix.conf 14 14 echo 'build-users-group = ' >> /etc/nix/nix.conf` 15 15 return Step{ 16 - Command: setupCmd, 17 - Name: "Configure Nix", 16 + command: setupCmd, 17 + name: "Configure Nix", 18 18 } 19 19 } 20 20 ··· 81 81 commands = append(commands, "git checkout FETCH_HEAD") 82 82 83 83 cloneStep := Step{ 84 - Command: strings.Join(commands, "\n"), 85 - Name: "Clone repository into workspace", 84 + command: strings.Join(commands, "\n"), 85 + name: "Clone repository into workspace", 86 86 } 87 87 return cloneStep 88 88 } ··· 91 91 // For dependencies using a custom registry (i.e. not nixpkgs), it collects 92 92 // all packages and adds a single 'nix profile install' step to the 93 93 // beginning of the workflow's step list. 94 - func dependencyStep(twf tangled.Pipeline_Workflow) *Step { 94 + func dependencyStep(deps map[string][]string) *Step { 95 95 var customPackages []string 96 96 97 - for _, d := range twf.Dependencies { 98 - registry := d.Registry 99 - packages := d.Packages 100 - 97 + for registry, packages := range deps { 101 98 if registry == "nixpkgs" { 102 99 continue 103 100 } ··· 112 115 installCmd := "nix --extra-experimental-features nix-command --extra-experimental-features flakes profile install" 113 116 cmd := fmt.Sprintf("%s %s", installCmd, strings.Join(customPackages, " ")) 114 117 installStep := Step{ 115 - Command: cmd, 116 - Name: "Install custom dependencies", 117 - Environment: map[string]string{ 118 + command: cmd, 119 + name: "Install custom dependencies", 120 + environment: map[string]string{ 118 121 "NIX_NO_COLOR": "1", 119 122 "NIX_SHOW_DOWNLOAD_PROGRESS": "0", 120 123 },
+38 -8
spindle/server.go
··· 20 20 "tangled.sh/tangled.sh/core/spindle/config" 21 21 "tangled.sh/tangled.sh/core/spindle/db" 22 22 "tangled.sh/tangled.sh/core/spindle/engine" 23 + "tangled.sh/tangled.sh/core/spindle/engines/nixery" 23 24 "tangled.sh/tangled.sh/core/spindle/models" 24 25 "tangled.sh/tangled.sh/core/spindle/queue" 25 26 "tangled.sh/tangled.sh/core/spindle/secrets" ··· 40 39 e *rbac.Enforcer 41 40 l *slog.Logger 42 41 n *notifier.Notifier 43 - eng *engine.Engine 42 + engs map[string]models.Engine 44 43 jq *queue.Queue 45 44 cfg *config.Config 46 45 ks *eventconsumer.Consumer ··· 94 93 return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 95 94 } 96 95 97 - eng, err := engine.New(ctx, cfg, d, &n, vault) 96 + nixeryEng, err := nixery.New(ctx, cfg) 98 97 if err != nil { 99 98 return err 100 99 } ··· 129 128 db: d, 130 129 l: logger, 131 130 n: &n, 132 - eng: eng, 131 + engs: map[string]models.Engine{"nixery": nixeryEng}, 133 132 jq: jq, 134 133 cfg: cfg, 135 134 res: resolver, ··· 217 216 Logger: logger, 218 217 Db: s.db, 219 218 Enforcer: s.e, 220 - Engine: s.eng, 219 + Engines: s.engs, 221 220 Config: s.cfg, 222 221 Resolver: s.res, 223 222 Vault: s.vault, ··· 262 261 Rkey: msg.Rkey, 263 262 } 264 263 264 + workflows := make(map[models.Engine][]models.Workflow) 265 + 265 266 for _, w := range tpl.Workflows { 266 267 if w != nil { 267 - err := s.db.StatusPending(models.WorkflowId{ 268 + if _, ok := s.engs[w.Engine]; !ok { 269 + err = s.db.StatusFailed(models.WorkflowId{ 270 + PipelineId: pipelineId, 271 + Name: w.Name, 272 + }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 273 + if err != nil { 274 + return err 275 + } 276 + 277 + continue 278 + } 279 + 280 + eng := s.engs[w.Engine] 281 + 282 + if _, ok := workflows[eng]; !ok { 283 + workflows[eng] = []models.Workflow{} 284 + } 285 + 286 + ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 287 + if err != nil { 288 + return err 289 + } 290 + 291 + workflows[eng] = append(workflows[eng], *ewf) 292 + 293 + err = s.db.StatusPending(models.WorkflowId{ 268 294 PipelineId: pipelineId, 269 295 Name: w.Name, 270 296 }, s.n) ··· 301 273 } 302 274 } 303 275 304 - spl := models.ToPipeline(tpl, *s.cfg) 305 - 306 276 ok := s.jq.Enqueue(queue.Job{ 307 277 Run: func() error { 308 - s.eng.StartWorkflows(ctx, spl, pipelineId) 278 + engine.StartWorkflows(s.l, s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 279 + RepoOwner: tpl.TriggerMetadata.Repo.Did, 280 + RepoName: tpl.TriggerMetadata.Repo.Repo, 281 + Workflows: workflows, 282 + }, pipelineId) 309 283 return nil 310 284 }, 311 285 OnFail: func(jobError error) {
+1 -2
spindle/stream.go
··· 9 9 "strconv" 10 10 "time" 11 11 12 - "tangled.sh/tangled.sh/core/spindle/engine" 13 12 "tangled.sh/tangled.sh/core/spindle/models" 14 13 15 14 "github.com/go-chi/chi/v5" ··· 142 143 } 143 144 isFinished := models.StatusKind(status.Status).IsFinish() 144 145 145 - filePath := engine.LogFilePath(s.cfg.Pipelines.LogDir, wid) 146 + filePath := models.LogFilePath(s.cfg.Server.LogDir, wid) 146 147 147 148 config := tail.Config{ 148 149 Follow: !isFinished,
+2 -2
spindle/xrpc/xrpc.go
··· 17 17 "tangled.sh/tangled.sh/core/rbac" 18 18 "tangled.sh/tangled.sh/core/spindle/config" 19 19 "tangled.sh/tangled.sh/core/spindle/db" 20 - "tangled.sh/tangled.sh/core/spindle/engine" 20 + "tangled.sh/tangled.sh/core/spindle/models" 21 21 "tangled.sh/tangled.sh/core/spindle/secrets" 22 22 ) 23 23 ··· 27 27 Logger *slog.Logger 28 28 Db *db.DB 29 29 Enforcer *rbac.Enforcer 30 - Engine *engine.Engine 30 + Engines map[string]models.Engine 31 31 Config *config.Config 32 32 Resolver *idresolver.Resolver 33 33 Vault secrets.Manager
+17 -36
workflow/compile.go
··· 1 1 package workflow 2 2 3 3 import ( 4 + "errors" 4 5 "fmt" 5 6 6 7 "tangled.sh/tangled.sh/core/api/tangled" ··· 64 63 return fmt.Sprintf("warning: %s: %s: %s", w.Path, w.Type, w.Reason) 65 64 } 66 65 66 + var ( 67 + MissingEngine error = errors.New("missing engine") 68 + ) 69 + 67 70 type WarningKind string 68 71 69 72 var ( ··· 100 95 for _, wf := range p { 101 96 cw := compiler.compileWorkflow(wf) 102 97 103 - // empty workflows are not added to the pipeline 104 - if len(cw.Steps) == 0 { 98 + if cw == nil { 105 99 continue 106 100 } 107 101 108 - cp.Workflows = append(cp.Workflows, &cw) 102 + cp.Workflows = append(cp.Workflows, cw) 109 103 } 110 104 111 105 return cp 112 106 } 113 107 114 - func (compiler *Compiler) compileWorkflow(w Workflow) tangled.Pipeline_Workflow { 115 - cw := tangled.Pipeline_Workflow{} 108 + func (compiler *Compiler) compileWorkflow(w Workflow) *tangled.Pipeline_Workflow { 109 + cw := &tangled.Pipeline_Workflow{} 116 110 117 111 if !w.Match(compiler.Trigger) { 118 112 compiler.Diagnostics.AddWarning( ··· 119 115 WorkflowSkipped, 120 116 fmt.Sprintf("did not match trigger %s", compiler.Trigger.Kind), 121 117 ) 122 - return cw 123 - } 124 - 125 - if len(w.Steps) == 0 { 126 - compiler.Diagnostics.AddWarning( 127 - w.Name, 128 - WorkflowSkipped, 129 - "empty workflow", 130 - ) 131 - return cw 118 + return nil 132 119 } 133 120 134 121 // validate clone options 135 122 compiler.analyzeCloneOptions(w) 136 123 137 124 cw.Name = w.Name 138 - cw.Dependencies = w.Dependencies.AsRecord() 139 - for _, s := range w.Steps { 140 - step := tangled.Pipeline_Step{ 141 - Command: s.Command, 142 - Name: s.Name, 143 - } 144 - for k, v := range s.Environment { 145 - e := &tangled.Pipeline_Pair{ 146 - Key: k, 147 - Value: v, 148 - } 149 - step.Environment = append(step.Environment, e) 150 - } 151 - cw.Steps = append(cw.Steps, &step) 125 + 126 + if w.Engine == "" { 127 + compiler.Diagnostics.AddError(w.Name, MissingEngine) 128 + return nil 152 129 } 153 - for k, v := range w.Environment { 154 - e := &tangled.Pipeline_Pair{ 155 - Key: k, 156 - Value: v, 157 - } 158 - cw.Environment = append(cw.Environment, e) 159 - } 130 + 131 + cw.Engine = w.Engine 132 + cw.Raw = w.Raw 160 133 161 134 o := w.CloneOpts.AsRecord() 162 135 cw.Clone = &o
+23 -29
workflow/compile_test.go
··· 26 26 27 27 func TestCompileWorkflow_MatchingWorkflowWithSteps(t *testing.T) { 28 28 wf := Workflow{ 29 - Name: ".tangled/workflows/test.yml", 30 - When: when, 31 - Steps: []Step{ 32 - {Name: "Test", Command: "go test ./..."}, 33 - }, 29 + Name: ".tangled/workflows/test.yml", 30 + Engine: "nixery", 31 + When: when, 34 32 CloneOpts: CloneOpts{}, // default true 35 33 } 36 34 ··· 41 43 assert.False(t, c.Diagnostics.IsErr()) 42 44 } 43 45 44 - func TestCompileWorkflow_EmptySteps(t *testing.T) { 45 - wf := Workflow{ 46 - Name: ".tangled/workflows/empty.yml", 47 - When: when, 48 - Steps: []Step{}, // no steps 49 - } 50 - 51 - c := Compiler{Trigger: trigger} 52 - cp := c.Compile([]Workflow{wf}) 53 - 54 - assert.Len(t, cp.Workflows, 0) 55 - assert.Len(t, c.Diagnostics.Warnings, 1) 56 - assert.Equal(t, WorkflowSkipped, c.Diagnostics.Warnings[0].Type) 57 - } 58 - 59 46 func TestCompileWorkflow_TriggerMismatch(t *testing.T) { 60 47 wf := Workflow{ 61 - Name: ".tangled/workflows/mismatch.yml", 48 + Name: ".tangled/workflows/mismatch.yml", 49 + Engine: "nixery", 62 50 When: []Constraint{ 63 51 { 64 52 Event: []string{"push"}, 65 53 Branch: []string{"master"}, // different branch 66 54 }, 67 - }, 68 - Steps: []Step{ 69 - {Name: "Lint", Command: "golint ./..."}, 70 55 }, 71 56 } 72 57 ··· 63 82 64 83 func TestCompileWorkflow_CloneFalseWithShallowTrue(t *testing.T) { 65 84 wf := Workflow{ 66 - Name: ".tangled/workflows/clone_skip.yml", 67 - When: when, 68 - Steps: []Step{ 69 - {Name: "Skip", Command: "echo skip"}, 70 - }, 85 + Name: ".tangled/workflows/clone_skip.yml", 86 + Engine: "nixery", 87 + When: when, 71 88 CloneOpts: CloneOpts{ 72 89 Skip: true, 73 90 Depth: 1, ··· 79 100 assert.True(t, cp.Workflows[0].Clone.Skip) 80 101 assert.Len(t, c.Diagnostics.Warnings, 1) 81 102 assert.Equal(t, InvalidConfiguration, c.Diagnostics.Warnings[0].Type) 103 + } 104 + 105 + func TestCompileWorkflow_MissingEngine(t *testing.T) { 106 + wf := Workflow{ 107 + Name: ".tangled/workflows/missing_engine.yml", 108 + When: when, 109 + Engine: "", 110 + } 111 + 112 + c := Compiler{Trigger: trigger} 113 + cp := c.Compile([]Workflow{wf}) 114 + 115 + assert.Len(t, cp.Workflows, 0) 116 + assert.Len(t, c.Diagnostics.Errors, 1) 117 + assert.Equal(t, MissingEngine, c.Diagnostics.Errors[0].Error) 82 118 }
+6 -33
workflow/def.go
··· 24 24 25 25 // this is simply a structural representation of the workflow file 26 26 Workflow struct { 27 - Name string `yaml:"-"` // name of the workflow file 28 - When []Constraint `yaml:"when"` 29 - Dependencies Dependencies `yaml:"dependencies"` 30 - Steps []Step `yaml:"steps"` 31 - Environment map[string]string `yaml:"environment"` 32 - CloneOpts CloneOpts `yaml:"clone"` 27 + Name string `yaml:"-"` // name of the workflow file 28 + Engine string `yaml:"engine"` 29 + When []Constraint `yaml:"when"` 30 + CloneOpts CloneOpts `yaml:"clone"` 31 + Raw string `yaml:"-"` 33 32 } 34 33 35 34 Constraint struct { ··· 36 37 Branch StringList `yaml:"branch"` // this is optional, and only applied on "push" events 37 38 } 38 39 39 - Dependencies map[string][]string 40 - 41 40 CloneOpts struct { 42 41 Skip bool `yaml:"skip"` 43 42 Depth int `yaml:"depth"` 44 43 IncludeSubmodules bool `yaml:"submodules"` 45 - } 46 - 47 - Step struct { 48 - Name string `yaml:"name"` 49 - Command string `yaml:"command"` 50 - Environment map[string]string `yaml:"environment"` 51 44 } 52 45 53 46 StringList []string ··· 68 77 } 69 78 70 79 wf.Name = name 80 + wf.Raw = string(contents) 71 81 72 82 return wf, nil 73 83 } ··· 165 173 } 166 174 167 175 return errors.New("failed to unmarshal StringOrSlice") 168 - } 169 - 170 - // conversion utilities to atproto records 171 - func (d Dependencies) AsRecord() []*tangled.Pipeline_Dependency { 172 - var deps []*tangled.Pipeline_Dependency 173 - for registry, packages := range d { 174 - deps = append(deps, &tangled.Pipeline_Dependency{ 175 - Registry: registry, 176 - Packages: packages, 177 - }) 178 - } 179 - return deps 180 - } 181 - 182 - func (s Step) AsRecord() tangled.Pipeline_Step { 183 - return tangled.Pipeline_Step{ 184 - Command: s.Command, 185 - Name: s.Name, 186 - } 187 176 } 188 177 189 178 func (c CloneOpts) AsRecord() tangled.Pipeline_CloneOpts {
+1 -86
workflow/def_test.go
··· 10 10 yamlData := ` 11 11 when: 12 12 - event: ["push", "pull_request"] 13 - branch: ["main", "develop"] 14 - 15 - dependencies: 16 - nixpkgs: 17 - - go 18 - - git 19 - - curl 20 - 21 - steps: 22 - - name: "Test" 23 - command: | 24 - go test ./...` 13 + branch: ["main", "develop"]` 25 14 26 15 wf, err := FromFile("test.yml", []byte(yamlData)) 27 16 assert.NoError(t, err, "YAML should unmarshal without error") ··· 19 30 assert.ElementsMatch(t, []string{"main", "develop"}, wf.When[0].Branch) 20 31 assert.ElementsMatch(t, []string{"push", "pull_request"}, wf.When[0].Event) 21 32 22 - assert.Len(t, wf.Steps, 1) 23 - assert.Equal(t, "Test", wf.Steps[0].Name) 24 - assert.Equal(t, "go test ./...", wf.Steps[0].Command) 25 - 26 - pkgs, ok := wf.Dependencies["nixpkgs"] 27 - assert.True(t, ok, "`nixpkgs` should be present in dependencies") 28 - assert.ElementsMatch(t, []string{"go", "git", "curl"}, pkgs) 29 - 30 33 assert.False(t, wf.CloneOpts.Skip, "Skip should default to false") 31 - } 32 - 33 - func TestUnmarshalCustomRegistry(t *testing.T) { 34 - yamlData := ` 35 - when: 36 - - event: push 37 - branch: main 38 - 39 - dependencies: 40 - git+https://tangled.sh/@oppi.li/tbsp: 41 - - tbsp 42 - git+https://git.peppe.rs/languages/statix: 43 - - statix 44 - 45 - steps: 46 - - name: "Check" 47 - command: | 48 - statix check` 49 - 50 - wf, err := FromFile("test.yml", []byte(yamlData)) 51 - assert.NoError(t, err, "YAML should unmarshal without error") 52 - 53 - assert.ElementsMatch(t, []string{"push"}, wf.When[0].Event) 54 - assert.ElementsMatch(t, []string{"main"}, wf.When[0].Branch) 55 - 56 - assert.ElementsMatch(t, []string{"tbsp"}, wf.Dependencies["git+https://tangled.sh/@oppi.li/tbsp"]) 57 - assert.ElementsMatch(t, []string{"statix"}, wf.Dependencies["git+https://git.peppe.rs/languages/statix"]) 58 34 } 59 35 60 36 func TestUnmarshalCloneFalse(t *testing.T) { ··· 29 75 30 76 clone: 31 77 skip: true 32 - 33 - dependencies: 34 - nixpkgs: 35 - - python3 36 - 37 - steps: 38 - - name: Notify 39 - command: | 40 - python3 ./notify.py 41 78 ` 42 79 43 80 wf, err := FromFile("test.yml", []byte(yamlData)) ··· 37 92 assert.ElementsMatch(t, []string{"pull_request_close"}, wf.When[0].Event) 38 93 39 94 assert.True(t, wf.CloneOpts.Skip, "Skip should be false") 40 - } 41 - 42 - func TestUnmarshalEnv(t *testing.T) { 43 - yamlData := ` 44 - when: 45 - - event: ["pull_request_close"] 46 - 47 - clone: 48 - skip: false 49 - 50 - environment: 51 - HOME: /home/foo bar/baz 52 - CGO_ENABLED: 1 53 - 54 - steps: 55 - - name: Something 56 - command: echo "hello" 57 - environment: 58 - FOO: bar 59 - BAZ: qux 60 - ` 61 - 62 - wf, err := FromFile("test.yml", []byte(yamlData)) 63 - assert.NoError(t, err) 64 - 65 - assert.Len(t, wf.Environment, 2) 66 - assert.Equal(t, "/home/foo bar/baz", wf.Environment["HOME"]) 67 - assert.Equal(t, "1", wf.Environment["CGO_ENABLED"]) 68 - assert.Equal(t, "bar", wf.Steps[0].Environment["FOO"]) 69 - assert.Equal(t, "qux", wf.Steps[0].Environment["BAZ"]) 70 95 }