like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove dead code

karitham 32899bd0 575b439c

+3 -80
-47
sync/adapter.go
··· 6 6 "errors" 7 7 "fmt" 8 8 "log/slog" 9 - "math/rand" 10 9 "net" 11 10 "strings" 12 11 "time" ··· 178 177 return atprotoRecords 179 178 } 180 179 181 - func waitForRetry(ctx context.Context, delay time.Duration) bool { 182 - timer := time.NewTimer(delay) 183 - defer timer.Stop() 184 - 185 - select { 186 - case <-timer.C: 187 - return true 188 - case <-ctx.Done(): 189 - slog.Debug("retry cancelled due to context done") 190 - return false 191 - } 192 - } 193 - 194 180 func defaultProgressLog(f func(ProgressReport)) func(ProgressReport) { 195 181 if f != nil { 196 182 return f ··· 253 239 slog.String("rate", formatRate(ratePerMinute(success, time.Since(startTime))))) 254 240 } 255 241 256 - func backoff(attempt int) time.Duration { 257 - if attempt <= 0 { 258 - return BaseRetryDelay 259 - } 260 - 261 - // Calculate exponential delay: BaseRetryDelay * 2^(attempt-1) 262 - // We use uint(attempt-1) because 1<<0 is 1 (for first retry) 263 - exp := min(attempt-1, 31) 264 - 265 - delay := BaseRetryDelay * time.Duration(1<<uint(exp)) 266 - 267 - // Cap the delay before adding jitter 268 - if delay > MaxRetryDelay || delay <= 0 { 269 - delay = MaxRetryDelay 270 - } 271 - 272 - // Add up to 25% jitter 273 - var jitter time.Duration 274 - if delay > 4 { 275 - jitter = time.Duration(rand.Int63n(int64(delay / 4))) 276 - } 277 - 278 - return delay + jitter 279 - } 280 - 281 242 func PublishBatch(ctx context.Context, client ATProtoClient, did string, batch []PlayRecord, storage cache.Storage) error { 282 243 if len(batch) == 0 { 283 244 return nil ··· 330 291 331 292 func IsTransientError(err error) bool { 332 293 return isTransientError(err) 333 - } 334 - 335 - func Backoff(attempt int) time.Duration { 336 - return backoff(attempt) 337 - } 338 - 339 - func WaitForRetry(ctx context.Context, delay time.Duration) bool { 340 - return waitForRetry(ctx, delay) 341 294 } 342 295 343 296 func isTransientError(err error) bool {
+2 -3
sync/batch_test.go
··· 4 4 "context" 5 5 "encoding/json" 6 6 "errors" 7 + "maps" 7 8 "net/http" 8 9 "strings" 9 10 "sync" ··· 44 45 func (m *mockStorage) SaveRecords(did string, records map[string][]byte) error { 45 46 m.mu.Lock() 46 47 defer m.mu.Unlock() 47 - for k, v := range records { 48 - m.unpublished[k] = v 49 - } 48 + maps.Copy(m.unpublished, records) 50 49 return nil 51 50 } 52 51
+1 -1
sync/publish.go
··· 102 102 }). 103 103 Build() 104 104 105 - err := failsafe.With[any](retryPolicy).WithContext(ctx).Run(func() error { 105 + err := failsafe.With(retryPolicy).WithContext(ctx).Run(func() error { 106 106 return PublishBatch(ctx, atprotoClient, did, batch, opts.Storage) 107 107 }) 108 108 if err != nil {
-29
sync/sync_test.go
··· 1 1 package sync 2 2 3 3 import ( 4 - "fmt" 5 4 "testing" 6 5 "time" 7 6 ) ··· 298 297 }) 299 298 } 300 299 } 301 - 302 - func TestBackoff(t *testing.T) { 303 - tests := []struct { 304 - attempt int 305 - min time.Duration 306 - max time.Duration 307 - }{ 308 - {0, BaseRetryDelay, BaseRetryDelay + (BaseRetryDelay / 4)}, 309 - {1, BaseRetryDelay, BaseRetryDelay + (BaseRetryDelay / 4)}, 310 - {2, BaseRetryDelay * 2, BaseRetryDelay*2 + (BaseRetryDelay * 2 / 4)}, 311 - {3, BaseRetryDelay * 4, BaseRetryDelay*4 + (BaseRetryDelay * 4 / 4)}, 312 - {10, MaxRetryDelay, MaxRetryDelay + (MaxRetryDelay / 4)}, 313 - {100, MaxRetryDelay, MaxRetryDelay + (MaxRetryDelay / 4)}, 314 - } 315 - 316 - for _, tt := range tests { 317 - t.Run(fmt.Sprintf("attempt %d", tt.attempt), func(t *testing.T) { 318 - d := backoff(tt.attempt) 319 - if d < tt.min { 320 - t.Errorf("backoff(%d) = %v, want >= %v", tt.attempt, d, tt.min) 321 - } 322 - // Allow for jitter in the max check 323 - if d > tt.max { 324 - t.Errorf("backoff(%d) = %v, want <= %v", tt.attempt, d, tt.max) 325 - } 326 - }) 327 - } 328 - }