forked from
tangled.org/core
Monorepo for Tangled
1package spindle
2
3import (
4 "context"
5 _ "embed"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "maps"
10 "net/http"
11 "sync"
12
13 "github.com/go-chi/chi/v5"
14 "tangled.org/core/api/tangled"
15 "tangled.org/core/eventconsumer"
16 "tangled.org/core/eventconsumer/cursor"
17 "tangled.org/core/idresolver"
18 "tangled.org/core/jetstream"
19 "tangled.org/core/log"
20 "tangled.org/core/notifier"
21 "tangled.org/core/rbac"
22 "tangled.org/core/spindle/config"
23 "tangled.org/core/spindle/db"
24 "tangled.org/core/spindle/engine"
25 "tangled.org/core/spindle/engines/nixery"
26 "tangled.org/core/spindle/engines/qemu"
27 "tangled.org/core/spindle/models"
28 "tangled.org/core/spindle/queue"
29 "tangled.org/core/spindle/secrets"
30 "tangled.org/core/spindle/xrpc"
31 "tangled.org/core/xrpc/serviceauth"
32)
33
34//go:embed motd
35var defaultMotd []byte
36
37const (
38 rbacDomain = "thisserver"
39)
40
41type Spindle struct {
42 jc *jetstream.JetstreamClient
43 db *db.DB
44 e *rbac.Enforcer
45 l *slog.Logger
46 n *notifier.Notifier
47 engs map[string]models.Engine
48 jq *queue.Queue
49 cfg *config.Config
50 ks *eventconsumer.Consumer
51 res *idresolver.Resolver
52 vault secrets.Manager
53 motd []byte
54 motdMu sync.RWMutex
55}
56
57// New creates a new Spindle server with the provided configuration and engines.
58func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) {
59 logger := log.FromContext(ctx)
60
61 d, err := db.Make(cfg.Server.DBPath)
62 if err != nil {
63 return nil, fmt.Errorf("failed to setup db: %w", err)
64 }
65
66 e, err := rbac.NewEnforcer(cfg.Server.DBPath)
67 if err != nil {
68 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err)
69 }
70 e.E.EnableAutoSave(true)
71
72 n := notifier.New()
73
74 var vault secrets.Manager
75 switch cfg.Server.Secrets.Provider {
76 case "openbao":
77 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" {
78 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider")
79 }
80 vault, err = secrets.NewOpenBaoManager(
81 cfg.Server.Secrets.OpenBao.ProxyAddr,
82 logger,
83 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount),
84 )
85 if err != nil {
86 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err)
87 }
88 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount)
89 case "sqlite", "":
90 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets"))
91 if err != nil {
92 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err)
93 }
94 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath)
95 default:
96 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider)
97 }
98
99 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount)
100 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount)
101
102 collections := []string{
103 tangled.SpindleMemberNSID,
104 tangled.RepoNSID,
105 tangled.RepoCollaboratorNSID,
106 }
107 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true)
108 if err != nil {
109 return nil, fmt.Errorf("failed to setup jetstream client: %w", err)
110 }
111 jc.AddDid(cfg.Server.Owner)
112
113 // Check if the spindle knows about any Dids;
114 dids, err := d.GetAllDids()
115 if err != nil {
116 return nil, fmt.Errorf("failed to get all dids: %w", err)
117 }
118 for _, d := range dids {
119 jc.AddDid(d)
120 }
121
122 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl)
123
124 spindle := &Spindle{
125 jc: jc,
126 e: e,
127 db: d,
128 l: logger,
129 n: &n,
130 engs: engines,
131 jq: jq,
132 cfg: cfg,
133 res: resolver,
134 vault: vault,
135 motd: defaultMotd,
136 }
137
138 err = e.AddSpindle(rbacDomain)
139 if err != nil {
140 return nil, fmt.Errorf("failed to set rbac domain: %w", err)
141 }
142 err = spindle.configureOwner()
143 if err != nil {
144 return nil, err
145 }
146 logger.Info("owner set", "did", cfg.Server.Owner)
147
148 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath)
149 if err != nil {
150 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err)
151 }
152
153 err = jc.StartJetstream(ctx, spindle.ingest())
154 if err != nil {
155 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err)
156 }
157
158 // for each incoming sh.tangled.pipeline, we execute
159 // spindle.processPipeline, which in turn enqueues the pipeline
160 // job in the above registered queue.
161 ccfg := eventconsumer.NewConsumerConfig()
162 ccfg.Logger = log.SubLogger(logger, "eventconsumer")
163 ccfg.Dev = cfg.Server.Dev
164 ccfg.ProcessFunc = spindle.processPipeline
165 ccfg.CursorStore = cursorStore
166 knownKnots, err := d.Knots()
167 if err != nil {
168 return nil, err
169 }
170 for _, knot := range knownKnots {
171 logger.Info("adding source start", "knot", knot)
172 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{}
173 }
174 spindle.ks = eventconsumer.NewConsumer(*ccfg)
175
176 return spindle, nil
177}
178
179// DB returns the database instance.
180func (s *Spindle) DB() *db.DB {
181 return s.db
182}
183
184// Queue returns the job queue instance.
185func (s *Spindle) Queue() *queue.Queue {
186 return s.jq
187}
188
189// Engines returns the map of available engines.
190func (s *Spindle) Engines() map[string]models.Engine {
191 return s.engs
192}
193
194// Vault returns the secrets manager instance.
195func (s *Spindle) Vault() secrets.Manager {
196 return s.vault
197}
198
199// Notifier returns the notifier instance.
200func (s *Spindle) Notifier() *notifier.Notifier {
201 return s.n
202}
203
204// Enforcer returns the RBAC enforcer instance.
205func (s *Spindle) Enforcer() *rbac.Enforcer {
206 return s.e
207}
208
209// SetMotdContent sets custom MOTD content, replacing the embedded default.
210func (s *Spindle) SetMotdContent(content []byte) {
211 s.motdMu.Lock()
212 defer s.motdMu.Unlock()
213 s.motd = content
214}
215
216// GetMotdContent returns the current MOTD content.
217func (s *Spindle) GetMotdContent() []byte {
218 s.motdMu.RLock()
219 defer s.motdMu.RUnlock()
220 return s.motd
221}
222
223// Start starts the Spindle server (blocking).
224func (s *Spindle) Start(ctx context.Context) error {
225 // starts a job queue runner in the background
226 s.jq.Start()
227 defer s.jq.Stop()
228
229 // Stop vault token renewal if it implements Stopper
230 if stopper, ok := s.vault.(secrets.Stopper); ok {
231 defer stopper.Stop()
232 }
233
234 go func() {
235 s.l.Info("starting knot event consumer")
236 s.ks.Start(ctx)
237 }()
238
239 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr)
240 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router())
241}
242
243func Run(ctx context.Context) error {
244 cfg, err := config.Load(ctx)
245 if err != nil {
246 return fmt.Errorf("failed to load config: %w", err)
247 }
248
249 nixeryEng, err := nixery.New(ctx, cfg)
250 if err != nil {
251 return err
252 }
253
254 qemuEng, err := qemu.New(ctx, cfg)
255 if err != nil {
256 return err
257 }
258
259 s, err := New(ctx, cfg, map[string]models.Engine{
260 "nixery": nixeryEng,
261 "qemu": qemuEng,
262 })
263 if err != nil {
264 return err
265 }
266
267 return s.Start(ctx)
268}
269
270func (s *Spindle) Router() http.Handler {
271 mux := chi.NewRouter()
272
273 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
274 w.Write(s.GetMotdContent())
275 })
276 mux.HandleFunc("/events", s.Events)
277 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs)
278
279 mux.Mount("/xrpc", s.XrpcRouter())
280 return mux
281}
282
283func (s *Spindle) XrpcRouter() http.Handler {
284 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String())
285
286 l := log.SubLogger(s.l, "xrpc")
287
288 x := xrpc.Xrpc{
289 Logger: l,
290 Db: s.db,
291 Enforcer: s.e,
292 Engines: s.engs,
293 Config: s.cfg,
294 Resolver: s.res,
295 Vault: s.vault,
296 Notifier: s.Notifier(),
297 ServiceAuth: serviceAuth,
298 }
299
300 return x.Router()
301}
302
303func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error {
304 if msg.Nsid == tangled.PipelineNSID {
305 tpl := tangled.Pipeline{}
306 err := json.Unmarshal(msg.EventJson, &tpl)
307 if err != nil {
308 s.l.Error("failed to unmarshal pipeline event", "err", err)
309 return err
310 }
311
312 if tpl.TriggerMetadata == nil {
313 return fmt.Errorf("no trigger metadata found")
314 }
315
316 if tpl.TriggerMetadata.Repo == nil {
317 return fmt.Errorf("no repo data found")
318 }
319
320 if src.Key() != tpl.TriggerMetadata.Repo.Knot {
321 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot)
322 }
323
324 // filter by repos
325 repoName := ""
326 if tpl.TriggerMetadata.Repo.Repo != nil {
327 repoName = *tpl.TriggerMetadata.Repo.Repo
328 }
329
330 _, err = s.db.GetRepo(
331 tpl.TriggerMetadata.Repo.Knot,
332 tpl.TriggerMetadata.Repo.Did,
333 repoName,
334 )
335 if err != nil {
336 return fmt.Errorf("failed to get repo: %w", err)
337 }
338
339 pipelineId := models.PipelineId{
340 Knot: src.Key(),
341 Rkey: msg.Rkey,
342 }
343
344 workflows := make(map[models.Engine][]models.Workflow)
345
346 // Build pipeline environment variables once for all workflows
347 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId)
348
349 for _, w := range tpl.Workflows {
350 if w != nil {
351 if _, ok := s.engs[w.Engine]; !ok {
352 err = s.db.StatusFailed(models.WorkflowId{
353 PipelineId: pipelineId,
354 Name: w.Name,
355 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n)
356 if err != nil {
357 return fmt.Errorf("db.StatusFailed: %w", err)
358 }
359
360 continue
361 }
362
363 eng := s.engs[w.Engine]
364
365 if _, ok := workflows[eng]; !ok {
366 workflows[eng] = []models.Workflow{}
367 }
368
369 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl)
370 if err != nil {
371 return fmt.Errorf("init workflow: %w", err)
372 }
373
374 // inject TANGLED_* env vars after InitWorkflow
375 // This prevents user-defined env vars from overriding them
376 if ewf.Environment == nil {
377 ewf.Environment = make(map[string]string)
378 }
379 maps.Copy(ewf.Environment, pipelineEnv)
380
381 // if in dev mode we have to replace localhost with the correct host alias
382 if s.cfg.Server.Dev {
383 if repoUrl, ok := ewf.Environment["TANGLED_REPO_URL"]; ok {
384 ewf.Environment["TANGLED_REPO_URL"] = models.RewriteLocalhost(repoUrl, w.Engine)
385 }
386 }
387
388 workflows[eng] = append(workflows[eng], *ewf)
389
390 err = s.db.StatusPending(models.WorkflowId{
391 PipelineId: pipelineId,
392 Name: w.Name,
393 }, s.n)
394 if err != nil {
395 return fmt.Errorf("db.StatusPending: %w", err)
396 }
397 }
398 }
399
400 ok := s.jq.Enqueue(queue.Job{
401 Run: func() error {
402 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{
403 RepoOwner: tpl.TriggerMetadata.Repo.Did,
404 RepoName: repoName,
405 Workflows: workflows,
406 }, pipelineId)
407 return nil
408 },
409 OnFail: func(jobError error) {
410 s.l.Error("pipeline run failed", "error", jobError)
411 },
412 })
413 if ok {
414 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey)
415 } else {
416 s.l.Error("failed to enqueue pipeline: queue is full")
417 }
418 }
419
420 return nil
421}
422
423func (s *Spindle) configureOwner() error {
424 cfgOwner := s.cfg.Server.Owner
425
426 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain)
427 if err != nil {
428 return err
429 }
430
431 switch len(existing) {
432 case 0:
433 // no owner configured, continue
434 case 1:
435 // find existing owner
436 existingOwner := existing[0]
437
438 // no ownership change, this is okay
439 if existingOwner == s.cfg.Server.Owner {
440 break
441 }
442
443 // remove existing owner
444 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner)
445 if err != nil {
446 return nil
447 }
448 default:
449 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath)
450 }
451
452 return s.e.AddSpindleOwner(rbacDomain, cfgOwner)
453}