this repo has no description
1//go:generate go run github.com/steebchen/prisma-client-go generate
2
3package main
4
5import (
6 "context"
7 "fmt"
8 "os"
9 "os/signal"
10 "syscall"
11 "time"
12
13 "git.inpt.fr/churros/notella"
14 "github.com/common-nighthawk/go-figure"
15 ll "github.com/gwennlbh/label-logger-go"
16 "github.com/nats-io/nats.go"
17 "github.com/nats-io/nats.go/jetstream"
18)
19
20var Version = "DEV"
21
22func main() {
23 figure.NewColorFigure("Notella", "", "yellow", true).Print()
24 fmt.Printf("%38s\n", fmt.Sprintf("美味しそう〜 v%s", Version))
25 fmt.Println()
26
27 // Setup a context to handle graceful shutdowns
28 ctx, cancel := context.WithCancel(context.Background())
29 defer cancel()
30
31 config, _ := notella.LoadConfiguration()
32
33 ll.Info("Server time is %s", time.Now().Format("2006-01-02 15:04:05 -07:00:00"))
34 if config.DryRunMode && len(config.DryRunExceptions) > 0 {
35 ll.Info("Running [bold]in dry run mode, [red]except for %+v[reset] with", config.DryRunExceptions)
36 } else if config.DryRunMode {
37 ll.Info("Running [bold]in dry run mode[reset] with")
38 } else {
39 ll.Info("Running with config")
40 }
41 ll.Log("", "reset", "Schedule recovery: [bold][dim]at startup [reset][bold]%s[reset]", config.StartupScheduleRestoration)
42 ll.Log("", "reset", "contact email: [bold]%s[reset]", config.ContactEmail)
43 ll.Log("", "reset", "NATS URL: [bold]%s[reset]", redactURL(config.NatsURL))
44 ll.Log("", "reset", "Churros DB URL: [bold]%s[reset]", redactURL(config.ChurrosDatabaseURL))
45 ll.Log("", "reset", "Redis URL: [bold]%s[reset]", redactURL(config.RedisURL))
46 ll.Log("", "reset", "Health check on: [bold]:%d/health[reset]", config.HealthCheckPort)
47 ll.Log("", "reset", "App Package ID: [bold]%s[reset]", config.AppPackageId)
48 if config.VapidPublicKey != "" && config.VapidPrivateKey != "" {
49 ll.Log("", "reset", "VAPID keys: [bold][green]set[reset]")
50 } else {
51 ll.Log("", "reset", "VAPID keys: [bold][red]not set[reset]")
52 }
53 if config.HasValidFirebaseServiceAccount() {
54 ll.Log("", "reset", "Firebase: [bold][green]configured[reset]")
55 } else {
56 ll.Log("", "reset", "Firebase: [bold][red]unconfigured[reset]")
57 }
58 fmt.Println()
59
60 if config.StartupScheduleRestoration != "disabled" {
61 notella.RestoreSchedule(config.StartupScheduleRestoration == "eager")
62 }
63 notella.DisplaySchedule()
64
65 ll.Info("starting scheduler")
66 go notella.StartScheduler()
67
68 ll.Log("Connecting", "cyan", "to Churros database at [bold]%s[reset]", config.ChurrosDatabaseURL)
69 err := notella.ConnectToDababase()
70 if err != nil {
71 ll.ErrorDisplay("could not connect to database", err)
72 }
73
74 ll.Log("Connecting", "cyan", "to NATS server at [bold]%s[reset]", config.NatsURL)
75 nc, err := nats.Connect(config.NatsURL)
76 if err != nil {
77 ll.ErrorDisplay("could not connect to NATS at %s", err, config.NatsURL)
78 return
79 }
80
81 js, err := jetstream.New(nc)
82 if err != nil {
83 ll.ErrorDisplay("could not connect to Jetstream", err)
84 return
85 }
86
87 ll.Log("Initializing", "cyan", "a Jetstream stream [bold]%s[reset], listening for subject [bold]%s[reset]", notella.StreamName, notella.SubjectName)
88
89 stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
90 Name: notella.StreamName,
91 Subjects: []string{notella.SubjectName},
92 })
93 if err != nil {
94 ll.ErrorDisplay("could not create stream", err)
95 return
96 }
97
98 ll.Log("Initializing", "cyan", "Jetstream consumer [bold]%s[reset] with [bold]AckExplicitPolicy[reset]", notella.ConsumerName)
99
100 consumer, err := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{
101 Durable: notella.ConsumerName,
102 Name: notella.ConsumerName,
103 AckPolicy: jetstream.AckExplicitPolicy,
104 })
105 if err != nil {
106 ll.ErrorDisplay("could not create consumer", err)
107 return
108 }
109
110 ll.Log("Starting", "cyan", "consumer [bold]%s[reset]", notella.ConsumerName)
111
112 cc, err := consumer.Consume(
113 func(msg jetstream.Msg) {
114 err := notella.NatsReceiver(msg)
115 if err != nil {
116 ll.ErrorDisplay("Could not process message", err)
117 }
118 msg.Ack() // Acknowledge the message
119 },
120 jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) {
121 ll.ErrorDisplay("Error while consuming message", err)
122 }),
123 )
124 if err != nil {
125 ll.ErrorDisplay("could not start consumer", err)
126 return
127 }
128
129 defer cc.Stop()
130
131 // Capture OS signals for graceful shutdown
132 go func() {
133 sigChan := make(chan os.Signal, 1)
134 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
135 <-sigChan
136 ll.Log("Shuting down", "magenta", "because of signal received")
137 cancel()
138 }()
139
140 // Start healthcheck endpoint
141 go notella.StartHealthCheckEndpoint(config.HealthCheckPort)
142
143 // Self-check
144 ll.Log("Checking", "cyan", "health")
145 result := notella.CheckHealth()
146 if result.AllGood() {
147 ll.Log("Health check", "green", "succeeded")
148 } else {
149 ll.Error("Health check failed: %#v", result)
150 }
151
152 // Send EventShowScheduledJobs to the stream every 5 minutes and save schedule to redis
153 go func() {
154 for {
155 select {
156 case <-ctx.Done():
157 return
158 default:
159 time.Sleep(5 * time.Minute)
160 notella.DisplaySchedule()
161 notella.SaveSchedule()
162 }
163 }
164 }()
165
166 // Block until the context is canceled (i.e., server shutdown signal received)
167 <-ctx.Done()
168 ll.Log("Stopped", "red", "server")
169}