like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

sync/rate: fix midnight rollover bug

karitham b142047f 7849f8fe

+125 -84
+1 -1
flake.nix
··· 17 17 let 18 18 lazuli = pkgs.buildGoModule rec { 19 19 name = "lazuli"; 20 - version = "0.1.0"; 20 + version = "0.1.1"; 21 21 src = pkgs.nix-gitignore.gitignoreSource [ "*.csv" "*.zip" "*.json" ] ./.; 22 22 vendorHash = "sha256-Zr9gGytJARMbf/7120HYkKsfzpeW47MkwdMODD9QTKc="; 23 23 ldflags = [
+73 -46
sync/rate.go
··· 37 37 Set(key string, val int) error 38 38 } 39 39 40 + type Clock interface { 41 + Now() time.Time 42 + } 43 + 44 + type realClock struct{} 45 + 46 + func (realClock) Now() time.Time { return time.Now().UTC() } 47 + 40 48 type quotaLimiter struct { 41 49 mu sync.Mutex 42 50 kv KVStore 43 51 prefix string 52 + clock Clock 44 53 } 45 54 46 55 func (l *quotaLimiter) Stats() (int, int) { ··· 54 63 return &quotaLimiter{ 55 64 kv: kv, 56 65 prefix: "quota", 66 + clock: realClock{}, 57 67 } 58 68 } 59 69 60 70 func (l *quotaLimiter) getKeys() (string, string) { 61 - day := time.Now().UTC().Format("2006-01-02") 71 + day := l.clock.Now().Format("2006-01-02") 62 72 return fmt.Sprintf("%s:writes:%s", l.prefix, day), fmt.Sprintf("%s:global:%s", l.prefix, day) 63 73 } 64 74 ··· 96 106 defer l.mu.Unlock() 97 107 98 108 for { 99 - now := time.Now().UTC() 100 - midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) 101 - elapsed := now.Sub(midnight).Seconds() 102 - 103 - currW := 0 104 - if wKey != "" { 105 - currW, _ = l.kv.Get(wKey) 106 - } 107 - currG, _ := l.kv.Get(gKey) 108 - 109 - var waitW, waitG time.Duration 110 - 111 - // Clamp the 'effective' elapsed time to prevent late-day bursts. 112 - // We allow a maximum credit of 1 minute worth of rate limit (burst window). 113 - const maxCreditSeconds = 60.0 114 - 115 - if wLimit > 0 { 116 - targetW := float64(currW+wCost) * secondsPerDay / float64(wLimit) 117 - // effectively: we can only be 'ahead' by maxCreditSeconds 118 - effectiveElapsed := max(elapsed, targetW-maxCreditSeconds) 119 - if targetW > effectiveElapsed { 120 - waitW = time.Duration((targetW - effectiveElapsed) * float64(time.Second)) 121 - } 122 - } 123 - 124 - if gLimit > 0 { 125 - targetG := float64(currG+gCost) * secondsPerDay / float64(gLimit) 126 - effectiveElapsed := max(elapsed, targetG-maxCreditSeconds) 127 - if targetG > effectiveElapsed { 128 - waitG = time.Duration((targetG - effectiveElapsed) * float64(time.Second)) 129 - } 130 - } 109 + elapsed := l.getElapsedSinceMidnight() 110 + currW, currG := l.getCurrentConsumption(wKey, gKey) 131 111 112 + waitW := l.computeTargetWait(currW, wCost, wLimit, elapsed) 113 + waitG := l.computeTargetWait(currG, gCost, gLimit, elapsed) 132 114 maxWait := max(waitG, waitW) 133 115 134 116 if maxWait <= 0 { 135 - if wKey != "" { 136 - l.kv.Set(wKey, currW+wCost) 137 - } 138 - l.kv.Set(gKey, currG+gCost) 117 + l.updateConsumption(wKey, gKey, wCost, gCost, currW, currG) 139 118 return nil 140 119 } 141 120 ··· 144 123 } 145 124 146 125 l.mu.Unlock() 147 - timer := time.NewTimer(maxWait) 148 - select { 149 - case <-ctx.Done(): 150 - timer.Stop() 151 - l.mu.Lock() 152 - return ctx.Err() 153 - case <-timer.C: 154 - l.mu.Lock() 155 - continue 126 + err := l.sleep(ctx, maxWait) 127 + l.mu.Lock() 128 + if err != nil { 129 + return err 156 130 } 157 131 } 158 132 } 133 + 134 + func (l *quotaLimiter) getElapsedSinceMidnight() float64 { 135 + now := l.clock.Now() 136 + midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) 137 + return now.Sub(midnight).Seconds() 138 + } 139 + 140 + func (l *quotaLimiter) getCurrentConsumption(wKey, gKey string) (int, int) { 141 + currW := 0 142 + if wKey != "" { 143 + currW, _ = l.kv.Get(wKey) 144 + } 145 + currG, _ := l.kv.Get(gKey) 146 + return currW, currG 147 + } 148 + 149 + func (l *quotaLimiter) computeTargetWait(curr, cost, limit int, elapsed float64) time.Duration { 150 + if limit <= 0 { 151 + return 0 152 + } 153 + const maxCreditSeconds = 60.0 154 + target := float64(curr+cost) * secondsPerDay / float64(limit) 155 + 156 + // If the target time (when this consumption is 'earned') is within the 157 + // burst window (now + maxCreditSeconds), we don't need to wait. 158 + if target <= elapsed+maxCreditSeconds { 159 + return 0 160 + } 161 + 162 + // Otherwise, we wait until we are at the edge of the burst window. 163 + // We cap the wait at maxCreditSeconds to avoid overly long sleeps in a single loop, 164 + // allowing for periodic re-checks of the clock and KV store. 165 + wait := target - (elapsed + maxCreditSeconds) 166 + return time.Duration(min(wait, maxCreditSeconds) * float64(time.Second)) 167 + } 168 + 169 + func (l *quotaLimiter) updateConsumption(wKey, gKey string, wCost, gCost, currW, currG int) { 170 + if wKey != "" { 171 + l.kv.Set(wKey, currW+wCost) 172 + } 173 + l.kv.Set(gKey, currG+gCost) 174 + } 175 + 176 + func (l *quotaLimiter) sleep(ctx context.Context, d time.Duration) error { 177 + timer := time.NewTimer(d) 178 + defer timer.Stop() 179 + select { 180 + case <-ctx.Done(): 181 + return ctx.Err() 182 + case <-timer.C: 183 + return nil 184 + } 185 + }
+51 -37
sync/rate_test.go
··· 63 63 64 64 func TestRateLimiter_Smoothing(t *testing.T) { 65 65 kv := &mockKV{data: make(map[string]int)} 66 - limiter := NewRateLimiter(kv) 67 - 68 - // Set consumption to just below the allowed threshold for "now" 69 - // Let's say 1 hour passed since midnight. 70 - // Allowed = (9000 / 86400) * 3600 = 375 71 - 72 - now := time.Now().UTC() 73 - midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) 74 - elapsed := now.Sub(midnight).Seconds() 66 + clock := &mockClock{now: time.Date(2026, 1, 22, 1, 0, 0, 0, time.UTC)} 67 + limiter := &quotaLimiter{ 68 + kv: kv, 69 + prefix: "quota", 70 + clock: clock, 71 + } 75 72 76 - allowedW := int((float64(WriteLimitDay) / secondsPerDay) * elapsed) 73 + // 1 hour since midnight = 3600s 74 + // allowance = (9000/86400) * 3600 = 375 75 + // burst allowance = (9000/86400) * (3600 + 60) = 381.25 77 76 78 - wKey, _ := limiter.(*quotaLimiter).getKeys() 79 - _ = kv.Set(wKey, allowedW) 77 + _ = kv.Set("quota:writes:2026-01-22", 400) // Well over the limit + burst 80 78 81 - // Trying to allow 100 more should block. 82 - // We'll use a short timeout context to verify it blocks. 83 79 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) 84 80 defer cancel() 85 81 86 - err := limiter.AllowBulkWrite(ctx, 100) 82 + err := limiter.AllowBulkWrite(ctx, 1) 87 83 if err != context.DeadlineExceeded { 88 84 t.Errorf("expected DeadlineExceeded, got %v", err) 89 85 } 90 86 } 91 87 92 - func TestRateLimiter_Refund(t *testing.T) { 93 - kv := &mockKV{data: make(map[string]int)} 94 - limiter := NewRateLimiter(kv) 95 - ctx := context.Background() 88 + type mockClock struct { 89 + now time.Time 90 + } 96 91 97 - // Initial state 98 - w, g := limiter.Stats() 99 - if w != 0 || g != 0 { 100 - t.Errorf("expected clean start, got w=%d, g=%d", w, g) 92 + func (m *mockClock) Now() time.Time { return m.now } 93 + 94 + func TestRateLimiter_MidnightRollover(t *testing.T) { 95 + kv := &mockKV{data: make(map[string]int)} 96 + clock := &mockClock{now: time.Date(2026, 1, 22, 23, 59, 59, 0, time.UTC)} 97 + limiter := &quotaLimiter{ 98 + kv: kv, 99 + prefix: "quota", 100 + clock: clock, 101 101 } 102 + ctx := context.Background() 102 103 103 - // Consume 104 + // 1. Consume some quota on day 1 104 105 err := limiter.AllowBulkWrite(ctx, 10) 105 106 if err != nil { 106 107 t.Fatal(err) 107 108 } 108 - w, g = limiter.Stats() 109 - if w != 10 || g != 30 { 110 - t.Errorf("expected w=10, g=30, got w=%d, g=%d", w, g) 109 + 110 + w1, g1 := limiter.Stats() 111 + if w1 != 10 || g1 != 30 { 112 + t.Errorf("expected w=10, g=30 on day 1, got w=%d, g=%d", w1, g1) 111 113 } 112 114 113 - // Refund half 114 - limiter.RefundBulkWrite(5) 115 - w, g = limiter.Stats() 116 - if w != 5 || g != 15 { 117 - t.Errorf("expected w=5, g=15, got w=%d, g=%d", w, g) 115 + // 2. Advance time to next day 116 + clock.now = clock.now.Add(2 * time.Second) // 00:00:01 on 2026-01-23 117 + 118 + // 3. Stats should now reflect day 2 (0) 119 + w2, g2 := limiter.Stats() 120 + if w2 != 0 || g2 != 0 { 121 + t.Errorf("expected w=0, g=0 on day 2, got w=%d, g=%d", w2, g2) 118 122 } 119 123 120 - // Refund more than exists (should floor at 0) 121 - limiter.RefundBulkWrite(10) 122 - w, g = limiter.Stats() 123 - if w != 0 || g != 0 { 124 - t.Errorf("expected floor at 0, got w=%d, g=%d", w, g) 124 + // 4. Consumption on day 2 should not affect day 1 125 + err = limiter.AllowBulkWrite(ctx, 5) 126 + if err != nil { 127 + t.Fatal(err) 128 + } 129 + 130 + w2, g2 = limiter.Stats() 131 + if w2 != 5 || g2 != 15 { 132 + t.Errorf("expected w=5, g=15 on day 2, got w=%d, g=%d", w2, g2) 133 + } 134 + 135 + // Verify day 1 keys are still there but not accessed by Stats() 136 + day1WKey := "quota:writes:2026-01-22" 137 + if val, _ := kv.Get(day1WKey); val != 10 { 138 + t.Errorf("expected day 1 write key to still be 10, got %d", val) 125 139 } 126 140 }