Deployment and lifecycle management for Nix
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

go-cli: drop builder and channel client

-578
-379
cmd/cli/builder/main.go
··· 1 - package builder 2 - 3 - import ( 4 - "bufio" 5 - "context" 6 - "encoding/json" 7 - "errors" 8 - "fmt" 9 - "log/slog" 10 - "net/url" 11 - "os" 12 - "os/exec" 13 - "strings" 14 - "sync" 15 - 16 - "codeberg.org/adamcstephens/sower/cmd/cli/commands" 17 - "github.com/golang-queue/queue" 18 - ) 19 - 20 - type evalResult struct { 21 - Attr string `json:"attr"` 22 - AttrPath []string `json:"attrPath"` 23 - DrvPath string `json:"drvPath"` 24 - Error string `json:"error"` 25 - InputDrvs inputDrv `json:"inputDrvs"` 26 - Name string `json:"name"` 27 - Outputs map[string]string `json:"outputs"` 28 - System string `json:"system"` 29 - } 30 - 31 - type inputDrv map[string][]string 32 - 33 - // Uploader defines the interface for pushing built outputs to a remote cache 34 - type Uploader interface { 35 - Push(outputs []string) error 36 - } 37 - 38 - // AtticUploader pushes to an Attic cache 39 - type AtticUploader struct { 40 - Cache string 41 - Jobs string 42 - } 43 - 44 - // Push implements Uploader for AtticUploader 45 - func (a *AtticUploader) Push(outputs []string) error { 46 - outputStr := strings.Join(outputs, "\n") 47 - 48 - // Create temporary file to capture output (prevents noisy progress bars in CI) 49 - logFile, err := os.CreateTemp("", "attic-push-*.log") 50 - if err != nil { 51 - return fmt.Errorf("failed to create temp log file: %v", err) 52 - } 53 - defer os.Remove(logFile.Name()) 54 - defer logFile.Close() 55 - 56 - pushCmd := exec.Command("attic", "push", "--stdin", "--ignore-upstream-cache-filter", "--jobs", a.Jobs, a.Cache) 57 - 58 - // Redirect both stdout and stderr to log file (progress bars overwrite in file) 59 - pushCmd.Stdout = logFile 60 - pushCmd.Stderr = logFile 61 - 62 - stdin, err := pushCmd.StdinPipe() 63 - if err != nil { 64 - return fmt.Errorf("error creating stdin: %v", err) 65 - } 66 - 67 - err = pushCmd.Start() 68 - if err != nil { 69 - return fmt.Errorf("failed to start command: %v", err) 70 - } 71 - 72 - _, err = stdin.Write([]byte(outputStr)) 73 - if err != nil { 74 - return fmt.Errorf("failed to send stdin to push: %v", err) 75 - } 76 - stdin.Close() 77 - 78 - err = pushCmd.Wait() 79 - if err != nil { 80 - // Display log on error 81 - if _, seekErr := logFile.Seek(0, 0); seekErr != nil { 82 - slog.Warn("Failed to seek log file", "error", seekErr) 83 - } 84 - logContent, _ := os.ReadFile(logFile.Name()) 85 - slog.Error("Attic push failed", "output", string(logContent)) 86 - return fmt.Errorf("failed to wait for run command: %v", err) 87 - } 88 - 89 - // Display final output (without progress bar spam) 90 - if _, err := logFile.Seek(0, 0); err != nil { 91 - slog.Warn("Failed to seek log file", "error", err) 92 - return nil 93 - } 94 - logContent, err := os.ReadFile(logFile.Name()) 95 - if err != nil { 96 - slog.Warn("Failed to read attic output", "error", err) 97 - } else if len(logContent) > 0 { 98 - fmt.Fprint(os.Stdout, string(logContent)) 99 - } 100 - 101 - return nil 102 - } 103 - 104 - // NixCopyUploader pushes using nix copy 105 - type NixCopyUploader struct { 106 - Remote string 107 - } 108 - 109 - // Push implements Uploader for NixCopyUploader 110 - func (n *NixCopyUploader) Push(outputs []string) error { 111 - args := append([]string{"copy", "--to", n.Remote}, outputs...) 112 - cmd := exec.Command("nix", args...) 113 - 114 - err := commands.SimpleRun(cmd) 115 - if err != nil { 116 - return fmt.Errorf("failed to run nix copy: %v", err) 117 - } 118 - 119 - return nil 120 - } 121 - 122 - // MultiUploader pushes to multiple targets in parallel 123 - type MultiUploader struct { 124 - Uploaders []Uploader 125 - } 126 - 127 - // Push implements Uploader for MultiUploader 128 - func (m *MultiUploader) Push(outputs []string) error { 129 - var wg sync.WaitGroup 130 - errChan := make(chan error, len(m.Uploaders)) 131 - 132 - for _, u := range m.Uploaders { 133 - wg.Add(1) 134 - go func(uploader Uploader) { 135 - defer wg.Done() 136 - if err := uploader.Push(outputs); err != nil { 137 - slog.Error("Failed to push to target", "error", err) 138 - errChan <- err 139 - } 140 - }(u) 141 - } 142 - 143 - wg.Wait() 144 - close(errChan) 145 - 146 - // Collect all errors 147 - var errs []error 148 - for err := range errChan { 149 - errs = append(errs, err) 150 - } 151 - 152 - if len(errs) > 0 { 153 - return fmt.Errorf("failed to push to %d target(s): %w", len(errs), errors.Join(errs...)) 154 - } 155 - return nil 156 - } 157 - 158 - // NewUploader creates an Uploader from a target string 159 - func NewUploader(target string) (Uploader, error) { 160 - parts := strings.SplitN(target, ":", 2) 161 - if len(parts) != 2 { 162 - return nil, fmt.Errorf("invalid target format: %s (expected type:target)", target) 163 - } 164 - 165 - uploadType := parts[0] 166 - targetSpec := parts[1] 167 - 168 - switch uploadType { 169 - case "attic": 170 - // Parse cache name and optional query params 171 - cache := targetSpec 172 - jobs := "20" // default 173 - 174 - // Check if there are query params 175 - if idx := strings.Index(targetSpec, "?"); idx != -1 { 176 - cache = targetSpec[:idx] 177 - queryStr := targetSpec[idx+1:] 178 - 179 - params, err := url.ParseQuery(queryStr) 180 - if err != nil { 181 - return nil, fmt.Errorf("invalid query parameters in attic target: %v", err) 182 - } 183 - 184 - if j := params.Get("jobs"); j != "" { 185 - jobs = j 186 - } 187 - } 188 - 189 - return &AtticUploader{Cache: cache, Jobs: jobs}, nil 190 - 191 - case "nix-copy": 192 - return &NixCopyUploader{Remote: targetSpec}, nil 193 - 194 - default: 195 - return nil, fmt.Errorf("unknown upload target type %q, supported types: attic, nix-copy", uploadType) 196 - } 197 - } 198 - 199 - // NewMultiUploader creates an Uploader from multiple target strings 200 - func NewMultiUploader(targets []string) (Uploader, error) { 201 - if len(targets) == 0 { 202 - return nil, fmt.Errorf("at least one upload target is required") 203 - } 204 - 205 - // Single target - return simple uploader (no wrapper overhead) 206 - if len(targets) == 1 { 207 - return NewUploader(targets[0]) 208 - } 209 - 210 - // Multiple targets - create MultiUploader 211 - uploaders := make([]Uploader, 0, len(targets)) 212 - for _, target := range targets { 213 - u, err := NewUploader(target) 214 - if err != nil { 215 - return nil, err 216 - } 217 - uploaders = append(uploaders, u) 218 - } 219 - 220 - return &MultiUploader{Uploaders: uploaders}, nil 221 - } 222 - 223 - func Push(workers int, system string, uploadTargets []string) error { 224 - uploader, err := NewMultiUploader(uploadTargets) 225 - if err != nil { 226 - return err 227 - } 228 - 229 - q := queue.NewPool(int64(workers)) 230 - defer q.Release() 231 - 232 - // Create a closure that captures the uploader 233 - pushResultFunc := func(result evalResult) error { 234 - return pushResult(result, uploader) 235 - } 236 - 237 - err = evalJobs(workers, system, q, pushResultFunc) 238 - if err != nil { 239 - return err 240 - } 241 - 242 - if q.FailureTasks() > 0 { 243 - return fmt.Errorf("failed to build one or more output") 244 - } 245 - 246 - return nil 247 - } 248 - 249 - func Build(workers int, system string) error { 250 - q := queue.NewPool(int64(workers)) 251 - defer q.Release() 252 - 253 - err := evalJobs(workers, system, q, buildResult) 254 - if err != nil { 255 - return err 256 - } 257 - 258 - if q.FailureTasks() > 0 { 259 - return fmt.Errorf("failed to build one or more output") 260 - } 261 - 262 - return nil 263 - } 264 - 265 - func Eval(workers int, system string) error { 266 - q := queue.NewPool(1) 267 - defer q.Release() 268 - 269 - err := evalJobs(workers, system, q, printResult) 270 - if err != nil { 271 - return err 272 - } 273 - 274 - return nil 275 - } 276 - 277 - func evalJobs(workers int, system string, resultQueue *queue.Queue, resultFunc func(evalResult) error) error { 278 - if workers == 0 { 279 - return fmt.Errorf("no workers specified") 280 - } 281 - 282 - target := ".#sowerJobs" 283 - if system != "" { 284 - target = fmt.Sprintf(".#sowerJobs.%s", system) 285 - } 286 - 287 - cmd := exec.Command("nix-eval-jobs", "--flake", target, "--force-recurse", "--workers", fmt.Sprint(workers)) 288 - 289 - stdout, err := cmd.StdoutPipe() 290 - if err != nil { 291 - return fmt.Errorf("error creating stdout: %v", err) 292 - } 293 - 294 - stdoutDone := make(chan struct{}) 295 - stdoutScanner := bufio.NewScanner(stdout) 296 - 297 - go func() { 298 - for stdoutScanner.Scan() { 299 - var result evalResult 300 - 301 - line := stdoutScanner.Text() 302 - err := json.Unmarshal([]byte(line), &result) 303 - 304 - if err != nil { 305 - slog.Error("failed to parse eval result", "error", err) 306 - continue 307 - } 308 - 309 - if result.Error != "" { 310 - slog.Error("failed eval result", "result", result) 311 - } else { 312 - if err := resultQueue.QueueTask(func(ctx context.Context) error { 313 - err := resultFunc(result) 314 - if err != nil { 315 - return err 316 - } 317 - 318 - return nil 319 - }); err != nil { 320 - panic(err) 321 - } 322 - } 323 - } 324 - 325 - stdoutDone <- struct{}{} 326 - }() 327 - 328 - slog.Debug("Running command", "cmd", cmd.String()) 329 - err = cmd.Start() 330 - if err != nil { 331 - return fmt.Errorf("error starting command: %v", err) 332 - } 333 - 334 - <-stdoutDone 335 - err = cmd.Wait() 336 - if err != nil { 337 - return fmt.Errorf("failure during nix-eval-jobs: %v", err) 338 - } 339 - 340 - return nil 341 - } 342 - 343 - func buildResult(result evalResult) error { 344 - slog.Debug("Building result", "result", result) 345 - err := commands.SimpleRun(exec.Command("nix", "build", fmt.Sprintf("%v^*", result.DrvPath))) 346 - if err != nil { 347 - return fmt.Errorf("failed to build: %v", err) 348 - } 349 - 350 - return nil 351 - } 352 - 353 - func printResult(result evalResult) error { 354 - slog.Info("Eval result", "result", result) 355 - 356 - return nil 357 - } 358 - 359 - func pushResult(result evalResult, uploader Uploader) error { 360 - outputs, err := commands.Run(exec.Command("nix", "build", "--print-out-paths", fmt.Sprintf("%v^*", result.DrvPath))) 361 - if err != nil { 362 - return fmt.Errorf("failed to build: %v", err) 363 - } 364 - 365 - output_list := strings.Split(outputs, "\n") 366 - // Filter out empty strings 367 - var filteredOutputs []string 368 - for _, output := range output_list { 369 - if output != "" { 370 - filteredOutputs = append(filteredOutputs, output) 371 - } 372 - } 373 - 374 - slog.Debug("Build result", "outputs", filteredOutputs) 375 - 376 - _ = printResult(result) 377 - 378 - return uploader.Push(filteredOutputs) 379 - }
-102
cmd/cli/channel-client.go
··· 1 - package main 2 - 3 - import ( 4 - "fmt" 5 - "log/slog" 6 - "net/url" 7 - 8 - "github.com/nshafer/phx" 9 - ) 10 - 11 - type channelClient struct { 12 - config config 13 - socket *phx.Socket 14 - } 15 - 16 - func newClient(config config) *channelClient { 17 - return &channelClient{config: config} 18 - } 19 - 20 - func (c *channelClient) connect() error { 21 - slog.Debug("Starting daemon") 22 - 23 - endpoint, err := url.Parse(fmt.Sprintf("%s/client", c.config.Endpoint)) 24 - if err != nil { 25 - return fmt.Errorf("failed to parse endpoint: %v", err) 26 - } 27 - endpoint.RawQuery = fmt.Sprintf("token=%s", url.QueryEscape(c.config.AccessToken)) 28 - 29 - socket := phx.NewSocket(endpoint) 30 - socket.Logger = &logger{} 31 - 32 - // Wait for the socket to connect before continuing. If it's not able to, it will keep 33 - // retrying forever. 34 - cont := make(chan bool) 35 - socket.OnOpen(func() { 36 - cont <- true 37 - }) 38 - socket.OnError(func(err error) { 39 - slog.Error("failed to open socket connection", "error", err) 40 - }) 41 - 42 - // Tell the socket to connect (or start retrying until it can connect) 43 - err = socket.Connect() 44 - if err != nil { 45 - slog.Error("failed to connect to server", "error", err) 46 - } 47 - 48 - // Wait for the connection 49 - <-cont 50 - 51 - c.socket = socket 52 - 53 - err = c.joinLobby() 54 - if err != nil { 55 - slog.Error("failed to join lobby", "error", err) 56 - } 57 - 58 - return nil 59 - } 60 - 61 - func (c *channelClient) run() { 62 - select {} 63 - } 64 - 65 - func (c *channelClient) joinLobby() error { 66 - cont := make(chan bool) 67 - channel := c.socket.Channel("client:all", nil) 68 - 69 - join, err := channel.Join() 70 - if err != nil { 71 - return fmt.Errorf("failed to join client:all") 72 - } 73 - 74 - // ensure successfully joined 75 - join.Receive("ok", func(response any) { 76 - cont <- true 77 - }) 78 - 79 - <-cont 80 - 81 - return nil 82 - } 83 - 84 - // func (c *channelClient) submitSeed(seed seed.Seed) error { 85 - // cont := make(chan error) 86 - // channel := c.socket.Channel("client:all", nil) 87 - // 88 - // push, err := channel.Push("seed:submit", seed) 89 - // if err != nil { 90 - // return fmt.Errorf("failed to push seed:submit") 91 - // } 92 - // 93 - // push.Receive("ok", func(response any) { 94 - // cont <- nil 95 - // }) 96 - // 97 - // push.Receive("error", func(response any) { 98 - // cont <- fmt.Errorf("failed to submit seed") 99 - // }) 100 - // 101 - // return <-cont 102 - // }
-28
cmd/cli/log.go
··· 1 - package main 2 - 3 - import ( 4 - "fmt" 5 - "log/slog" 6 - 7 - "github.com/nshafer/phx" 8 - ) 9 - 10 - type logger struct{} 11 - 12 - func (l *logger) Print(level phx.LoggerLevel, kind string, v ...any) { l.Println(level, kind, v) } 13 - func (l *logger) Println(level phx.LoggerLevel, kind string, v ...any) { 14 - switch level { 15 - case phx.LogDebug: 16 - slog.Debug(fmt.Sprintf("%v", v)) 17 - case phx.LogInfo: 18 - slog.Info(fmt.Sprintf("%v", v)) 19 - case phx.LogWarning: 20 - slog.Warn(fmt.Sprintf("%v", v)) 21 - case phx.LogError: 22 - slog.Error(fmt.Sprintf("%v", v)) 23 - } 24 - } 25 - 26 - func (l *logger) Printf(level phx.LoggerLevel, kind string, format string, v ...any) { 27 - l.Println(level, kind, fmt.Sprintf(format, v...)) 28 - }
-69
cmd/cli/main.go
··· 12 12 "time" 13 13 14 14 "codeberg.org/adamcstephens/sower/client-go" 15 - "codeberg.org/adamcstephens/sower/cmd/cli/builder" 16 15 "github.com/adrg/xdg" 17 16 "github.com/alexflint/go-arg" 18 17 "github.com/lmittmann/tint" ··· 22 21 var version = "dev" 23 22 24 23 type config struct { 25 - Builder *builderCmd `arg:"subcommand:builder"` 26 - Daemon *daemonCmd `arg:"subcommand:daemon"` 27 24 Seed *seedCmd `arg:"subcommand:seed"` 28 25 Services *servicesCmd `arg:"subcommand:services"` 29 26 ··· 33 30 Debug bool `arg:"--debug"` 34 31 Endpoint string `arg:"--endpoint,-e,env:SOWER_ENDPOINT"` 35 32 Version bool `arg:"--version"` 36 - } 37 - 38 - type builderCmd struct { 39 - Eval *builderEvalCmd `arg:"subcommand:eval"` 40 - Build *builderBuildCmd `arg:"subcommand:build"` 41 - Push *builderPushCmd `arg:"subcommand:push"` 42 - } 43 - 44 - type builderBuildCmd struct { 45 - Workers int `arg:"--workers,-w"` 46 - System string `arg:"--system"` 47 - } 48 - 49 - type builderEvalCmd struct { 50 - Workers int `arg:"--workers,-w"` 51 - System string `arg:"--system"` 52 - } 53 - 54 - type builderPushCmd struct { 55 - Workers int `arg:"--workers,-w"` 56 - System string `arg:"--system"` 57 - Targets []string `arg:"--target,-t,required,separate" help:"Upload target (attic:<cache>[?jobs=N] or nix-copy:<remote>). Can be repeated."` 58 - } 59 - 60 - type daemonCmd struct { 61 33 } 62 34 63 35 type seedCmd struct { ··· 190 162 case parseResult != nil: 191 163 slog.Error("Unknown error", "error", parseResult) 192 164 os.Exit(1) 193 - case args.Builder != nil: 194 - buildCommand(cfg) 195 - case args.Daemon != nil: 196 - daemonCommand(cfg) 197 165 case args.Seed != nil: 198 166 err := seedSubcommand(cfg) 199 167 if err != nil { ··· 228 196 })) 229 197 230 198 slog.SetDefault(logger) 231 - } 232 - 233 - func buildCommand(cfg config) { 234 - switch { 235 - case cfg.Builder.Build != nil: 236 - err := builder.Build(cfg.Builder.Build.Workers, cfg.Builder.Build.System) 237 - if err != nil { 238 - slog.Error("Failed to eval", "error", err) 239 - os.Exit(1) 240 - } 241 - case cfg.Builder.Eval != nil: 242 - err := builder.Eval(cfg.Builder.Eval.Workers, cfg.Builder.Eval.System) 243 - if err != nil { 244 - slog.Error("Failed to eval", "error", err) 245 - os.Exit(1) 246 - } 247 - case cfg.Builder.Push != nil: 248 - err := builder.Push(cfg.Builder.Push.Workers, cfg.Builder.Push.System, cfg.Builder.Push.Targets) 249 - if err != nil { 250 - slog.Error("Failed to eval", "error", err) 251 - os.Exit(1) 252 - } 253 - default: 254 - slog.Error("No subcommand specified") 255 - os.Exit(1) 256 - } 257 - } 258 - 259 - func daemonCommand(cfg config) { 260 - client := newClient(cfg) 261 - 262 - err := client.connect() 263 - if err != nil { 264 - slog.Error("failed to connect") 265 - } 266 - 267 - client.run() 268 199 } 269 200 270 201 func seedSubcommand(cfg config) error {