this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

outbox only mode

dholms 9b0d5143 61bf02fe

+42 -31
+28 -18
cmd/nexus/main.go
··· 118 118 Value: "info", 119 119 EnvVars: []string{"NEXUS_LOG_LEVEL", "LOG_LEVEL"}, 120 120 }, 121 + &cli.BoolFlag{ 122 + Name: "outbox-only", 123 + Usage: "run in outbox-only mode (no firehose, resync, or enumeration)", 124 + EnvVars: []string{"NEXUS_OUTBOX_ONLY"}, 125 + }, 121 126 } 122 127 123 128 app.Action = runNexus ··· 149 154 DisableAcks: cctx.Bool("disable-acks"), 150 155 WebhookURL: cctx.String("webhook-url"), 151 156 CollectionFilters: cctx.StringSlice("collection-filters"), 157 + OutboxOnly: cctx.Bool("outbox-only"), 152 158 } 153 159 154 160 logger.Info("creating nexus service") ··· 157 163 return err 158 164 } 159 165 160 - if config.SignalCollection != "" { 161 - go func() { 162 - if err := nexus.Crawler.EnumerateNetworkByCollection(ctx, config.SignalCollection); err != nil { 163 - logger.Error("collection enumeration failed", "error", err, "collection", config.SignalCollection) 164 - } 165 - }() 166 - } else if config.FullNetworkMode { 167 - go func() { 168 - if err := nexus.Crawler.EnumerateNetwork(ctx); err != nil { 169 - logger.Error("network enumeration failed", "error", err) 170 - } 171 - }() 166 + if !config.OutboxOnly { 167 + if config.SignalCollection != "" { 168 + go func() { 169 + if err := nexus.Crawler.EnumerateNetworkByCollection(ctx, config.SignalCollection); err != nil { 170 + logger.Error("collection enumeration failed", "error", err, "collection", config.SignalCollection) 171 + } 172 + }() 173 + } else if config.FullNetworkMode { 174 + go func() { 175 + if err := nexus.Crawler.EnumerateNetwork(ctx); err != nil { 176 + logger.Error("network enumeration failed", "error", err) 177 + } 178 + }() 179 + } 172 180 } 173 181 174 182 svcErr := make(chan error, 1) 175 183 176 - go func() { 177 - logger.Info("starting firehose consumer") 178 - if err := nexus.FirehoseConsumer.Run(ctx); err != nil { 179 - svcErr <- err 180 - } 181 - }() 184 + if !config.OutboxOnly { 185 + go func() { 186 + logger.Info("starting firehose consumer") 187 + if err := nexus.FirehoseConsumer.Run(ctx); err != nil { 188 + svcErr <- err 189 + } 190 + }() 191 + } 182 192 183 193 go nexus.Run(ctx) 184 194
+14 -11
cmd/nexus/nexus.go
··· 54 54 DisableAcks bool 55 55 WebhookURL string 56 56 CollectionFilters []string // e.g., ["app.bsky.feed.post", "app.bsky.graph.*"] 57 + OutboxOnly bool 57 58 } 58 59 59 60 func NewNexus(config NexusConfig) (*Nexus, error) { ··· 158 159 159 160 // Run starts internal background workers for resync, cursor saving, and outbox delivery. 160 161 func (n *Nexus) Run(ctx context.Context) { 161 - resyncParallelism := n.config.ResyncParallelism 162 - if resyncParallelism == 0 { 163 - resyncParallelism = 5 164 - } 165 - for i := 0; i < resyncParallelism; i++ { 166 - go n.runResyncWorker(ctx, i) 167 - } 162 + if !n.config.OutboxOnly { 163 + resyncParallelism := n.config.ResyncParallelism 164 + if resyncParallelism == 0 { 165 + resyncParallelism = 5 166 + } 167 + for i := 0; i < resyncParallelism; i++ { 168 + go n.runResyncWorker(ctx, i) 169 + } 168 170 169 - cursorSaveInterval := n.config.FirehoseCursorSaveInterval 170 - if cursorSaveInterval == 0 { 171 - cursorSaveInterval = 5 * time.Second 171 + cursorSaveInterval := n.config.FirehoseCursorSaveInterval 172 + if cursorSaveInterval == 0 { 173 + cursorSaveInterval = 5 * time.Second 174 + } 175 + go n.EventProcessor.RunCursorSaver(ctx, cursorSaveInterval) 172 176 } 173 - go n.EventProcessor.RunCursorSaver(ctx, cursorSaveInterval) 174 177 175 178 go n.Outbox.Run(ctx) 176 179 }
-2
cmd/nexus/outbox.go
··· 115 115 116 116 did := outboxEvt.DID() 117 117 118 - // Fast path: try to load existing worker 119 118 if val, ok := o.didWorkers.Load(did); ok { 120 119 worker := val.(*DIDWorker) 121 120 worker.addEvent(outboxEvt) 122 121 return 123 122 } 124 123 125 - // Slow path: create new worker 126 124 worker := &DIDWorker{ 127 125 did: did, 128 126 notifChan: make(chan struct{}, 1),