···11package qemu
2233import (
44- "bufio"
54 "context"
66- "encoding/base64"
75 "encoding/json"
86 "fmt"
97 "log/slog"
1010- "net"
118 "os"
1212- "os/exec"
139 "path/filepath"
1410 "sync"
1511 "time"
16121717- "github.com/digitalocean/go-qemu/qmp"
1813 "gopkg.in/yaml.v3"
19142015 "tangled.org/core/api/tangled"
···2217 "tangled.org/core/spindle/config"
2318 "tangled.org/core/spindle/engine"
2419 "tangled.org/core/spindle/engines/qemu/bakers"
2020+ "tangled.org/core/spindle/engines/qemu/virt"
2521 "tangled.org/core/spindle/models"
2622 "tangled.org/core/spindle/secrets"
2723)
···8177 return nil, err
8278 }
83798484- swf.Data = vmState{img: img}
8080+ swf.Data = &virt.VMState{Img: img}
8581 return swf, nil
8682}
87838884// discover and resolve kernel, initrd, and disk and other config from an image subfolder
8989-func (e *Engine) resolveImage(name string) (ResolvedImage, error) {
9090- var img ResolvedImage
8585+func (e *Engine) resolveImage(name string) (virt.ResolvedImage, error) {
8686+ var img virt.ResolvedImage
9187 if name == "" {
9288 name = e.cfg.QemuPipelines.DefaultImage
9389 }
···10298 configPath := bakers.ConfigPath(imageDir)
10399104100 if _, err := os.Stat(diskPath); err == nil {
105105- img.disk = diskPath
101101+ img.Disk = diskPath
106102 }
107103 if _, err := os.Stat(kernelPath); err == nil {
108108- img.kernel = kernelPath
104104+ img.Kernel = kernelPath
109105 }
110106 if _, err := os.Stat(initrdPath); err == nil {
111111- img.initrd = initrdPath
107107+ img.Initrd = initrdPath
112108 }
113109 if b, err := os.ReadFile(configPath); err == nil {
114110 var meta bakers.ImageMetadata
115111 if err := json.Unmarshal(b, &meta); err == nil {
116112 if meta.Cmdline != "" {
117117- img.cmdline = meta.Cmdline
113113+ img.Cmdline = meta.Cmdline
118114 }
119115 if meta.Shell != "" {
120120- img.shell = meta.Shell
116116+ img.Shell = meta.Shell
121117 }
122118 }
123119 }
124120 if b, err := os.ReadFile(filepath.Join(imageDir, bakers.UserDataName)); err == nil {
125125- img.userData = string(b)
121121+ img.UserData = string(b)
126122 }
127123128128- if img.disk == "" {
124124+ if img.Disk == "" {
129125 return img, fmt.Errorf("missing '%s' in %s", bakers.DiskName, imageDir)
130126 }
131131- if img.kernel != "" && (img.initrd == "" || img.cmdline == "") {
127127+ if img.Kernel != "" && (img.Initrd == "" || img.Cmdline == "") {
132128 return img, fmt.Errorf("kernel requires initrd and cmdline, but 'initrd' and/or 'cmdline' is missing for %s", name)
133129 }
134134- if img.shell == "" {
130130+ if img.Shell == "" {
135131 return img, fmt.Errorf("shell is not configured for %s", name)
136132 }
137133 return img, nil
···147143 wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusStart).Write([]byte{0})
148144 defer wfLogger.ControlWriter(setupStepIdx, setupStep, models.StepStatusEnd).Write([]byte{0})
149145150150- // some systems have tmpfs at /tmp which is not ideal for large workloads
151151- targetTempDir := e.cfg.QemuPipelines.OverlayDir
152152- if targetTempDir == "" {
153153- targetTempDir = os.TempDir()
154154- }
155155-156156- tempDir, err := os.MkdirTemp(targetTempDir, "qemu-wf-*")
157157- if err != nil {
158158- return err
159159- }
160160- e.registerCleanup(wid, func(ctx context.Context) error {
161161- return os.RemoveAll(tempDir)
162162- })
163163-164164- state := wf.Data.(vmState)
165165- img := state.img
166166-167167- if err := generateSeedISO(tempDir, img.userData); err != nil {
168168- return fmt.Errorf("generating seed iso: %w", err)
169169- }
170170-171171- qmpSock := filepath.Join(tempDir, "qmp.sock")
172172- qgaSock := filepath.Join(tempDir, "qga.sock")
173173-174174- // todo(dawn): ideally would be nice if we used qemu with the microvm enabled here...
175175- // but that is not compatible with cloud-init since it expects real hw enumeration...
176176- // and we would not be able to use standard cloud images, which is kind of annoying.
177177- // we also have to manage a virtiofsd process for the filesystem, instead of having to
178178- // manage a qcow overlay (see https://ubuntu.com/server/docs/explanation/virtualisation/qemu-microvm/).
179179- // also also need to be able to have some scripts for generating our own images
180180- // (though we should already do this anyway since some like alpine don't provide
181181- // "cloud ready" image files, at least with kernel and initrd)
182182- argv := []string{
183183- // todo(dawn): ideally probably have "tiers" and let the spindle concile the tier using
184184- // what the user wants and what the user has exposed to them by the spindle operator?
185185- "-m", e.cfg.QemuPipelines.Memory, "-smp", fmt.Sprintf("%d", e.cfg.QemuPipelines.SMP),
186186- "-display", "none", "-monitor", "none", "-nodefaults", "-no-user-config",
187187- // use snapshot=on to do copy-on-write without having us manage qcow overlays manually
188188- "-drive", fmt.Sprintf("file=%s,media=disk,snapshot=on,if=virtio", img.disk),
189189- "-drive", fmt.Sprintf("file=%s,media=cdrom", bakers.SeedISOPath(tempDir)),
190190- "-netdev", "user,id=net0",
191191- "-device", "virtio-net-pci,netdev=net0",
192192- "-qmp", fmt.Sprintf("unix:%s,server,nowait", qmpSock),
193193- "-chardev", fmt.Sprintf("socket,path=%s,server,nowait,id=qga0", qgaSock),
194194- "-device", "virtio-serial",
195195- "-device", "virtserialport,chardev=qga0,name=org.qemu.guest_agent.0",
196196- }
197197-198198- if e.cfg.Server.Dev {
199199- argv = append(argv, "-serial", "stdio")
200200- } else {
201201- argv = append(argv, "-serial", "none")
202202- }
203203-204204- // support booting using qemu bios still, but otherwise we make it faster!
205205- // incase someone wants to do this for whatever reason...
206206- if img.kernel != "" {
207207- argv = append(argv, "-kernel", img.kernel)
208208- if img.initrd != "" {
209209- argv = append(argv, "-initrd", img.initrd)
210210- }
211211- if img.cmdline != "" {
212212- argv = append(argv, "-append", img.cmdline)
213213- }
214214- } else {
215215- // booting with bios: explicitly select disk
216216- argv = append(argv, "-boot", "order=c")
217217- }
218218-219219- enableKVM := e.cfg.QemuPipelines.EnableKVM
220220- if _, err := os.Stat("/dev/kvm"); err != nil {
221221- if enableKVM {
222222- l.Warn("kvm was requested but /dev/kvm is not accessible; falling back to software emulation", "error", err)
223223- }
224224- enableKVM = false
225225- }
226226- if enableKVM {
227227- argv = append(argv, "-enable-kvm", "-cpu", "host")
228228- }
229229-230230- // todo(dawn): same with above, we assume x86_64 here, but should allow other archs,
231231- // probably just auto detect as a default
232232- qemuCmd := exec.Command("qemu-system-x86_64", argv...)
233233- qemuCmd.Env = append(os.Environ(), "TMPDIR="+tempDir)
234234- qemuCmd.Stdout = os.Stdout
235235- qemuCmd.Stderr = os.Stderr
236236-237237- startedBootAt := time.Now()
238238- if err := qemuCmd.Start(); err != nil {
239239- return fmt.Errorf("starting qemu: %w", err)
240240- }
241241-242242- var mon *qmp.SocketMonitor
243243-244244- // cleanup qemu if we fail to setup at any point below
245245- setupOk := false
246246- defer func() {
247247- if !setupOk {
248248- _ = qemuCmd.Process.Kill()
249249- if mon != nil {
250250- _ = mon.Disconnect()
251251- }
252252- }
253253- }()
254254-255255- qmpCtx, cancelQmp := context.WithTimeout(ctx, 10*time.Second)
256256- defer cancelQmp()
257257-258258- // wait for qmp to be ready
259259- for {
260260- mon, err = qmp.NewSocketMonitor("unix", qmpSock, 2*time.Second)
261261- if err == nil {
262262- if err = mon.Connect(); err == nil {
263263- break
264264- }
265265- }
266266- select {
267267- case <-qmpCtx.Done():
268268- return fmt.Errorf("qmp connect timeout: %w", err)
269269- case <-time.After(10 * time.Millisecond):
270270- }
271271- }
146146+ state := wf.Data.(*virt.VMState)
147147+ img := state.Img
272148273273- status, err := e.qmpQueryStatus(mon)
149149+ actualState, err := virt.StartVM(ctx, virt.VMConfig{
150150+ Memory: e.cfg.QemuPipelines.Memory,
151151+ SMP: e.cfg.QemuPipelines.SMP,
152152+ EnableKVM: e.cfg.QemuPipelines.EnableKVM,
153153+ Dev: e.cfg.Server.Dev,
154154+ OverlayDir: e.cfg.QemuPipelines.OverlayDir,
155155+ }, img, l)
274156 if err != nil {
275157 return err
276158 }
277159278278- l.Info("qemu guest status", "status", status)
279279- if status != "running" {
280280- return fmt.Errorf("qemu guest not running (status: %s)", status)
281281- }
282282-283283- qgaCtx, cancelQga := context.WithTimeout(ctx, time.Minute)
284284- defer cancelQga()
285285-286286- // wait for guest agent to be ready (aka boot)
287287- for {
288288- if _, err := os.Stat(qgaSock); err == nil {
289289- pingCtx, cancelPing := context.WithTimeout(qgaCtx, 5*time.Second)
290290- err = e.qgaGuestPing(pingCtx, qgaSock)
291291- cancelPing()
292292- if err == nil {
293293- l.Info("vm booted and guest-agent ready", "elapsed", time.Since(startedBootAt).Round(time.Millisecond))
294294- break
295295- }
296296- l.Debug("qga guest-ping failed", "error", err)
297297- } else {
298298- l.Debug("qga socket not found yet", "path", qgaSock)
299299- }
300300-301301- select {
302302- case <-qgaCtx.Done():
303303- return fmt.Errorf("qga connect timeout: %w", err)
304304- case <-time.After(100 * time.Millisecond):
305305- }
306306- }
307307-308160 e.registerCleanup(wid, func(ctx context.Context) error {
309161 // graceful powerdown so guest can sync filesystem
310310- if err := e.qmpSystemPowerdown(mon); err != nil {
162162+ if err := actualState.QMPSystemPowerdown(); err != nil {
311163 l.Error("failed to powerdown qemu guest", "workflow", wid, "error", err)
312164 }
313165314166 done := make(chan error, 1)
315315- go func() { done <- qemuCmd.Wait() }()
167167+ go func() {
168168+ _, err := actualState.Process.Wait()
169169+ done <- err
170170+ }()
316171317172 select {
318173 case <-done:
319174 case <-time.After(5 * time.Second):
320320- _ = qemuCmd.Process.Kill()
175175+ _ = actualState.Process.Kill()
321176 <-done // drain to avoid zombie
322177 }
323178324324- _ = mon.Disconnect()
179179+ _ = actualState.QMPMon.Disconnect()
180180+ _ = os.RemoveAll(actualState.TempDir)
325181 return nil
326182 })
327183328328- wf.Data = vmState{
329329- process: qemuCmd.Process,
330330- qmpMon: mon,
331331- qgaPath: qgaSock,
332332- tempDir: tempDir,
333333- img: img,
334334- }
335335-336336- setupOk = true
184184+ wf.Data = actualState
337185 return nil
338186}
339187340340-func (e *Engine) qmpRun(mon *qmp.SocketMonitor, command qmp.Command) ([]byte, error) {
341341- b, err := json.Marshal(command)
342342- if err != nil {
343343- return nil, err
344344- }
345345- return mon.Run(b)
346346-}
347347-348348-// sends a command to the qemu guest agent and returns the response
349349-func (e *Engine) qgaRun(ctx context.Context, sock string, command qmp.Command) ([]byte, error) {
350350- b, err := json.Marshal(command)
351351- if err != nil {
352352- return nil, err
353353- }
354354-355355- conn, err := (&net.Dialer{}).DialContext(ctx, "unix", sock)
356356- if err != nil {
357357- return nil, err
358358- }
359359- defer conn.Close()
360360-361361- if dl, ok := ctx.Deadline(); ok {
362362- _ = conn.SetDeadline(dl)
363363- }
364364-365365- if _, err := conn.Write(append(b, '\n')); err != nil {
366366- return nil, err
367367- }
368368-369369- return bufio.NewReader(conn).ReadBytes('\n')
370370-}
371371-372372-// query the qemu guest status
373373-func (e *Engine) qmpQueryStatus(mon *qmp.SocketMonitor) (string, error) {
374374- raw, err := e.qmpRun(mon, qmp.Command{Execute: "query-status"})
375375- if err != nil {
376376- return "", fmt.Errorf("qmp query-status failed: %w", err)
377377- }
378378-379379- var resp struct {
380380- Return struct {
381381- Status string `json:"status"`
382382- } `json:"return"`
383383- }
384384- if err := json.Unmarshal(raw, &resp); err != nil {
385385- return "", fmt.Errorf("qmp query-status parse: %w", err)
386386- }
387387- return resp.Return.Status, nil
388388-}
389389-390390-// send a command to qemu to powerdown the guest gracefully
391391-func (e *Engine) qmpSystemPowerdown(mon *qmp.SocketMonitor) error {
392392- _, err := e.qmpRun(mon, qmp.Command{Execute: "system_powerdown"})
393393- return err
394394-}
395395-396396-// ping the guest agent to see if it's ready
397397-func (e *Engine) qgaGuestPing(ctx context.Context, sock string) error {
398398- _, err := e.qgaRun(ctx, sock, qmp.Command{Execute: "guest-ping"})
399399- return err
400400-}
401401-402402-// execute a command on the guest and return its pid
403403-func (e *Engine) qgaGuestExec(ctx context.Context, sock string, shell string, command string, env []string) (int, error) {
404404- cmdArgs := []string{shell, "-c", command}
405405- raw, err := e.qgaRun(ctx, sock, qmp.Command{
406406- Execute: "guest-exec",
407407- Args: map[string]any{
408408- "path": shell,
409409- "arg": cmdArgs[1:],
410410- "env": env,
411411- "capture-output": true,
412412- },
413413- })
414414- if err != nil {
415415- return 0, fmt.Errorf("qga guest-exec: %w", err)
416416- }
417417-418418- var resp struct {
419419- Return struct {
420420- Pid int `json:"pid"`
421421- } `json:"return"`
422422- }
423423- if err := json.Unmarshal(raw, &resp); err != nil {
424424- return 0, fmt.Errorf("qga guest-exec parse: %w", err)
425425- }
426426- return resp.Return.Pid, nil
427427-}
428428-429429-type guestExecStatus struct {
430430- Exited bool
431431- ExitCode int
432432- // these are since the last time we asked for status
433433- OutData []byte
434434- ErrData []byte
435435-}
436436-437437-// get the status of a command executed with guest-exec
438438-func (e *Engine) qgaGuestExecStatus(ctx context.Context, sock string, pid int) (guestExecStatus, error) {
439439- var status guestExecStatus
440440- raw, err := e.qgaRun(ctx, sock, qmp.Command{
441441- Execute: "guest-exec-status",
442442- Args: map[string]any{"pid": pid},
443443- })
444444- if err != nil {
445445- return status, fmt.Errorf("qga guest-exec-status: %w", err)
446446- }
447447-448448- var resp struct {
449449- Return struct {
450450- Exited bool `json:"exited"`
451451- ExitCode int `json:"exitcode"`
452452- OutData string `json:"out-data"`
453453- ErrData string `json:"err-data"`
454454- } `json:"return"`
455455- }
456456- if err := json.Unmarshal(raw, &resp); err != nil {
457457- return status, fmt.Errorf("qga guest-exec-status parse: %w", err)
458458- }
459459-460460- status.Exited = resp.Return.Exited
461461- status.ExitCode = resp.Return.ExitCode
462462- if resp.Return.OutData != "" {
463463- status.OutData, _ = base64.StdEncoding.DecodeString(resp.Return.OutData)
464464- }
465465- if resp.Return.ErrData != "" {
466466- status.ErrData, _ = base64.StdEncoding.DecodeString(resp.Return.ErrData)
467467- }
468468-469469- return status, nil
470470-}
471471-472188func (e *Engine) RunStep(ctx context.Context, wid models.WorkflowId, w *models.Workflow, idx int, secrets []secrets.UnlockedSecret, wfLogger models.WorkflowLogger) error {
473473- state := w.Data.(vmState)
189189+ state := w.Data.(*virt.VMState)
474190 step := w.Steps[idx]
475191476192 env := make([]string, 0, len(w.Environment)+len(secrets))
···486202 }
487203 }
488204489489- pid, err := e.qgaGuestExec(ctx, state.qgaPath, state.img.shell, step.Command(), env)
205205+ pid, err := state.QGAGuestExec(ctx, step.Command(), env)
490206 if err != nil {
491207 return err
492208 }
493209494210 for {
495495- status, err := e.qgaGuestExecStatus(ctx, state.qgaPath, pid)
211211+ status, err := state.QGAGuestExecStatus(ctx, pid)
496212 if err != nil {
497213 return err
498214 }
-8
spindle/engines/qemu/errors.go
···11-package qemu
22-33-import "errors"
44-55-var (
66- ErrOOMKilled = errors.New("container died due to OOM kill")
77- ErrBootTimeout = errors.New("timed out waiting for VM to boot")
88-)