Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

spindle/server: refactor spindle code to allow calling functions separate from Run()

Signed-off-by: Evan Jarrett <evan@evanjarrett.com>

authored by

Evan Jarrett and committed by
Tangled
ee43a967 428c002e

+85 -40
+85 -40
spindle/server.go
··· 49 49 vault secrets.Manager 50 50 } 51 51 52 - func Run(ctx context.Context) error { 52 + // New creates a new Spindle server with the provided configuration and engines. 53 + func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 53 54 logger := log.FromContext(ctx) 54 - 55 - cfg, err := config.Load(ctx) 56 - if err != nil { 57 - return fmt.Errorf("failed to load config: %w", err) 58 - } 59 55 60 56 d, err := db.Make(cfg.Server.DBPath) 61 57 if err != nil { 62 - return fmt.Errorf("failed to setup db: %w", err) 58 + return nil, fmt.Errorf("failed to setup db: %w", err) 63 59 } 64 60 65 61 e, err := rbac.NewEnforcer(cfg.Server.DBPath) 66 62 if err != nil { 67 - return fmt.Errorf("failed to setup rbac enforcer: %w", err) 63 + return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 68 64 } 69 65 e.E.EnableAutoSave(true) 70 66 ··· 70 74 switch cfg.Server.Secrets.Provider { 71 75 case "openbao": 72 76 if cfg.Server.Secrets.OpenBao.ProxyAddr == "" { 73 - return fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 77 + return nil, fmt.Errorf("openbao proxy address is required when using openbao secrets provider") 74 78 } 75 79 vault, err = secrets.NewOpenBaoManager( 76 80 cfg.Server.Secrets.OpenBao.ProxyAddr, ··· 78 82 secrets.WithMountPath(cfg.Server.Secrets.OpenBao.Mount), 79 83 ) 80 84 if err != nil { 81 - return fmt.Errorf("failed to setup openbao secrets provider: %w", err) 85 + return nil, fmt.Errorf("failed to setup openbao secrets provider: %w", err) 82 86 } 83 87 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 84 88 case "sqlite", "": 85 89 vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 86 90 if err != nil { 87 - return fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 91 + return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 88 92 } 89 93 logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 90 94 default: 91 - return fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 92 - } 93 - 94 - nixeryEng, err := nixery.New(ctx, cfg) 95 - if err != nil { 96 - return err 95 + return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 97 96 } 98 97 99 98 jq := queue.NewQueue(cfg.Server.QueueSize, cfg.Server.MaxJobCount) ··· 101 110 } 102 111 jc, err := jetstream.NewJetstreamClient(cfg.Server.JetstreamEndpoint, "spindle", collections, nil, log.SubLogger(logger, "jetstream"), d, true, true) 103 112 if err != nil { 104 - return fmt.Errorf("failed to setup jetstream client: %w", err) 113 + return nil, fmt.Errorf("failed to setup jetstream client: %w", err) 105 114 } 106 115 jc.AddDid(cfg.Server.Owner) 107 116 108 117 // Check if the spindle knows about any Dids; 109 118 dids, err := d.GetAllDids() 110 119 if err != nil { 111 - return fmt.Errorf("failed to get all dids: %w", err) 120 + return nil, fmt.Errorf("failed to get all dids: %w", err) 112 121 } 113 122 for _, d := range dids { 114 123 jc.AddDid(d) ··· 116 125 117 126 resolver := idresolver.DefaultResolver(cfg.Server.PlcUrl) 118 127 119 - spindle := Spindle{ 128 + spindle := &Spindle{ 120 129 jc: jc, 121 130 e: e, 122 131 db: d, 123 132 l: logger, 124 133 n: &n, 125 - engs: map[string]models.Engine{"nixery": nixeryEng}, 134 + engs: engines, 126 135 jq: jq, 127 136 cfg: cfg, 128 137 res: resolver, ··· 131 140 132 141 err = e.AddSpindle(rbacDomain) 133 142 if err != nil { 134 - return fmt.Errorf("failed to set rbac domain: %w", err) 143 + return nil, fmt.Errorf("failed to set rbac domain: %w", err) 135 144 } 136 145 err = spindle.configureOwner() 137 146 if err != nil { 138 - return err 147 + return nil, err 139 148 } 140 149 logger.Info("owner set", "did", cfg.Server.Owner) 141 150 142 - // starts a job queue runner in the background 143 - jq.Start() 144 - defer jq.Stop() 145 - 146 - // Stop vault token renewal if it implements Stopper 147 - if stopper, ok := vault.(secrets.Stopper); ok { 148 - defer stopper.Stop() 149 - } 150 - 151 151 cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 152 152 if err != nil { 153 - return fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 153 + return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 154 154 } 155 155 156 156 err = jc.StartJetstream(ctx, spindle.ingest()) 157 157 if err != nil { 158 - return fmt.Errorf("failed to start jetstream consumer: %w", err) 158 + return nil, fmt.Errorf("failed to start jetstream consumer: %w", err) 159 159 } 160 160 161 161 // for each incoming sh.tangled.pipeline, we execute ··· 159 177 ccfg.CursorStore = cursorStore 160 178 knownKnots, err := d.Knots() 161 179 if err != nil { 162 - return err 180 + return nil, err 163 181 } 164 182 for _, knot := range knownKnots { 165 183 logger.Info("adding source start", "knot", knot) ··· 167 185 } 168 186 spindle.ks = eventconsumer.NewConsumer(*ccfg) 169 187 188 + return spindle, nil 189 + } 190 + 191 + // DB returns the database instance. 192 + func (s *Spindle) DB() *db.DB { 193 + return s.db 194 + } 195 + 196 + // Queue returns the job queue instance. 197 + func (s *Spindle) Queue() *queue.Queue { 198 + return s.jq 199 + } 200 + 201 + // Engines returns the map of available engines. 202 + func (s *Spindle) Engines() map[string]models.Engine { 203 + return s.engs 204 + } 205 + 206 + // Vault returns the secrets manager instance. 207 + func (s *Spindle) Vault() secrets.Manager { 208 + return s.vault 209 + } 210 + 211 + // Notifier returns the notifier instance. 212 + func (s *Spindle) Notifier() *notifier.Notifier { 213 + return s.n 214 + } 215 + 216 + // Enforcer returns the RBAC enforcer instance. 217 + func (s *Spindle) Enforcer() *rbac.Enforcer { 218 + return s.e 219 + } 220 + 221 + // Start starts the Spindle server (blocking). 222 + func (s *Spindle) Start(ctx context.Context) error { 223 + // starts a job queue runner in the background 224 + s.jq.Start() 225 + defer s.jq.Stop() 226 + 227 + // Stop vault token renewal if it implements Stopper 228 + if stopper, ok := s.vault.(secrets.Stopper); ok { 229 + defer stopper.Stop() 230 + } 231 + 170 232 go func() { 171 - logger.Info("starting knot event consumer") 172 - spindle.ks.Start(ctx) 233 + s.l.Info("starting knot event consumer") 234 + s.ks.Start(ctx) 173 235 }() 174 236 175 - logger.Info("starting spindle server", "address", cfg.Server.ListenAddr) 176 - logger.Error("server error", "error", http.ListenAndServe(cfg.Server.ListenAddr, spindle.Router())) 237 + s.l.Info("starting spindle server", "address", s.cfg.Server.ListenAddr) 238 + return http.ListenAndServe(s.cfg.Server.ListenAddr, s.Router()) 239 + } 177 240 178 - return nil 241 + func Run(ctx context.Context) error { 242 + cfg, err := config.Load(ctx) 243 + if err != nil { 244 + return fmt.Errorf("failed to load config: %w", err) 245 + } 246 + 247 + nixeryEng, err := nixery.New(ctx, cfg) 248 + if err != nil { 249 + return err 250 + } 251 + 252 + s, err := New(ctx, cfg, map[string]models.Engine{ 253 + "nixery": nixeryEng, 254 + }) 255 + if err != nil { 256 + return err 257 + } 258 + 259 + return s.Start(ctx) 179 260 } 180 261 181 262 func (s *Spindle) Router() http.Handler {