like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: rate limit quotas per period

karitham 78a7519d 32899bd0

+45 -44
+1 -1
flake.nix
··· 17 17 let 18 18 lazuli = pkgs.buildGoModule rec { 19 19 name = "lazuli"; 20 - version = "0.1.2"; 20 + version = "0.1.3"; 21 21 src = pkgs.nix-gitignore.gitignoreSource [ "*.csv" "*.zip" "*.json" ] ./.; 22 22 vendorHash = "sha256-MfBPv/L7wHuUGXx4BDd+DFq0RB11KuMHCzPjFv6FMgs="; 23 23 ldflags = [
+11 -11
main.go
··· 160 160 return fmt.Errorf("failed to get database stats: %w", err) 161 161 } 162 162 163 - limiter := sync.NewRateLimiter(a.storage) 163 + limiter := sync.NewRateLimiter(a.storage, 1) 164 164 writes, global, err := limiter.Stats() 165 165 if err != nil { 166 166 return fmt.Errorf("failed to get rate limit stats: %w", err) ··· 247 247 did := authClient.GetDID() 248 248 dryRun := cmd.Bool("dry-run") 249 249 250 - limiter := sync.NewRateLimiter(a.storage) 250 + limiter := sync.NewRateLimiter(a.storage, 0.9) 251 251 repoClient := sync.NewRateClient(authClient.GetAPIClient(), did, limiter) 252 252 253 253 var failedRecords []struct { ··· 383 383 func (a *App) runDebugFetch(ctx context.Context, cmd *cli.Command) error { 384 384 authClient, err := a.prepareAuth(ctx, cmd) 385 385 if err != nil { 386 - return fmt.Errorf("authentication failed: %w\nHint: Make sure your credentials are correct and you have network access.", err) 386 + return fmt.Errorf("authentication failed: %w", err) 387 387 } 388 388 389 389 repoClient := sync.NewRateClient(authClient.GetAPIClient(), authClient.GetDID(), nil) 390 390 391 391 records, _, err := repoClient.ListRecords(ctx, sync.RecordType, 10, "") 392 392 if err != nil { 393 - return fmt.Errorf("failed to fetch records from Bluesky: %w\nHint: Check your network connection and try again.", err) 393 + return fmt.Errorf("failed to fetch records from Bluesky: %w", err) 394 394 } 395 395 396 396 enc := json.NewEncoder(os.Stdout) ··· 435 435 password := cmd.String("password") 436 436 437 437 if handle == "" { 438 - return "", "", fmt.Errorf("Bluesky handle is required (set --handle or set the LAZULI_HANDLE environment variable)") 438 + return "", "", fmt.Errorf("bluesky handle is required (set --handle or set the LAZULI_HANDLE environment variable)") 439 439 } 440 440 if password == "" { 441 441 return "", "", fmt.Errorf("app password is required (set --password or set the LAZULI_PASSWORD environment variable)") ··· 551 551 } 552 552 a.log.Info("Authenticated", logutil.DID(authClient.GetDID()), slog.String("pds", authClient.GetPDS())) 553 553 554 - limiter := sync.NewRateLimiter(a.storage) 554 + limiter := sync.NewRateLimiter(a.storage, 0.9) 555 555 repoClient := sync.NewRateClient(authClient.GetAPIClient(), authClient.GetDID(), limiter) 556 556 557 557 existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.GetDID(), a.storage, fresh) ··· 663 663 fresh := cmd.Bool("fresh") 664 664 a.log.Info("Starting sync operation", logutil.DID(authClient.GetDID()), slog.Bool("fresh", fresh)) 665 665 666 - limiter := sync.NewRateLimiter(a.storage) 666 + limiter := sync.NewRateLimiter(a.storage, 0.85) 667 667 repoClient := sync.NewRateClient(authClient.GetAPIClient(), authClient.GetDID(), limiter) 668 668 669 669 if fresh { ··· 687 687 func (a *App) runDedupe(ctx context.Context, cmd *cli.Command) error { 688 688 authClient, err := a.prepareAuth(ctx, cmd) 689 689 if err != nil { 690 - return fmt.Errorf("authentication failed: %w\nHint: Make sure your credentials are correct.", err) 690 + return fmt.Errorf("authentication failed: %w", err) 691 691 } 692 692 693 693 dryRun := cmd.Bool("dry-run") ··· 698 698 slog.Bool("dry_run", dryRun), 699 699 slog.Bool("fresh", fresh)) 700 700 701 - limiter := sync.NewRateLimiter(a.storage) 701 + limiter := sync.NewRateLimiter(a.storage, 0.9) 702 702 repoClient := sync.NewRateClient(authClient.GetAPIClient(), authClient.GetDID(), limiter) 703 703 704 704 if fresh { ··· 711 711 712 712 existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.GetDID(), a.storage, fresh) 713 713 if err != nil { 714 - return fmt.Errorf("failed to fetch existing records: %w\nHint: Check your network connection and try again.", err) 714 + return fmt.Errorf("failed to fetch existing records: %w", err) 715 715 } 716 716 717 717 duplicates := sync.FindDuplicates(existingRecords) ··· 777 777 }). 778 778 Build() 779 779 780 - err := failsafe.With[any](retryPolicy).WithContext(ctx).Run(func() error { 780 + err := failsafe.With(retryPolicy).WithContext(ctx).Run(func() error { 781 781 return repoClient.DeleteRecord(ctx, sync.RecordType, rkey) 782 782 }) 783 783
+2 -1
sync/import_test.go
··· 10 10 "time" 11 11 12 12 "github.com/bluesky-social/indigo/atproto/atclient" 13 + 13 14 "tangled.org/karitham.dev/lazuli/cache" 14 15 "tangled.org/karitham.dev/lazuli/sync" 15 16 ) ··· 162 163 163 164 // 7. Publish 164 165 kv := &mockKV{data: make(map[string]int)} 165 - limiter := sync.NewRateLimiter(kv) 166 + limiter := sync.NewRateLimiter(kv, 1) 166 167 publishOpts := sync.PublishOptions{ 167 168 BatchSize: 10, 168 169 ATProtoClient: mockRepo,
+18 -21
sync/rate.go
··· 17 17 // Limits 18 18 WriteLimitMinute = 100 19 19 WriteLimitHour = 1000 20 - WriteLimitDay = 9000 20 + WriteLimitDay = 10000 21 21 22 22 GlobalLimitMinute = 300 23 23 GlobalLimitHour = 3000 ··· 57 57 func (realClock) Now() time.Time { return time.Now().UTC() } 58 58 59 59 type quotaLimiter struct { 60 - kv KVStore 61 - prefix string 62 - clock Clock 63 - mu sync.Mutex 60 + mu sync.Mutex 61 + kv KVStore 62 + prefix string 63 + clock Clock 64 + rlQuota float32 64 65 } 65 66 66 67 func (l *quotaLimiter) Stats() (int, int, error) { ··· 72 73 return vals[wd], vals[gd], nil 73 74 } 74 75 75 - func NewRateLimiter(kv KVStore) RateLimiter { 76 + func NewRateLimiter(kv KVStore, maxPercent float32) RateLimiter { 76 77 return &quotaLimiter{ 77 - kv: kv, 78 - prefix: "quota", 79 - clock: realClock{}, 78 + kv: kv, 79 + prefix: "quota", 80 + clock: realClock{}, 81 + rlQuota: maxPercent, 80 82 } 81 83 } 82 84 ··· 228 230 229 231 for i, k := range wKeys { 230 232 curr := values[k] 231 - if curr+wCost > wLimits[i] { 232 - wait := l.untilNextWindow(now, i) 233 - if wait > maxWait { 234 - maxWait = wait 235 - } 233 + if curr+wCost > int(float32(wLimits[i])*l.rlQuota) { 234 + maxWait = max(l.untilNextWindow(now, i), maxWait) 236 235 } 237 236 } 238 237 239 238 for i, k := range gKeys { 240 239 curr := values[k] 241 - if curr+gCost > gLimits[i] { 242 - wait := l.untilNextWindow(now, i) 243 - if wait > maxWait { 244 - maxWait = wait 245 - } 240 + if curr+gCost > int(float32(gLimits[i])*l.rlQuota) { 241 + maxWait = max(l.untilNextWindow(now, i), maxWait) 246 242 } 247 243 } 244 + 248 245 return maxWait, nil 249 246 } 250 247 ··· 289 286 var b [8]byte 290 287 _, _ = rand.Read(b[:]) 291 288 n := binary.LittleEndian.Uint64(b[:]) 292 - // Add 0-20% jitter 293 - jitter := float64(d) * 0.2 * (float64(n) / math.MaxUint64) 289 + // Add 0-10% jitter 290 + jitter := float64(d) * 0.1 * (float64(n) / math.MaxUint64) 294 291 return d + time.Duration(jitter) 295 292 } 296 293
+13 -10
sync/rate_test.go
··· 38 38 kv := &mockKV{data: make(map[string]int)} 39 39 clock := &mockClock{now: time.Date(2026, 1, 22, 12, 0, 0, 0, time.UTC)} 40 40 limiter := &quotaLimiter{ 41 - kv: kv, 42 - prefix: "quota", 43 - clock: clock, 41 + kv: kv, 42 + prefix: "quota", 43 + clock: clock, 44 + rlQuota: 1, 44 45 } 45 46 ctx := context.Background() 46 47 ··· 65 66 66 67 func TestRateLimiter_Weighting(t *testing.T) { 67 68 kv := &mockKV{data: make(map[string]int)} 68 - limiter := NewRateLimiter(kv) 69 + limiter := NewRateLimiter(kv, 1) 69 70 ctx := context.Background() 70 71 71 72 // 1 Read = 1 Global ··· 119 120 // Ensure we are at the very beginning of the minute to avoid window edge issues in test 120 121 clock := &mockClock{now: time.Date(2026, 1, 22, 1, 0, 0, 0, time.UTC)} 121 122 limiter := &quotaLimiter{ 122 - kv: kv, 123 - prefix: "quota", 124 - clock: clock, 123 + kv: kv, 124 + prefix: "quota", 125 + clock: clock, 126 + rlQuota: 1, 125 127 } 126 128 127 129 wd, gd, wh, gh, wm, gm := limiter.getKeys(clock.now) ··· 198 200 kv := &mockKV{data: make(map[string]int)} 199 201 clock := &mockClock{now: time.Date(2026, 1, 22, 23, 59, 59, 0, time.UTC)} 200 202 limiter := &quotaLimiter{ 201 - kv: kv, 202 - prefix: "quota", 203 - clock: clock, 203 + kv: kv, 204 + prefix: "quota", 205 + clock: clock, 206 + rlQuota: 1, 204 207 } 205 208 ctx := context.Background() 206 209