···11+package qemu
22+33+import (
44+ "context"
55+ "crypto/ed25519"
66+ "crypto/rand"
77+ "encoding/json"
88+ "fmt"
99+ "log/slog"
1010+ "net"
1111+ "os"
1212+ "os/exec"
1313+ "path/filepath"
1414+ "sync"
1515+ "time"
1616+1717+ "github.com/digitalocean/go-qemu/qmp"
1818+ "golang.org/x/crypto/ssh"
1919+ "gopkg.in/yaml.v3"
2020+2121+ "tangled.org/core/api/tangled"
2222+ "tangled.org/core/log"
2323+ "tangled.org/core/spindle/config"
2424+ "tangled.org/core/spindle/engine"
2525+ "tangled.org/core/spindle/engines/qemu/bakers"
2626+ "tangled.org/core/spindle/models"
2727+ "tangled.org/core/spindle/secrets"
2828+)
2929+3030+type ResolvedImage struct {
3131+ kernel string
3232+ initrd string
3333+ disk string
3434+ cmdline string
3535+ shell string
3636+}
3737+3838+type cleanupFunc func(context.Context) error
3939+4040+type Engine struct {
4141+ l *slog.Logger
4242+ cfg *config.Config
4343+ cleanupMu sync.Mutex
4444+ cleanup map[string][]cleanupFunc
4545+}
4646+4747+func New(ctx context.Context, cfg *config.Config) (*Engine, error) {
4848+ return &Engine{
4949+ l: log.FromContext(ctx).With("component", "engine.qemu"),
5050+ cfg: cfg,
5151+ cleanup: make(map[string][]cleanupFunc),
5252+ }, nil
5353+}
5454+5555+type Step struct {
5656+ name string
5757+ kind models.StepKind
5858+ command string
5959+ environment map[string]string
6060+}
6161+6262+func (s Step) Name() string { return s.name }
6363+func (s Step) Command() string { return s.command }
6464+func (s Step) Kind() models.StepKind { return s.kind }
6565+6666+type setupSteps []models.Step
6767+6868+func (ss *setupSteps) addStep(step models.Step) { *ss = append(*ss, step) }
6969+7070+func (e *Engine) InitWorkflow(twf tangled.Pipeline_Workflow, tpl tangled.Pipeline) (*models.Workflow, error) {
7171+ swf := &models.Workflow{}
7272+ var dwf manifestWorkflow
7373+7474+ if err := yaml.Unmarshal([]byte(twf.Raw), &dwf); err != nil {
7575+ return nil, err
7676+ }
7777+7878+ for _, dstep := range dwf.Steps {
7979+ swf.Steps = append(swf.Steps, Step{
8080+ name: dstep.Name,
8181+ kind: models.StepKindUser,
8282+ command: dstep.Command,
8383+ environment: dstep.Environment,
8484+ })
8585+ }
8686+ swf.Name = twf.Name
8787+ swf.Environment = dwf.Environment
8888+8989+ setup := &setupSteps{}
9090+ setup.addStep(models.BuildCloneStep(twf, *tpl.TriggerMetadata, e.cfg.Server.Dev))
9191+9292+ swf.Steps = append(*setup, swf.Steps...)
9393+9494+ img, err := e.resolveImage(dwf.Image)
9595+ if err != nil {
9696+ return nil, err
9797+ }
9898+9999+ swf.Data = vmState{
100100+ kernel: img.kernel,
101101+ initrd: img.initrd,
102102+ disk: img.disk,
103103+ cmdline: img.cmdline,
104104+ shell: img.shell,
105105+ }
106106+107107+ return swf, nil
108108+}
109109+110110+// discover and resolve kernel, initrd, and disk from an image subfolder
111111+func (e *Engine) resolveImage(name string) (ResolvedImage, error) {
112112+ var img ResolvedImage
113113+ if name == "" {
114114+ name = e.cfg.QemuPipelines.DefaultImage
115115+ }
116116+ if name == "" {
117117+ return img, fmt.Errorf("no image specified in workflow and SPINDLE_QEMU_PIPELINES_DEFAULT_IMAGE is not set")
118118+ }
119119+120120+ imageDir := filepath.Join(e.cfg.QemuPipelines.ImageDir, name)
121121+ kernelPath := filepath.Join(imageDir, "kernel")
122122+ initrdPath := filepath.Join(imageDir, "initrd")
123123+ diskPath := filepath.Join(imageDir, "disk.qcow2")
124124+ configPath := filepath.Join(imageDir, "config.json")
125125+126126+ if _, err := os.Stat(diskPath); err == nil {
127127+ img.disk = diskPath
128128+ }
129129+ if _, err := os.Stat(kernelPath); err == nil {
130130+ img.kernel = kernelPath
131131+ }
132132+ if _, err := os.Stat(initrdPath); err == nil {
133133+ img.initrd = initrdPath
134134+ }
135135+ if b, err := os.ReadFile(configPath); err == nil {
136136+ var meta bakers.ImageMetadata
137137+ if err := json.Unmarshal(b, &meta); err == nil {
138138+ if meta.Cmdline != "" {
139139+ img.cmdline = meta.Cmdline
140140+ }
141141+ if meta.Shell != "" {
142142+ img.shell = meta.Shell
143143+ }
144144+ }
145145+ }
146146+147147+ if img.disk == "" {
148148+ return img, fmt.Errorf("missing 'disk.qcow2' in %s", imageDir)
149149+ }
150150+ if img.kernel != "" && (img.initrd == "" || img.cmdline == "") {
151151+ return img, fmt.Errorf("kernel requires initrd and cmdline, but 'initrd' and/or 'cmdline' is missing for %s", name)
152152+ }
153153+ if img.shell == "" {
154154+ return img, fmt.Errorf("shell is not configured for %s", name)
155155+ }
156156+ return img, nil
157157+}
158158+159159+func (e *Engine) SetupWorkflow(ctx context.Context, wid models.WorkflowId, wf *models.Workflow, wfLogger models.WorkflowLogger) error {
160160+ l := e.l.With("workflow", wid)
161161+ l.Info("setting up qemu workflow")
162162+163163+ setupStep := Step{name: "qemu vm setup", kind: models.StepKindSystem}
164164+ setupStepIdx := -1
165165+166166+ wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusStart).Write([]byte{0})
167167+ defer wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusEnd).Write([]byte{0})
168168+169169+ // generate ed25519 keypair for the engine to ssh into the guest
170170+ pubKey, privKey, err := ed25519.GenerateKey(rand.Reader)
171171+ if err != nil {
172172+ return fmt.Errorf("generating keypair: %w", err)
173173+ }
174174+175175+ sshSigner, err := ssh.NewSignerFromKey(privKey)
176176+ if err != nil {
177177+ return err
178178+ }
179179+ sshPubKey, err := ssh.NewPublicKey(pubKey)
180180+ if err != nil {
181181+ return err
182182+ }
183183+ enginePubKeyStr := string(ssh.MarshalAuthorizedKey(sshPubKey))
184184+185185+ // the tempdir is configurable since some systems may have tmpfs as /tmp,
186186+ // which is not ideal if a workflow uses a lot of space.
187187+ targetTempDir := e.cfg.QemuPipelines.OverlayDir
188188+ if targetTempDir == "" {
189189+ targetTempDir = os.TempDir()
190190+ }
191191+192192+ tempDir, err := os.MkdirTemp(targetTempDir, "qemu-wf-*")
193193+ if err != nil {
194194+ return err
195195+ }
196196+ e.registerCleanup(wid, func(ctx context.Context) error {
197197+ return os.RemoveAll(tempDir)
198198+ })
199199+200200+ state := wf.Data.(vmState)
201201+202202+ // generate nocloud seed iso containing engine's authorized_keys
203203+ if err := generateSeedISO(tempDir, enginePubKeyStr); err != nil {
204204+ return fmt.Errorf("generating seed iso: %w", err)
205205+ }
206206+207207+ // note: there is an inherent TOCTOU race here between releasing the port
208208+ // and qemu binding it. nothing we can do about this in userspace without
209209+ // privilege, but in practice it's rarely an issue.
210210+ nl, err := net.Listen("tcp", "127.0.0.1:0")
211211+ if err != nil {
212212+ return fmt.Errorf("finding free port: %w", err)
213213+ }
214214+ sshPort := nl.Addr().(*net.TCPAddr).Port
215215+ nl.Close()
216216+217217+ qmpSock := filepath.Join(tempDir, "qmp.sock")
218218+ // todo(dawn): ideally would be nice if we used qemu with the microvm enabled here...
219219+ // but that is not compatible with cloud-init since it expects real hw enumeration...
220220+ // and we would not be able to use standard cloud images, which is kind of annoying.
221221+ // we also have to manage a virtiofsd process for the filesystem, instead of having to
222222+ // manage a qcow overlay (see https://ubuntu.com/server/docs/explanation/virtualisation/qemu-microvm/).
223223+ // also also need to be able to have some scripts for generating our own images
224224+ // (though we should already do this anyway since some like alpine don't provide
225225+ // "cloud ready" image files, at least with kernel and initrd)
226226+ argv := []string{
227227+ // todo(dawn): ideally probably have "tiers" and let the spindle concile the tier using
228228+ // what the user wants and what the user has exposed to them by the spindle operator?
229229+ "-m", e.cfg.QemuPipelines.Memory, "-smp", fmt.Sprintf("%d", e.cfg.QemuPipelines.SMP),
230230+ "-display", "none", "-nodefaults", "-no-user-config",
231231+ // use snapshot=on to do copy-on-write without having us manage qcow overlays manually
232232+ "-drive", fmt.Sprintf("file=%s,media=disk,snapshot=on,if=virtio", state.disk),
233233+ "-drive", fmt.Sprintf("file=%s,media=cdrom", filepath.Join(tempDir, "cloud-init.iso")),
234234+ "-netdev", fmt.Sprintf("user,id=net0,hostfwd=tcp:127.0.0.1:%d-:22", sshPort),
235235+ "-device", "virtio-net-pci,netdev=net0",
236236+ "-qmp", fmt.Sprintf("unix:%s,server,nowait", qmpSock),
237237+ "-monitor", "none",
238238+ }
239239+240240+ if e.cfg.Server.Dev {
241241+ argv = append(argv, "-serial", "stdio")
242242+ } else {
243243+ argv = append(argv, "-serial", "none")
244244+ }
245245+246246+ // support booting using qemu bios still, but otherwise we make it faster!
247247+ // incase someone wants to do this for whatever reason...
248248+ if state.kernel != "" {
249249+ argv = append(argv, "-kernel", state.kernel)
250250+ if state.initrd != "" {
251251+ argv = append(argv, "-initrd", state.initrd)
252252+ }
253253+ if state.cmdline != "" {
254254+ argv = append(argv, "-append", state.cmdline)
255255+ }
256256+ } else {
257257+ // if we are booting with bios, we need to tell it to boot from the disk
258258+ argv = append(argv, "-boot", "order=c")
259259+ }
260260+261261+ enableKVM := e.cfg.QemuPipelines.EnableKVM
262262+ if _, err := os.Stat("/dev/kvm"); err != nil {
263263+ if enableKVM {
264264+ l.Warn("kvm was requested but /dev/kvm is not accessible; falling back to software emulation", "error", err)
265265+ }
266266+ enableKVM = false
267267+ }
268268+ if enableKVM {
269269+ argv = append(argv, "-enable-kvm", "-cpu", "host")
270270+ }
271271+272272+ // todo(dawn): same with above, we assume x86_64 here, but should allow other archs,
273273+ // probably just auto detect as a default
274274+ qemuCmd := exec.Command("qemu-system-x86_64", argv...)
275275+ qemuCmd.Env = append(os.Environ(), "TMPDIR="+tempDir)
276276+ qemuCmd.Stdout = os.Stdout
277277+ qemuCmd.Stderr = os.Stderr
278278+279279+ startBoot := time.Now()
280280+ if err := qemuCmd.Start(); err != nil {
281281+ return fmt.Errorf("starting qemu: %w", err)
282282+ }
283283+284284+ var mon *qmp.SocketMonitor
285285+ qmpCtx, cancelQmp := context.WithTimeout(ctx, 10*time.Second)
286286+ defer cancelQmp()
287287+288288+ // retry qmp connect until the emulator is ready with its unix socket
289289+ for {
290290+ mon, err = qmp.NewSocketMonitor("unix", qmpSock, 2*time.Second)
291291+ if err == nil {
292292+ if err = mon.Connect(); err == nil {
293293+ break
294294+ }
295295+ }
296296+ select {
297297+ case <-qmpCtx.Done():
298298+ _ = qemuCmd.Process.Kill()
299299+ return fmt.Errorf("qmp connect timeout: %w", err)
300300+ case <-time.After(10 * time.Millisecond):
301301+ }
302302+ }
303303+304304+ // check if the guest is running
305305+ raw, err := mon.Run([]byte(`{"execute":"query-status"}`))
306306+ if err != nil {
307307+ _ = qemuCmd.Process.Kill()
308308+ _ = mon.Disconnect()
309309+ return fmt.Errorf("qmp query-status failed: %w", err)
310310+ }
311311+ var resp map[string]any
312312+ if err := json.Unmarshal(raw, &resp); err != nil {
313313+ _ = qemuCmd.Process.Kill()
314314+ _ = mon.Disconnect()
315315+ return fmt.Errorf("qmp query-status parse failed: %w", err)
316316+ }
317317+ status, _ := resp["return"].(map[string]any)["status"].(string)
318318+ l.Info("qemu guest status", "status", status)
319319+ if status != "running" {
320320+ _ = qemuCmd.Process.Kill()
321321+ _ = mon.Disconnect()
322322+ return fmt.Errorf("qemu guest not running (status: %s)", status)
323323+ }
324324+325325+ sshConfig := &ssh.ClientConfig{
326326+ User: "build",
327327+ Auth: []ssh.AuthMethod{ssh.PublicKeys(sshSigner)},
328328+ HostKeyCallback: ssh.InsecureIgnoreHostKey(),
329329+ Timeout: 5 * time.Second,
330330+ }
331331+332332+ bootCtx, cancelBoot := context.WithTimeout(ctx, 90*time.Second)
333333+ defer cancelBoot()
334334+335335+ // backoff until the guest finishes booting and starts sshd
336336+ var sshClient *ssh.Client
337337+ for {
338338+ sshClient, err = ssh.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", sshPort), sshConfig)
339339+ if err == nil {
340340+ bootDuration := time.Since(startBoot).Round(time.Millisecond)
341341+ l.Debug("vm booted and ssh ready", "elapsed", bootDuration)
342342+ break
343343+ }
344344+ select {
345345+ case <-bootCtx.Done():
346346+ _ = qemuCmd.Process.Kill()
347347+ _ = mon.Disconnect()
348348+ return ErrBootTimeout
349349+ case <-time.After(100 * time.Millisecond):
350350+ }
351351+ }
352352+353353+ e.registerCleanup(wid, func(ctx context.Context) error {
354354+ _ = sshClient.Close()
355355+356356+ // graceful powerdown so guest can sync filesystem
357357+ _, _ = mon.Run([]byte(`{"execute": "system_powerdown"}`))
358358+359359+ done := make(chan error, 1)
360360+ go func() {
361361+ done <- qemuCmd.Wait()
362362+ }()
363363+ select {
364364+ case <-done:
365365+ case <-time.After(time.Second):
366366+ _ = qemuCmd.Process.Kill()
367367+ // drain Wait after kill to avoid zombie
368368+ <-done
369369+ }
370370+371371+ _ = mon.Disconnect()
372372+ return nil
373373+ })
374374+375375+ wf.Data = vmState{
376376+ process: qemuCmd.Process,
377377+ qmpMon: mon,
378378+ sshClient: sshClient,
379379+ sshPort: sshPort,
380380+ tempDir: tempDir,
381381+ kernel: state.kernel,
382382+ initrd: state.initrd,
383383+ disk: state.disk,
384384+ shell: state.shell,
385385+ }
386386+387387+ return nil
388388+}
389389+390390+func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
391391+ state := w.Data.(vmState)
392392+ step := w.Steps[idx]
393393+394394+ var envStr string
395395+ appendEnv := func(k, v string) {
396396+ // we pass the env vars to shell instead of through ssh
397397+ // (we would need to make sshd config have AcceptEnv in seed iso but that kinda sucks
398398+ // because that would reload sshd during boot, which is not ideal, so we can just do this.)
399399+ // todo(dawn): make images be prepared with sshd configured
400400+ envStr += fmt.Sprintf("%s=%s ", k, shellescape(v))
401401+ }
402402+403403+ for k, v := range w.Environment {
404404+ appendEnv(k, v)
405405+ }
406406+ for _, s := range secrets {
407407+ appendEnv(s.Key, s.Value)
408408+ }
409409+ if s, ok := step.(Step); ok {
410410+ for k, v := range s.environment {
411411+ appendEnv(k, v)
412412+ }
413413+ }
414414+415415+ session, err := state.sshClient.NewSession()
416416+ if err != nil {
417417+ return fmt.Errorf("ssh new session: %w", err)
418418+ }
419419+ defer session.Close()
420420+421421+ stdoutPipe, err := session.StdoutPipe()
422422+ if err != nil {
423423+ return fmt.Errorf("ssh stdout pipe: %w", err)
424424+ }
425425+ stderrPipe, err := session.StderrPipe()
426426+ if err != nil {
427427+ return fmt.Errorf("ssh stderr pipe: %w", err)
428428+ }
429429+430430+ tailDone := make(chan error, 1)
431431+ go func() {
432432+ tailDone <- streamLogs(ctx, stdoutPipe, stderrPipe, idx, wfLogger)
433433+ }()
434434+435435+ // execute with the image's specified shell
436436+ cmd := fmt.Sprintf("%s%s -c %s", envStr, state.shell, shellescape(step.Command()))
437437+ if err := session.Start(cmd); err != nil {
438438+ return fmt.Errorf("session start: %w", err)
439439+ }
440440+441441+ select {
442442+ case err := <-tailDone:
443443+ if err != nil {
444444+ e.l.Warn("log streaming error", "workflow", wid, "step", idx, "error", err)
445445+ }
446446+ case <-ctx.Done():
447447+ _ = session.Signal(ssh.SIGKILL)
448448+ <-tailDone
449449+ return engine.ErrTimedOut
450450+ }
451451+452452+ if err := session.Wait(); err != nil {
453453+ return engine.ErrWorkflowFailed
454454+ }
455455+ return nil
456456+}
457457+458458+func (e *Engine) DestroyWorkflow(ctx context.Context, wid models.WorkflowId) error {
459459+ fns := e.drainCleanups(wid)
460460+461461+ // cleanups must be executed LIFO to respect resource dependencies
462462+ // (e.g. process shutdown before tempdir removal)
463463+ for i := len(fns) - 1; i >= 0; i-- {
464464+ if err := fns[i](ctx); err != nil {
465465+ e.l.Error("failed to cleanup workflow resource", "workflowId", wid, "error", err)
466466+ }
467467+ }
468468+ return nil
469469+}
470470+471471+func (e *Engine) WorkflowTimeout() time.Duration {
472472+ d, err := time.ParseDuration(e.cfg.QemuPipelines.WorkflowTimeout)
473473+ if err != nil {
474474+ return 5 * time.Minute
475475+ }
476476+ return d
477477+}
478478+479479+func (e *Engine) registerCleanup(wid models.WorkflowId, fn cleanupFunc) {
480480+ e.cleanupMu.Lock()
481481+ defer e.cleanupMu.Unlock()
482482+ key := wid.String()
483483+ e.cleanup[key] = append(e.cleanup[key], fn)
484484+}
485485+486486+func (e *Engine) drainCleanups(wid models.WorkflowId) []cleanupFunc {
487487+ e.cleanupMu.Lock()
488488+ defer e.cleanupMu.Unlock()
489489+ key := wid.String()
490490+ fns := e.cleanup[key]
491491+ delete(e.cleanup, key)
492492+ return fns
493493+}
+8
spindle/engines/qemu/errors.go
···11+package qemu
22+33+import "errors"
44+55+var (
66+ ErrOOMKilled = errors.New("container died due to OOM kill")
77+ ErrBootTimeout = errors.New("timed out waiting for VM to boot")
88+)