ai cooking
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

let watchdogs check if staples are ready (#448)

* let watchdogs check if staples are ready

* simple setup

* ugh watchdog protects

* okay have our watchdog package

* write somthign back to body

* remove unncessary

---------

Co-authored-by: paul miller <paul.miller>

authored by

Paul Miller
paul miller
and committed by
GitHub
9cfc831e df860115

+251 -3
+5
cmd/careme/web.go
··· 26 26 "careme/internal/static" 27 27 "careme/internal/templates" 28 28 "careme/internal/users" 29 + "careme/internal/watchdog" 29 30 30 31 utypes "careme/internal/users/types" 31 32 ) ··· 81 82 recipeHandler.Register(appRoutes) 82 83 83 84 actowiz.NewServer(locationStorage).Register(infraRoutes) 85 + 86 + watchdogServer := watchdog.Server{} 87 + watchdogServer.Add("staples", generator, 6.*time.Hour) 88 + watchdogServer.Register(infraRoutes) 84 89 85 90 adminMux := http.NewServeMux() 86 91 adminMux.Handle("/users", users.AdminUsersPage(userStorage))
+7 -2
internal/mail/mail.go
··· 14 14 "os" 15 15 "time" 16 16 17 + "careme/internal/ai" 17 18 "careme/internal/cache" 18 19 "careme/internal/config" 19 20 "careme/internal/locations" ··· 42 43 43 44 type emailClient interface { 44 45 Send(message *mail.SGMailV3) (*rest.Response, error) 46 + } 47 + 48 + type generator interface { 49 + GenerateRecipes(ctx context.Context, p *recipes.GeneratorParams) (*ai.ShoppingList, error) 45 50 } 46 51 47 52 type mailer struct { 48 53 cache cache.Cache 49 54 userStorage *users.Storage 50 - generator *recipes.Generator // interface requires making params public 55 + generator generator // interface requires making params public 51 56 locServer locServer 52 57 client emailClient 53 58 publicOrigin string ··· 83 88 return &mailer{ 84 89 cache: cache, 85 90 userStorage: userStorage, 86 - generator: generator.(*recipes.Generator), // TODO do better 91 + generator: generator, 87 92 locServer: locationserver, 88 93 client: sendgrid.NewSendClient(sendgridkey), 89 94 publicOrigin: cfg.ResolvedPublicOrigin(),
+18 -1
internal/recipes/generator.go
··· 45 45 io ingredientio 46 46 } 47 47 48 - func NewGenerator(cfg *config.Config, io ingredientio) (generator, error) { 48 + func NewGenerator(cfg *config.Config, io ingredientio) (generatorPlus, error) { 49 49 if cfg.Mocks.Enable { 50 50 return mock{}, nil 51 51 } ··· 215 215 216 216 func (g *Generator) Ready(ctx context.Context) error { 217 217 return g.aiClient.Ready(ctx) 218 + } 219 + 220 + // this is a little expnsive so unlike ready above needs to be protected by a once by. 221 + func (g *Generator) Watchdog(ctx context.Context) error { 222 + storeIDs := []string{ 223 + "wholefoods_10153", // bellevue 224 + "safeway_490", // bellevue 225 + "70500874", // qfc in bellevue 226 + "starmarket_3566", // boston 227 + "acmemarkets_806", // newark 228 + } 229 + _, err := parallelism.Flatten(storeIDs, func(storeID string) ([]kroger.Ingredient, error) { 230 + // defeats point of watch dog to read from cache but we could write to it as a courtesy. 231 + return g.staplesProvider.FetchStaples(ctx, storeID) 232 + }) 233 + 234 + return err 218 235 } 219 236 220 237 // toStr returns the string value if non-nil, or "empty" otherwise.
+4
internal/recipes/mock.go
··· 363 363 return nil 364 364 } 365 365 366 + func (m mock) Watchdog(ctx context.Context) error { 367 + return nil 368 + } 369 + 366 370 func (m mock) GenerateRecipes(ctx context.Context, p *generatorParams) (*ai.ShoppingList, error) { 367 371 id := p.ConversationID 368 372 if id == "" {
+3
internal/recipes/params.go
··· 45 45 PriorSavedHashes []string `json:"-"` 46 46 } 47 47 48 + // exist for mail's interface be careful please. 49 + type GeneratorParams = generatorParams 50 + 48 51 func DefaultParams(l *locations.Location, date time.Time) *generatorParams { 49 52 // normalize to midnight (shave hours, minutes, seconds, nanoseconds) 50 53 date = time.Date(date.Year(), date.Month(), date.Day(), 0, 0, 0, 0, date.Location())
+6
internal/recipes/server.go
··· 77 77 AskQuestion(ctx context.Context, question string, conversationID string) (string, error) 78 78 GenerateRecipeImage(ctx context.Context, recipe ai.Recipe) (*ai.GeneratedImage, error) 79 79 PickAWine(ctx context.Context, location string, recipe ai.Recipe, date time.Time) (*ai.WineSelection, error) 80 + } 81 + 82 + // should we have new generator just return two interfaces instead of gluing? 83 + type generatorPlus interface { 84 + generator 80 85 Ready(ctx context.Context) error 86 + Watchdog(ctx context.Context) error 81 87 } 82 88 83 89 type server struct {
+40
internal/watchdog/onceper.go
··· 1 + package watchdog 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "sync" 7 + "time" 8 + ) 9 + 10 + // oncePer is a very dumb rate limiter to protect watchdogs from abuse. 11 + // It becomes pretty useless as replicas expand. 12 + // There is a fancier version in branch fancyonceper, 13 + // but another option is to just hide watchdog calls from public. 14 + type oncePer struct { 15 + mu sync.Mutex 16 + last time.Time 17 + period time.Duration 18 + dog watchdog 19 + } 20 + 21 + func NewOncePer(period time.Duration, dog watchdog) oncePer { 22 + return oncePer{ 23 + period: period, 24 + dog: dog, 25 + } 26 + } 27 + 28 + var errTooSoon = errors.New("too soon to call this again") 29 + 30 + func (o *oncePer) Watchdog(ctx context.Context) error { 31 + o.mu.Lock() 32 + defer o.mu.Unlock() 33 + 34 + if !o.last.IsZero() && time.Since(o.last) < o.period { 35 + return errTooSoon 36 + } 37 + 38 + o.last = time.Now() 39 + return o.dog.Watchdog(ctx) // allow more tries if this fails? 40 + }
+38
internal/watchdog/onceper_test.go
··· 1 + package watchdog 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "testing" 7 + "time" 8 + ) 9 + 10 + func TestOncePerDo(t *testing.T) { 11 + t.Parallel() 12 + 13 + dog := &stubWatchdog{} 14 + guard := NewOncePer(time.Hour, dog) 15 + 16 + if err := guard.Watchdog(context.Background()); err != nil { 17 + t.Fatalf("first call: %v", err) 18 + } 19 + if got, want := dog.calls, 1; got != want { 20 + t.Fatalf("calls after first run = %d, want %d", got, want) 21 + } 22 + 23 + err := guard.Watchdog(context.Background()) 24 + if !errors.Is(err, errTooSoon) { 25 + t.Fatalf("second call error = %v, want %v", err, errTooSoon) 26 + } 27 + if got, want := dog.calls, 1; got != want { 28 + t.Fatalf("calls after blocked run = %d, want %d", got, want) 29 + } 30 + 31 + guard.last = time.Now().Add(-2 * time.Hour) 32 + if err := guard.Watchdog(context.Background()); err != nil { 33 + t.Fatalf("third call after period: %v", err) 34 + } 35 + if got, want := dog.calls, 2; got != want { 36 + t.Fatalf("calls after third run = %d, want %d", got, want) 37 + } 38 + }
+56
internal/watchdog/server.go
··· 1 + package watchdog 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "log/slog" 8 + "net/http" 9 + "time" 10 + 11 + "careme/internal/routing" 12 + ) 13 + 14 + type watchdog interface { 15 + Watchdog(ctx context.Context) error 16 + } 17 + 18 + type watcher struct { 19 + name string 20 + period time.Duration 21 + dog watchdog 22 + } 23 + 24 + type Server struct { 25 + watchers []watcher 26 + } 27 + 28 + func (s *Server) Add(name string, dog watchdog, period time.Duration) { 29 + guard := NewOncePer(period, dog) 30 + s.watchers = append(s.watchers, watcher{ 31 + name: name, 32 + period: period, 33 + dog: &guard, 34 + }) 35 + } 36 + 37 + func (s *Server) Register(mux routing.Registrar) { 38 + for _, watcher := range s.watchers { 39 + mux.HandleFunc("GET /watchdogs/"+watcher.name, func(w http.ResponseWriter, r *http.Request) { 40 + err := watcher.dog.Watchdog(r.Context()) 41 + if errors.Is(err, errTooSoon) { 42 + http.Error(w, fmt.Sprintf("can only call watchdog every %v", watcher.period), http.StatusServiceUnavailable) 43 + return 44 + } 45 + if err != nil { 46 + http.Error(w, fmt.Sprintf("%s not ready: %v", watcher.name, err), http.StatusServiceUnavailable) 47 + return 48 + } 49 + 50 + if _, err := w.Write([]byte("OK")); err != nil { 51 + slog.ErrorContext(r.Context(), "failed to write readiness response", "error", err) 52 + } 53 + w.WriteHeader(http.StatusOK) 54 + }) 55 + } 56 + }
+74
internal/watchdog/server_test.go
··· 1 + package watchdog 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "net/http" 7 + "net/http/httptest" 8 + "strings" 9 + "testing" 10 + "time" 11 + ) 12 + 13 + type stubWatchdog struct { 14 + calls int 15 + err error 16 + } 17 + 18 + func (s *stubWatchdog) Watchdog(context.Context) error { 19 + s.calls++ 20 + return s.err 21 + } 22 + 23 + func TestServerRegisterWatchdog(t *testing.T) { 24 + t.Parallel() 25 + 26 + dog := &stubWatchdog{} 27 + server := &Server{} 28 + server.Add("staples", dog, 6*time.Hour) 29 + mux := http.NewServeMux() 30 + server.Register(mux) 31 + 32 + first := httptest.NewRecorder() 33 + mux.ServeHTTP(first, httptest.NewRequest(http.MethodGet, "/watchdogs/staples", nil)) 34 + if got, want := first.Code, http.StatusOK; got != want { 35 + t.Fatalf("first status = %d, want %d", got, want) 36 + } 37 + if got, want := dog.calls, 1; got != want { 38 + t.Fatalf("watchdog calls after first request = %d, want %d", got, want) 39 + } 40 + 41 + second := httptest.NewRecorder() 42 + mux.ServeHTTP(second, httptest.NewRequest(http.MethodGet, "/watchdogs/staples", nil)) 43 + if got, want := second.Code, http.StatusServiceUnavailable; got != want { 44 + t.Fatalf("second status = %d, want %d", got, want) 45 + } 46 + if !strings.Contains(second.Body.String(), "can only call watchdog every "+(6*time.Hour).String()) { 47 + t.Fatalf("second body = %q, want rate limit message", second.Body.String()) 48 + } 49 + if got, want := dog.calls, 1; got != want { 50 + t.Fatalf("watchdog calls after second request = %d, want %d", got, want) 51 + } 52 + } 53 + 54 + func TestServerRegisterWatchdogError(t *testing.T) { 55 + t.Parallel() 56 + 57 + dog := &stubWatchdog{err: errors.New("boom")} 58 + server := &Server{} 59 + server.Add("produce", dog, 30*time.Minute) 60 + mux := http.NewServeMux() 61 + server.Register(mux) 62 + 63 + rec := httptest.NewRecorder() 64 + mux.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/watchdogs/produce", nil)) 65 + if got, want := rec.Code, http.StatusServiceUnavailable; got != want { 66 + t.Fatalf("status = %d, want %d", got, want) 67 + } 68 + if !strings.Contains(rec.Body.String(), "produce not ready: boom") { 69 + t.Fatalf("body = %q, want watchdog error", rec.Body.String()) 70 + } 71 + if got, want := dog.calls, 1; got != want { 72 + t.Fatalf("watchdog calls = %d, want %d", got, want) 73 + } 74 + }