like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

sync/rate: respect burst limit, code quality

karitham 575b439c b142047f

+1329 -555
+73 -12
cache/bbolt.go
··· 10 10 "time" 11 11 12 12 "go.etcd.io/bbolt" 13 + berr "go.etcd.io/bbolt/errors" 13 14 ) 14 15 15 16 var _ Storage = (*BoltStorage)(nil) ··· 244 245 } 245 246 246 247 return s.db.Update(func(tx *bbolt.Tx) error { 247 - if err := tx.DeleteBucket([]byte(recordsBucket(did))); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { 248 + if err := tx.DeleteBucket([]byte(recordsBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 248 249 return fmt.Errorf("delete records bucket: %w", err) 249 250 } 250 - if err := tx.DeleteBucket([]byte(processedBucket(did))); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { 251 + if err := tx.DeleteBucket([]byte(processedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 251 252 return fmt.Errorf("delete processed bucket: %w", err) 252 253 } 253 - if err := tx.DeleteBucket([]byte(failedBucket(did))); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { 254 + if err := tx.DeleteBucket([]byte(failedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 254 255 return fmt.Errorf("delete failed bucket: %w", err) 255 256 } 256 257 metaBkt := tx.Bucket([]byte(metaBucket())) ··· 278 279 }) 279 280 280 281 for _, did := range dids { 281 - if err := tx.DeleteBucket([]byte(recordsBucket(did))); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { 282 + if err := tx.DeleteBucket([]byte(recordsBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 282 283 return fmt.Errorf("delete records for %s: %w", did, err) 283 284 } 284 - if err := tx.DeleteBucket([]byte(processedBucket(did))); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { 285 + if err := tx.DeleteBucket([]byte(processedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 285 286 return fmt.Errorf("delete processed for %s: %w", did, err) 286 287 } 287 - if err := tx.DeleteBucket([]byte(failedBucket(did))); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { 288 + if err := tx.DeleteBucket([]byte(failedBucket(did))); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 288 289 return fmt.Errorf("delete failed for %s: %w", did, err) 289 290 } 290 291 } 291 292 292 - if err := tx.DeleteBucket([]byte(metaBucket())); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { 293 + if err := tx.DeleteBucket([]byte(metaBucket())); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 293 294 return fmt.Errorf("delete meta: %w", err) 294 295 } 295 296 296 - if err := tx.DeleteBucket([]byte("quota")); err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { 297 + if err := tx.DeleteBucket([]byte("quota")); err != nil && !errors.Is(err, berr.ErrBucketNotFound) { 297 298 return fmt.Errorf("delete quota: %w", err) 298 299 } 299 300 ··· 317 318 return val, err 318 319 } 319 320 320 - func (s *BoltStorage) Set(key string, val int) error { 321 - return s.db.Update(func(tx *bbolt.Tx) error { 321 + func (s *BoltStorage) IncrBy(key string, n int) (int, error) { 322 + var val int 323 + err := s.db.Update(func(tx *bbolt.Tx) error { 322 324 b, err := tx.CreateBucketIfNotExists([]byte("quota")) 323 325 if err != nil { 324 326 return err 325 327 } 326 - v, err := json.Marshal(val) 328 + v := b.Get([]byte(key)) 329 + if v != nil { 330 + if err := json.Unmarshal(v, &val); err != nil { 331 + return err 332 + } 333 + } 334 + val += n 335 + newV, err := json.Marshal(val) 327 336 if err != nil { 328 337 return err 329 338 } 330 - return b.Put([]byte(key), v) 339 + return b.Put([]byte(key), newV) 340 + }) 341 + return val, err 342 + } 343 + 344 + func (s *BoltStorage) GetMulti(keys []string) (map[string]int, error) { 345 + res := make(map[string]int, len(keys)) 346 + err := s.db.View(func(tx *bbolt.Tx) error { 347 + b := tx.Bucket([]byte("quota")) 348 + if b == nil { 349 + return nil 350 + } 351 + for _, key := range keys { 352 + v := b.Get([]byte(key)) 353 + if v == nil { 354 + res[key] = 0 355 + continue 356 + } 357 + var val int 358 + if err := json.Unmarshal(v, &val); err != nil { 359 + return err 360 + } 361 + res[key] = val 362 + } 363 + return nil 364 + }) 365 + return res, err 366 + } 367 + 368 + func (s *BoltStorage) IncrByMulti(deltas map[string]int) error { 369 + return s.db.Update(func(tx *bbolt.Tx) error { 370 + b, err := tx.CreateBucketIfNotExists([]byte("quota")) 371 + if err != nil { 372 + return err 373 + } 374 + for key, n := range deltas { 375 + var val int 376 + v := b.Get([]byte(key)) 377 + if v != nil { 378 + if err := json.Unmarshal(v, &val); err != nil { 379 + return err 380 + } 381 + } 382 + val += n 383 + newV, err := json.Marshal(val) 384 + if err != nil { 385 + return err 386 + } 387 + if err := b.Put([]byte(key), newV); err != nil { 388 + return err 389 + } 390 + } 391 + return nil 331 392 }) 332 393 } 333 394
-17
cache/cache_test.go
··· 82 82 t.Error("cache should be invalid") 83 83 } 84 84 } 85 - 86 - func TestQuotaKV(t *testing.T) { 87 - storage := newTestStorage(t) 88 - 89 - err := storage.Set("testkey", 123) 90 - if err != nil { 91 - t.Fatalf("Set failed: %v", err) 92 - } 93 - 94 - val, err := storage.Get("testkey") 95 - if err != nil { 96 - t.Fatalf("Get failed: %v", err) 97 - } 98 - if val != 123 { 99 - t.Errorf("expected 123, got %d", val) 100 - } 101 - }
+3 -1
cache/storage.go
··· 26 26 27 27 // KVStore implementation 28 28 Get(key string) (int, error) 29 - Set(key string, val int) error 29 + IncrBy(key string, n int) (int, error) 30 + GetMulti(keys []string) (map[string]int, error) 31 + IncrByMulti(counts map[string]int) error 30 32 } 31 33 32 34 type DBStats struct {
+2 -2
flake.nix
··· 17 17 let 18 18 lazuli = pkgs.buildGoModule rec { 19 19 name = "lazuli"; 20 - version = "0.1.1"; 20 + version = "0.1.2"; 21 21 src = pkgs.nix-gitignore.gitignoreSource [ "*.csv" "*.zip" "*.json" ] ./.; 22 - vendorHash = "sha256-Zr9gGytJARMbf/7120HYkKsfzpeW47MkwdMODD9QTKc="; 22 + vendorHash = "sha256-MfBPv/L7wHuUGXx4BDd+DFq0RB11KuMHCzPjFv6FMgs="; 23 23 ldflags = [ 24 24 "-X" 25 25 "main.Version=${version}"
+17 -14
go.mod
··· 3 3 go 1.25.5 4 4 5 5 require ( 6 - github.com/bluesky-social/indigo v0.0.0-20260114211028-207c9d49d0de 7 - github.com/urfave/cli/v3 v3.6.1 8 - go.etcd.io/bbolt v1.3.10 9 - golang.org/x/text v0.14.0 10 - golang.org/x/time v0.3.0 6 + github.com/bluesky-social/indigo v0.0.0-20260120225912-12d69fa4d209 7 + github.com/failsafe-go/failsafe-go v0.9.5 8 + github.com/urfave/cli/v3 v3.6.2 9 + go.etcd.io/bbolt v1.4.3 10 + golang.org/x/text v0.33.0 11 11 ) 12 12 13 13 require ( 14 14 github.com/beorn7/perks v1.0.1 // indirect 15 - github.com/cespare/xxhash/v2 v2.2.0 // indirect 15 + github.com/bits-and-blooms/bitset v1.24.4 // indirect 16 + github.com/cespare/xxhash/v2 v2.3.0 // indirect 16 17 github.com/earthboundkid/versioninfo/v2 v2.24.1 // indirect 17 18 github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect 18 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect 19 19 github.com/mr-tron/base58 v1.2.0 // indirect 20 - github.com/prometheus/client_golang v1.17.0 // indirect 21 - github.com/prometheus/client_model v0.5.0 // indirect 22 - github.com/prometheus/common v0.45.0 // indirect 23 - github.com/prometheus/procfs v0.12.0 // indirect 20 + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect 21 + github.com/prometheus/client_golang v1.23.2 // indirect 22 + github.com/prometheus/client_model v0.6.2 // indirect 23 + github.com/prometheus/common v0.67.5 // indirect 24 + github.com/prometheus/procfs v0.19.2 // indirect 24 25 gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b // indirect 25 26 gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 // indirect 26 - golang.org/x/crypto v0.21.0 // indirect 27 - golang.org/x/sys v0.22.0 // indirect 28 - google.golang.org/protobuf v1.33.0 // indirect 27 + go.yaml.in/yaml/v2 v2.4.3 // indirect 28 + golang.org/x/crypto v0.47.0 // indirect 29 + golang.org/x/sys v0.40.0 // indirect 30 + golang.org/x/time v0.14.0 // indirect 31 + google.golang.org/protobuf v1.36.11 // indirect 29 32 )
+57 -32
go.sum
··· 1 1 github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= 2 2 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= 3 - github.com/bluesky-social/indigo v0.0.0-20260114211028-207c9d49d0de h1:75emVEzhTQWXwAQoBZV4/Bg2NEULZSgRwLFAdTccTrY= 4 - github.com/bluesky-social/indigo v0.0.0-20260114211028-207c9d49d0de/go.mod h1:KIy0FgNQacp4uv2Z7xhNkV3qZiUSGuRky97s7Pa4v+o= 5 - github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= 6 - github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 3 + github.com/bits-and-blooms/bitset v1.24.4 h1:95H15Og1clikBrKr/DuzMXkQzECs1M6hhoGXLwLQOZE= 4 + github.com/bits-and-blooms/bitset v1.24.4/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= 5 + github.com/bluesky-social/indigo v0.0.0-20260120225912-12d69fa4d209 h1:W01PGqjCexVBzIZ4FoNe4iO8OhI9XbSE7ieWL0QnMu8= 6 + github.com/bluesky-social/indigo v0.0.0-20260120225912-12d69fa4d209/go.mod h1:KIy0FgNQacp4uv2Z7xhNkV3qZiUSGuRky97s7Pa4v+o= 7 + github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 8 + github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 7 9 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= 8 10 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 9 11 github.com/earthboundkid/versioninfo/v2 v2.24.1 h1:SJTMHaoUx3GzjjnUO1QzP3ZXK6Ee/nbWyCm58eY3oUg= 10 12 github.com/earthboundkid/versioninfo/v2 v2.24.1/go.mod h1:VcWEooDEuyUJnMfbdTh0uFN4cfEIg+kHMuWB2CDCLjw= 11 - github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= 12 - github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= 13 + github.com/failsafe-go/failsafe-go v0.9.5 h1:Bgt4wTKV3+n49GssB2njPZ4u5ApjvtKSIQlqIL4E3oo= 14 + github.com/failsafe-go/failsafe-go v0.9.5/go.mod h1:IeRpglkcwzKagjDMh90ZhN2l4Ovt3+jemQBUbThag54= 15 + github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= 16 + github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= 13 17 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= 14 18 github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= 19 + github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY= 20 + github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y= 15 21 github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= 16 22 github.com/ipfs/go-cid v0.4.1/go.mod h1:uQHwDeX4c6CtyrFwdqyhpNcxVewur1M7l7fNU7LKwZk= 17 23 github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= 18 24 github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= 19 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= 20 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= 25 + github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= 26 + github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= 27 + github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= 28 + github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= 21 29 github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= 22 30 github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= 23 31 github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= ··· 32 40 github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= 33 41 github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= 34 42 github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= 43 + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= 44 + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= 35 45 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= 36 46 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 37 - github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= 38 - github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= 39 - github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= 40 - github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= 41 - github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= 42 - github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= 43 - github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= 44 - github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= 47 + github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= 48 + github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= 49 + github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= 50 + github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= 51 + github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= 52 + github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= 53 + github.com/prometheus/procfs v0.19.2 h1:zUMhqEW66Ex7OXIiDkll3tl9a1ZdilUOd/F6ZXw4Vws= 54 + github.com/prometheus/procfs v0.19.2/go.mod h1:M0aotyiemPhBCM0z5w87kL22CxfcH05ZpYlu+b4J7mw= 55 + github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= 56 + github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= 45 57 github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= 46 58 github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= 47 59 github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= 48 60 github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= 49 - github.com/urfave/cli/v3 v3.6.1 h1:j8Qq8NyUawj/7rTYdBGrxcH7A/j7/G8Q5LhWEW4G3Mo= 50 - github.com/urfave/cli/v3 v3.6.1/go.mod h1:ysVLtOEmg2tOy6PknnYVhDoouyC/6N42TMeoMzskhso= 61 + github.com/urfave/cli/v3 v3.6.2 h1:lQuqiPrZ1cIz8hz+HcrG0TNZFxU70dPZ3Yl+pSrH9A8= 62 + github.com/urfave/cli/v3 v3.6.2/go.mod h1:ysVLtOEmg2tOy6PknnYVhDoouyC/6N42TMeoMzskhso= 51 63 github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e h1:28X54ciEwwUxyHn9yrZfl5ojgF4CBNLWX7LR0rvBkf4= 52 64 github.com/whyrusleeping/cbor-gen v0.2.1-0.20241030202151-b7a6831be65e/go.mod h1:pM99HXyEbSQHcosHc0iW7YFmwnscr+t9Te4ibko05so= 53 65 gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b h1:CzigHMRySiX3drau9C6Q5CAbNIApmLdat5jPMqChvDA= 54 66 gitlab.com/yawning/secp256k1-voi v0.0.0-20230925100816-f2616030848b/go.mod h1:/y/V339mxv2sZmYYR64O07VuCpdNZqCTwO8ZcouTMI8= 55 67 gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02 h1:qwDnMxjkyLmAFgcfgTnfJrmYKWhHnci3GjDqcZp1M3Q= 56 68 gitlab.com/yawning/tuplehash v0.0.0-20230713102510-df83abbf9a02/go.mod h1:JTnUj0mpYiAsuZLmKjTx/ex3AtMowcCgnE7YNyCEP0I= 57 - go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0= 58 - go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ= 59 - golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= 60 - golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= 61 - golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= 62 - golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= 63 - golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI= 64 - golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= 65 - golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= 66 - golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= 67 - golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= 68 - golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= 69 + go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= 70 + go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= 71 + go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= 72 + go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= 73 + go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= 74 + go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= 75 + golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8= 76 + golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A= 77 + golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= 78 + golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= 79 + golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= 80 + golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= 81 + golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= 82 + golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= 83 + golang.org/x/text v0.33.0 h1:B3njUFyqtHDUI5jMn1YIr5B0IE2U0qck04r6d4KPAxE= 84 + golang.org/x/text v0.33.0/go.mod h1:LuMebE6+rBincTi9+xWTY8TztLzKHc/9C1uBCG27+q8= 85 + golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= 86 + golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= 69 87 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= 70 88 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= 71 - google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= 72 - google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= 89 + google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs= 90 + google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= 91 + google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= 92 + google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= 93 + google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= 94 + google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= 95 + gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 96 + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= 97 + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= 73 98 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 74 99 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= 75 100 lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI=
+34 -12
main.go
··· 17 17 "tangled.org/karitham.dev/lazuli/sync" 18 18 "tangled.org/karitham.dev/lazuli/sync/logutil" 19 19 20 + "github.com/failsafe-go/failsafe-go" 21 + "github.com/failsafe-go/failsafe-go/retrypolicy" 20 22 "github.com/urfave/cli/v3" 21 23 ) 22 24 ··· 159 161 } 160 162 161 163 limiter := sync.NewRateLimiter(a.storage) 162 - writes, global := limiter.Stats() 164 + writes, global, err := limiter.Stats() 165 + if err != nil { 166 + return fmt.Errorf("failed to get rate limit stats: %w", err) 167 + } 163 168 164 169 if a.outputFormat == "json" { 165 170 out := map[string]any{ ··· 283 288 } 284 289 285 290 // Check rate limit for 1 write 286 - if err := limiter.AllowBulkWrite(ctx, 1); err != nil { 287 - return fmt.Errorf("rate limit wait failed: %w", err) 288 - } 289 291 290 - w, g := limiter.Stats() 291 - res := sync.PublishBatch(ctx, repoClient, did, []sync.PlayRecord{fr.rec}, w, g, a.storage) 292 + res := sync.PublishBatch(ctx, repoClient, did, []sync.PlayRecord{fr.rec}, a.storage) 292 293 293 - if res.ErrorCount == 0 { 294 + if res == nil { 294 295 fmt.Printf("Successfully retried: %s - %s\n", fr.rec.ArtistName(), fr.rec.TrackName) 295 296 // Mark as published (updates processedBucket to 1) 296 297 if err := a.storage.MarkPublished(did, fr.key); err != nil { ··· 302 303 } 303 304 successCount++ 304 305 } else { 305 - fmt.Printf("Failed again: %s - %s: %v\n", fr.rec.ArtistName(), fr.rec.TrackName, res.LastError) 306 + fmt.Printf("Failed again: %s - %s: %v\n", fr.rec.ArtistName(), fr.rec.TrackName, res) 306 307 errorCount++ 307 - limiter.RefundBulkWrite(1) 308 308 } 309 309 310 310 // Optional: small delay between retries? ··· 600 600 ATProtoClient: repoClient, 601 601 ProgressLog: progressLog, 602 602 Storage: a.storage, 603 + Limiter: limiter, 603 604 } 604 605 605 - result := sync.Publish(ctx, authClient, publishOpts, limiter) 606 + result := sync.Publish(ctx, authClient, publishOpts) 606 607 607 608 a.log.Info("Import completed", 608 609 slog.Int("success_count", result.SuccessCount), ··· 646 647 slog.String("elapsed", pr.Elapsed), 647 648 slog.String("eta", pr.ETA), 648 649 slog.String("rate", pr.Rate), 649 - slog.Int("errors", pr.Errors)) 650 + slog.Int("errors", pr.Errors), 651 + slog.Int("writes", pr.WritesConsumed), 652 + slog.Int("global", pr.GlobalConsumed)) 650 653 } 651 654 } 652 655 } ··· 758 761 uri := rec.URI 759 762 parts := strings.Split(uri, "/") 760 763 rkey := parts[len(parts)-1] 761 - err := repoClient.DeleteRecord(ctx, sync.RecordType, rkey) 764 + 765 + retryPolicy := retrypolicy.NewBuilder[any](). 766 + WithMaxRetries(10). 767 + WithBackoff(sync.BaseRetryDelay, 5*time.Minute). 768 + HandleIf(func(_ any, err error) bool { 769 + return sync.IsTransientError(err) 770 + }). 771 + OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[any]) { 772 + a.log.Warn("Delete failed with transient error, retrying", 773 + slog.Duration("retryDelay", e.Delay), 774 + logutil.Error(e.LastError()), 775 + slog.Int("attempt", e.Attempts()), 776 + slog.String("uri", uri)) 777 + }). 778 + Build() 779 + 780 + err := failsafe.With[any](retryPolicy).WithContext(ctx).Run(func() error { 781 + return repoClient.DeleteRecord(ctx, sync.RecordType, rkey) 782 + }) 783 + 762 784 if err != nil { 763 785 a.log.Error("Failed to delete record", logutil.Error(err), slog.String("uri", uri)) 764 786 } else {
+320 -69
sync/adapter.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "errors" 6 7 "fmt" 7 8 "log/slog" 9 + "math/rand" 10 + "net" 8 11 "strings" 12 + "time" 9 13 10 14 "github.com/bluesky-social/indigo/atproto/atclient" 11 15 "github.com/bluesky-social/indigo/atproto/syntax" 16 + "github.com/failsafe-go/failsafe-go" 17 + "github.com/failsafe-go/failsafe-go/retrypolicy" 12 18 13 19 "tangled.org/karitham.dev/lazuli/cache" 14 20 "tangled.org/karitham.dev/lazuli/sync/logutil" ··· 45 51 return nil, "", fmt.Errorf("client cannot be nil") 46 52 } 47 53 54 + var outResp struct { 55 + Records []struct { 56 + URI string `json:"uri"` 57 + CID string `json:"cid"` 58 + Value map[string]any `json:"value"` 59 + } `json:"records"` 60 + Cursor string `json:"cursor"` 61 + } 62 + 63 + var chargedAt time.Time 48 64 if c.limiter != nil { 49 65 slog.Debug("waiting for rate limit (read)") 50 - if err := c.limiter.AllowRead(ctx); err != nil { 66 + var err error 67 + chargedAt, err = c.limiter.AllowRead(ctx) 68 + if err != nil { 51 69 slog.Error("rate limit wait cancelled/failed (read)", logutil.Error(err)) 52 70 return nil, "", err 53 71 } 54 72 } 55 73 56 - var out []RecordRef 57 - 58 - for { 59 - select { 60 - case <-ctx.Done(): 61 - return nil, "", ctx.Err() 62 - default: 63 - } 64 - 65 - var outResp struct { 66 - Records []struct { 67 - URI string `json:"uri"` 68 - CID string `json:"cid"` 69 - Value map[string]any `json:"value"` 70 - } `json:"records"` 71 - Cursor string `json:"cursor"` 72 - } 73 - 74 - err := c.client.Get(ctx, syntax.NSID("com.atproto.repo.listRecords"), map[string]any{ 75 - "repo": c.did, 76 - "collection": collection, 77 - "limit": limit, 78 - "cursor": cursor, 79 - }, &outResp) 80 - if err != nil { 81 - return nil, "", err 74 + err := c.client.Get(ctx, syntax.NSID("com.atproto.repo.listRecords"), map[string]any{ 75 + "repo": c.did, 76 + "collection": collection, 77 + "limit": limit, 78 + "cursor": cursor, 79 + }, &outResp) 80 + if err != nil { 81 + if c.limiter != nil && isTransientError(err) { 82 + c.limiter.RefundRead(ctx, chargedAt) 82 83 } 84 + return nil, "", err 85 + } 83 86 84 - for _, r := range outResp.Records { 85 - var playRecord PlayRecord 86 - if r.Value != nil { 87 - b, err := json.Marshal(r.Value) 88 - if err != nil { 89 - slog.Debug("failed to marshal record value", slog.String("uri", r.URI), logutil.Error(err)) 90 - continue 91 - } 92 - if err := json.Unmarshal(b, &playRecord); err != nil { 93 - slog.Debug("failed to unmarshal record", slog.String("uri", r.URI), logutil.Error(err)) 94 - continue 95 - } 96 - slog.Debug("parsed record", slog.String("uri", r.URI), logutil.Track(playRecord.TrackName, playRecord.ArtistName(), playRecord.PlayedTime.Time)) 87 + out := make([]RecordRef, 0, len(outResp.Records)) 88 + for _, r := range outResp.Records { 89 + var playRecord PlayRecord 90 + if r.Value != nil { 91 + b, err := json.Marshal(r.Value) 92 + if err != nil { 93 + slog.Debug("failed to marshal record value", slog.String("uri", r.URI), logutil.Error(err)) 94 + continue 95 + } 96 + if err := json.Unmarshal(b, &playRecord); err != nil { 97 + slog.Debug("failed to unmarshal record", slog.String("uri", r.URI), logutil.Error(err)) 98 + continue 97 99 } 98 - out = append(out, RecordRef{ 99 - URI: r.URI, 100 - CID: r.CID, 101 - Value: playRecord, 102 - }) 103 - } 104 - 105 - if outResp.Cursor == "" || len(outResp.Records) < limit { 106 - break 100 + slog.Debug("parsed record", slog.String("uri", r.URI), logutil.Track(playRecord.TrackName, playRecord.ArtistName(), playRecord.PlayedTime.Time)) 107 101 } 108 - cursor = outResp.Cursor 102 + out = append(out, RecordRef{ 103 + URI: r.URI, 104 + CID: r.CID, 105 + Value: playRecord, 106 + }) 109 107 } 110 108 111 - return out, cursor, nil 109 + return out, outResp.Cursor, nil 112 110 } 113 111 114 112 func (c *RateClient) ApplyWrites(ctx context.Context, collection string, records []PlayRecord) error { ··· 120 118 return fmt.Errorf("client cannot be nil") 121 119 } 122 120 121 + var chargedAt time.Time 123 122 if c.limiter != nil { 124 - slog.Debug("waiting for rate limit (write)") 125 - if err := c.limiter.AllowBulkWrite(ctx, len(records)); err != nil { 126 - slog.Error("rate limit wait cancelled/failed (write)", logutil.Error(err)) 123 + var err error 124 + chargedAt, err = c.limiter.AllowBulkWrite(ctx, len(records)) 125 + if err != nil { 126 + slog.Error("rate limit wait cancelled/failed (write)", 127 + logutil.DID(c.did), 128 + slog.String("collection", collection), 129 + logutil.Error(err), 130 + ) 127 131 return err 128 132 } 129 133 } 130 134 135 + err := applyWrites(ctx, c.client, c.did, collection, records) 136 + if err != nil && isTransientError(err) && c.limiter != nil { 137 + c.limiter.RefundBulkWrite(ctx, len(records), chargedAt) 138 + } 139 + return err 140 + } 141 + 142 + func applyWrites(ctx context.Context, client *atclient.APIClient, did, collection string, records []PlayRecord) error { 143 + if len(records) == 0 { 144 + return nil 145 + } 146 + 147 + if len(records) > 200 { 148 + return fmt.Errorf("too many records in one ApplyWrites call: %d (max 200)", len(records)) 149 + } 150 + 131 151 writes, err := prepareWrites(records, collection) 132 152 if err != nil { 133 153 return err 134 154 } 135 155 136 - err = c.client.Post(ctx, syntax.NSID("com.atproto.repo.applyWrites"), map[string]any{ 137 - "repo": c.did, 156 + return client.Post(ctx, syntax.NSID("com.atproto.repo.applyWrites"), map[string]any{ 157 + "repo": did, 138 158 "writes": writes, 139 159 }, nil) 140 - if err != nil && c.limiter != nil { 141 - c.limiter.RefundBulkWrite(len(records)) 160 + } 161 + 162 + type atprotoClientAdapter struct { 163 + client *atclient.APIClient 164 + did string 165 + } 166 + 167 + func (a *atprotoClientAdapter) ApplyWrites(ctx context.Context, collection string, records []PlayRecord) error { 168 + return applyWrites(ctx, a.client, a.did, collection, records) 169 + } 170 + 171 + func prepareRecords(batch []PlayRecord) []PlayRecord { 172 + atprotoRecords := make([]PlayRecord, 0, len(batch)) 173 + for _, record := range batch { 174 + record.Type = RecordType 175 + record.SubmissionClientAgent = ClientAgent 176 + atprotoRecords = append(atprotoRecords, record) 177 + } 178 + return atprotoRecords 179 + } 180 + 181 + func waitForRetry(ctx context.Context, delay time.Duration) bool { 182 + timer := time.NewTimer(delay) 183 + defer timer.Stop() 184 + 185 + select { 186 + case <-timer.C: 187 + return true 188 + case <-ctx.Done(): 189 + slog.Debug("retry cancelled due to context done") 190 + return false 191 + } 192 + } 193 + 194 + func defaultProgressLog(f func(ProgressReport)) func(ProgressReport) { 195 + if f != nil { 196 + return f 197 + } 198 + return func(pr ProgressReport) { 199 + slog.Info("sync progress", 200 + slog.Int("completed", pr.Completed), 201 + slog.Int("total", pr.Total), 202 + slog.Float64("percent", pr.Percent), 203 + slog.String("elapsed", pr.Elapsed), 204 + slog.String("eta", pr.ETA), 205 + slog.String("rate", pr.Rate), 206 + slog.Int("errors", pr.Errors), 207 + ) 208 + } 209 + } 210 + 211 + func defaultBatchSize(size int) int { 212 + if size > 0 { 213 + return size 214 + } 215 + return DefaultBatchSize 216 + } 217 + 218 + func buildClient(client AuthClient, customClient ATProtoClient) (ATProtoClient, error) { 219 + if customClient != nil { 220 + return customClient, nil 221 + } 222 + 223 + apiClient := client.GetAPIClient() 224 + if apiClient == nil { 225 + slog.Error("failed to get API client", logutil.Error(fmt.Errorf("client is nil"))) 226 + return nil, fmt.Errorf("API client is nil") 227 + } 228 + 229 + return &atprotoClientAdapter{client: apiClient, did: client.GetDID()}, nil 230 + } 231 + 232 + func newPublishResult(success, errors, total int, start time.Time, cancelled bool) PublishResult { 233 + return PublishResult{ 234 + SuccessCount: success, 235 + ErrorCount: errors, 236 + Cancelled: cancelled, 237 + Duration: time.Since(start), 238 + TotalRecords: total, 239 + RecordsPerMinute: ratePerMinute(success, time.Since(start)), 240 + } 241 + } 242 + 243 + func logResult(success, errors int, startTime time.Time) { 244 + if errors > 0 { 245 + slog.Warn("import completed with errors", 246 + slog.Int("success", success), 247 + slog.Int("errors", errors)) 248 + } 249 + slog.Info("import completed", 250 + slog.Int("success", success), 251 + slog.Int("errors", errors), 252 + slog.Duration("duration", time.Since(startTime)), 253 + slog.String("rate", formatRate(ratePerMinute(success, time.Since(startTime))))) 254 + } 255 + 256 + func backoff(attempt int) time.Duration { 257 + if attempt <= 0 { 258 + return BaseRetryDelay 259 + } 260 + 261 + // Calculate exponential delay: BaseRetryDelay * 2^(attempt-1) 262 + // We use uint(attempt-1) because 1<<0 is 1 (for first retry) 263 + exp := min(attempt-1, 31) 264 + 265 + delay := BaseRetryDelay * time.Duration(1<<uint(exp)) 266 + 267 + // Cap the delay before adding jitter 268 + if delay > MaxRetryDelay || delay <= 0 { 269 + delay = MaxRetryDelay 270 + } 271 + 272 + // Add up to 25% jitter 273 + var jitter time.Duration 274 + if delay > 4 { 275 + jitter = time.Duration(rand.Int63n(int64(delay / 4))) 142 276 } 143 - return err 277 + 278 + return delay + jitter 279 + } 280 + 281 + func PublishBatch(ctx context.Context, client ATProtoClient, did string, batch []PlayRecord, storage cache.Storage) error { 282 + if len(batch) == 0 { 283 + return nil 284 + } 285 + 286 + atprotoRecords := prepareRecords(batch) 287 + 288 + err := client.ApplyWrites(ctx, RecordType, atprotoRecords) 289 + if err != nil { 290 + slog.Error("batch publish failed", logutil.Error(err)) 291 + return err 292 + } 293 + 294 + if storage != nil && did != "" { 295 + keys := CreateRecordKeys(atprotoRecords) 296 + cacheEntries := make(map[string][]byte) 297 + for i, rec := range atprotoRecords { 298 + key := keys[i] 299 + value, _ := json.Marshal(rec) 300 + cacheEntries[key] = value 301 + } 302 + 303 + if err := storage.SaveRecords(did, cacheEntries); err != nil { 304 + return fmt.Errorf("failed to save records to storage: %w", err) 305 + } 306 + 307 + if err := storage.MarkPublished(did, keys...); err != nil { 308 + return fmt.Errorf("failed to mark records as published: %w", err) 309 + } 310 + } 311 + 312 + return nil 313 + } 314 + 315 + type ATProtoClient interface { 316 + ApplyWrites(ctx context.Context, collection string, records []PlayRecord) error 317 + } 318 + 319 + func ratePerMinute(count int, duration time.Duration) float64 { 320 + if duration == 0 { 321 + return 0 322 + } 323 + return float64(count) / duration.Minutes() 324 + } 325 + 326 + type AuthClient interface { 327 + GetAPIClient() *atclient.APIClient 328 + GetDID() string 329 + } 330 + 331 + func IsTransientError(err error) bool { 332 + return isTransientError(err) 333 + } 334 + 335 + func Backoff(attempt int) time.Duration { 336 + return backoff(attempt) 337 + } 338 + 339 + func WaitForRetry(ctx context.Context, delay time.Duration) bool { 340 + return waitForRetry(ctx, delay) 341 + } 342 + 343 + func isTransientError(err error) bool { 344 + if err == nil { 345 + return false 346 + } 347 + 348 + var apiErr *atclient.APIError 349 + if errors.As(err, &apiErr) { 350 + switch apiErr.StatusCode { 351 + case 429, 500, 502, 503, 504: 352 + return true 353 + } 354 + return false 355 + } 356 + 357 + var netErr net.Error 358 + if errors.As(err, &netErr) { 359 + return netErr.Timeout() 360 + } 361 + 362 + return false 144 363 } 145 364 146 365 func (c *RateClient) DeleteRecord(ctx context.Context, collection, rkey string) error { ··· 148 367 return fmt.Errorf("client is nil") 149 368 } 150 369 370 + var chargedAt time.Time 151 371 if c.limiter != nil { 152 372 slog.Debug("waiting for rate limit (delete)") 153 - if err := c.limiter.AllowBulkWrite(ctx, 1); err != nil { 373 + var err error 374 + chargedAt, err = c.limiter.AllowBulkWrite(ctx, 1) 375 + if err != nil { 154 376 slog.Error("rate limit wait cancelled/failed (delete)", logutil.Error(err)) 155 377 return err 156 378 } ··· 165 387 "rkey": {rkey}, 166 388 }, 167 389 }) 168 - if err != nil && c.limiter != nil { 169 - c.limiter.RefundBulkWrite(1) 390 + if err != nil && c.limiter != nil && isTransientError(err) { 391 + c.limiter.RefundBulkWrite(ctx, 1, chargedAt) 170 392 } 171 393 return err 172 394 } ··· 201 423 } 202 424 203 425 allRecords := make([]ExistingRecord, 0, 1024) 426 + return fetchExistingLoop(ctx, client, did, storage, allRecords) 427 + } 428 + 429 + func fetchExistingLoop(ctx context.Context, client RepoClient, did string, storage cache.Storage, allRecords []ExistingRecord) ([]ExistingRecord, error) { 204 430 const batchSize = 100 205 431 var cursor string 206 432 433 + type fetchResult struct { 434 + records []RecordRef 435 + cursor string 436 + } 437 + 438 + retryPolicy := retrypolicy.NewBuilder[fetchResult](). 439 + WithMaxRetries(10). 440 + WithBackoff(BaseRetryDelay, 5*time.Minute). 441 + HandleIf(func(_ fetchResult, err error) bool { 442 + return isTransientError(err) 443 + }). 444 + OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[fetchResult]) { 445 + slog.Warn("fetch failed with transient error, retrying", 446 + slog.Duration("retryDelay", e.Delay), 447 + logutil.Error(e.LastError()), 448 + slog.Int("attempt", e.Attempts())) 449 + }). 450 + Build() 451 + 207 452 for { 208 453 select { 209 454 case <-ctx.Done(): ··· 211 456 default: 212 457 } 213 458 214 - records, newCursor, err := client.ListRecords(ctx, RecordType, batchSize, cursor) 459 + result, err := failsafe.With(retryPolicy). 460 + WithContext(ctx). 461 + Get(func() (fetchResult, error) { 462 + recs, next, err := client.ListRecords(ctx, RecordType, batchSize, cursor) 463 + if err != nil { 464 + return fetchResult{}, err 465 + } 466 + 467 + return fetchResult{records: recs, cursor: next}, nil 468 + }) 215 469 if err != nil { 216 470 return nil, err 217 471 } 218 472 219 - for _, rec := range records { 473 + for _, rec := range result.records { 220 474 allRecords = append(allRecords, ExistingRecord(rec)) 221 475 } 222 476 223 - if newCursor == "" || len(records) < batchSize { 477 + if result.cursor == "" || len(result.records) < batchSize { 224 478 break 225 479 } 226 - cursor = newCursor 480 + cursor = result.cursor 227 481 } 228 482 229 483 if storage != nil { 230 484 cacheEntries := make(map[string][]byte) 231 485 keys := make([]string, 0, len(allRecords)) 232 486 for _, rec := range allRecords { 233 - // use the rkey from URI if available, otherwise fallback to generating it 234 - // URI is at://did/collection/rkey 235 487 parts := strings.Split(rec.URI, "/") 236 488 key := parts[len(parts)-1] 237 489 if key == "" { ··· 246 498 return nil, err 247 499 } 248 500 249 - // Mark remote records as published locally to prevent redundant syncs 250 501 if err := storage.MarkPublished(did, keys...); err != nil { 251 502 return nil, err 252 503 }
+364
sync/batch_test.go
··· 1 + package sync 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "errors" 7 + "net/http" 8 + "strings" 9 + "sync" 10 + "sync/atomic" 11 + "testing" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/atclient" 15 + "tangled.org/karitham.dev/lazuli/cache" 16 + ) 17 + 18 + type mockRoundTripper func(req *http.Request) (*http.Response, error) 19 + 20 + func (f mockRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { 21 + return f(req) 22 + } 23 + 24 + // Mock Storage 25 + 26 + type mockStorage struct { 27 + cache.Storage 28 + unpublished map[string][]byte 29 + published map[string]bool 30 + failed map[string]string 31 + kv map[string]int 32 + mu sync.Mutex 33 + } 34 + 35 + func newMockStorage() *mockStorage { 36 + return &mockStorage{ 37 + unpublished: make(map[string][]byte), 38 + published: make(map[string]bool), 39 + failed: make(map[string]string), 40 + kv: make(map[string]int), 41 + } 42 + } 43 + 44 + func (m *mockStorage) SaveRecords(did string, records map[string][]byte) error { 45 + m.mu.Lock() 46 + defer m.mu.Unlock() 47 + for k, v := range records { 48 + m.unpublished[k] = v 49 + } 50 + return nil 51 + } 52 + 53 + func (m *mockStorage) IterateUnpublished(did string, fn func(key string, rec []byte) error) error { 54 + m.mu.Lock() 55 + // Copy to avoid deadlock if fn calls back 56 + keys := make([]string, 0, len(m.unpublished)) 57 + for k := range m.unpublished { 58 + keys = append(keys, k) 59 + } 60 + m.mu.Unlock() 61 + 62 + for _, k := range keys { 63 + m.mu.Lock() 64 + rec, ok := m.unpublished[k] 65 + m.mu.Unlock() 66 + if ok { 67 + if err := fn(k, rec); err != nil { 68 + return err 69 + } 70 + } 71 + } 72 + return nil 73 + } 74 + 75 + func (m *mockStorage) MarkPublished(did string, keys ...string) error { 76 + m.mu.Lock() 77 + defer m.mu.Unlock() 78 + for _, k := range keys { 79 + delete(m.unpublished, k) 80 + m.published[k] = true 81 + } 82 + return nil 83 + } 84 + 85 + func (m *mockStorage) MarkFailed(did string, keys []string, err string) error { 86 + m.mu.Lock() 87 + defer m.mu.Unlock() 88 + for _, k := range keys { 89 + m.failed[k] = err 90 + } 91 + return nil 92 + } 93 + 94 + func (m *mockStorage) Get(key string) (int, error) { 95 + m.mu.Lock() 96 + defer m.mu.Unlock() 97 + return m.kv[key], nil 98 + } 99 + 100 + func (m *mockStorage) IncrBy(key string, n int) (int, error) { 101 + m.mu.Lock() 102 + defer m.mu.Unlock() 103 + m.kv[key] += n 104 + return m.kv[key], nil 105 + } 106 + 107 + // Mock RateLimiter 108 + type mockLimiter struct { 109 + refunds int32 110 + } 111 + 112 + func (m *mockLimiter) AllowRead(ctx context.Context) (time.Time, error) { 113 + return time.Now(), nil 114 + } 115 + 116 + func (m *mockLimiter) AllowBulkWrite(ctx context.Context, n int) (time.Time, error) { 117 + return time.Now(), nil 118 + } 119 + 120 + func (m *mockLimiter) RefundBulkWrite(ctx context.Context, n int, chargedAt time.Time) { 121 + atomic.AddInt32(&m.refunds, 1) 122 + } 123 + 124 + func (m *mockLimiter) RefundRead(ctx context.Context, chargedAt time.Time) { 125 + atomic.AddInt32(&m.refunds, 1) 126 + } 127 + 128 + func (m *mockLimiter) Stats() (int, int, error) { 129 + return 0, 0, nil 130 + } 131 + 132 + // Mock ATProtoClient 133 + type mockATProtoClient struct { 134 + applyWritesFunc func(ctx context.Context, collection string, records []PlayRecord) error 135 + } 136 + 137 + func (m *mockATProtoClient) ApplyWrites(ctx context.Context, collection string, records []PlayRecord) error { 138 + if m.applyWritesFunc != nil { 139 + return m.applyWritesFunc(ctx, collection, records) 140 + } 141 + return nil 142 + } 143 + 144 + // Mock AuthClient 145 + type mockAuthClient struct { 146 + did string 147 + } 148 + 149 + func (m *mockAuthClient) GetAPIClient() *atclient.APIClient { return nil } 150 + func (m *mockAuthClient) GetDID() string { return m.did } 151 + 152 + type timeoutError struct{} 153 + 154 + func (e timeoutError) Error() string { return "timeout" } 155 + func (e timeoutError) Timeout() bool { return true } 156 + func (e timeoutError) Temporary() bool { return true } 157 + 158 + func TestIsTransientError(t *testing.T) { 159 + tests := []struct { 160 + name string 161 + err error 162 + want bool 163 + }{ 164 + {"nil", nil, false}, 165 + {"generic error", errors.New("some error"), false}, 166 + {"API 400", &atclient.APIError{StatusCode: 400}, false}, 167 + {"API 429", &atclient.APIError{StatusCode: 429}, true}, 168 + {"API 500", &atclient.APIError{StatusCode: 500}, true}, 169 + {"API 503", &atclient.APIError{StatusCode: 503}, true}, 170 + {"net timeout", timeoutError{}, true}, 171 + {"net non-timeout", errors.New("network is down"), false}, 172 + } 173 + 174 + for _, tt := range tests { 175 + t.Run(tt.name, func(t *testing.T) { 176 + if got := isTransientError(tt.err); got != tt.want { 177 + t.Errorf("isTransientError() = %v, want %v", got, tt.want) 178 + } 179 + }) 180 + } 181 + } 182 + 183 + func TestApplyWrites_RateClient(t *testing.T) { 184 + ctx := context.Background() 185 + 186 + t.Run("Empty records", func(t *testing.T) { 187 + limiter := &mockLimiter{} 188 + client := NewRateClient(nil, "did:example:123", limiter) 189 + err := client.ApplyWrites(ctx, "test", nil) 190 + if err != nil { 191 + t.Errorf("ApplyWrites(nil) error = %v", err) 192 + } 193 + }) 194 + 195 + t.Run("Too many records", func(t *testing.T) { 196 + limiter := &mockLimiter{} 197 + client := NewRateClient(&atclient.APIClient{}, "did:example:123", limiter) 198 + records := make([]PlayRecord, 201) 199 + err := client.ApplyWrites(ctx, "test", records) 200 + if err == nil { 201 + t.Fatal("expected error for > 200 records") 202 + } 203 + expected := "too many records in one ApplyWrites call: 201 (max 200)" 204 + if err.Error() != expected { 205 + t.Errorf("expected error %q, got %q", expected, err.Error()) 206 + } 207 + }) 208 + 209 + t.Run("Transient error refunds tokens", func(t *testing.T) { 210 + limiter := &mockLimiter{} 211 + apiClient := atclient.NewAPIClient("https://example.com") 212 + apiClient.Client.Transport = mockRoundTripper(func(req *http.Request) (*http.Response, error) { 213 + return &http.Response{ 214 + StatusCode: 503, 215 + Body: http.NoBody, 216 + }, nil 217 + }) 218 + client := NewRateClient(apiClient, "did:example:123", limiter) 219 + 220 + err := client.ApplyWrites(ctx, "test", []PlayRecord{{TrackName: "Song 1"}}) 221 + if err == nil { 222 + t.Fatal("expected error") 223 + } 224 + if atomic.LoadInt32(&limiter.refunds) != 1 { 225 + t.Errorf("expected 1 refund, got %d", limiter.refunds) 226 + } 227 + }) 228 + 229 + t.Run("Non-transient error does NOT refund", func(t *testing.T) { 230 + limiter := &mockLimiter{} 231 + apiClient := atclient.NewAPIClient("https://example.com") 232 + apiClient.Client.Transport = mockRoundTripper(func(req *http.Request) (*http.Response, error) { 233 + return &http.Response{ 234 + StatusCode: 400, 235 + Body: http.NoBody, 236 + }, nil 237 + }) 238 + client := NewRateClient(apiClient, "did:example:123", limiter) 239 + 240 + err := client.ApplyWrites(ctx, "test", []PlayRecord{{TrackName: "Song 1"}}) 241 + if err == nil { 242 + t.Fatal("expected error") 243 + } 244 + if atomic.LoadInt32(&limiter.refunds) != 0 { 245 + t.Errorf("expected 0 refunds, got %d", limiter.refunds) 246 + } 247 + }) 248 + } 249 + 250 + func TestPublishBatch(t *testing.T) { 251 + ctx := context.Background() 252 + did := "did:example:123" 253 + batch := []PlayRecord{{TrackName: "Song 1"}} 254 + 255 + t.Run("Success", func(t *testing.T) { 256 + storage := newMockStorage() 257 + client := &mockATProtoClient{} 258 + err := PublishBatch(ctx, client, did, batch, storage) 259 + if err != nil { 260 + t.Fatal(err) 261 + } 262 + if len(storage.published) != 1 { 263 + t.Errorf("expected 1 published record, got %d", len(storage.published)) 264 + } 265 + }) 266 + 267 + t.Run("ApplyWrites failure", func(t *testing.T) { 268 + storage := newMockStorage() 269 + expectedErr := errors.New("apply failed") 270 + client := &mockATProtoClient{ 271 + applyWritesFunc: func(ctx context.Context, collection string, records []PlayRecord) error { 272 + return expectedErr 273 + }, 274 + } 275 + err := PublishBatch(ctx, client, did, batch, storage) 276 + if !errors.Is(err, expectedErr) { 277 + t.Errorf("expected error %v, got %v", expectedErr, err) 278 + } 279 + if len(storage.published) != 0 { 280 + t.Error("expected 0 published records") 281 + } 282 + }) 283 + 284 + t.Run("Storage failure after ApplyWrites success", func(t *testing.T) { 285 + storage := &failingStorage{} 286 + client := &mockATProtoClient{} 287 + err := PublishBatch(ctx, client, did, batch, storage) 288 + if err == nil || !strings.Contains(err.Error(), "failed to save records") { 289 + t.Errorf("expected storage save error, got %v", err) 290 + } 291 + }) 292 + } 293 + 294 + type failingStorage struct { 295 + mockStorage 296 + } 297 + 298 + func (s *failingStorage) SaveRecords(did string, records map[string][]byte) error { 299 + return errors.New("failed to save records") 300 + } 301 + 302 + func TestPublish_Iterative(t *testing.T) { 303 + ctx := context.Background() 304 + did := "did:example:123" 305 + 306 + rec1, _ := json.Marshal(PlayRecord{TrackName: "Song 1"}) 307 + rec2, _ := json.Marshal(PlayRecord{TrackName: "Song 2"}) 308 + 309 + t.Run("Retry on transient error", func(t *testing.T) { 310 + storage := newMockStorage() 311 + storage.SaveRecords(did, map[string][]byte{"k1": rec1, "k2": rec2}) 312 + 313 + var attempts int32 314 + client := &mockATProtoClient{ 315 + applyWritesFunc: func(ctx context.Context, collection string, records []PlayRecord) error { 316 + if atomic.AddInt32(&attempts, 1) <= 2 { 317 + return &atclient.APIError{StatusCode: 503} 318 + } 319 + return nil 320 + }, 321 + } 322 + 323 + oldBase := BaseRetryDelay 324 + BaseRetryDelay = time.Millisecond 325 + defer func() { BaseRetryDelay = oldBase }() 326 + 327 + res := Publish(ctx, &mockAuthClient{did: did}, PublishOptions{ 328 + BatchSize: 1, 329 + ATProtoClient: client, 330 + Storage: storage, 331 + }) 332 + 333 + if res.SuccessCount != 2 { 334 + t.Errorf("expected 2 successes, got %d", res.SuccessCount) 335 + } 336 + if atomic.LoadInt32(&attempts) < 3 { 337 + t.Errorf("expected at least 3 attempts (2 fails + 1 success), got %d", attempts) 338 + } 339 + }) 340 + 341 + t.Run("Fail fast on non-transient error", func(t *testing.T) { 342 + storage := newMockStorage() 343 + storage.SaveRecords(did, map[string][]byte{"k1": rec1}) 344 + 345 + client := &mockATProtoClient{ 346 + applyWritesFunc: func(ctx context.Context, collection string, records []PlayRecord) error { 347 + return &atclient.APIError{StatusCode: 400} 348 + }, 349 + } 350 + 351 + res := Publish(ctx, &mockAuthClient{did: did}, PublishOptions{ 352 + BatchSize: 1, 353 + ATProtoClient: client, 354 + Storage: storage, 355 + }) 356 + 357 + if res.SuccessCount != 0 { 358 + t.Errorf("expected 0 successes, got %d", res.SuccessCount) 359 + } 360 + if res.ErrorCount != 1 { 361 + t.Errorf("expected 1 error, got %d", res.ErrorCount) 362 + } 363 + }) 364 + }
+2 -1
sync/config.go
··· 13 13 CacheVersion = 1 14 14 SlingshotResolverURL = "https://slingshot.microcosm.blue/xrpc/com.bad-example.identity.resolveMiniDoc" 15 15 MaxRetryDelay = 15 * time.Minute 16 - BaseRetryDelay = 2 * time.Second 17 16 MaxRetries = 1000 18 17 ) 18 + 19 + var BaseRetryDelay = 2 * time.Second 19 20 20 21 type ImportMode string 21 22
+41 -2
sync/import_test.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "fmt" 6 7 "io" 7 8 "io/fs" 8 9 "testing" ··· 36 37 } 37 38 38 39 func (m *mockRepoClient) ApplyWrites(ctx context.Context, collection string, records []sync.PlayRecord) error { 40 + if len(records) > 200 { 41 + return fmt.Errorf("too many records") 42 + } 39 43 m.applied = append(m.applied, records...) 40 44 return nil 41 45 } ··· 52 56 func (m *mockAuthClient) GetAPIClient() *atclient.APIClient { return nil } 53 57 func (m *mockAuthClient) GetDID() string { return m.did } 54 58 59 + type mockKV struct { 60 + data map[string]int 61 + } 62 + 63 + func (m *mockKV) GetMulti(keys []string) (map[string]int, error) { 64 + out := make(map[string]int) 65 + for _, k := range keys { 66 + out[k] = m.data[k] 67 + } 68 + return out, nil 69 + } 70 + 71 + func (m *mockKV) IncrByMulti(counts map[string]int) error { 72 + for k, v := range counts { 73 + m.data[k] += v 74 + } 75 + return nil 76 + } 77 + 78 + func (m *mockKV) Get(key string) (int, error) { 79 + return m.data[key], nil 80 + } 81 + 82 + func (m *mockKV) Set(key string, val int) error { 83 + m.data[key] = val 84 + return nil 85 + } 86 + 87 + func (m *mockKV) IncrBy(key string, n int) (int, error) { 88 + m.data[key] += n 89 + return m.data[key], nil 90 + } 91 + 55 92 func TestImportE2E(t *testing.T) { 56 93 ctx := context.Background() 57 94 did := "did:plc:test" ··· 124 161 } 125 162 126 163 // 7. Publish 127 - limiter := sync.NewRateLimiter(storage) 164 + kv := &mockKV{data: make(map[string]int)} 165 + limiter := sync.NewRateLimiter(kv) 128 166 publishOpts := sync.PublishOptions{ 129 167 BatchSize: 10, 130 168 ATProtoClient: mockRepo, 131 169 Storage: storage, 170 + Limiter: limiter, 132 171 } 133 172 134 173 auth := &mockAuthClient{did: did} 135 - result := sync.Publish(ctx, auth, publishOpts, limiter) 174 + result := sync.Publish(ctx, auth, publishOpts) 136 175 137 176 if result.SuccessCount != 1 { 138 177 t.Errorf("expected 1 successful publish, got %d", result.SuccessCount)
+27 -15
sync/progress.go
··· 118 118 LastLogTime time.Time 119 119 mu sync.Mutex 120 120 121 + limiter RateLimiter 121 122 LogInterval time.Duration 122 123 LogRecordsMetric int 123 124 } 124 125 125 - func NewProgressTracker(total int) *ProgressTracker { 126 + func NewProgressTracker(total int, limiter RateLimiter) *ProgressTracker { 126 127 return &ProgressTracker{ 127 128 Total: total, 128 129 StartTime: time.Now(), 129 130 LastLogTime: time.Now(), 131 + limiter: limiter, 130 132 LogInterval: 30 * time.Second, 131 133 LogRecordsMetric: 1000, 132 134 } ··· 191 193 } 192 194 193 195 type ProgressReport struct { 194 - Total int `json:"total"` 195 - Completed int `json:"completed"` 196 - Percent float64 `json:"percent"` 197 - Errors int `json:"errors"` 198 - Elapsed string `json:"elapsed"` 199 - ETA string `json:"eta,omitempty"` 200 - Rate string `json:"rate"` 196 + Total int `json:"total"` 197 + Completed int `json:"completed"` 198 + Percent float64 `json:"percent"` 199 + Errors int `json:"errors"` 200 + Elapsed string `json:"elapsed"` 201 + ETA string `json:"eta,omitempty"` 202 + Rate string `json:"rate"` 203 + WritesConsumed int `json:"writesConsumed,omitempty"` 204 + GlobalConsumed int `json:"globalConsumed,omitempty"` 201 205 } 202 206 203 207 func (t *ProgressTracker) Report() ProgressReport { ··· 206 210 if eta > 0 { 207 211 etaStr = FormatDuration(eta) 208 212 } 213 + 214 + var w, g int 215 + if t.limiter != nil { 216 + w, g, _ = t.limiter.Stats() 217 + } 218 + 209 219 return ProgressReport{ 210 - Total: t.Total, 211 - Completed: t.Completed, 212 - Percent: percent, 213 - Errors: t.Errors, 214 - Elapsed: elapsed.Round(time.Second).String(), 215 - ETA: etaStr, 216 - Rate: rate, 220 + Total: t.Total, 221 + Completed: t.Completed, 222 + Percent: percent, 223 + Errors: t.Errors, 224 + Elapsed: elapsed.Round(time.Second).String(), 225 + ETA: etaStr, 226 + Rate: rate, 227 + WritesConsumed: w, 228 + GlobalConsumed: g, 217 229 } 218 230 } 219 231
+43 -268
sync/publish.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 - "errors" 7 6 "fmt" 8 7 "log/slog" 9 - "math/rand" 10 8 "time" 11 9 12 - "github.com/bluesky-social/indigo/atproto/atclient" 13 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 + "github.com/failsafe-go/failsafe-go" 12 + "github.com/failsafe-go/failsafe-go/retrypolicy" 14 13 15 14 "tangled.org/karitham.dev/lazuli/cache" 16 15 "tangled.org/karitham.dev/lazuli/sync/logutil" ··· 22 21 ATProtoClient ATProtoClient 23 22 ProgressLog func(ProgressReport) 24 23 Storage cache.Storage 24 + Limiter RateLimiter 25 25 } 26 26 27 - func Publish(ctx context.Context, client AuthClient, opts PublishOptions, limiter RateLimiter) PublishResult { 27 + func Publish(ctx context.Context, client AuthClient, opts PublishOptions) PublishResult { 28 28 startTime := time.Now() 29 29 30 30 batchSize := defaultBatchSize(opts.BatchSize) ··· 59 59 slog.Int("daily_token_limit", GlobalLimitDay), 60 60 slog.String("rate_limit", fmt.Sprintf("1 write per %.1fs", 86400.0/WriteLimitDay))) 61 61 62 - tracker := NewProgressTracker(totalRecords) 62 + tracker := NewProgressTracker(totalRecords, opts.Limiter) 63 63 progressLog := defaultProgressLog(opts.ProgressLog) 64 64 totalSuccess := 0 65 65 totalErrors := 0 ··· 86 86 return nil 87 87 } 88 88 89 - if err := limiter.AllowBulkWrite(ctx, len(batch)); err != nil { 90 - slog.Error("rate limit wait failed", logutil.Error(err)) 91 - return err 92 - } 93 - 94 - var lastResult BatchResult 95 89 did := client.GetDID() 96 - attempt := 0 97 - for { 98 - w, g := limiter.Stats() 99 - lastResult = PublishBatch(ctx, atprotoClient, did, batch, w, g, opts.Storage) 100 - 101 - if lastResult.ErrorCount == 0 { 102 - break 103 - } 104 - 105 - limiter.RefundBulkWrite(len(batch)) 106 - 107 - attempt++ 108 - var apiErr *atclient.APIError 109 - is500 := errors.As(lastResult.LastError, &apiErr) && apiErr.StatusCode >= 500 90 + retryPolicy := retrypolicy.NewBuilder[any](). 91 + WithMaxRetries(10). 92 + WithBackoff(BaseRetryDelay, 5*time.Minute). 93 + HandleIf(func(_ any, err error) bool { 94 + return isTransientError(err) 95 + }). 96 + OnRetryScheduled(func(e failsafe.ExecutionScheduledEvent[any]) { 97 + slog.Warn("batch failed with transient error, retrying", 98 + slog.Int("count", len(batch)), 99 + slog.Duration("retryDelay", e.Delay), 100 + logutil.Error(e.LastError()), 101 + slog.Int("attempt", e.Attempts())) 102 + }). 103 + Build() 110 104 111 - if is500 || attempt >= MaxRetries { 112 - if is500 { 113 - first := batch[0] 114 - last := batch[len(batch)-1] 115 - slog.Error("batch failed with 500 error, marking as failed and moving on", 116 - logutil.Error(lastResult.LastError), 117 - slog.Int("count", len(batch)), 118 - slog.Group("range", 119 - slog.Attr(logutil.Track(first.TrackName, first.ArtistName(), first.PlayedTime.Time)), 120 - slog.Attr(logutil.Track(last.TrackName, last.ArtistName(), last.PlayedTime.Time)))) 121 - } else { 122 - slog.Error("batch failed after max retries", slog.Int("errorCount", lastResult.ErrorCount)) 123 - } 105 + err := failsafe.With[any](retryPolicy).WithContext(ctx).Run(func() error { 106 + return PublishBatch(ctx, atprotoClient, did, batch, opts.Storage) 107 + }) 108 + if err != nil { 109 + slog.Error("batch failed after retries", 110 + logutil.Error(err), 111 + slog.Int("count", len(batch))) 124 112 125 - if opts.Storage != nil { 126 - errMsg := lastResult.LastError.Error() 127 - if err := opts.Storage.MarkFailed(did, batchKeys, errMsg); err != nil { 128 - slog.Error("failed to mark records as failed", logutil.Error(err)) 129 - } 113 + if opts.Storage != nil { 114 + if markErr := opts.Storage.MarkFailed(did, batchKeys, err.Error()); markErr != nil { 115 + slog.Error("failed to mark records as failed", logutil.Error(markErr)) 130 116 } 131 - break 132 117 } 133 118 134 - delay := backoff(attempt) 135 - slog.Warn("batch failed, retrying with backoff", 136 - slog.Int("errorCount", lastResult.ErrorCount), 137 - slog.Duration("retryDelay", delay), 138 - logutil.Error(lastResult.LastError), 139 - slog.Int("attempt", attempt)) 140 - 141 - if !waitForRetry(ctx, delay) { 142 - return ctx.Err() 143 - } 119 + totalErrors += len(batch) 120 + tracker.IncrementErrors(len(batch)) 144 121 145 - if err := limiter.AllowBulkWrite(ctx, len(batch)); err != nil { 146 - return err 147 - } 122 + batch = batch[:0] 123 + batchKeys = batchKeys[:0] 124 + return nil // Return nil so we continue with the next batch 148 125 } 149 126 150 - totalSuccess += lastResult.SuccessCount 151 - totalErrors += lastResult.ErrorCount 152 - tracker.Increment(lastResult.SuccessCount) 153 - tracker.IncrementErrors(lastResult.ErrorCount) 154 - 155 - if lastResult.SuccessCount > 0 && opts.Storage != nil { 156 - keys := CreateRecordKeys(batch[:lastResult.SuccessCount]) 157 - if err := opts.Storage.MarkPublished(did, keys...); err != nil { 158 - slog.Error("failed to mark records as published", logutil.Error(err)) 159 - } 160 - } 127 + totalSuccess += len(batch) 128 + tracker.Increment(len(batch)) 161 129 162 130 if tracker.ShouldLog() { 163 131 progressLog(tracker.Report()) ··· 177 145 178 146 var record PlayRecord 179 147 if err := json.Unmarshal(rec, &record); err != nil { 180 - return nil // skip malformed 148 + slog.Error("malformed record in storage", slog.String("key", key), logutil.Error(err)) 149 + if opts.Storage != nil { 150 + _ = opts.Storage.MarkFailed(client.GetDID(), []string{key}, "malformed record") 151 + } 152 + totalErrors++ 153 + tracker.IncrementErrors(1) 154 + return nil 181 155 } 182 156 183 157 batch = append(batch, record) ··· 204 178 logResult(totalSuccess, totalErrors, startTime) 205 179 return newPublishResult(totalSuccess, totalErrors, totalRecords, startTime, cancelled) 206 180 } 207 - 208 - // waitForRetry waits for the specified duration, returning false if context is cancelled. 209 - func waitForRetry(ctx context.Context, delay time.Duration) bool { 210 - select { 211 - case <-time.After(delay): 212 - return true 213 - case <-ctx.Done(): 214 - slog.Debug("retry cancelled due to context done") 215 - return false 216 - } 217 - } 218 - 219 - func defaultProgressLog(f func(ProgressReport)) func(ProgressReport) { 220 - if f != nil { 221 - return f 222 - } 223 - return func(pr ProgressReport) { 224 - slog.Info("sync progress", 225 - slog.Int("completed", pr.Completed), 226 - slog.Int("total", pr.Total), 227 - slog.Float64("percent", pr.Percent), 228 - slog.String("elapsed", pr.Elapsed), 229 - slog.String("eta", pr.ETA), 230 - slog.String("rate", pr.Rate), 231 - slog.Int("errors", pr.Errors), 232 - ) 233 - } 234 - } 235 - 236 - func defaultBatchSize(size int) int { 237 - if size > 0 { 238 - return size 239 - } 240 - return DefaultBatchSize 241 - } 242 - 243 - func buildClient(client AuthClient, customClient ATProtoClient) (ATProtoClient, error) { 244 - if customClient != nil { 245 - return customClient, nil 246 - } 247 - 248 - apiClient := client.GetAPIClient() 249 - if apiClient == nil { 250 - slog.Error("failed to get API client", logutil.Error(fmt.Errorf("client is nil"))) 251 - return nil, fmt.Errorf("API client is nil") 252 - } 253 - 254 - return &atprotoClientAdapter{client: apiClient, did: client.GetDID()}, nil 255 - } 256 - 257 - func newPublishResult(success, errors, total int, start time.Time, cancelled bool) PublishResult { 258 - return PublishResult{ 259 - SuccessCount: success, 260 - ErrorCount: errors, 261 - Cancelled: cancelled, 262 - Duration: time.Since(start), 263 - TotalRecords: total, 264 - RecordsPerMinute: ratePerMinute(success, time.Since(start)), 265 - } 266 - } 267 - 268 - func logResult(success, errors int, startTime time.Time) { 269 - if errors > 0 { 270 - slog.Warn("import completed with errors", 271 - slog.Int("success", success), 272 - slog.Int("errors", errors)) 273 - } 274 - slog.Info("import completed", 275 - slog.Int("success", success), 276 - slog.Int("errors", errors), 277 - slog.Duration("duration", time.Since(startTime)), 278 - slog.String("rate", formatRate(ratePerMinute(success, time.Since(startTime))))) 279 - } 280 - 281 - func backoff(attempt int) time.Duration { 282 - if attempt <= 0 { 283 - return BaseRetryDelay 284 - } 285 - 286 - // Calculate exponential delay: BaseRetryDelay * 2^(attempt-1) 287 - // We use uint(attempt-1) because 1<<0 is 1 (for first retry) 288 - exp := min(attempt-1, 31) 289 - 290 - delay := BaseRetryDelay * time.Duration(1<<uint(exp)) 291 - 292 - // Cap the delay before adding jitter 293 - if delay > MaxRetryDelay || delay <= 0 { 294 - delay = MaxRetryDelay 295 - } 296 - 297 - // Add up to 25% jitter 298 - var jitter time.Duration 299 - if delay > 4 { 300 - jitter = time.Duration(rand.Int63n(int64(delay / 4))) 301 - } 302 - 303 - return delay + jitter 304 - } 305 - 306 - type BatchResult struct { 307 - SuccessCount int 308 - ErrorCount int 309 - FailedRecords []PlayRecord 310 - LastError error 311 - } 312 - 313 - func PublishBatch(ctx context.Context, client ATProtoClient, did string, batch []PlayRecord, consumedW, consumedG int, storage cache.Storage) BatchResult { 314 - if len(batch) == 0 { 315 - return BatchResult{} 316 - } 317 - 318 - atprotoRecords := prepareRecords(batch) 319 - 320 - err := client.ApplyWrites(ctx, RecordType, atprotoRecords) 321 - if err != nil { 322 - slog.Error("batch publish failed", logutil.Error(err)) 323 - return BatchResult{ErrorCount: len(atprotoRecords), FailedRecords: atprotoRecords, LastError: err} 324 - } 325 - 326 - logBatch(atprotoRecords, consumedW, consumedG) 327 - 328 - if storage != nil && did != "" { 329 - keys := CreateRecordKeys(atprotoRecords) 330 - cacheEntries := make(map[string][]byte) 331 - for i, rec := range atprotoRecords { 332 - key := keys[i] 333 - value, _ := json.Marshal(rec) 334 - cacheEntries[key] = value 335 - } 336 - if err := storage.SaveRecords(did, cacheEntries); err != nil { 337 - slog.Debug("failed to add records to cache", logutil.Error(err)) 338 - } 339 - } 340 - 341 - return BatchResult{SuccessCount: len(atprotoRecords)} 342 - } 343 - 344 - func prepareRecords(batch []PlayRecord) []PlayRecord { 345 - atprotoRecords := make([]PlayRecord, 0, len(batch)) 346 - for _, record := range batch { 347 - record.Type = RecordType 348 - record.SubmissionClientAgent = ClientAgent 349 - atprotoRecords = append(atprotoRecords, record) 350 - } 351 - return atprotoRecords 352 - } 353 - 354 - func logBatch(atprotoRecords []PlayRecord, consumedW, consumedG int) { 355 - first := atprotoRecords[0] 356 - last := atprotoRecords[len(atprotoRecords)-1] 357 - slog.Debug("batch published", 358 - slog.Int("records", len(atprotoRecords)), 359 - slog.Int("writes_consumed", consumedW), 360 - slog.Int("writes_limit", WriteLimitDay), 361 - slog.Int("writes_remaining", WriteLimitDay-consumedW), 362 - slog.Int("global_consumed", consumedG), 363 - slog.Int("global_limit", GlobalLimitDay), 364 - slog.Int("global_remaining", GlobalLimitDay-consumedG), 365 - logutil.Track(first.TrackName, first.ArtistName(), first.PlayedTime.Time), 366 - logutil.Track(last.TrackName, last.ArtistName(), last.PlayedTime.Time)) 367 - } 368 - 369 - // removed getArtistName from here to favor the one in record.go 370 - 371 - func ratePerMinute(count int, duration time.Duration) float64 { 372 - if duration == 0 { 373 - return 0 374 - } 375 - return float64(count) / duration.Minutes() 376 - } 377 - 378 - type ATProtoClient interface { 379 - ApplyWrites(ctx context.Context, collection string, records []PlayRecord) error 380 - } 381 - 382 - type AuthClient interface { 383 - GetAPIClient() *atclient.APIClient 384 - GetDID() string 385 - } 386 - 387 - type atprotoClientAdapter struct { 388 - client *atclient.APIClient 389 - did string 390 - } 391 - 392 - func (a *atprotoClientAdapter) ApplyWrites(ctx context.Context, collection string, records []PlayRecord) error { 393 - writes, err := prepareWrites(records, collection) 394 - if err != nil { 395 - return err 396 - } 397 - if writes == nil { 398 - return nil 399 - } 400 - 401 - return a.client.Post(ctx, syntax.NSID("com.atproto.repo.applyWrites"), map[string]any{ 402 - "repo": a.did, 403 - "writes": writes, 404 - }, nil) 405 - }
+210 -86
sync/rate.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "crypto/rand" 6 + "encoding/binary" 5 7 "fmt" 6 8 "log/slog" 9 + "math" 7 10 "sync" 8 11 "time" 12 + 13 + "tangled.org/karitham.dev/lazuli/sync/logutil" 9 14 ) 10 15 11 16 const ( 12 17 // Limits 13 - WriteLimitDay = 9000 14 - GlobalLimitDay = 35000 18 + WriteLimitMinute = 100 19 + WriteLimitHour = 1000 20 + WriteLimitDay = 9000 21 + 22 + GlobalLimitMinute = 300 23 + GlobalLimitHour = 3000 24 + GlobalLimitDay = 35000 15 25 16 26 // Costs 17 27 ReadGlobalCost = 1 18 28 WriteOnlyCost = 1 19 29 WriteGlobalCost = 3 20 - 21 - secondsPerDay = 86400 22 30 ) 23 31 24 32 type RateLimiter interface { 25 - // AllowBulkWrite blocks or returns error until N writes are permissible 26 - AllowBulkWrite(ctx context.Context, n int) error 27 - // AllowRead blocks or returns error until a read is permissible 28 - AllowRead(ctx context.Context) error 29 - // RefundBulkWrite restores N writes to the quota (e.g. after a failed write) 30 - RefundBulkWrite(n int) 33 + // AllowBulkWrite blocks or returns error until N writes are permissible. 34 + // Returns the timestamp of when the quota was charged for bucket-accurate refunds. 35 + AllowBulkWrite(ctx context.Context, n int) (time.Time, error) 36 + // AllowRead blocks or returns error until a read is permissible. 37 + AllowRead(ctx context.Context) (time.Time, error) 38 + // RefundBulkWrite restores N writes to the quota using the original charge time. 39 + RefundBulkWrite(ctx context.Context, n int, chargedAt time.Time) 40 + // RefundRead restores a read to the quota using the original charge time. 41 + RefundRead(ctx context.Context, chargedAt time.Time) 31 42 // Stats returns current consumption (writes, global) 32 - Stats() (int, int) 43 + Stats() (int, int, error) 33 44 } 34 45 35 46 type KVStore interface { 36 - Get(key string) (int, error) 37 - Set(key string, val int) error 47 + GetMulti(keys []string) (map[string]int, error) 48 + IncrByMulti(counts map[string]int) error 38 49 } 39 50 40 51 type Clock interface { ··· 46 57 func (realClock) Now() time.Time { return time.Now().UTC() } 47 58 48 59 type quotaLimiter struct { 49 - mu sync.Mutex 50 60 kv KVStore 51 61 prefix string 52 62 clock Clock 63 + mu sync.Mutex 53 64 } 54 65 55 - func (l *quotaLimiter) Stats() (int, int) { 56 - wKey, gKey := l.getKeys() 57 - w, _ := l.kv.Get(wKey) 58 - g, _ := l.kv.Get(gKey) 59 - return w, g 66 + func (l *quotaLimiter) Stats() (int, int, error) { 67 + wd, gd, _, _, _, _ := l.getKeys(l.clock.Now()) 68 + vals, err := l.kv.GetMulti([]string{wd, gd}) 69 + if err != nil { 70 + return 0, 0, fmt.Errorf("failed to get stats: %w", err) 71 + } 72 + return vals[wd], vals[gd], nil 60 73 } 61 74 62 75 func NewRateLimiter(kv KVStore) RateLimiter { ··· 67 80 } 68 81 } 69 82 70 - func (l *quotaLimiter) getKeys() (string, string) { 71 - day := l.clock.Now().Format("2006-01-02") 72 - return fmt.Sprintf("%s:writes:%s", l.prefix, day), fmt.Sprintf("%s:global:%s", l.prefix, day) 83 + func (l *quotaLimiter) getKeys(t time.Time) (string, string, string, string, string, string) { 84 + day := t.Format("2006-01-02") 85 + hour := t.Format("2006-01-02-15") 86 + minute := t.Format("2006-01-02-15-04") 87 + return fmt.Sprintf("%s:writes:d:%s", l.prefix, day), fmt.Sprintf("%s:global:d:%s", l.prefix, day), 88 + fmt.Sprintf("%s:writes:h:%s", l.prefix, hour), fmt.Sprintf("%s:global:h:%s", l.prefix, hour), 89 + fmt.Sprintf("%s:writes:m:%s", l.prefix, minute), fmt.Sprintf("%s:global:m:%s", l.prefix, minute) 73 90 } 74 91 75 - func (l *quotaLimiter) AllowBulkWrite(ctx context.Context, n int) error { 76 - wKey, gKey := l.getKeys() 92 + func (l *quotaLimiter) AllowBulkWrite(ctx context.Context, n int) (time.Time, error) { 77 93 wCost := n * WriteOnlyCost 78 94 gCost := n * WriteGlobalCost 79 95 80 - return l.wait(ctx, wKey, gKey, wCost, gCost, WriteLimitDay, GlobalLimitDay) 81 - } 96 + for { 97 + now := l.clock.Now() 98 + wKeys, gKeys := l.getAllKeys(now) 82 99 83 - func (l *quotaLimiter) AllowRead(ctx context.Context) error { 84 - _, gKey := l.getKeys() 85 - return l.wait(ctx, "", gKey, 0, ReadGlobalCost, 0, GlobalLimitDay) 86 - } 100 + l.mu.Lock() 101 + maxWait, err := l.checkQuota(now, wKeys, gKeys, wCost, gCost) 102 + if err != nil { 103 + l.mu.Unlock() 104 + return now, err 105 + } 87 106 88 - func (l *quotaLimiter) RefundBulkWrite(n int) { 89 - l.mu.Lock() 90 - defer l.mu.Unlock() 107 + if maxWait > 0 { 108 + l.mu.Unlock() 109 + slog.Info("Rate limit reached, sleeping until next window", slog.Duration("wait", maxWait.Round(time.Second))) 110 + // Add a tiny bit of buffer + jitter to ensure we are definitely in the next window 111 + wait := maxWait + 100*time.Millisecond + addJitter(100*time.Millisecond) 112 + if err := l.sleep(ctx, wait); err != nil { 113 + return now, err 114 + } 115 + continue 116 + } 91 117 92 - wKey, gKey := l.getKeys() 93 - wCost := n * WriteOnlyCost 94 - gCost := n * WriteGlobalCost 118 + // Charge quota while holding the lock 119 + err = l.charge(wKeys, gKeys, wCost, gCost) 120 + l.mu.Unlock() 95 121 96 - if currW, err := l.kv.Get(wKey); err == nil { 97 - l.kv.Set(wKey, max(0, currW-wCost)) 98 - } 99 - if currG, err := l.kv.Get(gKey); err == nil { 100 - l.kv.Set(gKey, max(0, currG-gCost)) 122 + if err != nil { 123 + return now, err 124 + } 125 + return now, nil 101 126 } 102 127 } 103 128 104 - func (l *quotaLimiter) wait(ctx context.Context, wKey, gKey string, wCost, gCost, wLimit, gLimit int) error { 105 - l.mu.Lock() 106 - defer l.mu.Unlock() 129 + func (l *quotaLimiter) AllowRead(ctx context.Context) (time.Time, error) { 130 + gCost := ReadGlobalCost 107 131 108 132 for { 109 - elapsed := l.getElapsedSinceMidnight() 110 - currW, currG := l.getCurrentConsumption(wKey, gKey) 133 + now := l.clock.Now() 134 + _, gKeys := l.getAllKeys(now) 111 135 112 - waitW := l.computeTargetWait(currW, wCost, wLimit, elapsed) 113 - waitG := l.computeTargetWait(currG, gCost, gLimit, elapsed) 114 - maxWait := max(waitG, waitW) 115 - 116 - if maxWait <= 0 { 117 - l.updateConsumption(wKey, gKey, wCost, gCost, currW, currG) 118 - return nil 136 + l.mu.Lock() 137 + maxWait, err := l.checkQuota(now, nil, gKeys, 0, gCost) 138 + if err != nil { 139 + l.mu.Unlock() 140 + return now, err 119 141 } 120 142 121 - if maxWait > 1*time.Minute { 122 - slog.Info("Rate limit reached, sleeping", slog.Duration("wait", maxWait.Round(time.Second))) 143 + if maxWait > 0 { 144 + l.mu.Unlock() 145 + slog.Info("Rate limit reached, sleeping until next window", slog.Duration("wait", maxWait.Round(time.Second))) 146 + wait := maxWait + 100*time.Millisecond + addJitter(100*time.Millisecond) 147 + if err := l.sleep(ctx, wait); err != nil { 148 + return now, err 149 + } 150 + continue 123 151 } 124 152 153 + // Charge quota while holding the lock 154 + err = l.charge(nil, gKeys, 0, gCost) 125 155 l.mu.Unlock() 126 - err := l.sleep(ctx, maxWait) 127 - l.mu.Lock() 156 + 128 157 if err != nil { 129 - return err 158 + return now, err 130 159 } 160 + return now, nil 131 161 } 132 162 } 133 163 134 - func (l *quotaLimiter) getElapsedSinceMidnight() float64 { 135 - now := l.clock.Now() 136 - midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) 137 - return now.Sub(midnight).Seconds() 164 + func (l *quotaLimiter) getAllKeys(t time.Time) ([]string, []string) { 165 + wd, gd, wh, gh, wm, gm := l.getKeys(t) 166 + return []string{wm, wh, wd}, []string{gm, gh, gd} 138 167 } 139 168 140 - func (l *quotaLimiter) getCurrentConsumption(wKey, gKey string) (int, int) { 141 - currW := 0 142 - if wKey != "" { 143 - currW, _ = l.kv.Get(wKey) 169 + func (l *quotaLimiter) RefundBulkWrite(ctx context.Context, n int, chargedAt time.Time) { 170 + wKeys, gKeys := l.getAllKeys(chargedAt) 171 + wCost := n * WriteOnlyCost 172 + gCost := n * WriteGlobalCost 173 + 174 + l.mu.Lock() 175 + defer l.mu.Unlock() 176 + 177 + updates := make(map[string]int, len(wKeys)+len(gKeys)) 178 + for _, k := range wKeys { 179 + updates[k] = -wCost 180 + } 181 + for _, k := range gKeys { 182 + updates[k] = -gCost 144 183 } 145 - currG, _ := l.kv.Get(gKey) 146 - return currW, currG 184 + 185 + if err := l.kv.IncrByMulti(updates); err != nil { 186 + slog.Error("failed to refund write quota", logutil.Error(err)) 187 + } 147 188 } 148 189 149 - func (l *quotaLimiter) computeTargetWait(curr, cost, limit int, elapsed float64) time.Duration { 150 - if limit <= 0 { 151 - return 0 190 + func (l *quotaLimiter) RefundRead(ctx context.Context, chargedAt time.Time) { 191 + _, gKeys := l.getAllKeys(chargedAt) 192 + gCost := ReadGlobalCost 193 + 194 + l.mu.Lock() 195 + defer l.mu.Unlock() 196 + 197 + updates := make(map[string]int, len(gKeys)) 198 + for _, k := range gKeys { 199 + updates[k] = -gCost 152 200 } 153 - const maxCreditSeconds = 60.0 154 - target := float64(curr+cost) * secondsPerDay / float64(limit) 201 + 202 + if err := l.kv.IncrByMulti(updates); err != nil { 203 + slog.Error("failed to refund global quota (read)", logutil.Error(err)) 204 + } 205 + } 206 + 207 + // checkQuota checks if the proposed cost fits within the limits. 208 + // Returns the wait duration if over limit (0 if OK), or error. 209 + // Must be called with lock held. 210 + func (l *quotaLimiter) checkQuota(now time.Time, wKeys, gKeys []string, wCost, gCost int) (time.Duration, error) { 211 + wLimits := []int{WriteLimitMinute, WriteLimitHour, WriteLimitDay} 212 + gLimits := []int{GlobalLimitMinute, GlobalLimitHour, GlobalLimitDay} 213 + 214 + allKeys := make([]string, 0, len(wKeys)+len(gKeys)) 215 + allKeys = append(allKeys, wKeys...) 216 + allKeys = append(allKeys, gKeys...) 155 217 156 - // If the target time (when this consumption is 'earned') is within the 157 - // burst window (now + maxCreditSeconds), we don't need to wait. 158 - if target <= elapsed+maxCreditSeconds { 159 - return 0 218 + if len(allKeys) == 0 { 219 + return 0, nil 160 220 } 161 221 162 - // Otherwise, we wait until we are at the edge of the burst window. 163 - // We cap the wait at maxCreditSeconds to avoid overly long sleeps in a single loop, 164 - // allowing for periodic re-checks of the clock and KV store. 165 - wait := target - (elapsed + maxCreditSeconds) 166 - return time.Duration(min(wait, maxCreditSeconds) * float64(time.Second)) 222 + values, err := l.kv.GetMulti(allKeys) 223 + if err != nil { 224 + return 0, fmt.Errorf("failed to check quota: %w", err) 225 + } 226 + 227 + maxWait := time.Duration(0) 228 + 229 + for i, k := range wKeys { 230 + curr := values[k] 231 + if curr+wCost > wLimits[i] { 232 + wait := l.untilNextWindow(now, i) 233 + if wait > maxWait { 234 + maxWait = wait 235 + } 236 + } 237 + } 238 + 239 + for i, k := range gKeys { 240 + curr := values[k] 241 + if curr+gCost > gLimits[i] { 242 + wait := l.untilNextWindow(now, i) 243 + if wait > maxWait { 244 + maxWait = wait 245 + } 246 + } 247 + } 248 + return maxWait, nil 167 249 } 168 250 169 - func (l *quotaLimiter) updateConsumption(wKey, gKey string, wCost, gCost, currW, currG int) { 170 - if wKey != "" { 171 - l.kv.Set(wKey, currW+wCost) 251 + // charge applies the cost to the keys. 252 + // Must be called with lock held. 253 + func (l *quotaLimiter) charge(wKeys, gKeys []string, wCost, gCost int) error { 254 + updates := make(map[string]int, len(wKeys)+len(gKeys)) 255 + for _, k := range wKeys { 256 + updates[k] = wCost 172 257 } 173 - l.kv.Set(gKey, currG+gCost) 258 + for _, k := range gKeys { 259 + updates[k] = gCost 260 + } 261 + 262 + if len(updates) == 0 { 263 + return nil 264 + } 265 + 266 + if err := l.kv.IncrByMulti(updates); err != nil { 267 + return fmt.Errorf("failed to charge quota: %w", err) 268 + } 269 + return nil 270 + } 271 + 272 + func (l *quotaLimiter) untilNextWindow(now time.Time, tier int) time.Duration { 273 + switch tier { 274 + case 0: // Minute 275 + return now.Truncate(time.Minute).Add(time.Minute).Sub(now) 276 + case 1: // Hour 277 + return now.Truncate(time.Hour).Add(time.Hour).Sub(now) 278 + case 2: // Day 279 + return now.Truncate(24 * time.Hour).Add(24 * time.Hour).Sub(now) 280 + default: 281 + // Safety fallback: if unknown tier, wait a reasonable amount (e.g. 1 minute) 282 + // to prevent busy loops or bypassing limits. 283 + slog.Warn("unknown rate limit tier encountered", slog.Int("tier", tier)) 284 + return time.Minute 285 + } 286 + } 287 + 288 + func addJitter(d time.Duration) time.Duration { 289 + var b [8]byte 290 + _, _ = rand.Read(b[:]) 291 + n := binary.LittleEndian.Uint64(b[:]) 292 + // Add 0-20% jitter 293 + jitter := float64(d) * 0.2 * (float64(n) / math.MaxUint64) 294 + return d + time.Duration(jitter) 174 295 } 175 296 176 297 func (l *quotaLimiter) sleep(ctx context.Context, d time.Duration) error { 298 + if d <= 0 { 299 + return nil 300 + } 177 301 timer := time.NewTimer(d) 178 302 defer timer.Stop() 179 303 select {
+136 -24
sync/rate_test.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/json" 6 + "strings" 5 7 "testing" 6 8 "time" 9 + 10 + "github.com/bluesky-social/indigo/atproto/atclient" 7 11 ) 8 12 9 13 type mockKV struct { 10 14 data map[string]int 11 15 } 12 16 17 + func (m *mockKV) GetMulti(keys []string) (map[string]int, error) { 18 + out := make(map[string]int) 19 + for _, k := range keys { 20 + out[k] = m.data[k] 21 + } 22 + return out, nil 23 + } 24 + 25 + func (m *mockKV) IncrByMulti(counts map[string]int) error { 26 + for k, v := range counts { 27 + m.data[k] += v 28 + } 29 + return nil 30 + } 31 + 32 + // Helper for tests that inspect internal state directly 13 33 func (m *mockKV) Get(key string) (int, error) { 14 34 return m.data[key], nil 15 35 } 16 36 17 - func (m *mockKV) Set(key string, val int) error { 18 - m.data[key] = val 19 - return nil 37 + func TestRateLimiter_Refunds(t *testing.T) { 38 + kv := &mockKV{data: make(map[string]int)} 39 + clock := &mockClock{now: time.Date(2026, 1, 22, 12, 0, 0, 0, time.UTC)} 40 + limiter := &quotaLimiter{ 41 + kv: kv, 42 + prefix: "quota", 43 + clock: clock, 44 + } 45 + ctx := context.Background() 46 + 47 + // Test Bulk Write Refund 48 + chargedAt, _ := limiter.AllowBulkWrite(ctx, 10) 49 + limiter.RefundBulkWrite(ctx, 10, chargedAt) 50 + 51 + w, g, _ := limiter.Stats() 52 + if w != 0 || g != 0 { 53 + t.Errorf("BulkWrite refund failed: w=%d, g=%d", w, g) 54 + } 55 + 56 + // Test Read Refund 57 + chargedAt, _ = limiter.AllowRead(ctx) 58 + limiter.RefundRead(ctx, chargedAt) 59 + 60 + _, g, _ = limiter.Stats() 61 + if g != 0 { 62 + t.Errorf("Read refund failed: g=%d", g) 63 + } 20 64 } 21 65 22 66 func TestRateLimiter_Weighting(t *testing.T) { ··· 25 69 ctx := context.Background() 26 70 27 71 // 1 Read = 1 Global 28 - err := limiter.AllowRead(ctx) 72 + _, err := limiter.AllowRead(ctx) 29 73 if err != nil { 30 74 t.Fatal(err) 31 75 } 32 - _, g := limiter.Stats() 76 + _, g, err := limiter.Stats() 77 + if err != nil { 78 + t.Fatal(err) 79 + } 33 80 if g != 1 { 34 81 t.Errorf("expected 1 global unit, got %d", g) 35 82 } 36 83 37 84 // 1 Write = 1 Write-Only + 3 Global 38 - err = limiter.AllowBulkWrite(ctx, 1) 85 + _, err = limiter.AllowBulkWrite(ctx, 1) 39 86 if err != nil { 40 87 t.Fatal(err) 41 88 } 42 - w, g := limiter.Stats() 89 + w, g, err := limiter.Stats() 90 + if err != nil { 91 + t.Fatal(err) 92 + } 43 93 if w != 1 { 44 94 t.Errorf("expected 1 write unit, got %d", w) 45 95 } ··· 48 98 } 49 99 50 100 // Bulk Write (10 elements) = 10 Write-Only + 30 Global 51 - err = limiter.AllowBulkWrite(ctx, 10) 101 + _, err = limiter.AllowBulkWrite(ctx, 10) 52 102 if err != nil { 53 103 t.Fatal(err) 54 104 } 55 - w, g = limiter.Stats() 105 + w, g, err = limiter.Stats() 106 + if err != nil { 107 + t.Fatal(err) 108 + } 56 109 if w != 11 { 57 110 t.Errorf("expected 11 write units, got %d", w) 58 111 } ··· 63 116 64 117 func TestRateLimiter_Smoothing(t *testing.T) { 65 118 kv := &mockKV{data: make(map[string]int)} 119 + // Ensure we are at the very beginning of the minute to avoid window edge issues in test 66 120 clock := &mockClock{now: time.Date(2026, 1, 22, 1, 0, 0, 0, time.UTC)} 67 121 limiter := &quotaLimiter{ 68 122 kv: kv, ··· 70 124 clock: clock, 71 125 } 72 126 73 - // 1 hour since midnight = 3600s 74 - // allowance = (9000/86400) * 3600 = 375 75 - // burst allowance = (9000/86400) * (3600 + 60) = 381.25 127 + wd, gd, wh, gh, wm, gm := limiter.getKeys(clock.now) 128 + kv.data[wm] = WriteLimitMinute 129 + kv.data[wh] = 0 130 + kv.data[wd] = 0 131 + kv.data[gm] = 0 132 + kv.data[gh] = 0 133 + kv.data[gd] = 0 76 134 77 - _ = kv.Set("quota:writes:2026-01-22", 400) // Well over the limit + burst 135 + // Use a context that is already cancelled to simulate what happens 136 + // when we hit the rate limit and can't proceed. 137 + // Ensure the store thinks we are over the limit 138 + kv.data[wm] = WriteLimitMinute + 1 78 139 79 - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) 80 - defer cancel() 140 + ctx, cancel := context.WithCancel(context.Background()) 141 + cancel() 81 142 82 - err := limiter.AllowBulkWrite(ctx, 1) 83 - if err != context.DeadlineExceeded { 84 - t.Errorf("expected DeadlineExceeded, got %v", err) 143 + _, err := limiter.AllowBulkWrite(ctx, 1) 144 + if err == nil { 145 + t.Errorf("expected error, got nil") 146 + } 147 + } 148 + 149 + func TestRetryExhaustionMarkFailed(t *testing.T) { 150 + ctx := context.Background() 151 + did := "did:example:123" 152 + storage := newMockStorage() 153 + rec1, _ := json.Marshal(PlayRecord{TrackName: "Song 1"}) 154 + storage.SaveRecords(did, map[string][]byte{"k1": rec1}) 155 + 156 + client := &mockATProtoClient{ 157 + applyWritesFunc: func(ctx context.Context, collection string, records []PlayRecord) error { 158 + return &atclient.APIError{StatusCode: 503} 159 + }, 160 + } 161 + 162 + oldBase := BaseRetryDelay 163 + BaseRetryDelay = time.Nanosecond 164 + defer func() { BaseRetryDelay = oldBase }() 165 + 166 + res := Publish(ctx, &mockAuthClient{did: did}, PublishOptions{ 167 + BatchSize: 1, 168 + ATProtoClient: client, 169 + Storage: storage, 170 + }) 171 + 172 + if res.SuccessCount != 0 { 173 + t.Errorf("expected 0 successes, got %d", res.SuccessCount) 174 + } 175 + if res.ErrorCount != 1 { 176 + t.Errorf("expected 1 error, got %d", res.ErrorCount) 177 + } 178 + 179 + storage.mu.Lock() 180 + errStr, ok := storage.failed["k1"] 181 + storage.mu.Unlock() 182 + 183 + if !ok { 184 + t.Error("expected record k1 to be marked as failed") 185 + } 186 + if !strings.Contains(errStr, "503") { 187 + t.Errorf("expected error string to contain 503, got %q", errStr) 85 188 } 86 189 } 87 190 ··· 102 205 ctx := context.Background() 103 206 104 207 // 1. Consume some quota on day 1 105 - err := limiter.AllowBulkWrite(ctx, 10) 208 + _, err := limiter.AllowBulkWrite(ctx, 10) 106 209 if err != nil { 107 210 t.Fatal(err) 108 211 } 109 212 110 - w1, g1 := limiter.Stats() 213 + w1, g1, err := limiter.Stats() 214 + if err != nil { 215 + t.Fatal(err) 216 + } 111 217 if w1 != 10 || g1 != 30 { 112 218 t.Errorf("expected w=10, g=30 on day 1, got w=%d, g=%d", w1, g1) 113 219 } ··· 116 222 clock.now = clock.now.Add(2 * time.Second) // 00:00:01 on 2026-01-23 117 223 118 224 // 3. Stats should now reflect day 2 (0) 119 - w2, g2 := limiter.Stats() 225 + w2, g2, err := limiter.Stats() 226 + if err != nil { 227 + t.Fatal(err) 228 + } 120 229 if w2 != 0 || g2 != 0 { 121 230 t.Errorf("expected w=0, g=0 on day 2, got w=%d, g=%d", w2, g2) 122 231 } 123 232 124 233 // 4. Consumption on day 2 should not affect day 1 125 - err = limiter.AllowBulkWrite(ctx, 5) 234 + _, err = limiter.AllowBulkWrite(ctx, 5) 126 235 if err != nil { 127 236 t.Fatal(err) 128 237 } 129 238 130 - w2, g2 = limiter.Stats() 239 + w2, g2, err = limiter.Stats() 240 + if err != nil { 241 + t.Fatal(err) 242 + } 131 243 if w2 != 5 || g2 != 15 { 132 244 t.Errorf("expected w=5, g=15 on day 2, got w=%d, g=%d", w2, g2) 133 245 } 134 246 135 247 // Verify day 1 keys are still there but not accessed by Stats() 136 - day1WKey := "quota:writes:2026-01-22" 248 + day1WKey := "quota:writes:d:2026-01-22" 137 249 if val, _ := kv.Get(day1WKey); val != 10 { 138 250 t.Errorf("expected day 1 write key to still be 10, got %d", val) 139 251 }