ai cooking
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Asynccritique (#489)

* close

* close

* okay that kind of works

* okay thats something

* think we're good

* lint failure

* todo and mail wait

---------

Co-authored-by: paul miller <paul.miller>

authored by

Paul Miller
paul miller
and committed by
GitHub
52ef392e 6fbdea5f

+195 -129
+9 -5
cmd/careme/web.go
··· 58 58 59 59 userStorage := users.NewStorage(cache) 60 60 61 - generator, err := recipes.NewGenerator(cfg, recipes.IO(cache)) 61 + mc := recipes.NewMultiCritiquer(cfg, cache) 62 + generator, err := recipes.NewGenerator(cfg, cache, mc) 62 63 if err != nil { 63 64 return fmt.Errorf("failed to create recipe generator: %w", err) 64 65 } ··· 150 151 }) 151 152 152 153 ro := &readyOnce{} 153 - ro.Add(generator, locationServer) 154 + ro.Add(generator, locationServer, mc) 154 155 155 156 // no logging for readyiness too noisy. 156 157 rootMux.Handle("/ready", &recoverer{ro}) ··· 182 183 return nil 183 184 case sig := <-shutdown: 184 185 slog.Info("Shutdown signal received", "signal", sig) 185 - return gracefulShutdown(server, recipeHandler.Wait) 186 + return gracefulShutdown(server, func() { 187 + recipeHandler.Wait() 188 + mc.Wait() 189 + }) 186 190 } 187 191 } 188 192 189 - func gracefulShutdown(svr *http.Server, recipesWait func()) error { 193 + func gracefulShutdown(svr *http.Server, wait func()) error { 190 194 // Give outstanding requests 25 seconds to complete (kubernetes has 30 second grace period) 191 195 time.Sleep(5 * time.Second) // buffer to allow ingress ot update. only needed in prod 192 196 ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second) ··· 204 208 205 209 done := make(chan struct{}) 206 210 go func() { 207 - recipesWait() 211 + wait() 208 212 close(done) 209 213 }() 210 214
+1 -1
cmd/careme/web_e2e_test.go
··· 163 163 cacheStore := cache.NewFileCache(cacheDir) 164 164 userStorage := users.NewStorage(cacheStore) 165 165 166 - generator, err := recipes.NewGenerator(cfg, recipes.IO(cacheStore)) 166 + generator, err := recipes.NewGenerator(cfg, cacheStore, recipes.NewMultiCritiquer(cfg, cacheStore)) 167 167 if err != nil { 168 168 t.Fatalf("failed to create generator: %v", err) 169 169 }
+6 -2
internal/mail/mail.go
··· 56 56 locServer locServer 57 57 client emailClient 58 58 publicOrigin string 59 + wait func() 59 60 } 60 61 61 62 // TODO share some of this with web.go? good for mocking? ··· 66 67 } 67 68 68 69 userStorage := users.NewStorage(cache) 69 - 70 - generator, err := recipes.NewGenerator(cfg, recipes.IO(cache)) 70 + mc := recipes.NewMultiCritiquer(cfg, cache) 71 + generator, err := recipes.NewGenerator(cfg, cache, mc) 71 72 if err != nil { 72 73 return nil, fmt.Errorf("failed to create recipe generator: %w", err) 73 74 } ··· 92 93 locServer: locationserver, 93 94 client: sendgrid.NewSendClient(sendgridkey), 94 95 publicOrigin: cfg.ResolvedPublicOrigin(), 96 + wait: mc.Wait, 95 97 }, nil 96 98 } 97 99 ··· 106 108 for _, user := range users { 107 109 m.sendEmail(ctx, user) 108 110 } 111 + m.wait() 112 + slog.InfoContext(ctx, "finished user email run") 109 113 } 110 114 111 115 func (m *mailer) sendEmail(ctx context.Context, user utypes.User) {
+65
internal/recipes/caching_critiquer.go
··· 1 + package recipes 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "log/slog" 8 + 9 + "careme/internal/ai" 10 + "careme/internal/cache" 11 + ) 12 + 13 + type recipeCritiquer interface { 14 + CritiqueRecipe(ctx context.Context, recipe ai.Recipe) (*ai.RecipeCritique, error) 15 + Ready(ctx context.Context) error 16 + } 17 + 18 + // TODO move critique.go over here and get rid of this iterface chacing admin_page 19 + type critiqueCache interface { 20 + CritiqueFromCache(ctx context.Context, hash string) (*ai.RecipeCritique, error) 21 + SaveCritique(ctx context.Context, hash string, critique *ai.RecipeCritique) error 22 + } 23 + 24 + var _ recipeCritiquer = &cachingCritiquer{} 25 + 26 + type cachingCritiquer struct { 27 + critiquer recipeCritiquer 28 + cache critiqueCache 29 + } 30 + 31 + func newCachingCritiquer(critiquer recipeCritiquer, cache cache.Cache) *cachingCritiquer { 32 + if critiquer == nil || cache == nil { 33 + panic("critiquer and cache must not be nil") 34 + } 35 + return &cachingCritiquer{ 36 + critiquer: critiquer, 37 + cache: IO(cache), 38 + } 39 + } 40 + 41 + func (c *cachingCritiquer) Ready(ctx context.Context) error { 42 + return c.critiquer.Ready(ctx) 43 + } 44 + 45 + func (c *cachingCritiquer) CritiqueRecipe(ctx context.Context, recipe ai.Recipe) (*ai.RecipeCritique, error) { 46 + hash := recipe.ComputeHash() 47 + critique, err := c.cache.CritiqueFromCache(ctx, hash) 48 + if err == nil { 49 + return critique, nil 50 + } 51 + if !errors.Is(err, cache.ErrNotFound) { 52 + slog.ErrorContext(ctx, "failed to load cached recipe critique", "recipe", recipe.Title, "hash", hash, "error", err) 53 + return nil, fmt.Errorf("load cached critique for recipe %q (%s): %w", recipe.Title, hash, err) 54 + } 55 + 56 + critique, err = c.critiquer.CritiqueRecipe(ctx, recipe) 57 + if err != nil { 58 + return nil, err 59 + } 60 + if err := c.cache.SaveCritique(ctx, hash, critique); err != nil { 61 + slog.ErrorContext(ctx, "failed to cache recipe critique", "recipe", recipe.Title, "hash", hash, "error", err) 62 + // not actually fatal 63 + } 64 + return critique, nil 65 + }
+101 -71
internal/recipes/generator.go
··· 10 10 "log/slog" 11 11 "slices" 12 12 "strings" 13 + "sync" 13 14 "time" 14 15 15 16 "careme/internal/ai" ··· 33 34 Ready(ctx context.Context) error 34 35 } 35 36 36 - type recipeCritiquer interface { 37 - CritiqueRecipe(ctx context.Context, recipe ai.Recipe) (*ai.RecipeCritique, error) 37 + type multiCritiquier interface { 38 + CritiqueRecipes(ctx context.Context, recipes []ai.Recipe) <-chan recipeCritiqueResult 39 + } 40 + 41 + // TODO move this out of generator.go 42 + type multiCritiquierPlus interface { 43 + multiCritiquier 44 + Wait() 38 45 Ready(ctx context.Context) error 39 46 } 40 47 48 + type rubberstamp struct{} 49 + 50 + func (r rubberstamp) CritiqueRecipes(ctx context.Context, recipes []ai.Recipe) <-chan recipeCritiqueResult { 51 + results := make(chan recipeCritiqueResult, len(recipes)) 52 + for _, r := range recipes { 53 + results <- recipeCritiqueResult{ 54 + Critique: &ai.RecipeCritique{OverallScore: 10}, 55 + Recipe: &r, 56 + err: nil, 57 + } 58 + } 59 + close(results) 60 + return results 61 + } 62 + 63 + func (r rubberstamp) Wait() {} 64 + func (r rubberstamp) Ready(ctx context.Context) error { return nil } 65 + 66 + type MultiCritiquer struct { 67 + critiquer recipeCritiquer 68 + wg sync.WaitGroup 69 + } 70 + 71 + func (mc *MultiCritiquer) Ready(ctx context.Context) error { 72 + return mc.critiquer.Ready(ctx) 73 + } 74 + 75 + func NewMultiCritiquer(cfg *config.Config, cache cache.Cache) multiCritiquierPlus { 76 + if !cfg.Gemini.IsEnabled() { 77 + return rubberstamp{} 78 + } 79 + crit := ai.NewCritiquer(cfg.Gemini.APIKey, cfg.Gemini.CritiqueModel) 80 + cachingCritiquer := newCachingCritiquer(crit, cache) 81 + return &MultiCritiquer{critiquer: cachingCritiquer} 82 + } 83 + 84 + func (mc *MultiCritiquer) CritiqueRecipes(ctx context.Context, recipes []ai.Recipe) <-chan recipeCritiqueResult { 85 + results := make(chan recipeCritiqueResult, len(recipes)) 86 + mc.wg.Add(len(recipes)) 87 + 88 + var localWg sync.WaitGroup 89 + for _, recipe := range recipes { 90 + localWg.Go(func() { 91 + defer mc.wg.Done() 92 + critique, err := mc.critiquer.CritiqueRecipe(ctx, recipe) 93 + results <- recipeCritiqueResult{ 94 + Recipe: &recipe, 95 + Critique: critique, 96 + err: err, 97 + } 98 + }) 99 + } 100 + go func() { 101 + localWg.Wait() 102 + close(results) 103 + }() 104 + return results 105 + } 106 + 107 + func (mc *MultiCritiquer) Wait() { 108 + mc.wg.Wait() 109 + } 110 + 41 111 type ingredientio interface { 42 112 SaveIngredients(ctx context.Context, hash string, ingredients []kroger.Ingredient) error 43 113 IngredientsFromCache(ctx context.Context, hash string) ([]kroger.Ingredient, error) 44 114 } 45 115 46 - type critiqueIO interface { 47 - SaveCritique(ctx context.Context, hash string, critique *ai.RecipeCritique) error 48 - } 49 - 50 116 const minimumRecipeCritiqueScore = 8 51 117 52 118 type Generator struct { 53 119 config *config.Config 54 120 aiClient aiClient 55 - critiquer recipeCritiquer 121 + critiquer multiCritiquier 56 122 staplesProvider staplesProvider 57 - io ingredientio 58 - cio critiqueIO // pull this out? 59 - } 60 - 61 - type allIO interface { 62 - ingredientio 63 - critiqueIO 123 + // TODO move ingrededientio into staples provider and remove from generator. 124 + io ingredientio 64 125 } 65 126 66 - func NewGenerator(cfg *config.Config, io allIO) (generatorPlus, error) { 127 + // this is kind of a factory. Could instead take stapes/criqiue and ai client isntead of creating them 128 + func NewGenerator(cfg *config.Config, cache cache.Cache, mc multiCritiquier) (generatorPlus, error) { 67 129 if cfg.Mocks.Enable { 68 130 return mock{}, nil 69 131 } ··· 73 135 return nil, fmt.Errorf("failed to create staples provider: %w", err) 74 136 } 75 137 76 - var critiquer recipeCritiquer 77 - if cfg.Gemini.IsEnabled() { 78 - critiquer = ai.NewCritiquer(cfg.Gemini.APIKey, cfg.Gemini.CritiqueModel) 79 - } 80 - 81 138 return &Generator{ 82 - io: io, 83 - cio: io, // pull this out? 139 + io: IO(cache), 84 140 config: cfg, 85 141 aiClient: ai.NewClient(cfg.AI.APIKey, "TODOMODEL"), 86 - critiquer: critiquer, 87 142 staplesProvider: stapesProvider, 143 + critiquer: mc, 88 144 }, nil 89 145 } 90 146 ··· 193 249 if err != nil { 194 250 return nil, err 195 251 } 196 - // how to pipe this back to ai client? should ai client hjave its own critiquer or do we just call regenerate once? 197 - 198 - // should never happen? How do you get save on first generte? 199 - // shoppingList.Recipes = append(shoppingList.Recipes, p.Saved...) 200 252 201 253 // TODO this does not get saved in params and thus must be loaded from html 202 254 // could update params after first generation or pregenerate before we save params. ··· 248 300 }) 249 301 } 250 302 303 + // TODO pass in ai client so web.go can check aiclients readiness. 251 304 func (g *Generator) Ready(ctx context.Context) error { 252 305 if err := g.aiClient.Ready(ctx); err != nil { 253 306 return err 254 307 } 255 - if g.critiquer != nil { 256 - if err := g.critiquer.Ready(ctx); err != nil { 257 - return fmt.Errorf("gemini critique client not ready: %w", err) 258 - } 259 - } 260 308 return nil 261 309 } 262 310 ··· 309 357 type recipeCritiqueResult struct { 310 358 Recipe *ai.Recipe // just here so we can give the model the title 311 359 Critique *ai.RecipeCritique 360 + err error 312 361 } 313 362 314 363 func (g *Generator) critiqueAndMaybeRetry(ctx context.Context, shoppingList *ai.ShoppingList) (*ai.ShoppingList, error) { 315 - results, err := g.cacheRecipeCritiques(ctx, shoppingList.Recipes) 316 - if err != nil { 317 - return nil, fmt.Errorf("failed to cache recipe critiques: %w", err) 364 + if g.critiquer == nil { 365 + return shoppingList, nil 318 366 } 367 + 368 + results := g.critiquer.CritiqueRecipes(ctx, shoppingList.Recipes) 319 369 var garbage []recipeCritiqueResult 320 370 var good []ai.Recipe 321 - for _, result := range results { 322 - if result.Critique.OverallScore >= minimumRecipeCritiqueScore { 323 - slog.InfoContext(ctx, "acceptable", "hash", result.Recipe.ComputeHash(), "title", result.Recipe.Title, "score", result.Critique.OverallScore) 371 + for result := range results { 372 + if result.err != nil { 373 + slog.ErrorContext(ctx, "failed to critique recipe", "hash", result.Recipe.ComputeHash(), "title", result.Recipe.Title, "error", result.err) 374 + good = append(good, *result.Recipe) 375 + continue 376 + } 324 377 378 + if result.Critique.OverallScore >= minimumRecipeCritiqueScore { 325 379 good = append(good, *result.Recipe) 326 - } else { 327 - // if there are no issues should we still retry? wasted of tokens 328 - slog.InfoContext(ctx, "low scoring recipe", "hash", result.Recipe.ComputeHash(), "title", result.Recipe.Title, "score", result.Critique.OverallScore) 329 - garbage = append(garbage, result) 380 + continue 330 381 } 382 + // if there are no issues should we still retry? wasted of tokens 383 + slog.InfoContext(ctx, "low scoring recipe", "hash", result.Recipe.ComputeHash(), "title", result.Recipe.Title, "score", result.Critique.OverallScore) 384 + garbage = append(garbage, result) 385 + 331 386 } 332 387 if len(garbage) == 0 { 333 388 return shoppingList, nil ··· 340 395 return nil, fmt.Errorf("conversation ID is required for critique retry") 341 396 } 342 397 343 - shoppingList, err = g.aiClient.Regenerate(ctx, critiqueRetryInstructions(garbage), shoppingList.ConversationID) 398 + shoppingList, err := g.aiClient.Regenerate(ctx, critiqueRetryInstructions(garbage), shoppingList.ConversationID) 344 399 if err != nil { 345 400 return nil, fmt.Errorf("failed to regenerate recipes from critique feedback: %w", err) 346 401 } 402 + newRecipes := shoppingList.Recipes 347 403 shoppingList.Recipes = append(shoppingList.Recipes, good...) 348 404 shoppingList.Discarded = lo.Map(garbage, func(result recipeCritiqueResult, _ int) ai.Recipe { 349 405 return *result.Recipe 350 406 }) 351 407 352 - // async as this is just debug not retrying twice yet. 353 - if _, err := g.cacheRecipeCritiques(ctx, shoppingList.Recipes); err != nil { 354 - return nil, fmt.Errorf("failed to cache recipe critiques: %w", err) 355 - } 408 + // fire this off async 409 + _ = g.critiquer.CritiqueRecipes(ctx, newRecipes) 356 410 return shoppingList, nil 357 - } 358 - 359 - func (g *Generator) cacheRecipeCritiques(ctx context.Context, recipes []ai.Recipe) ([]recipeCritiqueResult, error) { 360 - if g.critiquer == nil || g.cio == nil { 361 - // yuck refactor tests to make this alway present 362 - return nil, nil 363 - } 364 - return parallelism.MapWithErrors(recipes, func(recipe ai.Recipe) (recipeCritiqueResult, error) { 365 - hash := recipe.ComputeHash() 366 - critique, err := g.critiquer.CritiqueRecipe(ctx, recipe) 367 - if err != nil { 368 - slog.ErrorContext(ctx, "failed to critique recipe", "recipe", recipe.Title, "hash", hash, "error", err) 369 - return recipeCritiqueResult{}, fmt.Errorf("critique recipe %q (%s): %w", recipe.Title, hash, err) 370 - } 371 - // should we background the saving of this? too fast to matter? 372 - if err := g.cio.SaveCritique(ctx, hash, critique); err != nil { 373 - slog.ErrorContext(ctx, "failed to cache recipe critique", "recipe", recipe.Title, "hash", hash, "error", err) 374 - return recipeCritiqueResult{}, fmt.Errorf("cache critique for recipe %q (%s): %w", recipe.Title, hash, err) 375 - } 376 - return recipeCritiqueResult{ 377 - Recipe: &recipe, 378 - Critique: critique, 379 - }, nil 380 - }) 381 411 } 382 412 383 413 func critiqueRetryInstructions(results []recipeCritiqueResult) []string {
+12 -48
internal/recipes/generator_test.go
··· 2 2 3 3 import ( 4 4 "context" 5 - "errors" 6 5 "slices" 7 6 "sync" 8 7 "testing" ··· 405 404 }, 406 405 } 407 406 critiquer := &captureCritiquer{} 407 + cachedCrit := newCachingCritiquer(critiquer, cacheStore) 408 + mc := &MultiCritiquer{critiquer: cachedCrit} 408 409 g := &Generator{ 409 410 io: io, 410 - cio: io, 411 411 aiClient: aiStub, 412 - critiquer: critiquer, 412 + critiquer: mc, 413 413 } 414 414 415 415 got, err := g.GenerateRecipes(t.Context(), params) ··· 433 433 } 434 434 } 435 435 436 - func TestGenerateRecipes_CritiqueFailuresFailGeneration(t *testing.T) { 437 - cacheStore := cache.NewFileCache(t.TempDir()) 438 - io := IO(cacheStore) 439 - params := DefaultParams(&locations.Location{ID: "70004001", Name: "Store"}, time.Now()) 440 - if err := io.SaveIngredients(t.Context(), params.LocationHash(), []kroger.Ingredient{{Description: loPtr("Chicken")}}); err != nil { 441 - t.Fatalf("failed to seed ingredients cache: %v", err) 442 - } 443 - 444 - recipe := ai.Recipe{Title: "Roast Chicken", Description: "Crisp and simple", Instructions: []string{"Roast the chicken."}} 445 - g := &Generator{ 446 - io: io, 447 - cio: io, 448 - aiClient: &captureGenerateAIClient{ 449 - shoppingList: &ai.ShoppingList{ 450 - ConversationID: "conv-123", 451 - Recipes: []ai.Recipe{recipe}, 452 - }, 453 - }, 454 - critiquer: &captureCritiquer{err: errors.New("gemini down")}, 455 - } 456 - 457 - got, err := g.GenerateRecipes(t.Context(), params) 458 - if err == nil { 459 - t.Fatal("expected GenerateRecipes to fail when critique caching fails") 460 - } 461 - if got != nil { 462 - t.Fatalf("expected no shopping list on critique failure, got %+v", got) 463 - } 464 - if _, err := io.CritiqueFromCache(t.Context(), recipe.ComputeHash()); !errors.Is(err, cache.ErrNotFound) { 465 - t.Fatalf("expected no cached critique after failure, got %v", err) 466 - } 467 - } 468 - 469 436 func TestGenerateRecipes_RegenerateCritiquesOnlyFreshRecipes(t *testing.T) { 470 437 alreadySaved := ai.Recipe{Title: "Already Saved", Description: "Saved earlier"} 471 438 newResult := ai.Recipe{Title: "Brand New Dinner", Description: "Fresh idea"} ··· 473 440 critiquer := &captureCritiquer{} 474 441 g := &Generator{ 475 442 io: IO(cache.NewInMemoryCache()), 476 - cio: IO(cache.NewInMemoryCache()), 477 443 aiClient: &captureRegenerateAIClient{shoppingList: &ai.ShoppingList{ConversationID: "conv-123", Recipes: []ai.Recipe{newResult}}}, 478 - critiquer: critiquer, 444 + critiquer: &MultiCritiquer{critiquer: critiquer}, 479 445 } 480 446 481 447 params := DefaultParams(&locations.Location{ID: "70004001", Name: "Store"}, time.Now()) ··· 542 508 } 543 509 }, 544 510 } 511 + 512 + mc := &MultiCritiquer{critiquer: newCachingCritiquer(critiquer, cacheStore)} 545 513 g := &Generator{ 546 514 io: io, 547 - cio: io, 548 515 aiClient: aiStub, 549 - critiquer: critiquer, 516 + critiquer: mc, 550 517 } 551 518 552 519 got, err := g.GenerateRecipes(t.Context(), params) ··· 572 539 if got := aiStub.regenerateConversation; !slices.Equal(got, []string{"conv-initial"}) { 573 540 t.Fatalf("unexpected critique retry conversation IDs: got %v", got) 574 541 } 542 + mc.Wait() 575 543 if len(critiquer.recipes) != 2 { 576 544 t.Fatalf("expected two critique passes, got %d", len(critiquer.recipes)) 577 545 } ··· 633 601 } 634 602 g := &Generator{ 635 603 io: io, 636 - cio: io, 637 604 aiClient: aiStub, 638 - critiquer: critiquer, 605 + critiquer: &MultiCritiquer{critiquer: critiquer}, 639 606 } 640 607 641 608 got, err := g.GenerateRecipes(t.Context(), params) ··· 668 635 } 669 636 g := &Generator{ 670 637 io: io, 671 - cio: io, 672 638 aiClient: aiStub, 673 - critiquer: &captureCritiquer{}, 639 + critiquer: &MultiCritiquer{critiquer: &captureCritiquer{}}, 674 640 } 675 641 676 642 got, err := g.GenerateRecipes(t.Context(), params) ··· 733 699 } 734 700 g := &Generator{ 735 701 io: io, 736 - cio: io, 737 702 aiClient: aiStub, 738 - critiquer: critiquer, 703 + critiquer: &MultiCritiquer{critiquer: critiquer}, 739 704 } 740 705 741 706 params := DefaultParams(&locations.Location{ID: "70004001", Name: "Store"}, time.Now()) ··· 806 771 } 807 772 g := &Generator{ 808 773 io: io, 809 - cio: io, 810 774 aiClient: aiStub, 811 - critiquer: critiquer, 775 + critiquer: &MultiCritiquer{critiquer: critiquer}, 812 776 } 813 777 814 778 got, err := g.GenerateRecipes(t.Context(), params)
+1 -2
internal/recipes/server.go
··· 1043 1043 return 1044 1044 } 1045 1045 1046 - // add saved recipes here rather than each 1047 - 1046 + // should save be inside generator or shouild saved merging happen out here? 1048 1047 if err := s.SaveShoppingList(ctx, shoppingList, hash); err != nil { 1049 1048 slog.ErrorContext(ctx, "save error", "error", err) 1050 1049 return