ai cooking
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge pull request #146 from paulgmiller/racefix2

get rid of race condition

authored by

Paul Miller and committed by
GitHub
9823f58c 0d38887d

+131 -35
+24 -3
internal/cache/azure.go
··· 8 8 "os" 9 9 "strings" 10 10 11 + "github.com/Azure/azure-sdk-for-go/sdk/azcore" 11 12 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" 12 13 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" 13 14 "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" ··· 95 96 return stream.Body, nil 96 97 } 97 98 98 - func (fc *BlobCache) Set(ctx context.Context, key, value string) error { 99 - _, err := fc.container.NewBlockBlobClient(key).UploadStream(ctx, strings.NewReader(value), &azblob.UploadStreamOptions{}) 100 - return err 99 + func (fc *BlobCache) Put(ctx context.Context, key, value string, opts PutOptions) error { 100 + var access *blob.AccessConditions 101 + if opts.Condition == PutIfNoneMatch { 102 + access = &blob.AccessConditions{} 103 + access.ModifiedAccessConditions = &blob.ModifiedAccessConditions{} 104 + any := azcore.ETag("*") 105 + access.ModifiedAccessConditions.IfNoneMatch = &any 106 + // TODO: IfMatch support. 107 + } 108 + 109 + _, err := fc.container.NewBlockBlobClient(key).UploadStream(ctx, strings.NewReader(value), &azblob.UploadStreamOptions{ 110 + AccessConditions: access, 111 + }) 112 + if err != nil { 113 + if bloberror.HasCode(err, bloberror.BlobAlreadyExists, bloberror.ResourceAlreadyExists) { 114 + return ErrAlreadyExists 115 + } 116 + if bloberror.HasCode(err, bloberror.ConditionNotMet, bloberror.TargetConditionNotMet, bloberror.SourceConditionNotMet) { 117 + return ErrAlreadyExists 118 + } 119 + return err 120 + } 121 + return nil 101 122 } 102 123 103 124 // TODO take a config? let it set container or directory?
+42 -5
internal/cache/file.go
··· 10 10 ) 11 11 12 12 var ErrNotFound = errors.New("cache entry not found") 13 + var ErrAlreadyExists = errors.New("cache entry already exists") 14 + 15 + type PutCondition uint8 16 + 17 + const ( 18 + PutUnconditional PutCondition = iota 19 + PutIfNoneMatch 20 + // PutIfMatch 21 + ) 22 + 23 + type PutOptions struct { 24 + Condition PutCondition 25 + // IfMatch updates the entry only if the current ETag matches this value. 26 + // IfMatch string 27 + } 28 + 29 + func Unconditional() PutOptions { 30 + return PutOptions{Condition: PutUnconditional} 31 + } 32 + 33 + func IfNoneMatch() PutOptions { 34 + return PutOptions{Condition: PutIfNoneMatch} 35 + } 13 36 14 37 type Cache interface { 15 38 Get(ctx context.Context, key string) (io.ReadCloser, error) 16 - //TODO define set behavior if it already exists. Maybe go immutable and error? 17 - Set(ctx context.Context, key, value string) error 18 39 Exists(ctx context.Context, key string) (bool, error) 40 + Put(ctx context.Context, key, value string, opts PutOptions) error 19 41 } 20 42 21 43 type ListCache interface { ··· 62 84 } 63 85 64 86 func (fc *FileCache) Get(_ context.Context, key string) (io.ReadCloser, error) { 65 - 66 87 data, err := os.Open(filepath.Join(fc.Dir, key)) 67 88 if err != nil { 68 89 if os.IsNotExist(err) { ··· 73 94 return data, nil 74 95 } 75 96 76 - func (fc *FileCache) Set(_ context.Context, key, value string) error { 97 + func (fc *FileCache) Put(_ context.Context, key, value string, opts PutOptions) error { 77 98 fullPath := filepath.Join(fc.Dir, key) 78 99 dir := filepath.Dir(fullPath) 79 - // Create parent directories if they don't exist 80 100 if err := os.MkdirAll(dir, 0755); err != nil { 81 101 return err 82 102 } 103 + 104 + if opts.Condition == PutIfNoneMatch { 105 + f, err := os.OpenFile(fullPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0644) 106 + if err != nil { 107 + if os.IsExist(err) { 108 + return ErrAlreadyExists 109 + } 110 + return err 111 + } 112 + defer f.Close() 113 + if _, err := f.WriteString(value); err != nil { 114 + return err 115 + } 116 + return nil 117 + } 118 + 119 + // TODO: IfMatch support (write only if etag matches). 83 120 return os.WriteFile(fullPath, []byte(value), 0644) 84 121 }
+1 -1
internal/recipes/generator.go
··· 158 158 slog.ErrorContext(ctx, "failed to marshal ingredients", "location", p.String(), "error", err) 159 159 return nil, err 160 160 } 161 - if err := g.cache.Set(ctx, p.LocationHash(), string(allingredientsJSON)); err != nil { 161 + if err := g.cache.Put(ctx, p.LocationHash(), string(allingredientsJSON), cache.Unconditional()); err != nil { 162 162 slog.ErrorContext(ctx, "failed to cache ingredients", "location", p.String(), "error", err) 163 163 return nil, err 164 164 }
+9 -23
internal/recipes/io.go
··· 63 63 recipe := &recipes[i] 64 64 recipe.OriginHash = originHash 65 65 hash := recipe.ComputeHash() 66 - exists, err := rio.Cache.Exists(ctx, recipeCachePrefix+hash) 67 - if err != nil { 68 - slog.ErrorContext(ctx, "failed to check existing recipe in cache", "recipe", recipe.Title, "error", err) 69 - errs = append(errs, fmt.Errorf("error checking %s, %w", hash, err)) 70 - continue 71 - } 72 - if exists { 73 - continue 74 - } 75 66 76 67 slog.InfoContext(ctx, "storing recipe", "title", recipe.Title, "hash", hash) 77 68 recipeJSON := lo.Must(json.Marshal(recipe)) 78 - if err := rio.Cache.Set(ctx, recipeCachePrefix+hash, string(recipeJSON)); err != nil { 69 + if err := rio.Cache.Put(ctx, recipeCachePrefix+hash, string(recipeJSON), cache.IfNoneMatch()); err != nil { 70 + if errors.Is(err, cache.ErrAlreadyExists) { 71 + continue 72 + } 79 73 slog.ErrorContext(ctx, "failed to cache individual recipe", "recipe", recipe.Title, "error", err) 80 74 errs = append(errs, fmt.Errorf("error saving %s, %w", hash, err)) 81 75 } ··· 86 80 var AlreadyExists = errors.New("already exists") 87 81 88 82 func (rio *recipeio) SaveParams(ctx context.Context, p *generatorParams) error { 89 - //not atomic push this into cache 90 - exists, err := rio.Cache.Exists(ctx, p.Hash()+".params") 91 - if err != nil { 92 - slog.ErrorContext(ctx, "failed to check existing params in cache", "location", p.String(), "error", err) 93 - return err 94 - } 95 - 96 - if exists { 97 - return AlreadyExists 98 - } 99 - 100 83 paramsJSON := lo.Must(json.Marshal(p)) 101 - if err := rio.Cache.Set(ctx, p.Hash()+".params", string(paramsJSON)); err != nil { 84 + if err := rio.Cache.Put(ctx, p.Hash()+".params", string(paramsJSON), cache.IfNoneMatch()); err != nil { 85 + if errors.Is(err, cache.ErrAlreadyExists) { 86 + return AlreadyExists 87 + } 102 88 slog.ErrorContext(ctx, "failed to cache params", "location", p.String(), "error", err) 103 89 return err 104 90 } ··· 112 98 } 113 99 // we could actually nuke out the rest of recipe and lazily load but not yet 114 100 shoppingJSON := lo.Must(json.Marshal(shoppingList)) 115 - if err := rio.Cache.Set(ctx, hash, string(shoppingJSON)); err != nil { 101 + if err := rio.Cache.Put(ctx, hash, string(shoppingJSON), cache.Unconditional()); err != nil { 116 102 slog.ErrorContext(ctx, "failed to cache shopping list document", "hash", hash, "error", err) 117 103 return err 118 104 }
+52
internal/recipes/io_test.go
··· 1 + package recipes 2 + 3 + import ( 4 + "careme/internal/cache" 5 + "careme/internal/locations" 6 + "errors" 7 + "os" 8 + "sync" 9 + "testing" 10 + "time" 11 + ) 12 + 13 + func TestSaveParams_IsAtomic(t *testing.T) { 14 + tmpDir, err := os.MkdirTemp("", "careme-test-saveparams-*") 15 + if err != nil { 16 + t.Fatalf("failed to create temp dir: %v", err) 17 + } 18 + defer os.RemoveAll(tmpDir) 19 + 20 + rio := IO(cache.NewFileCache(tmpDir)) 21 + p := DefaultParams(&locations.Location{ID: "123", Name: "Test Store"}, time.Date(2026, 1, 25, 0, 0, 0, 0, time.UTC)) 22 + 23 + const n = 32 24 + var wg sync.WaitGroup 25 + wg.Add(n) 26 + 27 + errs := make(chan error, n) 28 + for range n { 29 + go func() { 30 + defer wg.Done() 31 + errs <- rio.SaveParams(t.Context(), p) 32 + }() 33 + } 34 + wg.Wait() 35 + close(errs) 36 + 37 + var ok, alreadyExists, other int 38 + for err := range errs { 39 + switch { 40 + case err == nil: 41 + ok++ 42 + case errors.Is(err, AlreadyExists): 43 + alreadyExists++ 44 + default: 45 + other++ 46 + } 47 + } 48 + 49 + if ok != 1 || other != 0 || alreadyExists != n-1 { 50 + t.Fatalf("expected 1 success + %d AlreadyExists, got ok=%d alreadyExists=%d other=%d", n-1, ok, alreadyExists, other) 51 + } 52 + }
+1 -1
internal/recipes/storage_test.go
··· 25 25 26 26 hash := recipe.ComputeHash() 27 27 recipeJSON, _ := json.Marshal(recipe) 28 - err = fileCache.Set(t.Context(), "recipe/"+hash, string(recipeJSON)) 28 + err = fileCache.Put(t.Context(), "recipe/"+hash, string(recipeJSON), cache.Unconditional()) 29 29 if err != nil { 30 30 t.Fatalf("failed to save recipe: %v", err) 31 31 }
+2 -2
internal/users/storage.go
··· 166 166 if err := s.Update(&newUser); err != nil { 167 167 return nil, fmt.Errorf("failed to create new user: %w", err) 168 168 } 169 - if err := s.cache.Set(context.TODO(), emailPrefix+newUser.Email[0], newUser.ID); err != nil { 169 + if err := s.cache.Put(context.TODO(), emailPrefix+newUser.Email[0], newUser.ID, cache.Unconditional()); err != nil { 170 170 return nil, fmt.Errorf("failed to index new user by email: %w", err) 171 171 } 172 172 return &newUser, nil ··· 181 181 if err != nil { 182 182 return fmt.Errorf("failed to marshal user: %w", err) 183 183 } 184 - if err := s.cache.Set(context.TODO(), userPrefix+user.ID, string(userBytes)); err != nil { 184 + if err := s.cache.Put(context.TODO(), userPrefix+user.ID, string(userBytes), cache.Unconditional()); err != nil { 185 185 return fmt.Errorf("failed to update user: %w", err) 186 186 } 187 187 return nil