like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

log upgrades

karitham 941398d7 78a7519d

+183 -153
+3 -1
main.go
··· 649 649 slog.String("rate", pr.Rate), 650 650 slog.Int("errors", pr.Errors), 651 651 slog.Int("writes", pr.WritesConsumed), 652 - slog.Int("global", pr.GlobalConsumed)) 652 + slog.Int("global", pr.GlobalConsumed), 653 + slog.String("limited_rate", pr.ConstrainedRate), 654 + slog.String("reset_in", pr.TimeUntilReset)) 653 655 } 654 656 } 655 657 }
-32
sync/adapter.go
··· 484 484 func generateRecordURI(did string, record PlayRecord) string { 485 485 return fmt.Sprintf("at://%s/%s/%s", did, RecordType, CreateRecordKey(record)) 486 486 } 487 - 488 - func AddToCache(did string, records []ExistingRecord, storage cache.Storage) error { 489 - if storage == nil { 490 - return nil 491 - } 492 - newEntries := make(map[string][]byte) 493 - for _, rec := range records { 494 - key := CreateRecordKey(rec.Value) 495 - value, _ := json.Marshal(rec.Value) 496 - newEntries[key] = value 497 - } 498 - return storage.SaveRecords(did, newEntries) 499 - } 500 - 501 - func AddToCacheWithKeys(did string, records []PlayRecord, keys []string, storage cache.Storage) error { 502 - if storage == nil { 503 - return nil 504 - } 505 - if did == "" { 506 - return fmt.Errorf("did cannot be empty") 507 - } 508 - if len(records) != len(keys) { 509 - return fmt.Errorf("records and keys length mismatch: %d vs %d", len(records), len(keys)) 510 - } 511 - newEntries := make(map[string][]byte) 512 - for i, rec := range records { 513 - key := keys[i] 514 - value, _ := json.Marshal(rec) 515 - newEntries[key] = value 516 - } 517 - return storage.SaveRecords(did, newEntries) 518 - }
+8
sync/batch_test.go
··· 128 128 return 0, 0, nil 129 129 } 130 130 131 + func (m *mockLimiter) EstimatedWriteTime(n int) time.Duration { 132 + return 0 133 + } 134 + 135 + func (m *mockLimiter) RemainingQuota() (int, int, time.Duration) { 136 + return 10000, 35000, time.Hour 137 + } 138 + 131 139 // Mock ATProtoClient 132 140 type mockATProtoClient struct { 133 141 applyWritesFunc func(ctx context.Context, collection string, records []PlayRecord) error
-4
sync/config.go
··· 64 64 func (r *PublishResult) Errored() bool { 65 65 return r.ErrorCount > 0 66 66 } 67 - 68 - func (r *PublishResult) AllSuccessful() bool { 69 - return r.ErrorCount == 0 && !r.Cancelled 70 - }
+48 -114
sync/progress.go
··· 1 1 package sync 2 2 3 3 import ( 4 - "encoding/json" 5 4 "fmt" 6 - "os" 7 - "path/filepath" 8 5 "sync" 9 6 "time" 10 7 ) ··· 21 18 FailedRecords int `json:"failedRecords"` 22 19 } 23 20 24 - func stateDir() (string, error) { 25 - dir := os.Getenv("LAZULI_STATE_DIR") 26 - if dir != "" { 27 - return dir, nil 28 - } 29 - home, err := os.UserHomeDir() 30 - if err != nil { 31 - return "", err 32 - } 33 - return filepath.Join(home, ".lazuli", "state"), nil 34 - } 35 - 36 - func statePath(did string) (string, error) { 37 - sanitized := did 38 - sanitized = filepath.FromSlash(sanitized) 39 - sanitized = filepath.Clean(sanitized) 40 - return sanitized + ".json", nil 41 - } 42 - 43 - func LoadSyncState(did string) (*SyncState, error) { 44 - dir, err := stateDir() 45 - if err != nil { 46 - return nil, err 47 - } 48 - 49 - path, err := statePath(did) 50 - if err != nil { 51 - return nil, err 52 - } 53 - 54 - fullPath := filepath.Join(dir, path) 55 - data, err := os.ReadFile(fullPath) 56 - if err != nil { 57 - if os.IsNotExist(err) { 58 - return &SyncState{StartedAt: time.Now()}, nil 59 - } 60 - return nil, err 61 - } 62 - 63 - var state SyncState 64 - if err := json.Unmarshal(data, &state); err != nil { 65 - return nil, err 66 - } 67 - 68 - return &state, nil 69 - } 70 - 71 - func SaveSyncState(did string, state *SyncState) error { 72 - dir, err := stateDir() 73 - if err != nil { 74 - return err 75 - } 76 - 77 - if err := os.MkdirAll(dir, 0o755); err != nil { 78 - return err 79 - } 80 - 81 - path, err := statePath(did) 82 - if err != nil { 83 - return err 84 - } 85 - 86 - data, err := json.MarshalIndent(state, "", " ") 87 - if err != nil { 88 - return err 89 - } 90 - 91 - return os.WriteFile(filepath.Join(dir, path), data, 0o644) 92 - } 93 - 94 - func ClearSyncState(did string) error { 95 - dir, err := stateDir() 96 - if err != nil { 97 - return err 98 - } 99 - 100 - path, err := statePath(did) 101 - if err != nil { 102 - return err 103 - } 104 - 105 - fullPath := filepath.Join(dir, path) 106 - if _, err := os.Stat(fullPath); os.IsNotExist(err) { 107 - return nil 108 - } 109 - 110 - return os.Remove(fullPath) 111 - } 112 - 113 21 type ProgressTracker struct { 114 22 Total int 115 23 Completed int ··· 152 60 153 61 elapsed = time.Since(t.StartTime) 154 62 if t.Completed == 0 { 155 - return 0, 0, elapsed, "0 rec/min" 63 + return 0, 0, elapsed, "0/min" 156 64 } 157 65 158 - perMin := float64(t.Completed) / elapsed.Minutes() 159 - rate = formatRate(perMin) 66 + observedRate := float64(t.Completed) / elapsed.Minutes() 67 + rate = formatRate(observedRate) 160 68 161 69 if t.Total == 0 { 162 70 return 100, 0, elapsed, rate ··· 168 76 } 169 77 170 78 remainingRecords := float64(t.Total - t.Completed) 171 - remainingMinutes := remainingRecords / perMin 79 + remainingMinutes := remainingRecords / observedRate 172 80 eta = time.Duration(remainingMinutes*60) * time.Second 173 81 return percent, eta, elapsed, rate 174 82 } ··· 193 101 } 194 102 195 103 type ProgressReport struct { 196 - Total int `json:"total"` 197 - Completed int `json:"completed"` 198 - Percent float64 `json:"percent"` 199 - Errors int `json:"errors"` 200 - Elapsed string `json:"elapsed"` 201 - ETA string `json:"eta,omitempty"` 202 - Rate string `json:"rate"` 203 - WritesConsumed int `json:"writesConsumed,omitempty"` 204 - GlobalConsumed int `json:"globalConsumed,omitempty"` 104 + Total int `json:"total"` 105 + Completed int `json:"completed"` 106 + Percent float64 `json:"percent"` 107 + Errors int `json:"errors"` 108 + Elapsed string `json:"elapsed"` 109 + ETA string `json:"eta,omitempty"` 110 + Rate string `json:"rate"` 111 + WritesConsumed int `json:"writesConsumed,omitempty"` 112 + GlobalConsumed int `json:"globalConsumed,omitempty"` 113 + WritesRemaining int `json:"writesRemaining,omitempty"` 114 + GlobalRemaining int `json:"globalRemaining,omitempty"` 115 + TimeUntilReset string `json:"timeUntilReset,omitempty"` 116 + ConstrainedRate string `json:"constrainedRate,omitempty"` 205 117 } 206 118 207 119 func (t *ProgressTracker) Report() ProgressReport { ··· 212 124 } 213 125 214 126 var w, g int 127 + var writesRemaining, globalRemaining int 128 + var timeUntilReset time.Duration 129 + var constrainedRate string 130 + 215 131 if t.limiter != nil { 216 132 w, g, _ = t.limiter.Stats() 133 + writesRemaining, globalRemaining, timeUntilReset = t.limiter.RemainingQuota() 134 + if writesRemaining > 0 && timeUntilReset > 0 { 135 + constrainedPerMin := float64(writesRemaining) / (timeUntilReset.Minutes() + 0.001) 136 + if constrainedPerMin >= 1000 { 137 + constrainedRate = fmt.Sprintf("%.1fk/min", constrainedPerMin/1000) 138 + } else { 139 + constrainedRate = fmt.Sprintf("%.0f/min", constrainedPerMin) 140 + } 141 + } 142 + } 143 + 144 + resetStr := "" 145 + if timeUntilReset > 0 { 146 + resetStr = FormatDuration(timeUntilReset) 217 147 } 218 148 219 149 return ProgressReport{ 220 - Total: t.Total, 221 - Completed: t.Completed, 222 - Percent: percent, 223 - Errors: t.Errors, 224 - Elapsed: elapsed.Round(time.Second).String(), 225 - ETA: etaStr, 226 - Rate: rate, 227 - WritesConsumed: w, 228 - GlobalConsumed: g, 150 + Total: t.Total, 151 + Completed: t.Completed, 152 + Percent: percent, 153 + Errors: t.Errors, 154 + Elapsed: elapsed.Round(time.Second).String(), 155 + ETA: etaStr, 156 + Rate: rate, 157 + WritesConsumed: w, 158 + GlobalConsumed: g, 159 + WritesRemaining: writesRemaining, 160 + GlobalRemaining: globalRemaining, 161 + TimeUntilReset: resetStr, 162 + ConstrainedRate: constrainedRate, 229 163 } 230 164 } 231 165
+124 -2
sync/rate.go
··· 41 41 RefundRead(ctx context.Context, chargedAt time.Time) 42 42 // Stats returns current consumption (writes, global) 43 43 Stats() (int, int, error) 44 + // EstimatedWriteTime returns how long it would take to write n records 45 + // given current consumption and rate limits. 46 + EstimatedWriteTime(n int) time.Duration 47 + // RemainingQuota returns remaining write and global quota at each tier, 48 + // plus the time until the next tier reset (the maximum wait). 49 + RemainingQuota() (writesRemaining, globalRemaining int, timeUntilReset time.Duration) 44 50 } 45 51 46 52 type KVStore interface { ··· 108 114 109 115 if maxWait > 0 { 110 116 l.mu.Unlock() 111 - slog.Info("Rate limit reached, sleeping until next window", slog.Duration("wait", maxWait.Round(time.Second))) 117 + slog.Debug("Rate limit reached, sleeping until next window", slog.Duration("wait", maxWait.Round(time.Second))) 112 118 // Add a tiny bit of buffer + jitter to ensure we are definitely in the next window 113 119 wait := maxWait + 100*time.Millisecond + addJitter(100*time.Millisecond) 114 120 if err := l.sleep(ctx, wait); err != nil { ··· 144 150 145 151 if maxWait > 0 { 146 152 l.mu.Unlock() 147 - slog.Info("Rate limit reached, sleeping until next window", slog.Duration("wait", maxWait.Round(time.Second))) 153 + slog.Debug("Rate limit reached, sleeping until next window", slog.Duration("wait", maxWait.Round(time.Second))) 148 154 wait := maxWait + 100*time.Millisecond + addJitter(100*time.Millisecond) 149 155 if err := l.sleep(ctx, wait); err != nil { 150 156 return now, err ··· 304 310 return nil 305 311 } 306 312 } 313 + 314 + func (l *quotaLimiter) EstimatedWriteTime(n int) time.Duration { 315 + if n <= 0 { 316 + return 0 317 + } 318 + 319 + now := l.clock.Now() 320 + wKeys, gKeys := l.getAllKeys(now) 321 + 322 + wCost := n * WriteOnlyCost 323 + gCost := n * WriteGlobalCost 324 + 325 + allKeys := make([]string, 0, len(wKeys)+len(gKeys)) 326 + allKeys = append(allKeys, wKeys...) 327 + allKeys = append(allKeys, gKeys...) 328 + 329 + values, err := l.kv.GetMulti(allKeys) 330 + if err != nil { 331 + return time.Minute 332 + } 333 + 334 + maxWait := time.Duration(0) 335 + 336 + for i, k := range wKeys { 337 + curr := values[k] 338 + limit := int(float32(WriteLimitMinute) * l.rlQuota) 339 + if i == 1 { 340 + limit = int(float32(WriteLimitHour) * l.rlQuota) 341 + } else if i == 2 { 342 + limit = int(float32(WriteLimitDay) * l.rlQuota) 343 + } 344 + if curr+wCost > limit { 345 + maxWait = max(l.untilNextWindow(now, i), maxWait) 346 + } 347 + } 348 + 349 + for i, k := range gKeys { 350 + curr := values[k] 351 + limit := int(float32(GlobalLimitMinute) * l.rlQuota) 352 + if i == 1 { 353 + limit = int(float32(GlobalLimitHour) * l.rlQuota) 354 + } else if i == 2 { 355 + limit = int(float32(GlobalLimitDay) * l.rlQuota) 356 + } 357 + if curr+gCost > limit { 358 + maxWait = max(l.untilNextWindow(now, i), maxWait) 359 + } 360 + } 361 + 362 + return maxWait 363 + } 364 + 365 + func (l *quotaLimiter) RemainingQuota() (writesRemaining, globalRemaining int, timeUntilReset time.Duration) { 366 + now := l.clock.Now() 367 + wKeys, gKeys := l.getAllKeys(now) 368 + 369 + allKeys := make([]string, 0, len(wKeys)+len(gKeys)) 370 + allKeys = append(allKeys, wKeys...) 371 + allKeys = append(allKeys, gKeys...) 372 + 373 + values, err := l.kv.GetMulti(allKeys) 374 + if err != nil { 375 + return 0, 0, time.Minute 376 + } 377 + 378 + minWritesRemaining := int(float32(WriteLimitDay) * l.rlQuota) 379 + minGlobalRemaining := int(float32(GlobalLimitDay) * l.rlQuota) 380 + maxResetTime := time.Duration(0) 381 + 382 + for i, k := range wKeys { 383 + curr := values[k] 384 + var limit int 385 + switch i { 386 + case 0: 387 + limit = int(float32(WriteLimitMinute) * l.rlQuota) 388 + case 1: 389 + limit = int(float32(WriteLimitHour) * l.rlQuota) 390 + case 2: 391 + limit = int(float32(WriteLimitDay) * l.rlQuota) 392 + } 393 + remaining := limit - curr 394 + if remaining < minWritesRemaining { 395 + minWritesRemaining = remaining 396 + } 397 + resetTime := l.untilNextWindow(now, i) 398 + if resetTime > maxResetTime { 399 + maxResetTime = resetTime 400 + } 401 + } 402 + 403 + for i, k := range gKeys { 404 + curr := values[k] 405 + var limit int 406 + switch i { 407 + case 0: 408 + limit = int(float32(GlobalLimitMinute) * l.rlQuota) 409 + case 1: 410 + limit = int(float32(GlobalLimitHour) * l.rlQuota) 411 + case 2: 412 + limit = int(float32(GlobalLimitDay) * l.rlQuota) 413 + } 414 + remaining := limit - curr 415 + if remaining < minGlobalRemaining { 416 + minGlobalRemaining = remaining 417 + } 418 + } 419 + 420 + if minWritesRemaining < 0 { 421 + minWritesRemaining = 0 422 + } 423 + if minGlobalRemaining < 0 { 424 + minGlobalRemaining = 0 425 + } 426 + 427 + return minWritesRemaining, minGlobalRemaining, maxResetTime 428 + }