like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

cleanup: remove some more dead code

karitham 47416f66 26ea91c9

+11 -80
+6 -6
cache/bbolt.go
··· 339 339 return tx.ForEach(func(name []byte, b *bbolt.Bucket) error { 340 340 bucketName := string(name) 341 341 342 - if strings.HasPrefix(bucketName, "records:") { 343 - did := strings.TrimPrefix(bucketName, "records:") 342 + if after, ok := strings.CutPrefix(bucketName, "records:"); ok { 343 + did := after 344 344 count := 0 345 345 b.ForEach(func(k, v []byte) error { 346 346 count++ ··· 357 357 stats.UserStats[did] = m 358 358 } 359 359 360 - if strings.HasPrefix(bucketName, "processed:") { 361 - did := strings.TrimPrefix(bucketName, "processed:") 360 + if after, ok := strings.CutPrefix(bucketName, "processed:"); ok { 361 + did := after 362 362 count := 0 363 363 b.ForEach(func(k, v []byte) error { 364 364 count++ ··· 375 375 stats.UserStats[did] = m 376 376 } 377 377 378 - if strings.HasPrefix(bucketName, "failed:") { 379 - did := strings.TrimPrefix(bucketName, "failed:") 378 + if after, ok := strings.CutPrefix(bucketName, "failed:"); ok { 379 + did := after 380 380 count := 0 381 381 b.ForEach(func(k, v []byte) error { 382 382 count++
+1 -10
flags.go
··· 1 1 package main 2 2 3 3 import ( 4 - "time" 5 - 6 4 "github.com/urfave/cli/v3" 7 5 8 6 "tangled.org/karitham.dev/lazuli/sync" 9 7 ) 10 8 11 9 const ( 12 - DefaultBatchSize = 20 13 - DefaultBatchDelay = 2000 * time.Millisecond 10 + DefaultBatchSize = 20 14 11 ) 15 12 16 13 const ( ··· 130 127 Usage: "Records per batch (default: 20)", 131 128 Value: DefaultBatchSize, 132 129 Sources: cli.EnvVars("LAZULI_BATCH_SIZE"), 133 - }, 134 - &cli.IntFlag{ 135 - Name: "batch-delay", 136 - Usage: "MS between batches (default: 2000)", 137 - Value: int(DefaultBatchDelay.Milliseconds()), 138 - Sources: cli.EnvVars("LAZULI_BATCH_DELAY"), 139 130 }, 140 131 &cli.DurationFlag{ 141 132 Name: "tolerance",
-43
main.go
··· 505 505 fresh := cmd.Bool("fresh") 506 506 clearCache := cmd.Bool("clear-cache") 507 507 batchSize := int(cmd.Int("batch-size")) 508 - batchDelay := int(cmd.Int("batch-delay")) 509 508 tolerance := cmd.Duration("tolerance") 510 509 511 510 if clearCache { ··· 592 591 593 592 cfg := sync.DefaultConfig 594 593 cfg.BatchSize = batchSize 595 - cfg.BatchDelay = time.Duration(batchDelay) * time.Millisecond 596 594 597 595 progressLog := a.createProgressLogger() 598 596 599 597 publishOpts := sync.PublishOptions{ 600 598 BatchSize: cfg.BatchSize, 601 - BatchDelay: cfg.BatchDelay, 602 599 DryRun: dryRun, 603 600 ATProtoClient: repoClient, 604 601 ProgressLog: progressLog, ··· 775 772 } 776 773 777 774 return nil 778 - } 779 - 780 - func (a *App) loadRecordsForImport(ctx context.Context, lastfmPath, spotifyPath string, mode sync.ImportMode, tolerance time.Duration) ([]sync.PlayRecord, int, error) { 781 - var lastfmRecords, spotifyRecords []sync.PlayRecord 782 - var err error 783 - 784 - if mode == sync.ImportModeLastFM || mode == sync.ImportModeCombined { 785 - lastfmRecords, err = sync.ParseInput(ctx, lastfmPath, lastfm.Parser{}) 786 - if err != nil { 787 - return nil, 0, fmt.Errorf("parse lastfm: %w", err) 788 - } 789 - } 790 - 791 - if mode == sync.ImportModeSpotify || mode == sync.ImportModeCombined { 792 - spotifyRecords, err = sync.ParseInput(ctx, spotifyPath, spotify.Parser{}) 793 - if err != nil { 794 - return nil, 0, fmt.Errorf("parse spotify: %w", err) 795 - } 796 - } 797 - 798 - totalInput := len(lastfmRecords) + len(spotifyRecords) 799 - 800 - var ( 801 - mergedRecords []sync.PlayRecord 802 - stats sync.MergeStats 803 - ) 804 - 805 - switch mode { 806 - case sync.ImportModeCombined: 807 - mergedRecords, stats = sync.MergeRecords(lastfmRecords, spotifyRecords, tolerance) 808 - a.log.Debug("Merged records", 809 - slog.Int("merged_total", stats.MergedTotal), 810 - slog.Int("duplicates_removed", stats.DuplicatesRemoved)) 811 - case sync.ImportModeLastFM: 812 - mergedRecords = lastfmRecords 813 - default: 814 - mergedRecords = spotifyRecords 815 - } 816 - 817 - return mergedRecords, totalInput, nil 818 775 } 819 776 820 777 func (a *App) outputRecords(records []sync.PlayRecord, outputPath string) error {
+1 -1
sync/atproto_auth.go
··· 77 77 } 78 78 defer resp.Body.Close() 79 79 80 - if !(resp.StatusCode >= 200 && resp.StatusCode < 300) { 80 + if resp.StatusCode < 200 || resp.StatusCode >= 300 { 81 81 var eb atclient.ErrorBody 82 82 if err := json.NewDecoder(resp.Body).Decode(&eb); err != nil { 83 83 return &atclient.APIError{StatusCode: resp.StatusCode}
+3 -2
sync/atproto_auth_test.go
··· 79 79 // We call the library's method directly 80 80 _ = pa.Refresh(context.Background(), http.DefaultClient, "old-refresh") 81 81 82 - if methodUsed == http.MethodGet { 82 + switch methodUsed { 83 + case http.MethodGet: 83 84 t.Log("Confirmed: Library uses GET for refreshSession (Buggy)") 84 - } else if methodUsed == http.MethodPost { 85 + case http.MethodPost: 85 86 t.Log("Library uses POST for refreshSession") 86 87 } 87 88 }
-4
sync/config.go
··· 7 7 const ( 8 8 RecordType = "fm.teal.alpha.feed.play" 9 9 DefaultBatchSize = 20 10 - DefaultBatchDelay = 2000 * time.Millisecond 11 - MinBatchDelay = 1000 * time.Millisecond 12 10 DefaultCrossSourceTolerance = 5 * time.Minute 13 11 CrossSourceTolerance = DefaultCrossSourceTolerance 14 12 CacheTTL = 24 * time.Hour ··· 32 30 RecordType string `json:"recordType"` 33 31 ClientAgent string `json:"clientAgent"` 34 32 BatchSize int `json:"batchSize"` 35 - BatchDelay time.Duration `json:"batchDelay"` 36 33 CrossSourceTolerance time.Duration `json:"crossSourceTolerance"` 37 34 CacheTTL time.Duration `json:"cacheTTL"` 38 35 CacheVersion int `json:"cacheVersion"` ··· 45 42 RecordType: RecordType, 46 43 ClientAgent: ClientAgent, 47 44 BatchSize: DefaultBatchSize, 48 - BatchDelay: DefaultBatchDelay, 49 45 CrossSourceTolerance: CrossSourceTolerance, 50 46 CacheTTL: CacheTTL, 51 47 CacheVersion: CacheVersion,
-14
sync/publish.go
··· 18 18 19 19 type PublishOptions struct { 20 20 BatchSize int 21 - BatchDelay time.Duration 22 21 DryRun bool 23 22 ATProtoClient ATProtoClient 24 23 ProgressLog func(ProgressReport) ··· 29 28 startTime := time.Now() 30 29 31 30 batchSize := defaultBatchSize(opts.BatchSize) 32 - batchDelay := defaultBatchDelay(opts.BatchDelay) 33 31 34 32 atprotoClient, err := buildClient(client, opts.ATProtoClient) 35 33 if err != nil { ··· 57 55 slog.Info("starting iterative import", 58 56 slog.Int("total_records", totalRecords), 59 57 slog.Int("batch_size", batchSize), 60 - slog.Duration("batch_delay", batchDelay), 61 58 slog.Int("daily_write_limit", WriteLimitDay), 62 59 slog.Int("daily_token_limit", GlobalLimitDay), 63 60 slog.String("rate_limit", fmt.Sprintf("1 write per %.1fs", 86400.0/WriteLimitDay))) ··· 243 240 return DefaultBatchSize 244 241 } 245 242 246 - func defaultBatchDelay(delay time.Duration) time.Duration { 247 - if delay > 0 { 248 - return delay 249 - } 250 - return DefaultBatchDelay 251 - } 252 - 253 243 func buildClient(client AuthClient, customClient ATProtoClient) (ATProtoClient, error) { 254 244 if customClient != nil { 255 245 return customClient, nil ··· 273 263 TotalRecords: total, 274 264 RecordsPerMinute: ratePerMinute(success, time.Since(start)), 275 265 } 276 - } 277 - 278 - func makeRecordKeys(records []PlayRecord) []string { 279 - return CreateRecordKeys(records) 280 266 } 281 267 282 268 func logResult(success, errors int, startTime time.Time) {