like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: sync package

karitham 8fdef6b6 55ad5d37

+154 -131
+1 -3
atproto/client_test.go
··· 493 493 t.Parallel() 494 494 495 495 identity := &mockAuthClient{apiClient: tt.apiClient, did: tt.did} 496 - result, err := BuildClient[map[string]any](identity, tt.customClient) 496 + result, err := BuildClient(identity, tt.customClient) 497 497 498 498 if tt.wantErr { 499 499 if err == nil { ··· 646 646 647 647 ctx := context.Background() 648 648 err := auth.Refresh(ctx, httpClient, "refresh-token") 649 - 650 649 if err != nil { 651 650 t.Fatalf("Refresh failed: %v", err) 652 651 } ··· 699 698 fixedAuth := &FixedPasswordAuth{PasswordAuth: pa} 700 699 701 700 err := fixedAuth.Refresh(context.Background(), server.Client(), "refresh-token") 702 - 703 701 if err != nil { 704 702 t.Fatalf("FixedPasswordAuth.Refresh failed: %v", err) 705 703 }
+1 -1
atproto/rate_test.go
··· 59 59 t.Fatalf("AllowRead failed: %v", err) 60 60 } 61 61 62 - if kv.incrs == nil || len(kv.incrs) == 0 { 62 + if len(kv.incrs) == 0 { 63 63 t.Error("Expected kv.IncrByMulti to be called") 64 64 } 65 65 if !chargedAt.IsZero() {
+1 -1
atproto/repo_test.go
··· 745 745 } 746 746 747 747 func TestApplyWrites_Empty(t *testing.T) { 748 - err := applyWrites[map[string]any](context.Background(), nil, "did:plc:test", "app.bsky.feed.post", []map[string]any{}) 748 + err := applyWrites(context.Background(), nil, "did:plc:test", "app.bsky.feed.post", []map[string]any{}) 749 749 if err != nil { 750 750 t.Errorf("applyWrites with empty records should not fail: %v", err) 751 751 }
-1
atproto/testmock.go
··· 47 47 } 48 48 49 49 type mockKVStoreWithErr struct { 50 - mockKVStore 51 50 Data map[string]int 52 51 Err error 53 52 }
+1 -1
flake.nix
··· 17 17 let 18 18 lazuli = pkgs.buildGoModule rec { 19 19 name = "lazuli"; 20 - version = "0.1.3"; 20 + version = "0.1.4"; 21 21 src = pkgs.nix-gitignore.gitignoreSource [ "*.csv" "*.zip" "*.json" ] ./.; 22 22 vendorHash = "sha256-MfBPv/L7wHuUGXx4BDd+DFq0RB11KuMHCzPjFv6FMgs="; 23 23 ldflags = [
+1 -3
main.go
··· 60 60 } 61 61 62 62 func run() error { 63 - sync.ClientAgent = "lazuli/" + Version 64 - 65 63 storage, err := cache.NewBoltStorage() 66 64 if err != nil { 67 65 return fmt.Errorf("open cache: %w", err) ··· 317 315 continue 318 316 } 319 317 320 - res := sync.PublishBatch(ctx, repoClient, did, []sync.PlayRecord{fr.rec}, a.storage) 318 + res := sync.PublishBatch(ctx, repoClient, did, []sync.PlayRecord{fr.rec}, a.storage, sync.DefaultClientAgent) 321 319 322 320 if res == nil { 323 321 fmt.Printf("Successfully retried: %s - %s\n", fr.rec.ArtistName(), fr.rec.TrackName)
+1 -1
sources/lastfm/lastfm.go
··· 129 129 TrackName: r.Track, 130 130 Artists: []sync.PlayRecordArtist{{ArtistName: r.Artist, ArtistMbId: r.ArtistMbid}}, 131 131 PlayedTime: sync.Timestamp{Time: time.Unix(utsSec, 0).UTC()}, 132 - SubmissionClientAgent: sync.ClientAgent, 132 + SubmissionClientAgent: sync.DefaultClientAgent, 133 133 MusicServiceBaseDomain: sync.MusicServiceLastFM, 134 134 ReleaseName: r.Album, 135 135 ReleaseMbId: r.AlbumMbid,
+1 -1
sources/spotify/spotify.go
··· 64 64 TrackName: trackName, 65 65 Artists: []sync.PlayRecordArtist{{ArtistName: artistName}}, 66 66 PlayedTime: r.Ts, 67 - SubmissionClientAgent: sync.ClientAgent, 67 + SubmissionClientAgent: sync.DefaultClientAgent, 68 68 MusicServiceBaseDomain: sync.MusicServiceSpotify, 69 69 ReleaseName: releaseName, 70 70 OriginUrl: originUrl,
+91 -61
sync/batch_test.go
··· 197 197 198 198 for _, tt := range tests { 199 199 t.Run(tt.name, func(t *testing.T) { 200 + t.Parallel() 200 201 if got := isTransientError(tt.err); got != tt.want { 201 202 t.Errorf("isTransientError() = %v, want %v", got, tt.want) 202 203 } ··· 206 207 207 208 func TestApplyWrites_RateClient(t *testing.T) { 208 209 ctx := context.Background() 210 + clientAgent := "test-agent" 209 211 210 - t.Run("Empty anyRecords(records)", func(t *testing.T) { 211 - limiter := &mockLimiter{} 212 - client := atproto.NewRateClient[any](nil, "did:example:123", limiter) 213 - err := client.ApplyWrites(ctx, "test", nil) 214 - if err != nil { 215 - t.Errorf("ApplyWrites(nil) error = %v", err) 216 - } 217 - }) 218 - 219 - t.Run("Too many records", func(t *testing.T) { 220 - limiter := &mockLimiter{} 221 - client := atproto.NewRateClient[any](&atclient.APIClient{}, "did:example:123", limiter) 222 - recs := make([]any, 201) 223 - err := client.ApplyWrites(ctx, "test", recs) 224 - if err == nil { 225 - t.Fatal("expected error for > 200 records") 226 - } 227 - expected := "too many records in one ApplyWrites call: 201 (max 200)" 228 - if err.Error() != expected { 229 - t.Errorf("expected error %q, got %q", expected, err.Error()) 230 - } 231 - }) 212 + tests := []struct { 213 + name string 214 + setupClient func() (*atproto.RateClient[PlayRecord], *mockLimiter) 215 + records []PlayRecord 216 + wantErr bool 217 + wantErrMsg string 218 + wantRefunds int32 219 + }{ 220 + { 221 + name: "empty records succeeds", 222 + setupClient: func() (*atproto.RateClient[PlayRecord], *mockLimiter) { 223 + limiter := &mockLimiter{} 224 + return atproto.NewRateClient[PlayRecord](nil, "did:example:123", limiter), limiter 225 + }, 226 + records: nil, 227 + wantErr: false, 228 + wantRefunds: 0, 229 + }, 230 + { 231 + name: "too many records fails", 232 + setupClient: func() (*atproto.RateClient[PlayRecord], *mockLimiter) { 233 + limiter := &mockLimiter{} 234 + return atproto.NewRateClient[PlayRecord](&atclient.APIClient{}, "did:example:123", limiter), limiter 235 + }, 236 + records: make([]PlayRecord, 201), 237 + wantErr: true, 238 + wantErrMsg: "too many records in one ApplyWrites call: 201 (max 200)", 239 + wantRefunds: 0, 240 + }, 241 + { 242 + name: "transient error refunds tokens", 243 + setupClient: func() (*atproto.RateClient[PlayRecord], *mockLimiter) { 244 + limiter := &mockLimiter{} 245 + apiClient := atclient.NewAPIClient("https://example.com") 246 + apiClient.Client.Transport = mockRoundTripper(func(req *http.Request) (*http.Response, error) { 247 + return &http.Response{ 248 + StatusCode: 503, 249 + Body: http.NoBody, 250 + }, nil 251 + }) 252 + return atproto.NewRateClient[PlayRecord](apiClient, "did:example:123", limiter), limiter 253 + }, 254 + records: []PlayRecord{{TrackName: "Song 1"}}, 255 + wantErr: true, 256 + wantRefunds: 1, 257 + }, 258 + { 259 + name: "non-transient error does NOT refund", 260 + setupClient: func() (*atproto.RateClient[PlayRecord], *mockLimiter) { 261 + limiter := &mockLimiter{} 262 + apiClient := atclient.NewAPIClient("https://example.com") 263 + apiClient.Client.Transport = mockRoundTripper(func(req *http.Request) (*http.Response, error) { 264 + return &http.Response{ 265 + StatusCode: 400, 266 + Body: http.NoBody, 267 + }, nil 268 + }) 269 + return atproto.NewRateClient[PlayRecord](apiClient, "did:example:123", limiter), limiter 270 + }, 271 + records: []PlayRecord{{TrackName: "Song 1"}}, 272 + wantErr: true, 273 + wantRefunds: 0, 274 + }, 275 + } 232 276 233 - t.Run("Transient error refunds tokens", func(t *testing.T) { 234 - limiter := &mockLimiter{} 235 - apiClient := atclient.NewAPIClient("https://example.com") 236 - apiClient.Client.Transport = mockRoundTripper(func(req *http.Request) (*http.Response, error) { 237 - return &http.Response{ 238 - StatusCode: 503, 239 - Body: http.NoBody, 240 - }, nil 241 - }) 242 - client := atproto.NewRateClient[PlayRecord](apiClient, "did:example:123", limiter) 277 + for _, tt := range tests { 278 + t.Run(tt.name, func(t *testing.T) { 279 + client, limiter := tt.setupClient() 280 + err := client.ApplyWrites(ctx, "test", tt.records) 243 281 244 - err := client.ApplyWrites(ctx, "test", []PlayRecord{{TrackName: "Song 1"}}) 245 - if err == nil { 246 - t.Fatal("expected error") 247 - } 248 - if atomic.LoadInt32(&limiter.refunds) != 1 { 249 - t.Errorf("expected 1 refund, got %d", limiter.refunds) 250 - } 251 - }) 282 + if tt.wantErr { 283 + if err == nil { 284 + t.Error("expected error, got nil") 285 + } else if tt.wantErrMsg != "" && err.Error() != tt.wantErrMsg { 286 + t.Errorf("error msg = %q, want %q", err.Error(), tt.wantErrMsg) 287 + } 288 + } else if err != nil { 289 + t.Errorf("unexpected error: %v", err) 290 + } 252 291 253 - t.Run("Non-transient error does NOT refund", func(t *testing.T) { 254 - limiter := &mockLimiter{} 255 - apiClient := atclient.NewAPIClient("https://example.com") 256 - apiClient.Client.Transport = mockRoundTripper(func(req *http.Request) (*http.Response, error) { 257 - return &http.Response{ 258 - StatusCode: 400, 259 - Body: http.NoBody, 260 - }, nil 292 + if got := atomic.LoadInt32(&limiter.refunds); got != tt.wantRefunds { 293 + t.Errorf("refunds = %d, want %d", got, tt.wantRefunds) 294 + } 261 295 }) 262 - client := atproto.NewRateClient[PlayRecord](apiClient, "did:example:123", limiter) 263 - 264 - err := client.ApplyWrites(ctx, "test", []PlayRecord{{TrackName: "Song 1"}}) 265 - if err == nil { 266 - t.Fatal("expected error") 267 - } 268 - if atomic.LoadInt32(&limiter.refunds) != 0 { 269 - t.Errorf("expected 0 refunds, got %d", limiter.refunds) 270 - } 271 - }) 296 + } 297 + _ = clientAgent 272 298 } 273 299 274 300 func TestPublishBatch(t *testing.T) { 275 301 ctx := context.Background() 276 302 did := "did:example:123" 277 303 batch := []PlayRecord{{TrackName: "Song 1"}} 304 + clientAgent := "test-agent" 278 305 279 306 t.Run("Success", func(t *testing.T) { 280 307 storage := newMockStorage() 281 308 client := &mockATProtoClient{} 282 - err := PublishBatch(ctx, client, did, batch, storage) 309 + err := PublishBatch(ctx, client, did, batch, storage, clientAgent) 283 310 if err != nil { 284 311 t.Fatal(err) 285 312 } ··· 296 323 return expectedErr 297 324 }, 298 325 } 299 - err := PublishBatch(ctx, client, did, batch, storage) 326 + err := PublishBatch(ctx, client, did, batch, storage, clientAgent) 300 327 if !errors.Is(err, expectedErr) { 301 328 t.Errorf("expected error %v, got %v", expectedErr, err) 302 329 } ··· 308 335 t.Run("Storage failure after ApplyWrites success", func(t *testing.T) { 309 336 storage := &failingStorage{} 310 337 client := &mockATProtoClient{} 311 - err := PublishBatch(ctx, client, did, batch, storage) 338 + err := PublishBatch(ctx, client, did, batch, storage, clientAgent) 312 339 if err == nil || !strings.Contains(err.Error(), "failed to save records") { 313 340 t.Errorf("expected storage save error, got %v", err) 314 341 } ··· 326 353 func TestPublish_Iterative(t *testing.T) { 327 354 ctx := context.Background() 328 355 did := "did:example:123" 356 + clientAgent := "test-agent" 329 357 330 358 rec1, _ := json.Marshal(PlayRecord{TrackName: "Song 1"}) 331 359 rec2, _ := json.Marshal(PlayRecord{TrackName: "Song 2"}) ··· 352 380 BatchSize: 1, 353 381 ATProtoClient: client, 354 382 Storage: storage, 383 + ClientAgent: clientAgent, 355 384 }) 356 385 357 386 if res.SuccessCount != 2 { ··· 376 405 BatchSize: 1, 377 406 ATProtoClient: client, 378 407 Storage: storage, 408 + ClientAgent: clientAgent, 379 409 }) 380 410 381 411 if res.SuccessCount != 0 {
+1 -1
sync/config.go
··· 41 41 42 42 var DefaultConfig = Config{ 43 43 RecordType: RecordType, 44 - ClientAgent: ClientAgent, 44 + ClientAgent: DefaultClientAgent, 45 45 BatchSize: DefaultBatchSize, 46 46 CrossSourceTolerance: CrossSourceTolerance, 47 47 CacheTTL: CacheTTL,
+1
sync/import_test.go
··· 169 169 ATProtoClient: mockRepo, 170 170 Storage: storage, 171 171 Limiter: limiter, 172 + ClientAgent: sync.DefaultClientAgent, 172 173 } 173 174 174 175 auth := &mockAuthClient{did: did}
+4 -20
sync/progress.go
··· 37 37 StartTime: time.Now(), 38 38 LastLogTime: time.Now(), 39 39 limiter: limiter, 40 - LogInterval: 30 * time.Second, 41 - LogRecordsMetric: 1000, 40 + LogInterval: 5 * time.Second, 41 + LogRecordsMetric: 100, 42 42 } 43 43 } 44 44 ··· 120 120 percent, eta, elapsed, rate := t.Progress() 121 121 etaStr := "" 122 122 if eta > 0 { 123 - etaStr = FormatDuration(eta) 123 + etaStr = eta.String() 124 124 } 125 125 126 126 var w, g int ··· 143 143 144 144 resetStr := "" 145 145 if timeUntilReset > 0 { 146 - resetStr = FormatDuration(timeUntilReset) 146 + resetStr = timeUntilReset.String() 147 147 } 148 148 149 149 return ProgressReport{ ··· 169 169 } 170 170 return fmt.Sprintf("%.0f/min", perMin) 171 171 } 172 - 173 - func FormatDuration(d time.Duration) string { 174 - if d == 0 { 175 - return "done" 176 - } 177 - hours := int(d.Hours()) 178 - minutes := int(d.Minutes()) % 60 179 - seconds := int(d.Seconds()) % 60 180 - if hours > 0 { 181 - return fmt.Sprintf("%dh %dm", hours, minutes) 182 - } 183 - if minutes > 0 { 184 - return fmt.Sprintf("%dm %ds", minutes, seconds) 185 - } 186 - return fmt.Sprintf("%ds", seconds) 187 - }
+19 -5
sync/publish.go
··· 57 57 ProgressLog func(ProgressReport) 58 58 Storage cache.Storage 59 59 Limiter RateLimiter 60 + ClientAgent string 60 61 } 61 62 62 63 func Publish(ctx context.Context, client AuthClient, opts PublishOptions) PublishResult { ··· 116 117 } 117 118 totalSuccess += len(batch) 118 119 tracker.Increment(len(batch)) 120 + slog.Debug("batch dry run completed", 121 + slog.Int("count", len(batch)), 122 + slog.Int("completed", tracker.Completed)) 119 123 batch = batch[:0] 120 124 batchKeys = batchKeys[:0] 121 125 return nil 122 126 } 123 127 128 + slog.Debug("processing batch", 129 + slog.Int("count", len(batch)), 130 + slog.Int("completed", tracker.Completed), 131 + slog.Int("total", tracker.Total)) 132 + 124 133 did := client.DID() 125 134 retryPolicy := retrypolicy.NewBuilder[any](). 126 135 WithMaxRetries(10). ··· 138 147 Build() 139 148 140 149 err := failsafe.With(retryPolicy).WithContext(ctx).Run(func() error { 141 - return PublishBatch(ctx, atprotoClient, did, batch, opts.Storage) 150 + return PublishBatch(ctx, atprotoClient, did, batch, opts.Storage, opts.ClientAgent) 142 151 }) 143 152 if err != nil { 144 153 slog.Error("batch failed after retries", ··· 161 170 162 171 totalSuccess += len(batch) 163 172 tracker.Increment(len(batch)) 173 + slog.Debug("batch published", 174 + slog.Int("count", len(batch)), 175 + slog.Int("completed", tracker.Completed), 176 + slog.Int("total", tracker.Total)) 164 177 165 178 if tracker.ShouldLog() { 166 179 progressLog(tracker.Report()) ··· 266 279 slog.String("rate", formatRate(ratePerMinute(success, time.Since(startTime))))) 267 280 } 268 281 269 - func PublishBatch(ctx context.Context, client ATProtoClient, did string, batch []PlayRecord, storage cache.Storage) error { 282 + func PublishBatch(ctx context.Context, client ATProtoClient, did string, batch []PlayRecord, storage cache.Storage, clientAgent string) error { 270 283 if len(batch) == 0 { 271 284 return nil 272 285 } 273 286 274 - err := client.ApplyWrites(ctx, RecordType, batch) 287 + atprotoRecords := prepareRecords(batch, clientAgent) 288 + err := client.ApplyWrites(ctx, RecordType, atprotoRecords) 275 289 if err != nil { 276 290 slog.Error("batch publish failed", Error(err)) 277 291 return err ··· 298 312 return nil 299 313 } 300 314 301 - func prepareRecords(batch []PlayRecord) []PlayRecord { 315 + func prepareRecords(batch []PlayRecord, clientAgent string) []PlayRecord { 302 316 atprotoRecords := make([]PlayRecord, 0, len(batch)) 303 317 for _, record := range batch { 304 318 record.Type = RecordType 305 - record.SubmissionClientAgent = ClientAgent 319 + record.SubmissionClientAgent = clientAgent 306 320 atprotoRecords = append(atprotoRecords, record) 307 321 } 308 322 return atprotoRecords
+29 -30
sync/rate_test.go
··· 8 8 "time" 9 9 10 10 "github.com/bluesky-social/indigo/atproto/atclient" 11 + 11 12 "tangled.org/karitham.dev/lazuli/atproto" 12 13 ) 13 14 ··· 84 85 wCost := n * atproto.WriteOnlyCost 85 86 gCost := n * atproto.WriteGlobalCost 86 87 87 - for { 88 - now := l.clock.now 89 - wKeys, gKeys := l.getAllKeys(now) 88 + now := l.clock.now 89 + wKeys, gKeys := l.getAllKeys(now) 90 90 91 - maxWait, err := l.checkQuota(now, wKeys, gKeys, wCost, gCost) 92 - if err != nil { 93 - return now, err 94 - } 91 + maxWait, err := l.checkQuota(now, wKeys, gKeys, wCost, gCost) 92 + if err != nil { 93 + return now, err 94 + } 95 95 96 - if maxWait > 0 { 97 - return now, context.DeadlineExceeded 98 - } 96 + if maxWait > 0 { 97 + return now, context.DeadlineExceeded 98 + } 99 99 100 - err = l.charge(wKeys, gKeys, wCost, gCost) 101 - if err != nil { 102 - return now, err 103 - } 104 - return now, nil 100 + err = l.charge(wKeys, gKeys, wCost, gCost) 101 + if err != nil { 102 + return now, err 105 103 } 104 + return now, nil 106 105 } 107 106 108 107 func (l *testRateLimiter) AllowRead(ctx context.Context) (time.Time, error) { 109 108 gCost := atproto.ReadGlobalCost 110 109 111 - for { 112 - now := l.clock.now 113 - _, gKeys := l.getAllKeys(now) 110 + now := l.clock.now 111 + _, gKeys := l.getAllKeys(now) 114 112 115 - maxWait, err := l.checkQuota(now, nil, gKeys, 0, gCost) 116 - if err != nil { 117 - return now, err 118 - } 113 + maxWait, err := l.checkQuota(now, nil, gKeys, 0, gCost) 114 + if err != nil { 115 + return now, err 116 + } 119 117 120 - if maxWait > 0 { 121 - return now, context.DeadlineExceeded 122 - } 118 + if maxWait > 0 { 119 + return now, context.DeadlineExceeded 120 + } 123 121 124 - err = l.charge(nil, gKeys, 0, gCost) 125 - if err != nil { 126 - return now, err 127 - } 128 - return now, nil 122 + err = l.charge(nil, gKeys, 0, gCost) 123 + if err != nil { 124 + return now, err 129 125 } 126 + return now, nil 130 127 } 131 128 132 129 func (l *testRateLimiter) RefundBulkWrite(ctx context.Context, n int, chargedAt time.Time) { ··· 287 284 func TestRetryExhaustionMarkFailed(t *testing.T) { 288 285 ctx := context.Background() 289 286 did := "did:example:123" 287 + clientAgent := "test-agent" 290 288 storage := newMockStorage() 291 289 rec1, _ := json.Marshal(PlayRecord{TrackName: "Song 1"}) 292 290 storage.SaveRecords(did, map[string][]byte{"k1": rec1}) ··· 305 303 BatchSize: 1, 306 304 ATProtoClient: client, 307 305 Storage: storage, 306 + ClientAgent: clientAgent, 308 307 }) 309 308 310 309 if res.SuccessCount != 0 {
+2 -2
sync/record.go
··· 126 126 127 127 TimeBucketSize = 30 * time.Second 128 128 MinListenDuration = 30 * time.Second 129 - ) 130 129 131 - var ClientAgent = "lazuli/dev" 130 + DefaultClientAgent = "lazuli/dev" 131 + ) 132 132 133 133 func CreateRecordKey(record PlayRecord) string { 134 134 return string(syntax.NewTIDFromTime(record.PlayedTime.Time, 0))