this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(nats): create a proper nats jetstream consumer instead of an infinite for{} loop

it starves the gc

see
https://stackoverflow.com/questions/45886359/golang-nats-subscribe-issue/45921617#45921617

ref 108362a42b3463fef9e7cae534f43c7d20bc7176

+27 -54
+3 -3
receiver.go
··· 5 5 "fmt" 6 6 7 7 ll "github.com/gwennlbh/label-logger-go" 8 - "github.com/nats-io/nats.go" 8 + "github.com/nats-io/nats.go/jetstream" 9 9 ) 10 10 11 11 const StreamName = "notella:stream" 12 12 const SubjectName = "notella:notification" 13 13 14 - func NatsReceiver(m *nats.Msg) error { 14 + func NatsReceiver(m jetstream.Msg) error { 15 15 var message Message 16 - err := json.Unmarshal(m.Data, &message) 16 + err := json.Unmarshal(m.Data(), &message) 17 17 if err != nil { 18 18 return fmt.Errorf("while unmarshaling received message: %w", err) 19 19 }
+24 -51
server/main.go
··· 14 14 "github.com/common-nighthawk/go-figure" 15 15 ll "github.com/gwennlbh/label-logger-go" 16 16 "github.com/nats-io/nats.go" 17 + "github.com/nats-io/nats.go/jetstream" 17 18 ) 18 19 19 20 var Version = "DEV" 20 - 21 - var consumerSub *nats.Subscription 22 21 23 22 func main() { 24 23 figure.NewColorFigure("Notella", "", "yellow", true).Print() 25 24 fmt.Printf("%38s\n", fmt.Sprintf("美味しそう〜 v%s", Version)) 26 25 fmt.Println() 27 26 27 + // Setup a context to handle graceful shutdowns 28 + ctx, cancel := context.WithCancel(context.Background()) 29 + defer cancel() 30 + 28 31 config, _ := notella.LoadConfiguration() 29 32 30 33 ll.Info("Server time is %s", time.Now().Format("2006-01-02 15:04:05 -07:00:00")) 31 34 if config.DryRunMode && len(config.DryRunExceptions) > 0 { 32 - ll.Info("Running [bold]in dry run mode, [red]except for %+v[reset] with") 35 + ll.Info("Running [bold]in dry run mode, [red]except for %+v[reset] with", config.DryRunExceptions) 33 36 } else if config.DryRunMode { 34 37 ll.Info("Running [bold]in dry run mode[reset] with") 35 38 } else { ··· 75 78 return 76 79 } 77 80 78 - js, err := nc.JetStream() 81 + js, err := jetstream.New(nc) 79 82 if err != nil { 80 83 ll.ErrorDisplay("could not connect to Jetstream", err) 81 84 return ··· 83 86 84 87 ll.Log("Initializing", "cyan", "a Jetstream stream [bold]%s[reset], listening for subject [bold]%s[reset]", notella.StreamName, notella.SubjectName) 85 88 86 - _, err = js.AddStream(&nats.StreamConfig{ 89 + stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ 87 90 Name: notella.StreamName, 88 91 Subjects: []string{notella.SubjectName}, 89 92 }) ··· 94 97 95 98 ll.Log("Initializing", "cyan", "Jetstream consumer [bold]NotellaConsumer[reset] with [bold]AckExplicitPolicy[reset]") 96 99 97 - _, err = js.AddConsumer(notella.StreamName, &nats.ConsumerConfig{ 100 + consumer, err := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ 98 101 Durable: "NotellaConsumer", 99 - AckPolicy: nats.AckExplicitPolicy, 102 + Name: "NotellaConsumer", 103 + AckPolicy: jetstream.AckExplicitPolicy, 100 104 }) 101 105 if err != nil { 102 106 ll.ErrorDisplay("could not create consumer", err) ··· 105 109 106 110 ll.Log("Starting", "cyan", "consumer [bold]NotellaConsumer[reset]") 107 111 108 - consumerSub, err = js.PullSubscribe(notella.SubjectName, "NotellaConsumer") 112 + cc, err := consumer.Consume( 113 + func(msg jetstream.Msg) { 114 + err := notella.NatsReceiver(msg) 115 + if err != nil { 116 + ll.ErrorDisplay("Could not process message", err) 117 + } 118 + msg.Ack() // Acknowledge the message 119 + }, 120 + jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) { 121 + ll.ErrorDisplay("Error while consuming message", err) 122 + }), 123 + ) 109 124 if err != nil { 110 125 ll.ErrorDisplay("could not start consumer", err) 111 126 return 112 127 } 113 128 114 - // Setup a context to handle graceful shutdowns 115 - ctx, cancel := context.WithCancel(context.Background()) 116 - defer cancel() 129 + defer cc.Stop() 117 130 118 131 // Capture OS signals for graceful shutdown 119 132 go func() { ··· 137 150 time.Sleep(5 * time.Minute) 138 151 notella.DisplaySchedule() 139 152 notella.SaveSchedule() 140 - } 141 - } 142 - }() 143 - 144 - // Continuously fetch and process messages 145 - go func() { 146 - for { 147 - select { 148 - case <-ctx.Done(): 149 - return 150 - default: 151 - // Fetch messages in batches 152 - msgs, err := consumerSub.Fetch(10, nats.MaxWait(5*time.Second)) 153 - if err != nil { 154 - if err == nats.ErrTimeout { 155 - // ok 156 - } else if err == nats.ErrBadSubscription { 157 - ll.WarnDisplay("Subscription is not valid anymore, trying to reconnect to consumer", err) 158 - consumerSub, err = js.PullSubscribe(notella.SubjectName, "NotellaConsumer") 159 - if err != nil { 160 - ll.ErrorDisplay("could not start consumer", err) 161 - return 162 - } 163 - continue 164 - 165 - } else { 166 - ll.ErrorDisplay("Could not fetch messages", err) 167 - time.Sleep(2 * time.Second) // Wait before retrying 168 - continue 169 - } 170 - } 171 - 172 - // Process each message 173 - for _, msg := range msgs { 174 - err = notella.NatsReceiver(msg) 175 - if err != nil { 176 - ll.ErrorDisplay("Could not process message", err) 177 - } 178 - msg.Ack() // Acknowledge the message 179 - } 180 153 } 181 154 } 182 155 }()