Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 453 lines 12 kB view raw
1package spindle 2 3import ( 4 "context" 5 _ "embed" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "maps" 10 "net/http" 11 "sync" 12 13 "github.com/go-chi/chi/v5" 14 "tangled.org/core/api/tangled" 15 "tangled.org/core/eventconsumer" 16 "tangled.org/core/eventconsumer/cursor" 17 "tangled.org/core/idresolver" 18 "tangled.org/core/jetstream" 19 "tangled.org/core/log" 20 "tangled.org/core/notifier" 21 "tangled.org/core/rbac" 22 "tangled.org/core/spindle/config" 23 "tangled.org/core/spindle/db" 24 "tangled.org/core/spindle/engine" 25 "tangled.org/core/spindle/engines/nixery" 26 "tangled.org/core/spindle/engines/qemu" 27 "tangled.org/core/spindle/models" 28 "tangled.org/core/spindle/queue" 29 "tangled.org/core/spindle/secrets" 30 "tangled.org/core/spindle/xrpc" 31 "tangled.org/core/xrpc/serviceauth" 32) 33 34//go:embed motd 35var defaultMotd []byte 36 37const ( 38 rbacDomain = "thisserver" 39) 40 41type Spindle struct { 42 jc *jetstream.JetstreamClient 43 db *db.DB 44 e *rbac.Enforcer 45 l *slog.Logger 46 n *notifier.Notifier 47 engs map[string]models.Engine 48 jq *queue.Queue 49 cfg *config.Config 50 ks *eventconsumer.Consumer 51 res *idresolver.Resolver 52 vault secrets.Manager 53 motd []byte 54 motdMu sync.RWMutex 55} 56 57// New creates a new Spindle server with the provided configuration and engines. 58func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 59 logger := log.FromContext(ctx) 60 61 d, err := db.Make(cfg.Server.DBPath) 62 if err != nil { 63 return nil, fmt.Errorf("failed to setup db: %w", err) 64 } 65 66 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 67 if err != nil { 68 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 69 } 70 e.E.EnableAutoSave(true) 71 72 n := notifier.New() 73 74 var vault secrets.Manager 75 switch cfg.Server.Secrets.Provider { 76 case "openbao": 77 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 78 return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 79 } 80 vault, err = secrets.NewOpenBaoManager( 81 cfg.Server.Secrets.OpenBao.ProxyAddr, 82 logger, 83 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 84 ) 85 if err != nil { 86 return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 87 } 88 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 89 case "sqlite", "": 90 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 91 if err != nil { 92 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 93 } 94 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 95 default: 96 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 97 } 98 99 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) 100 logger.Info("initialized queue", "queueSize", cfg.Server.QueueSize, "numWorkers", cfg.Server.MaxJobCount) 101 102 collections := []string{ 103 tangled.SpindleMemberNSID, 104 tangled.RepoNSID, 105 tangled.RepoCollaboratorNSID, 106 } 107 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 108 if err != nil { 109 return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 110 } 111 jc.AddDid(cfg.Server.Owner) 112 113 // Check if the spindle knows about any Dids; 114 dids, err := d.GetAllDids() 115 if err != nil { 116 return nil, fmt.Errorf("failed to get all dids: %w", err) 117 } 118 for _, d := range dids { 119 jc.AddDid(d) 120 } 121 122 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 123 124 spindle := &Spindle{ 125 jc: jc, 126 e: e, 127 db: d, 128 l: logger, 129 n: &n, 130 engs: engines, 131 jq: jq, 132 cfg: cfg, 133 res: resolver, 134 vault: vault, 135 motd: defaultMotd, 136 } 137 138 err = e.AddSpindle(rbacDomain) 139 if err != nil { 140 return nil, fmt.Errorf("failed to set rbac domain: %w", err) 141 } 142 err = spindle.configureOwner() 143 if err != nil { 144 return nil, err 145 } 146 logger.Info("owner set", "did", cfg.Server.Owner) 147 148 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 149 if err != nil { 150 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 151 } 152 153 err = jc.StartJetstream(ctx, spindle.ingest()) 154 if err != nil { 155 return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 156 } 157 158 // for each incoming sh.tangled.pipeline, we execute 159 // spindle.processPipeline, which in turn enqueues the pipeline 160 // job in the above registered queue. 161 ccfg := eventconsumer.NewConsumerConfig() 162 ccfg.Logger = log.SubLogger(logger, "eventconsumer") 163 ccfg.Dev = cfg.Server.Dev 164 ccfg.ProcessFunc = spindle.processPipeline 165 ccfg.CursorStore = cursorStore 166 knownKnots, err := d.Knots() 167 if err != nil { 168 return nil, err 169 } 170 for _, knot := range knownKnots { 171 logger.Info("adding source start", "knot", knot) 172 ccfg.Sources[eventconsumer.NewKnotSource(knot)] = struct{}{} 173 } 174 spindle.ks = eventconsumer.NewConsumer(*ccfg) 175 176 return spindle, nil 177} 178 179// DB returns the database instance. 180func (s *Spindle) DB() *db.DB { 181 return s.db 182} 183 184// Queue returns the job queue instance. 185func (s *Spindle) Queue() *queue.Queue { 186 return s.jq 187} 188 189// Engines returns the map of available engines. 190func (s *Spindle) Engines() map[string]models.Engine { 191 return s.engs 192} 193 194// Vault returns the secrets manager instance. 195func (s *Spindle) Vault() secrets.Manager { 196 return s.vault 197} 198 199// Notifier returns the notifier instance. 200func (s *Spindle) Notifier() *notifier.Notifier { 201 return s.n 202} 203 204// Enforcer returns the RBAC enforcer instance. 205func (s *Spindle) Enforcer() *rbac.Enforcer { 206 return s.e 207} 208 209// SetMotdContent sets custom MOTD content, replacing the embedded default. 210func (s *Spindle) SetMotdContent(content []byte) { 211 s.motdMu.Lock() 212 defer s.motdMu.Unlock() 213 s.motd = content 214} 215 216// GetMotdContent returns the current MOTD content. 217func (s *Spindle) GetMotdContent() []byte { 218 s.motdMu.RLock() 219 defer s.motdMu.RUnlock() 220 return s.motd 221} 222 223// Start starts the Spindle server (blocking). 224func (s *Spindle) Start(ctx context.Context) error { 225 // starts a job queue runner in the background 226 s.jq.Start() 227 defer s.jq.Stop() 228 229 // Stop vault token renewal if it implements Stopper 230 if stopper, ok := s.vault.(secrets.Stopper); ok { 231 defer stopper.Stop() 232 } 233 234 go func() { 235 s.l.Info("starting knot event consumer") 236 s.ks.Start(ctx) 237 }() 238 239 s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 240 return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 241} 242 243func Run(ctx context.Context) error { 244 cfg, err := config.Load(ctx) 245 if err != nil { 246 return fmt.Errorf("failed to load config: %w", err) 247 } 248 249 nixeryEng, err := nixery.New(ctx, cfg) 250 if err != nil { 251 return err 252 } 253 254 qemuEng, err := qemu.New(ctx, cfg) 255 if err != nil { 256 return err 257 } 258 259 s, err := New(ctx, cfg, map[string]models.Engine{ 260 "nixery": nixeryEng, 261 "qemu": qemuEng, 262 }) 263 if err != nil { 264 return err 265 } 266 267 return s.Start(ctx) 268} 269 270func (s *Spindle) Router() http.Handler { 271 mux := chi.NewRouter() 272 273 mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { 274 w.Write(s.GetMotdContent()) 275 }) 276 mux.HandleFunc("/events", s.Events) 277 mux.HandleFunc("/logs/{knot}/{rkey}/{name}", s.Logs) 278 279 mux.Mount("/xrpc", s.XrpcRouter()) 280 return mux 281} 282 283func (s *Spindle) XrpcRouter() http.Handler { 284 serviceAuth := serviceauth.NewServiceAuth(s.l, s.res, s.cfg.Server.Did().String()) 285 286 l := log.SubLogger(s.l, "xrpc") 287 288 x := xrpc.Xrpc{ 289 Logger: l, 290 Db: s.db, 291 Enforcer: s.e, 292 Engines: s.engs, 293 Config: s.cfg, 294 Resolver: s.res, 295 Vault: s.vault, 296 Notifier: s.Notifier(), 297 ServiceAuth: serviceAuth, 298 } 299 300 return x.Router() 301} 302 303func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 304 if msg.Nsid == tangled.PipelineNSID { 305 tpl := tangled.Pipeline{} 306 err := json.Unmarshal(msg.EventJson, &tpl) 307 if err != nil { 308 s.l.Error("failed to unmarshal pipeline event", "err", err) 309 return err 310 } 311 312 if tpl.TriggerMetadata == nil { 313 return fmt.Errorf("no trigger metadata found") 314 } 315 316 if tpl.TriggerMetadata.Repo == nil { 317 return fmt.Errorf("no repo data found") 318 } 319 320 if src.Key() != tpl.TriggerMetadata.Repo.Knot { 321 return fmt.Errorf("repo knot does not match event source: %s != %s", src.Key(), tpl.TriggerMetadata.Repo.Knot) 322 } 323 324 // filter by repos 325 repoName := "" 326 if tpl.TriggerMetadata.Repo.Repo != nil { 327 repoName = *tpl.TriggerMetadata.Repo.Repo 328 } 329 330 _, err = s.db.GetRepo( 331 tpl.TriggerMetadata.Repo.Knot, 332 tpl.TriggerMetadata.Repo.Did, 333 repoName, 334 ) 335 if err != nil { 336 return fmt.Errorf("failed to get repo: %w", err) 337 } 338 339 pipelineId := models.PipelineId{ 340 Knot: src.Key(), 341 Rkey: msg.Rkey, 342 } 343 344 workflows := make(map[models.Engine][]models.Workflow) 345 346 // Build pipeline environment variables once for all workflows 347 pipelineEnv := models.PipelineEnvVars(tpl.TriggerMetadata, pipelineId) 348 349 for _, w := range tpl.Workflows { 350 if w != nil { 351 if _, ok := s.engs[w.Engine]; !ok { 352 err = s.db.StatusFailed(models.WorkflowId{ 353 PipelineId: pipelineId, 354 Name: w.Name, 355 }, fmt.Sprintf("unknown engine %#v", w.Engine), -1, s.n) 356 if err != nil { 357 return fmt.Errorf("db.StatusFailed: %w", err) 358 } 359 360 continue 361 } 362 363 eng := s.engs[w.Engine] 364 365 if _, ok := workflows[eng]; !ok { 366 workflows[eng] = []models.Workflow{} 367 } 368 369 ewf, err := s.engs[w.Engine].InitWorkflow(*w, tpl) 370 if err != nil { 371 return fmt.Errorf("init workflow: %w", err) 372 } 373 374 // inject TANGLED_* env vars after InitWorkflow 375 // This prevents user-defined env vars from overriding them 376 if ewf.Environment == nil { 377 ewf.Environment = make(map[string]string) 378 } 379 maps.Copy(ewf.Environment, pipelineEnv) 380 381 // if in dev mode we have to replace localhost with the correct host alias 382 if s.cfg.Server.Dev { 383 if repoUrl, ok := ewf.Environment["TANGLED_REPO_URL"]; ok { 384 ewf.Environment["TANGLED_REPO_URL"] = models.RewriteLocalhost(repoUrl, w.Engine) 385 } 386 } 387 388 workflows[eng] = append(workflows[eng], *ewf) 389 390 err = s.db.StatusPending(models.WorkflowId{ 391 PipelineId: pipelineId, 392 Name: w.Name, 393 }, s.n) 394 if err != nil { 395 return fmt.Errorf("db.StatusPending: %w", err) 396 } 397 } 398 } 399 400 ok := s.jq.Enqueue(queue.Job{ 401 Run: func() error { 402 engine.StartWorkflows(log.SubLogger(s.l, "engine"), s.vault, s.cfg, s.db, s.n, ctx, &models.Pipeline{ 403 RepoOwner: tpl.TriggerMetadata.Repo.Did, 404 RepoName: repoName, 405 Workflows: workflows, 406 }, pipelineId) 407 return nil 408 }, 409 OnFail: func(jobError error) { 410 s.l.Error("pipeline run failed", "error", jobError) 411 }, 412 }) 413 if ok { 414 s.l.Info("pipeline enqueued successfully", "id", msg.Rkey) 415 } else { 416 s.l.Error("failed to enqueue pipeline: queue is full") 417 } 418 } 419 420 return nil 421} 422 423func (s *Spindle) configureOwner() error { 424 cfgOwner := s.cfg.Server.Owner 425 426 existing, err := s.e.GetSpindleUsersByRole("server:owner", rbacDomain) 427 if err != nil { 428 return err 429 } 430 431 switch len(existing) { 432 case 0: 433 // no owner configured, continue 434 case 1: 435 // find existing owner 436 existingOwner := existing[0] 437 438 // no ownership change, this is okay 439 if existingOwner == s.cfg.Server.Owner { 440 break 441 } 442 443 // remove existing owner 444 err = s.e.RemoveSpindleOwner(rbacDomain, existingOwner) 445 if err != nil { 446 return nil 447 } 448 default: 449 return fmt.Errorf("more than one owner in DB, try deleting %q and starting over", s.cfg.Server.DBPath) 450 } 451 452 return s.e.AddSpindleOwner(rbacDomain, cfgOwner) 453}