this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

finish cleaning up lifecycle stuff

dholms e77417be 6eeeb023

+43 -43
+37 -11
cmd/nexus/main.go
··· 104 104 } 105 105 106 106 func runNexus(cctx *cli.Context) error { 107 - ctx := cctx.Context 107 + ctx, cancel := context.WithCancel(cctx.Context) 108 108 logger := configLogger(cctx, os.Stdout) 109 109 slog.SetDefault(logger) 110 110 ··· 124 124 CollectionFilters: cctx.StringSlice("collection-filters"), 125 125 } 126 126 127 - logger.Info("creating nexus service", "config", config) 127 + logger.Info("creating nexus service") 128 128 nexus, err := NewNexus(config) 129 129 if err != nil { 130 130 return err 131 131 } 132 132 133 - logger.Info("starting nexus background workers") 134 - if err := nexus.Start(ctx); err != nil { 135 - return err 133 + if config.SignalCollection != "" { 134 + go func() { 135 + if err := nexus.Crawler.EnumerateNetworkByCollection(ctx, config.SignalCollection); err != nil { 136 + logger.Error("collection enumeration failed", "error", err, "collection", config.SignalCollection) 137 + } 138 + }() 139 + } else if config.FullNetworkMode { 140 + go func() { 141 + if err := nexus.Crawler.EnumerateNetwork(ctx); err != nil { 142 + logger.Error("network enumeration failed", "error", err) 143 + } 144 + }() 136 145 } 137 146 138 147 svcErr := make(chan error, 1) 148 + 149 + go func() { 150 + logger.Info("starting firehose consumer") 151 + if err := nexus.FirehoseConsumer.Run(ctx); err != nil { 152 + svcErr <- err 153 + } 154 + }() 155 + 156 + go nexus.Run(ctx) 157 + 139 158 go func() { 140 159 logger.Info("starting HTTP server", "addr", cctx.String("bind")) 141 - err := nexus.echo.Start(cctx.String("bind")) 142 - svcErr <- err 160 + if err := nexus.Server.Start(cctx.String("bind")); err != nil { 161 + svcErr <- err 162 + } 143 163 }() 144 164 145 165 logger.Info("startup complete") ··· 148 168 logger.Info("received shutdown signal") 149 169 case err := <-svcErr: 150 170 if err != nil { 151 - logger.Error("server error", "err", err) 171 + logger.Error("service error", "err", err) 152 172 } 153 173 } 154 174 155 175 logger.Info("shutting down") 156 - shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 157 - defer cancel() 176 + cancel() 158 177 159 - if err := nexus.Shutdown(shutdownCtx); err != nil { 178 + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) 179 + defer shutdownCancel() 180 + 181 + if err := nexus.Server.Shutdown(shutdownCtx); err != nil { 160 182 logger.Error("error during shutdown", "err", err) 183 + return err 184 + } 185 + 186 + if err := nexus.CloseDb(shutdownCtx); err != nil { 161 187 return err 162 188 } 163 189
+6 -32
cmd/nexus/nexus.go
··· 151 151 RelayHost: config.RelayHost, 152 152 } 153 153 154 - return n, nil 155 - } 156 - 157 - func (n *Nexus) Start(ctx context.Context) error { 158 154 if err := n.resetPartiallyResynced(); err != nil { 159 - return err 155 + return nil, err 160 156 } 161 157 162 - if n.config.SignalCollection != "" { 163 - go func() { 164 - if err := n.Crawler.EnumerateNetworkByCollection(ctx, n.config.SignalCollection); err != nil { 165 - n.logger.Error("collection enumeration failed", "error", err, "collection", n.config.SignalCollection) 166 - } 167 - }() 168 - } else if n.config.FullNetworkMode { 169 - go func() { 170 - if err := n.Crawler.EnumerateNetwork(ctx); err != nil { 171 - n.logger.Error("network enumeration failed", "error", err) 172 - } 173 - }() 174 - } 158 + return n, nil 159 + } 175 160 161 + func (n *Nexus) Run(ctx context.Context) { 176 162 resyncParallelism := n.config.ResyncParallelism 177 163 if resyncParallelism == 0 { 178 164 resyncParallelism = 5 ··· 188 174 go n.EventProcessor.RunCursorSaver(ctx, cursorSaveInterval) 189 175 190 176 go n.Outbox.Run(ctx) 191 - 192 - return n.FirehoseConsumer.Run(ctx) 193 - go func() { 194 - if err := n.FirehoseConsumer.Run(ctx); err != nil { 195 - n.logger.Error("firehose error", "error", err) 196 - } 197 - }() 198 - 199 - return nil 200 177 } 201 178 202 - func (n *Nexus) Shutdown(ctx context.Context) error { 203 - n.logger.Info("shutting down nexus server") 204 - if err := n.echo.Shutdown(ctx); err != nil { 205 - n.logger.Error("error shutting down echo", "error", err) 206 - } 179 + func (n *Nexus) CloseDb(ctx context.Context) error { 180 + n.logger.Info("shutting down nexus") 207 181 208 182 sqlDB, err := n.db.DB() 209 183 if err != nil {