this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 169 lines 5.2 kB view raw
1//go:generate go run github.com/steebchen/prisma-client-go generate 2 3package main 4 5import ( 6 "context" 7 "fmt" 8 "os" 9 "os/signal" 10 "syscall" 11 "time" 12 13 "git.inpt.fr/churros/notella" 14 "github.com/common-nighthawk/go-figure" 15 ll "github.com/gwennlbh/label-logger-go" 16 "github.com/nats-io/nats.go" 17 "github.com/nats-io/nats.go/jetstream" 18) 19 20var Version = "DEV" 21 22func main() { 23 figure.NewColorFigure("Notella", "", "yellow", true).Print() 24 fmt.Printf("%38s\n", fmt.Sprintf("美味しそう〜 v%s", Version)) 25 fmt.Println() 26 27 // Setup a context to handle graceful shutdowns 28 ctx, cancel := context.WithCancel(context.Background()) 29 defer cancel() 30 31 config, _ := notella.LoadConfiguration() 32 33 ll.Info("Server time is %s", time.Now().Format("2006-01-02 15:04:05 -07:00:00")) 34 if config.DryRunMode && len(config.DryRunExceptions) > 0 { 35 ll.Info("Running [bold]in dry run mode, [red]except for %+v[reset] with", config.DryRunExceptions) 36 } else if config.DryRunMode { 37 ll.Info("Running [bold]in dry run mode[reset] with") 38 } else { 39 ll.Info("Running with config") 40 } 41 ll.Log("", "reset", "Schedule recovery: [bold][dim]at startup [reset][bold]%s[reset]", config.StartupScheduleRestoration) 42 ll.Log("", "reset", "contact email: [bold]%s[reset]", config.ContactEmail) 43 ll.Log("", "reset", "NATS URL: [bold]%s[reset]", redactURL(config.NatsURL)) 44 ll.Log("", "reset", "Churros DB URL: [bold]%s[reset]", redactURL(config.ChurrosDatabaseURL)) 45 ll.Log("", "reset", "Redis URL: [bold]%s[reset]", redactURL(config.RedisURL)) 46 ll.Log("", "reset", "Health check on: [bold]:%d/health[reset]", config.HealthCheckPort) 47 ll.Log("", "reset", "App Package ID: [bold]%s[reset]", config.AppPackageId) 48 if config.VapidPublicKey != "" && config.VapidPrivateKey != "" { 49 ll.Log("", "reset", "VAPID keys: [bold][green]set[reset]") 50 } else { 51 ll.Log("", "reset", "VAPID keys: [bold][red]not set[reset]") 52 } 53 if config.HasValidFirebaseServiceAccount() { 54 ll.Log("", "reset", "Firebase: [bold][green]configured[reset]") 55 } else { 56 ll.Log("", "reset", "Firebase: [bold][red]unconfigured[reset]") 57 } 58 fmt.Println() 59 60 if config.StartupScheduleRestoration != "disabled" { 61 notella.RestoreSchedule(config.StartupScheduleRestoration == "eager") 62 } 63 notella.DisplaySchedule() 64 65 ll.Info("starting scheduler") 66 go notella.StartScheduler() 67 68 ll.Log("Connecting", "cyan", "to Churros database at [bold]%s[reset]", config.ChurrosDatabaseURL) 69 err := notella.ConnectToDababase() 70 if err != nil { 71 ll.ErrorDisplay("could not connect to database", err) 72 } 73 74 ll.Log("Connecting", "cyan", "to NATS server at [bold]%s[reset]", config.NatsURL) 75 nc, err := nats.Connect(config.NatsURL) 76 if err != nil { 77 ll.ErrorDisplay("could not connect to NATS at %s", err, config.NatsURL) 78 return 79 } 80 81 js, err := jetstream.New(nc) 82 if err != nil { 83 ll.ErrorDisplay("could not connect to Jetstream", err) 84 return 85 } 86 87 ll.Log("Initializing", "cyan", "a Jetstream stream [bold]%s[reset], listening for subject [bold]%s[reset]", notella.StreamName, notella.SubjectName) 88 89 stream, err := js.CreateStream(ctx, jetstream.StreamConfig{ 90 Name: notella.StreamName, 91 Subjects: []string{notella.SubjectName}, 92 }) 93 if err != nil { 94 ll.ErrorDisplay("could not create stream", err) 95 return 96 } 97 98 ll.Log("Initializing", "cyan", "Jetstream consumer [bold]%s[reset] with [bold]AckExplicitPolicy[reset]", notella.ConsumerName) 99 100 consumer, err := stream.CreateConsumer(ctx, jetstream.ConsumerConfig{ 101 Durable: notella.ConsumerName, 102 Name: notella.ConsumerName, 103 AckPolicy: jetstream.AckExplicitPolicy, 104 }) 105 if err != nil { 106 ll.ErrorDisplay("could not create consumer", err) 107 return 108 } 109 110 ll.Log("Starting", "cyan", "consumer [bold]%s[reset]", notella.ConsumerName) 111 112 cc, err := consumer.Consume( 113 func(msg jetstream.Msg) { 114 err := notella.NatsReceiver(msg) 115 if err != nil { 116 ll.ErrorDisplay("Could not process message", err) 117 } 118 msg.Ack() // Acknowledge the message 119 }, 120 jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) { 121 ll.ErrorDisplay("Error while consuming message", err) 122 }), 123 ) 124 if err != nil { 125 ll.ErrorDisplay("could not start consumer", err) 126 return 127 } 128 129 defer cc.Stop() 130 131 // Capture OS signals for graceful shutdown 132 go func() { 133 sigChan := make(chan os.Signal, 1) 134 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) 135 <-sigChan 136 ll.Log("Shuting down", "magenta", "because of signal received") 137 cancel() 138 }() 139 140 // Start healthcheck endpoint 141 go notella.StartHealthCheckEndpoint(config.HealthCheckPort) 142 143 // Self-check 144 ll.Log("Checking", "cyan", "health") 145 result := notella.CheckHealth() 146 if result.AllGood() { 147 ll.Log("Health check", "green", "succeeded") 148 } else { 149 ll.Error("Health check failed: %#v", result) 150 } 151 152 // Send EventShowScheduledJobs to the stream every 5 minutes and save schedule to redis 153 go func() { 154 for { 155 select { 156 case <-ctx.Done(): 157 return 158 default: 159 time.Sleep(5 * time.Minute) 160 notella.DisplaySchedule() 161 notella.SaveSchedule() 162 } 163 } 164 }() 165 166 // Block until the context is canceled (i.e., server shutdown signal received) 167 <-ctx.Done() 168 ll.Log("Stopped", "red", "server") 169}