Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

spindle: limit concurrent workflows to cap total memory usage

Add a channel-based semaphore (SPINDLE_SERVER_MAX_CONCURRENT_WORKFLOWS,
default 8) that blocks workflow goroutines from starting a container until
a slot is free. Each workflow acquires a slot before SetupWorkflow and
releases it on exit via defer.

Combined with the 6 GiB per-container limit (MAX_JOB_MEMORY_MB), this
bounds total container memory to ~48 GiB on the current 64 GiB host.
MAX_JOB_COUNT still controls pipeline-level concurrency at the queue.
Without this semaphore, max workflows were effectively unbounded and
(wrongly) assumed to be capped at MAX_JOB_COUNT.

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.org>

authored by

Anirudh Oppiliappan and committed by
Tangled
36a59c30 5dde2df5

+27 -17
+2 -1
spindle/config/config.go
··· 19 19 Secrets Secrets `env:",prefix=SECRETS_"` 20 20 LogDir string `env:"LOG_DIR, default=/var/log/spindle"` 21 21 QueueSize int `env:"QUEUE_SIZE, default=100"` 22 - MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of jobs that run at a time 22 + MaxJobCount int `env:"MAX_JOB_COUNT, default=2"` // max number of pipelines that run at a time 23 + MaxConcurrentWorkflows int `env:"MAX_CONCURRENT_WORKFLOWS, default=8"` // max number of workflow containers running at once (memory cap) 23 24 } 24 25 25 26 func (s Server) Did() syntax.DID {
+5 -1
spindle/engine/engine.go
··· 21 21 ErrWorkflowFailed = errors.New("workflow failed") 22 22 ) 23 23 24 - func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 24 + func StartWorkflows(l *slog.Logger, vault secrets.Manager, cfg *config.Config, db *db.DB, n *notifier.Notifier, workflowSem chan struct{}, ctx context.Context, pipeline *models.Pipeline, pipelineId models.PipelineId) { 25 25 l.Info("starting all workflows in parallel", "pipeline", pipelineId) 26 26 27 27 // extract secrets ··· 80 80 l.Error("failed to set workflow status to running", "wid", wid, "err", err) 81 81 return 82 82 } 83 + 84 + // acquire semaphore slot before starting the container 85 + workflowSem <- struct{}{} 86 + defer func() { <-workflowSem }() 83 87 84 88 err = eng.SetupWorkflow(ctx, wid, &w, wfLogger) 85 89 if err != nil {
+20 -15
spindle/server.go
··· 48 48 cfg *config.Config 49 49 ks *eventconsumer.Consumer 50 50 res *idresolver.Resolver 51 - vault secrets.Manager 52 - motd []byte 53 - motdMu sync.RWMutex 51 + vault secrets.Manager 52 + motd []byte 53 + motdMu sync.RWMutex 54 + workflowSem chan struct{} 54 55 } 55 56 56 57 // New creates a new Spindle server with the provided configuration and engines. ··· 98 99 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 99 100 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 100 101 102 + workflowSem := make(chan struct{}, cfg.Server.MaxConcurrentWorkflows) 103 + logger.Info("initialized workflow semaphore", "maxConcurrentWorkflows", cfg.Server.MaxConcurrentWorkflows) 104 + 101 105 collections := []string{ 102 106 tangled.SpindleMemberNSID, 103 107 tangled.RepoNSID, ··· 121 125 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 122 126 123 127 spindle := &Spindle{ 124 - jc: jc, 125 - e: e, 126 - db: d, 127 - l: logger, 128 - n: &n, 129 - engs: engines, 130 - jq: jq, 131 - cfg: cfg, 132 - res: resolver, 133 - vault: vault, 134 - motd: defaultMotd, 128 + jc: jc, 129 + e: e, 130 + db: d, 131 + l: logger, 132 + n: &n, 133 + engs: engines, 134 + jq: jq, 135 + cfg: cfg, 136 + res: resolver, 137 + vault: vault, 138 + motd: defaultMotd, 139 + workflowSem: workflowSem, 135 140 } 136 141 137 142 err = e.AddSpindle(rbacDomain) ··· 393 398 394 399 ok := s.jq.Enqueue(queue.Job{ 395 400 Run: func() error { 396 - engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 401 + engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, s.workflowSem, ctx, &models.Pipeline{ 397 402 RepoOwner: tpl.TriggerMetadata.Repo.Did, 398 403 RepoName: repoName, 399 404 Workflows: workflows,