Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

spindle/engine: setup and destroy workflows

During setup, we register cleanup functions which get executed at the
end of the workflow goroutine (deferred exec of DestroyWorkflow).

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.sh>

authored by

Anirudh Oppiliappan and committed by
Tangled
5a3b7020 1a84fc09

+129 -47
+125 -45
spindle/engine/engine.go
··· 8 8 "log/slog" 9 9 "os" 10 10 "path" 11 + "strings" 11 12 "sync" 13 + "syscall" 12 14 13 15 "github.com/docker/docker/api/types/container" 14 16 "github.com/docker/docker/api/types/image" ··· 30 28 workspaceDir = "/tangled/workspace" 31 29 ) 32 30 31 + type cleanupFunc func(context.Context) error 32 + 33 33 type Engine struct { 34 34 docker client.APIClient 35 35 l *slog.Logger ··· 41 37 chanMu sync.RWMutex 42 38 stdoutChans map[string]chan string 43 39 stderrChans map[string]chan string 40 + 41 + cleanupMu sync.Mutex 42 + cleanup map[string][]cleanupFunc 44 43 } 45 44 46 45 func New(ctx context.Context, db *db.DB, n *notifier.Notifier) (*Engine, error) { ··· 64 57 e.stdoutChans = make(map[string]chan string, 100) 65 58 e.stderrChans = make(map[string]chan string, 100) 66 59 60 + e.cleanup = make(map[string][]cleanupFunc) 61 + 67 62 return e, nil 68 - } 69 - 70 - // SetupPipeline sets up a new network for the pipeline, and possibly volumes etc. 71 - // in the future. In here also goes other setup steps. 72 - func (e *Engine) SetupPipeline(ctx context.Context, pipeline *tangled.Pipeline, atUri, id string) error { 73 - e.l.Info("setting up pipeline", "pipeline", id) 74 - 75 - _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 76 - Name: workspaceVolume(id), 77 - Driver: "local", 78 - }) 79 - if err != nil { 80 - return err 81 - } 82 - 83 - _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 84 - Name: nixVolume(id), 85 - Driver: "local", 86 - }) 87 - if err != nil { 88 - return err 89 - } 90 - 91 - _, err = e.docker.NetworkCreate(ctx, pipelineName(id), network.CreateOptions{ 92 - Driver: "bridge", 93 - }) 94 - if err != nil { 95 - return err 96 - } 97 - 98 - err = e.db.CreatePipeline(id, atUri, e.n) 99 - return err 100 63 } 101 64 102 65 func (e *Engine) StartWorkflows(ctx context.Context, pipeline *tangled.Pipeline, id string) error { ··· 80 103 g := errgroup.Group{} 81 104 for _, w := range pipeline.Workflows { 82 105 g.Go(func() error { 106 + err := e.SetupWorkflow(ctx, id, w.Name) 107 + if err != nil { 108 + return err 109 + } 110 + 111 + defer e.DestroyWorkflow(ctx, id, w.Name) 112 + 83 113 // TODO: actual checks for image/registry etc. 84 114 var deps string 85 115 for _, d := range w.Dependencies { ··· 111 127 defer reader.Close() 112 128 io.Copy(os.Stdout, reader) 113 129 114 - err = e.StartSteps(ctx, w.Steps, id, cimg) 130 + err = e.StartSteps(ctx, w.Steps, w.Name, id, cimg) 115 131 if err != nil { 116 132 e.l.Error("pipeline failed!", "id", id, "error", err.Error()) 117 133 return e.db.MarkPipelineFailed(id, -1, err.Error(), e.n) ··· 131 147 return e.db.MarkPipelineSuccess(id, e.n) 132 148 } 133 149 150 + // SetupWorkflow sets up a new network for the workflow and volumes for 151 + // the workspace and Nix store. These are persisted across steps and are 152 + // destroyed at the end of the workflow. 153 + func (e *Engine) SetupWorkflow(ctx context.Context, id, workflowName string) error { 154 + e.l.Info("setting up workflow", "pipeline", id, "workflow", workflowName) 155 + 156 + _, err := e.docker.VolumeCreate(ctx, volume.CreateOptions{ 157 + Name: workspaceVolume(id, workflowName), 158 + Driver: "local", 159 + }) 160 + if err != nil { 161 + return err 162 + } 163 + e.registerCleanup(id, workflowName, func(ctx context.Context) error { 164 + return e.docker.VolumeRemove(ctx, workspaceVolume(id, workflowName), true) 165 + }) 166 + 167 + _, err = e.docker.VolumeCreate(ctx, volume.CreateOptions{ 168 + Name: nixVolume(id, workflowName), 169 + Driver: "local", 170 + }) 171 + if err != nil { 172 + return err 173 + } 174 + e.registerCleanup(id, workflowName, func(ctx context.Context) error { 175 + return e.docker.VolumeRemove(ctx, nixVolume(id, workflowName), true) 176 + }) 177 + 178 + _, err = e.docker.NetworkCreate(ctx, networkName(id, workflowName), network.CreateOptions{ 179 + Driver: "bridge", 180 + }) 181 + if err != nil { 182 + return err 183 + } 184 + e.registerCleanup(id, workflowName, func(ctx context.Context) error { 185 + return e.docker.NetworkRemove(ctx, networkName(id, workflowName)) 186 + }) 187 + 188 + return nil 189 + } 190 + 134 191 // StartSteps starts all steps sequentially with the same base image. 135 192 // ONLY marks pipeline as failed if container's exit code is non-zero. 136 193 // All other errors are bubbled up. 137 - func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, id, image string) error { 194 + func (e *Engine) StartSteps(ctx context.Context, steps []*tangled.Pipeline_Step, workflowName, id, image string) error { 138 195 // set up logging channels 139 196 e.chanMu.Lock() 140 197 if _, exists := e.stdoutChans[id]; !exists { ··· 193 168 }() 194 169 195 170 for _, step := range steps { 196 - hostConfig := hostConfig(id) 171 + hostConfig := hostConfig(id, workflowName) 197 172 resp, err := e.docker.ContainerCreate(ctx, &container.Config{ 198 173 Image: image, 199 174 Cmd: []string{"bash", "-c", step.Command}, ··· 206 181 return fmt.Errorf("creating container: %w", err) 207 182 } 208 183 209 - err = e.docker.NetworkConnect(ctx, pipelineName(id), resp.ID, nil) 184 + err = e.docker.NetworkConnect(ctx, networkName(id, workflowName), resp.ID, nil) 210 185 if err != nil { 211 186 return fmt.Errorf("connecting network: %w", err) 212 187 } ··· 233 208 wg.Wait() 234 209 235 210 state, err := e.WaitStep(ctx, resp.ID) 211 + if err != nil { 212 + return err 213 + } 214 + 215 + err = e.DestroyStep(ctx, resp.ID, id) 236 216 if err != nil { 237 217 return err 238 218 } ··· 340 310 return nil 341 311 } 342 312 313 + func (e *Engine) DestroyStep(ctx context.Context, containerID, pipelineID string) error { 314 + err := e.docker.ContainerKill(ctx, containerID, syscall.SIGKILL.String()) 315 + if err != nil && !isErrContainerNotFoundOrNotRunning(err) { 316 + return err 317 + } 318 + 319 + if err := e.docker.ContainerRemove(ctx, containerID, container.RemoveOptions{ 320 + RemoveVolumes: true, 321 + RemoveLinks: false, 322 + Force: false, 323 + }); err != nil && !isErrContainerNotFoundOrNotRunning(err) { 324 + return err 325 + } 326 + 327 + return nil 328 + } 329 + 330 + func (e *Engine) DestroyWorkflow(ctx context.Context, pipelineID, workflowName string) error { 331 + e.cleanupMu.Lock() 332 + key := fmt.Sprintf("%s-%s", pipelineID, workflowName) 333 + 334 + fns := e.cleanup[key] 335 + delete(e.cleanup, key) 336 + e.cleanupMu.Unlock() 337 + 338 + for _, fn := range fns { 339 + if err := fn(ctx); err != nil { 340 + e.l.Error("failed to cleanup workflow resource", "pipeline", pipelineID, "workflow", workflowName, "err", err) 341 + } 342 + } 343 + return nil 344 + } 345 + 343 346 func (e *Engine) LogChannels(pipelineID string) (stdout <-chan string, stderr <-chan string, ok bool) { 344 347 e.chanMu.RLock() 345 348 defer e.chanMu.RUnlock() ··· 386 323 return stdoutCh, stderrCh, true 387 324 } 388 325 389 - func workspaceVolume(id string) string { 390 - return "workspace-" + id 326 + func (e *Engine) registerCleanup(pipelineID, workflowName string, fn cleanupFunc) { 327 + e.cleanupMu.Lock() 328 + defer e.cleanupMu.Unlock() 329 + 330 + key := fmt.Sprintf("%s-%s", pipelineID, workflowName) 331 + e.cleanup[key] = append(e.cleanup[key], fn) 391 332 } 392 333 393 - func nixVolume(id string) string { 394 - return "nix-" + id 334 + func workspaceVolume(id, name string) string { 335 + return fmt.Sprintf("workspace-%s-%s", id, name) 395 336 } 396 337 397 - func pipelineName(id string) string { 398 - return "pipeline-" + id 338 + func nixVolume(id, name string) string { 339 + return fmt.Sprintf("nix-%s-%s", id, name) 399 340 } 400 341 401 - func hostConfig(id string) *container.HostConfig { 342 + func networkName(id, name string) string { 343 + return fmt.Sprintf("workflow-network-%s-%s", id, name) 344 + } 345 + 346 + func hostConfig(id, name string) *container.HostConfig { 402 347 hostConfig := &container.HostConfig{ 403 348 Mounts: []mount.Mount{ 404 349 { 405 350 Type: mount.TypeVolume, 406 - Source: workspaceVolume(id), 351 + Source: workspaceVolume(id, name), 407 352 Target: workspaceDir, 408 353 }, 409 354 { 410 355 Type: mount.TypeVolume, 411 - Source: nixVolume(id), 356 + Source: nixVolume(id, name), 412 357 Target: "/nix", 413 358 }, 414 359 }, ··· 426 355 } 427 356 428 357 return hostConfig 358 + } 359 + 360 + // thanks woodpecker 361 + func isErrContainerNotFoundOrNotRunning(err error) bool { 362 + // Error response from daemon: Cannot kill container: ...: No such container: ... 363 + // Error response from daemon: Cannot kill container: ...: Container ... is not running" 364 + // Error response from podman daemon: can only kill running containers. ... is in state exited 365 + // Error: No such container: ... 366 + return err != nil && (strings.Contains(err.Error(), "No such container") || strings.Contains(err.Error(), "is not running") || strings.Contains(err.Error(), "can only kill running containers")) 429 367 }
+4 -2
spindle/server.go
··· 122 122 pipelineAtUri := fmt.Sprintf("at://%s/did:web:%s/%s", tangled.PipelineNSID, pipeline.TriggerMetadata.Repo.Knot, msg.Rkey) 123 123 124 124 rkey := TID() 125 - err = s.eng.SetupPipeline(ctx, &pipeline, pipelineAtUri, rkey) 125 + 126 + err = s.db.CreatePipeline(rkey, pipelineAtUri, s.n) 126 127 if err != nil { 127 128 return err 128 129 } 130 + 129 131 return s.eng.StartWorkflows(ctx, &pipeline, rkey) 130 132 }, 131 133 OnFail: func(error) { 132 - s.l.Error("pipeline setup failed", "error", err) 134 + s.l.Error("pipeline run failed", "error", err) 133 135 }, 134 136 }) 135 137 if ok {