like malachite (atproto-lastfm-importer) but in go and bluer
go spotify tealfm lastfm atproto
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor: storage layer for concurrent read-only access

- Replace global storage instance with local initialization in CLI commands
- Implement read-only mode using bbolt.Options{ReadOnly: true}
- Ensure deterministic resource cleanup with defer Close() in all actions
- Fix cursor early exit bug in BoltStorage iteration logic
- Optimize memory allocations in batch record processing
- Add comprehensive table-driven tests for storage and publishing

This refactor allows CLI status commands to run concurrently with the
main sync process by opening the database in read-only mode.

karitham a438862a 1376811e

+875 -416
-11
atproto/client.go
··· 209 209 return c.client 210 210 } 211 211 212 - func (c *Client) Close() error { 213 - c.mu.Lock() 214 - defer c.mu.Unlock() 215 - if c.client != nil && c.client.Auth != nil { 216 - if logout, ok := c.client.Auth.(*atclient.PasswordAuth); ok { 217 - return logout.Logout(context.Background(), c.client.Client) 218 - } 219 - } 220 - return nil 221 - } 222 - 223 212 func (c *Client) HasClient() bool { 224 213 c.mu.Lock() 225 214 defer c.mu.Unlock()
+27 -150
atproto/client_test.go
··· 6 6 "net/http" 7 7 "net/http/httptest" 8 8 "testing" 9 + "testing/synctest" 9 10 "time" 10 11 11 12 "github.com/bluesky-social/indigo/atproto/atclient" ··· 97 98 })) 98 99 defer server.Close() 99 100 100 - ctx := context.Background() 101 + ctx := t.Context() 101 102 102 103 opts := tt.opts 103 104 if opts == nil { ··· 144 145 opts := NewClientOptions() 145 146 opts.ResolverURL = server.URL 146 147 147 - ctx := context.Background() 148 + ctx := t.Context() 148 149 did, pds, _, err := ResolveMiniDoc(ctx, "test.user", opts) 149 150 if err != nil { 150 151 t.Fatalf("ResolveMiniDoc failed: %v", err) ··· 176 177 opts.ResolverURL = server.URL 177 178 opts.UserAgent = "test-agent/1.0" 178 179 179 - ctx := context.Background() 180 + ctx := t.Context() 180 181 did, _, _, err := ResolveMiniDoc(ctx, "test.user", opts) 181 182 if err != nil { 182 183 t.Fatalf("ResolveMiniDoc failed: %v", err) ··· 188 189 } 189 190 190 191 func TestResolveMiniDoc_ContextCancelled(t *testing.T) { 191 - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 192 - select { 193 - case <-time.After(100 * time.Millisecond): 194 - w.WriteHeader(http.StatusOK) 195 - case <-r.Context().Done(): 196 - return 197 - } 198 - })) 199 - defer server.Close() 192 + synctest.Test(t, func(t *testing.T) { 193 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 194 + select { 195 + case <-time.After(100 * time.Millisecond): 196 + w.WriteHeader(http.StatusOK) 197 + case <-r.Context().Done(): 198 + return 199 + } 200 + })) 201 + defer server.Close() 200 202 201 - ctx, cancel := context.WithCancel(context.Background()) 202 - cancel() 203 + ctx, cancel := context.WithCancel(t.Context()) 204 + cancel() 203 205 204 - opts := NewClientOptions() 205 - opts.ResolverURL = server.URL 206 - opts.HTTPClient = server.Client() 206 + opts := NewClientOptions() 207 + opts.ResolverURL = server.URL 208 + opts.HTTPClient = server.Client() 207 209 208 - _, _, _, err := ResolveMiniDoc(ctx, "test.user", opts) 210 + _, _, _, err := ResolveMiniDoc(ctx, "test.user", opts) 209 211 210 - if err == nil { 211 - t.Error("expected error for cancelled context") 212 - } 212 + if err == nil { 213 + t.Error("expected error for cancelled context") 214 + } 215 + }) 213 216 } 214 217 215 218 func TestResolveMiniDoc_InvalidURL(t *testing.T) { 216 - ctx := context.Background() 219 + ctx := t.Context() 217 220 218 221 opts := NewClientOptions() 219 222 opts.ResolverURL = "://invalid-url" ··· 281 284 t.Parallel() 282 285 283 286 if tt.handle == "" { 284 - ctx := context.Background() 287 + ctx := t.Context() 285 288 _, err := ResolveIdentity(ctx, "", tt.opts) 286 289 if err == nil { 287 290 t.Error("expected error for empty handle") ··· 295 298 })) 296 299 defer server.Close() 297 300 298 - ctx := context.Background() 301 + ctx := t.Context() 299 302 300 303 opts := tt.opts 301 304 if opts == nil { ··· 321 324 if tt.checkFunc != nil { 322 325 tt.checkFunc(t, identity, err) 323 326 } 324 - }) 325 - } 326 - } 327 - 328 - func TestClient_Getters(t *testing.T) { 329 - t.Parallel() 330 - 331 - tests := []struct { 332 - name string 333 - setup func() *Client 334 - check func(t *testing.T, c *Client) 335 - }{ 336 - { 337 - name: "HasClient true", 338 - setup: func() *Client { 339 - return &Client{ 340 - client: &atclient.APIClient{}, 341 - resolvedIdentity: resolvedIdentity{ 342 - DID: "did:plc:test", 343 - Handle: "test.bsky.social", 344 - PDS: "https://pds.example.com", 345 - }, 346 - } 347 - }, 348 - check: func(t *testing.T, c *Client) { 349 - if !c.HasClient() { 350 - t.Error("HasClient() = false, want true") 351 - } 352 - }, 353 - }, 354 - { 355 - name: "HasClient false", 356 - setup: func() *Client { 357 - return &Client{} 358 - }, 359 - check: func(t *testing.T, c *Client) { 360 - if c.HasClient() { 361 - t.Error("HasClient() = true, want false") 362 - } 363 - }, 364 - }, 365 - { 366 - name: "DID", 367 - setup: func() *Client { 368 - return &Client{ 369 - resolvedIdentity: resolvedIdentity{DID: "did:plc:test123"}, 370 - } 371 - }, 372 - check: func(t *testing.T, c *Client) { 373 - if got := c.DID(); got != "did:plc:test123" { 374 - t.Errorf("DID() = %s, want did:plc:test123", got) 375 - } 376 - }, 377 - }, 378 - { 379 - name: "PDS", 380 - setup: func() *Client { 381 - return &Client{ 382 - resolvedIdentity: resolvedIdentity{PDS: "https://pds.example.com"}, 383 - } 384 - }, 385 - check: func(t *testing.T, c *Client) { 386 - if got := c.PDS(); got != "https://pds.example.com" { 387 - t.Errorf("PDS() = %s, want https://pds.example.com", got) 388 - } 389 - }, 390 - }, 391 - { 392 - name: "Handle", 393 - setup: func() *Client { 394 - return &Client{ 395 - resolvedIdentity: resolvedIdentity{Handle: "test.bsky.social"}, 396 - } 397 - }, 398 - check: func(t *testing.T, c *Client) { 399 - if got := c.Handle(); got != "test.bsky.social" { 400 - t.Errorf("Handle() = %s, want test.bsky.social", got) 401 - } 402 - }, 403 - }, 404 - { 405 - name: "SigningKey", 406 - setup: func() *Client { 407 - return &Client{ 408 - resolvedIdentity: resolvedIdentity{SigningKey: "-----BEGIN PUBLIC KEY-----\nabc\n-----END PUBLIC KEY-----"}, 409 - } 410 - }, 411 - check: func(t *testing.T, c *Client) { 412 - if got := c.SigningKey(); got != "-----BEGIN PUBLIC KEY-----\nabc\n-----END PUBLIC KEY-----" { 413 - t.Errorf("SigningKey() = %s, want expected key", got) 414 - } 415 - }, 416 - }, 417 - { 418 - name: "APIClient", 419 - setup: func() *Client { 420 - expectedClient := &atclient.APIClient{} 421 - return &Client{ 422 - client: expectedClient, 423 - resolvedIdentity: resolvedIdentity{DID: "did:plc:test"}, 424 - } 425 - }, 426 - check: func(t *testing.T, c *Client) { 427 - if got := c.APIClient(); got == nil { 428 - t.Error("APIClient() = nil, want non-nil") 429 - } 430 - }, 431 - }, 432 - { 433 - name: "APIClient nil", 434 - setup: func() *Client { 435 - return &Client{} 436 - }, 437 - check: func(t *testing.T, c *Client) { 438 - if got := c.APIClient(); got != nil { 439 - t.Errorf("APIClient() = %v, want nil", got) 440 - } 441 - }, 442 - }, 443 - } 444 - 445 - for _, tt := range tests { 446 - t.Run(tt.name, func(t *testing.T) { 447 - t.Parallel() 448 - c := tt.setup() 449 - tt.check(t, c) 450 327 }) 451 328 } 452 329 }
+7 -7
atproto/rate_test.go
··· 53 53 rlQuota: 1.0, 54 54 } 55 55 56 - ctx := context.Background() 56 + ctx := t.Context() 57 57 chargedAt, err := rl.AllowRead(ctx) 58 58 if err != nil { 59 59 t.Fatalf("AllowRead failed: %v", err) ··· 98 98 rlQuota: 1.0, 99 99 } 100 100 101 - ctx := context.Background() 101 + ctx := t.Context() 102 102 _, err := rl.AllowBulkWrite(ctx, tt.n) 103 103 104 104 if tt.wantErr && err == nil { ··· 167 167 rlQuota: 1.0, 168 168 } 169 169 170 - ctx := context.Background() 170 + ctx := t.Context() 171 171 tt.refundFunc(rl, ctx) 172 172 173 173 if len(kv.incrs) != tt.wantIncrCalls { ··· 548 548 { 549 549 name: "AllowBulkWrite error", 550 550 testFunc: func(t *testing.T, rl *quotaLimiter) { 551 - ctx := context.Background() 551 + ctx := t.Context() 552 552 _, err := rl.AllowBulkWrite(ctx, 1) 553 553 if err == nil { 554 554 t.Error("expected error from KV store") ··· 558 558 { 559 559 name: "AllowRead error", 560 560 testFunc: func(t *testing.T, rl *quotaLimiter) { 561 - ctx := context.Background() 561 + ctx := t.Context() 562 562 _, err := rl.AllowRead(ctx) 563 563 if err == nil { 564 564 t.Error("expected error from KV store") ··· 651 651 rlQuota: 1.0, 652 652 } 653 653 654 - ctx := context.Background() 654 + ctx := t.Context() 655 655 tt.refundFn(rl, ctx) 656 656 }) 657 657 } ··· 668 668 rlQuota: 1.0, 669 669 } 670 670 671 - ctx := context.Background() 671 + ctx := t.Context() 672 672 done := make(chan bool) 673 673 errors := make(chan error, 10) 674 674
+12 -12
atproto/repo_test.go
··· 169 169 } 170 170 171 171 rateClient := NewRateClient[map[string]any](tt.client, "did:plc:test", tt.limiter) 172 - ctx := context.Background() 172 + ctx := t.Context() 173 173 174 174 records, cursor, err := rateClient.ListRecords(ctx, tt.collection, tt.limit, tt.cursor) 175 175 ··· 207 207 } 208 208 209 209 rateClient := NewRateClient[map[string]any](&client, "did:plc:test", nil) 210 - ctx := context.Background() 210 + ctx := t.Context() 211 211 212 212 _, _, err := rateClient.ListRecords(ctx, "app.bsky.feed.post", 10, "") 213 213 ··· 233 233 } 234 234 235 235 rateClient := NewRateClient[map[string]any](&client, "did:plc:test", nil) 236 - ctx, cancel := context.WithCancel(context.Background()) 236 + ctx, cancel := context.WithCancel(t.Context()) 237 237 cancel() 238 238 239 239 _, _, err := rateClient.ListRecords(ctx, "app.bsky.feed.post", 10, "") ··· 277 277 } 278 278 279 279 rateClient := NewRateClient[map[string]any](&client, "did:plc:test", nil) 280 - ctx := context.Background() 280 + ctx := t.Context() 281 281 282 282 records, cursor, err := rateClient.ListRecords(ctx, "app.bsky.feed.post", 10, "") 283 283 if err != nil { ··· 394 394 } 395 395 396 396 rateClient := NewRateClient[map[string]any](tt.client, "did:plc:test", nil) 397 - ctx := context.Background() 397 + ctx := t.Context() 398 398 399 399 err := rateClient.ApplyWrites(ctx, "app.bsky.feed.post", tt.records) 400 400 ··· 421 421 } 422 422 423 423 rateClient := NewRateClient[map[string]any](&client, "did:plc:test", nil) 424 - ctx := context.Background() 424 + ctx := t.Context() 425 425 426 426 records := []map[string]any{ 427 427 {"$type": "app.bsky.feed.post", "text": "Hello"}, ··· 449 449 limiter := NewRateLimiter(mockKV, 1.0) 450 450 451 451 rateClient := NewRateClient[map[string]any](&client, "did:plc:test", limiter) 452 - ctx := context.Background() 452 + ctx := t.Context() 453 453 454 454 records := []map[string]any{ 455 455 {"$type": "app.bsky.feed.post", "text": "Hello"}, ··· 544 544 } 545 545 546 546 rateClient := NewRateClient[map[string]any](tt.client, "did:plc:test", nil) 547 - ctx := context.Background() 547 + ctx := t.Context() 548 548 549 549 err := rateClient.DeleteRecord(ctx, "app.bsky.feed.post", "3k5x3x2x1") 550 550 ··· 571 571 } 572 572 573 573 rateClient := NewRateClient[map[string]any](&client, "did:plc:test", nil) 574 - ctx := context.Background() 574 + ctx := t.Context() 575 575 576 576 err := rateClient.DeleteRecord(ctx, "app.bsky.feed.post", "3k5x3x2x1") 577 577 ··· 634 634 for _, tt := range tests { 635 635 t.Run(tt.name, func(t *testing.T) { 636 636 t.Parallel() 637 - ctx := context.Background() 637 + ctx := t.Context() 638 638 639 639 switch tt.name { 640 640 case "ListRecords": ··· 752 752 } 753 753 754 754 func TestApplyWrites_Empty(t *testing.T) { 755 - err := applyWrites(context.Background(), nil, "did:plc:test", "app.bsky.feed.post", []map[string]any{}) 755 + err := applyWrites(t.Context(), nil, "did:plc:test", "app.bsky.feed.post", []map[string]any{}) 756 756 if err != nil { 757 757 t.Errorf("applyWrites with empty records should not fail: %v", err) 758 758 } ··· 937 937 } 938 938 939 939 rateClient := NewRateClient[testRecord](&client, "did:plc:test", nil) 940 - ctx := context.Background() 940 + ctx := t.Context() 941 941 942 942 records, _, err := rateClient.ListRecords(ctx, "app.bsky.feed.post", 10, "") 943 943 if err != nil {
+100 -87
cache/bbolt.go
··· 1 1 package cache 2 2 3 3 import ( 4 + "bytes" 4 5 "encoding/json" 5 6 "errors" 6 7 "fmt" 8 + "iter" 7 9 "os" 8 10 "path/filepath" 9 11 "strings" ··· 22 24 path string 23 25 } 24 26 25 - func NewBoltStorage() (*BoltStorage, error) { 27 + func NewBoltStorage(readOnly bool) (*BoltStorage, error) { 26 28 dir, err := cacheDir() 27 29 if err != nil { 28 30 return nil, err ··· 31 33 return nil, err 32 34 } 33 35 db, err := bbolt.Open(filepath.Join(dir, CacheFile), 0o644, &bbolt.Options{ 34 - Timeout: time.Second, 36 + Timeout: time.Second, 37 + ReadOnly: readOnly, 35 38 }) 36 39 if err != nil { 37 40 return nil, err ··· 66 69 }) 67 70 } 68 71 69 - func (s *BoltStorage) IterateUnpublished(did string, fn func(key string, rec []byte) error) error { 70 - published, err := s.GetPublished(did) 71 - if err != nil { 72 - return err 73 - } 74 - 75 - return s.db.View(func(tx *bbolt.Tx) error { 76 - b := tx.Bucket([]byte(recordsBucket(did))) 77 - if b == nil { 78 - return nil 72 + func (s *BoltStorage) IterateUnpublished(did string, reverse bool) iter.Seq2[string, []byte] { 73 + return func(yield func(key string, rec []byte) bool) { 74 + published, err := s.GetPublished(did) 75 + if err != nil { 76 + return 79 77 } 80 78 81 - return b.ForEach(func(k, v []byte) error { 82 - key := string(k) 83 - if published[key] { 79 + _ = s.db.View(func(tx *bbolt.Tx) error { 80 + b := tx.Bucket([]byte(recordsBucket(did))) 81 + if b == nil { 84 82 return nil 85 83 } 86 - return fn(key, v) 87 - }) 88 - }) 89 - } 90 84 91 - func (s *BoltStorage) IteratePublished(did string, fn func(key string, rec []byte) error) error { 92 - published, err := s.GetPublished(did) 93 - if err != nil { 94 - return err 95 - } 85 + if reverse { 86 + cursor := b.Cursor() 96 87 97 - return s.db.View(func(tx *bbolt.Tx) error { 98 - b := tx.Bucket([]byte(recordsBucket(did))) 99 - if b == nil { 88 + for k, v := cursor.Last(); k != nil; k, v = cursor.Prev() { 89 + key := string(k) 90 + if !published[key] { 91 + // Clone key and value to avoid holding read locks 92 + if !yield(key, bytes.Clone(v)) { 93 + break // Stop iteration when yield returns false 94 + } 95 + } 96 + } 97 + } else { 98 + return b.ForEach(func(k, v []byte) error { 99 + key := string(k) 100 + if published[key] { 101 + return nil 102 + } 103 + // Clone key and value to avoid holding read locks 104 + if !yield(key, bytes.Clone(v)) { 105 + return errors.New("stop iteration") 106 + } 107 + return nil 108 + }) 109 + } 100 110 return nil 111 + }) 112 + } 113 + } 114 + 115 + func (s *BoltStorage) IteratePublished(did string, reverse bool) iter.Seq2[string, []byte] { 116 + return func(yield func(key string, rec []byte) bool) { 117 + published, err := s.GetPublished(did) 118 + if err != nil { 119 + return 101 120 } 102 121 103 - return b.ForEach(func(k, v []byte) error { 104 - key := string(k) 105 - if !published[key] { 122 + _ = s.db.View(func(tx *bbolt.Tx) error { 123 + b := tx.Bucket([]byte(recordsBucket(did))) 124 + if b == nil { 106 125 return nil 107 126 } 108 - return fn(key, v) 127 + 128 + if reverse { 129 + cursor := b.Cursor() 130 + k, v := cursor.Last() 131 + for k != nil { 132 + key := string(k) 133 + if published[key] { 134 + // Clone key and value to avoid holding read locks 135 + if !yield(key, bytes.Clone(v)) { 136 + break // Stop iteration when yield returns false 137 + } 138 + } 139 + k, v = cursor.Prev() 140 + } 141 + } else { 142 + return b.ForEach(func(k, v []byte) error { 143 + key := string(k) 144 + if !published[key] { 145 + return nil 146 + } 147 + // Clone key and value to avoid holding read locks 148 + if !yield(key, bytes.Clone(v)) { 149 + return errors.New("stop iteration") 150 + } 151 + return nil 152 + }) 153 + } 154 + return nil 109 155 }) 110 - }) 156 + } 111 157 } 112 158 113 - func (s *BoltStorage) IterateFailed(did string, fn func(key string, rec []byte, errMsg string) error) error { 114 - return s.db.View(func(tx *bbolt.Tx) error { 115 - fb := tx.Bucket([]byte(failedBucket(did))) 116 - if fb == nil { 117 - return nil 118 - } 119 - rb := tx.Bucket([]byte(recordsBucket(did))) 120 - if rb == nil { 121 - return nil 122 - } 159 + func (s *BoltStorage) IterateFailed(did string) func(yield func(key string, rec []byte, errMsg string) bool) { 160 + return func(yield func(key string, rec []byte, errMsg string) bool) { 161 + _ = s.db.View(func(tx *bbolt.Tx) error { 162 + fb := tx.Bucket([]byte(failedBucket(did))) 163 + if fb == nil { 164 + return nil 165 + } 166 + rb := tx.Bucket([]byte(recordsBucket(did))) 167 + if rb == nil { 168 + return nil 169 + } 123 170 124 - return fb.ForEach(func(k, v []byte) error { 125 - key := string(k) 126 - errMsg := string(v) 127 - rec := rb.Get(k) 128 - return fn(key, rec, errMsg) 171 + return fb.ForEach(func(k, v []byte) error { 172 + key := string(k) 173 + errMsg := string(v) 174 + rec := rb.Get(k) 175 + // Clone to avoid holding read lock 176 + if !yield(key, bytes.Clone(rec), errMsg) { 177 + return errors.New("stop iteration") 178 + } 179 + return nil 180 + }) 129 181 }) 130 - }) 182 + } 131 183 } 132 184 133 185 func (s *BoltStorage) MarkPublished(did string, keys ...string) error { ··· 300 352 301 353 return nil 302 354 }) 303 - } 304 - 305 - func (s *BoltStorage) Get(key string) (int, error) { 306 - var val int 307 - err := s.db.View(func(tx *bbolt.Tx) error { 308 - b := tx.Bucket([]byte("quota")) 309 - if b == nil { 310 - return nil 311 - } 312 - v := b.Get([]byte(key)) 313 - if v == nil { 314 - return nil 315 - } 316 - return json.Unmarshal(v, &val) 317 - }) 318 - return val, err 319 - } 320 - 321 - func (s *BoltStorage) IncrBy(key string, n int) (int, error) { 322 - var val int 323 - err := s.db.Update(func(tx *bbolt.Tx) error { 324 - b, err := tx.CreateBucketIfNotExists([]byte("quota")) 325 - if err != nil { 326 - return err 327 - } 328 - v := b.Get([]byte(key)) 329 - if v != nil { 330 - if err := json.Unmarshal(v, &val); err != nil { 331 - return err 332 - } 333 - } 334 - val += n 335 - newV, err := json.Marshal(val) 336 - if err != nil { 337 - return err 338 - } 339 - return b.Put([]byte(key), newV) 340 - }) 341 - return val, err 342 355 } 343 356 344 357 func (s *BoltStorage) GetMulti(keys []string) (map[string]int, error) {
+422 -16
cache/cache_test.go
··· 1 1 package cache 2 2 3 3 import ( 4 + "slices" 4 5 "testing" 5 6 ) 6 7 7 8 func newTestStorage(t *testing.T) *BoltStorage { 8 9 t.Helper() 9 - storage, err := NewBoltStorage() 10 + storage, err := NewBoltStorage(false) 10 11 if err != nil { 11 12 t.Fatalf("NewBoltStorage failed: %v", err) 12 13 } 13 - t.Cleanup(func() { storage.Close() }) 14 + t.Cleanup(func() { 15 + storage.Close() 16 + }) 14 17 return storage 15 18 } 16 19 20 + // testDID generates a unique DID for each test to ensure test isolation. 21 + func testDID(t *testing.T, suffix string) string { 22 + return "did:plc:test/" + t.Name() + "/" + suffix 23 + } 24 + 25 + func TestReadOnlyMode(t *testing.T) { 26 + tests := []struct { 27 + name string 28 + readOnly bool 29 + shouldWrite bool 30 + expectError bool 31 + }{ 32 + { 33 + name: "read-write mode allows writes", 34 + readOnly: false, 35 + shouldWrite: true, 36 + expectError: false, 37 + }, 38 + { 39 + name: "read-only mode prevents writes", 40 + readOnly: true, 41 + shouldWrite: true, 42 + expectError: true, 43 + }, 44 + { 45 + name: "read-only mode allows iteration", 46 + readOnly: true, 47 + shouldWrite: false, 48 + expectError: false, 49 + }, 50 + } 51 + 52 + for _, tt := range tests { 53 + t.Run(tt.name, func(t *testing.T) { 54 + storage, err := NewBoltStorage(tt.readOnly) 55 + if err != nil { 56 + t.Fatalf("NewBoltStorage failed: %v", err) 57 + } 58 + t.Cleanup(func() { 59 + storage.Close() 60 + }) 61 + 62 + did := "did:plc:testro" + t.Name() 63 + records := map[string][]byte{ 64 + "key1": []byte(`{"trackName":"track1"}`), 65 + } 66 + 67 + if tt.shouldWrite { 68 + err = storage.SaveRecords(did, records) 69 + if tt.expectError && err == nil { 70 + t.Error("expected error when writing to read-only storage") 71 + } else if !tt.expectError && err != nil { 72 + t.Errorf("unexpected error when writing: %v", err) 73 + } 74 + } else { 75 + // Just test that iteration works on read-only storage 76 + var count int 77 + for range storage.IterateUnpublished(did, false) { 78 + count++ 79 + } 80 + if count != 0 { 81 + t.Errorf("expected 0 records, got %d", count) 82 + } 83 + } 84 + }) 85 + } 86 + } 87 + 17 88 func TestSaveIterateRoundtrip(t *testing.T) { 18 89 storage := newTestStorage(t) 19 - did := "did:plc:test" 90 + did := testDID(t, "roundtrip") 20 91 21 92 records := map[string][]byte{ 22 93 "key1": []byte(`{"trackName":"track1"}`), ··· 28 99 } 29 100 30 101 count := 0 31 - err := storage.IterateUnpublished(did, func(key string, data []byte) error { 102 + for range storage.IterateUnpublished(did, false) { 32 103 count++ 33 - return nil 34 - }) 35 - if err != nil { 36 - t.Fatalf("IterateUnpublished failed: %v", err) 37 104 } 38 105 if count != 2 { 39 106 t.Errorf("expected 2 records, got %d", count) ··· 42 109 43 110 func TestMarkPublished(t *testing.T) { 44 111 storage := newTestStorage(t) 45 - did := "did:plc:test" 112 + did := testDID(t, "mark") 46 113 47 114 records := map[string][]byte{ 48 115 "key1": []byte(`{"trackName":"track1"}`), ··· 55 122 } 56 123 57 124 count := 0 58 - storage.IterateUnpublished(did, func(key string, data []byte) error { 59 - if key == "key1" { 60 - t.Error("key1 should have been filtered out") 61 - } 125 + for range storage.IterateUnpublished(did, false) { 62 126 count++ 63 - return nil 64 - }) 127 + } 65 128 if count != 1 { 66 129 t.Errorf("expected 1 unpublished record, got %d", count) 67 130 } ··· 69 132 70 133 func TestClear(t *testing.T) { 71 134 storage := newTestStorage(t) 72 - did := "did:plc:test" 135 + did := testDID(t, "clear") 73 136 storage.SaveRecords(did, map[string][]byte{"key1": []byte(`{}`)}) 74 137 75 138 if !storage.IsValid(did) { ··· 82 145 t.Error("cache should be invalid") 83 146 } 84 147 } 148 + 149 + func TestIterateUnpublishedReverse(t *testing.T) { 150 + tests := []struct { 151 + name string 152 + did string 153 + records map[string][]byte 154 + reverse bool 155 + expectedKeys []string 156 + }{ 157 + { 158 + name: "basic reverse iteration", 159 + records: map[string][]byte{"aaa": []byte(`{"trackName":"a"}`), "bbb": []byte(`{"trackName":"b"}`), "ccc": []byte(`{"trackName":"c"}`)}, 160 + reverse: true, 161 + expectedKeys: []string{"ccc", "bbb", "aaa"}, 162 + }, 163 + { 164 + name: "forward iteration", 165 + records: map[string][]byte{"aaa": []byte(`{"trackName":"a"}`), "bbb": []byte(`{"trackName":"b"}`), "ccc": []byte(`{"trackName":"c"}`)}, 166 + reverse: false, 167 + expectedKeys: []string{"aaa", "bbb", "ccc"}, 168 + }, 169 + { 170 + name: "single record reverse", 171 + records: map[string][]byte{"only": []byte(`{"trackName":"one"}`)}, 172 + reverse: true, 173 + expectedKeys: []string{"only"}, 174 + }, 175 + } 176 + 177 + for _, tt := range tests { 178 + t.Run(tt.name, func(t *testing.T) { 179 + storage := newTestStorage(t) 180 + did := testDID(t, "unpublished") 181 + 182 + if err := storage.SaveRecords(did, tt.records); err != nil { 183 + t.Fatalf("SaveRecords failed: %v", err) 184 + } 185 + 186 + var keys []string 187 + for key, rec := range storage.IterateUnpublished(did, tt.reverse) { 188 + keys = append(keys, key) 189 + _ = rec 190 + } 191 + 192 + if !slices.Equal(keys, tt.expectedKeys) { 193 + t.Errorf("expected keys %v, got %v", tt.expectedKeys, keys) 194 + } 195 + }) 196 + } 197 + } 198 + 199 + func TestIteratePublishedReverse(t *testing.T) { 200 + tests := []struct { 201 + name string 202 + records map[string][]byte 203 + published []string 204 + reverse bool 205 + expectedKeys []string 206 + }{ 207 + { 208 + name: "basic published reverse", 209 + records: map[string][]byte{"aaa": []byte(`{"trackName":"a"}`), "bbb": []byte(`{"trackName":"b"}`), "ccc": []byte(`{"trackName":"c"}`)}, 210 + published: []string{"aaa", "ccc"}, reverse: true, expectedKeys: []string{"ccc", "aaa"}, 211 + }, 212 + { 213 + name: "forward published", 214 + records: map[string][]byte{"aaa": []byte(`{"trackName":"a"}`), "bbb": []byte(`{"trackName":"b"}`), "ccc": []byte(`{"trackName":"c"}`)}, 215 + published: []string{"aaa", "ccc"}, reverse: false, expectedKeys: []string{"aaa", "ccc"}, 216 + }, 217 + } 218 + 219 + for _, tt := range tests { 220 + t.Run(tt.name, func(t *testing.T) { 221 + storage := newTestStorage(t) 222 + did := testDID(t, "published") 223 + 224 + if err := storage.SaveRecords(did, tt.records); err != nil { 225 + t.Fatalf("SaveRecords failed: %v", err) 226 + } 227 + 228 + if err := storage.MarkPublished(did, tt.published...); err != nil { 229 + t.Fatalf("MarkPublished failed: %v", err) 230 + } 231 + 232 + var keys []string 233 + for key, rec := range storage.IteratePublished(did, tt.reverse) { 234 + keys = append(keys, key) 235 + _ = rec 236 + } 237 + 238 + if !slices.Equal(keys, tt.expectedKeys) { 239 + t.Errorf("expected keys %v, got %v", tt.expectedKeys, keys) 240 + } 241 + }) 242 + } 243 + } 244 + 245 + func TestIterateReverseEmptyBucket(t *testing.T) { 246 + tests := []struct { 247 + name string 248 + reverse bool 249 + expectedLen int 250 + }{ 251 + { 252 + name: "empty unpublished reverse", 253 + reverse: true, 254 + expectedLen: 0, 255 + }, 256 + { 257 + name: "empty published reverse", 258 + reverse: true, 259 + expectedLen: 0, 260 + }, 261 + { 262 + name: "empty unpublished forward", 263 + reverse: false, 264 + expectedLen: 0, 265 + }, 266 + } 267 + 268 + for _, tt := range tests { 269 + t.Run(tt.name, func(t *testing.T) { 270 + storage := newTestStorage(t) 271 + did := testDID(t, "empty") 272 + 273 + count := 0 274 + for range storage.IterateUnpublished(did, tt.reverse) { 275 + count++ 276 + } 277 + 278 + if count != tt.expectedLen { 279 + t.Errorf("expected %d records, got %d", tt.expectedLen, count) 280 + } 281 + }) 282 + } 283 + } 284 + 285 + func TestIterateReverseWithEarlyExit(t *testing.T) { 286 + tests := []struct { 287 + name string 288 + records map[string][]byte 289 + reverse bool 290 + breakAfter int 291 + expectedKeys []string 292 + }{ 293 + { 294 + name: "exit after first record reverse", 295 + records: map[string][]byte{"aaa": []byte(`{"trackName":"a"}`), "bbb": []byte(`{"trackName":"b"}`), "ccc": []byte(`{"trackName":"c"}`)}, 296 + reverse: true, 297 + breakAfter: 1, 298 + expectedKeys: []string{"ccc"}, 299 + }, 300 + { 301 + name: "exit after two records reverse", 302 + records: map[string][]byte{"aaa": []byte(`{"trackName":"a"}`), "bbb": []byte(`{"trackName":"b"}`), "ccc": []byte(`{"trackName":"c"}`)}, 303 + reverse: true, 304 + breakAfter: 2, 305 + expectedKeys: []string{"ccc", "bbb"}, 306 + }, 307 + } 308 + 309 + for _, tt := range tests { 310 + t.Run(tt.name, func(t *testing.T) { 311 + storage := newTestStorage(t) 312 + did := testDID(t, "exit") 313 + 314 + if err := storage.SaveRecords(did, tt.records); err != nil { 315 + t.Fatalf("SaveRecords failed: %v", err) 316 + } 317 + 318 + var keys []string 319 + for key, rec := range storage.IterateUnpublished(did, tt.reverse) { 320 + keys = append(keys, key) 321 + _ = rec 322 + if len(keys) >= tt.breakAfter { 323 + break 324 + } 325 + } 326 + 327 + if !slices.Equal(keys, tt.expectedKeys) { 328 + t.Errorf("expected keys %v, got %v", tt.expectedKeys, keys) 329 + } 330 + }) 331 + } 332 + } 333 + 334 + func TestIterateFailed(t *testing.T) { 335 + tests := []struct { 336 + name string 337 + setupStorage func(*BoltStorage, string) error 338 + wantCount int 339 + wantKeys []string 340 + }{ 341 + { 342 + name: "returns failed records", 343 + setupStorage: func(s *BoltStorage, did string) error { 344 + records := map[string][]byte{ 345 + "key1": []byte(`{"trackName":"a"}`), 346 + "key2": []byte(`{"trackName":"b"}`), 347 + "key3": []byte(`{"trackName":"c"}`), 348 + } 349 + if err := s.SaveRecords(did, records); err != nil { 350 + return err 351 + } 352 + return s.MarkFailed(did, []string{"key1", "key3"}, "timeout error") 353 + }, 354 + wantCount: 2, 355 + wantKeys: []string{"key1", "key3"}, 356 + }, 357 + { 358 + name: "handles empty failed set", 359 + setupStorage: func(s *BoltStorage, did string) error { 360 + records := map[string][]byte{ 361 + "key1": []byte(`{"trackName":"a"}`), 362 + } 363 + return s.SaveRecords(did, records) 364 + }, 365 + wantCount: 0, 366 + wantKeys: nil, 367 + }, 368 + { 369 + name: "handles non-existent DID", 370 + setupStorage: func(s *BoltStorage, did string) error { 371 + return nil 372 + }, 373 + wantCount: 0, 374 + wantKeys: nil, 375 + }, 376 + { 377 + name: "returns error messages", 378 + setupStorage: func(s *BoltStorage, did string) error { 379 + records := map[string][]byte{ 380 + "key1": []byte(`{"trackName":"a"}`), 381 + } 382 + if err := s.SaveRecords(did, records); err != nil { 383 + return err 384 + } 385 + return s.MarkFailed(did, []string{"key1"}, "custom error message") 386 + }, 387 + wantCount: 1, 388 + wantKeys: []string{"key1"}, 389 + }, 390 + } 391 + 392 + for _, tt := range tests { 393 + t.Run(tt.name, func(t *testing.T) { 394 + storage := newTestStorage(t) 395 + did := testDID(t, "failed") 396 + 397 + if err := tt.setupStorage(storage, did); err != nil { 398 + t.Fatalf("setupStorage failed: %v", err) 399 + } 400 + 401 + var count int 402 + var keys []string 403 + iterateFailed := storage.IterateFailed(did) 404 + iterateFailed(func(key string, rec []byte, errMsg string) bool { 405 + count++ 406 + keys = append(keys, key) 407 + return true 408 + }) 409 + 410 + if count != tt.wantCount { 411 + t.Errorf("IterateFailed() returned %d records, want %d", count, tt.wantCount) 412 + } 413 + 414 + if len(keys) != len(tt.wantKeys) { 415 + t.Errorf("IterateFailed() returned %d keys, want %d", len(keys), len(tt.wantKeys)) 416 + return 417 + } 418 + 419 + for i, key := range keys { 420 + if key != tt.wantKeys[i] { 421 + t.Errorf("IterateFailed() key[%d] = %s, want %s", i, key, tt.wantKeys[i]) 422 + } 423 + } 424 + }) 425 + } 426 + } 427 + 428 + func TestIterateFailedWithEarlyExit(t *testing.T) { 429 + storage := newTestStorage(t) 430 + did := testDID(t, "failed-exit") 431 + 432 + records := map[string][]byte{ 433 + "key1": []byte(`{"trackName":"a"}`), 434 + "key2": []byte(`{"trackName":"b"}`), 435 + "key3": []byte(`{"trackName":"c"}`), 436 + } 437 + if err := storage.SaveRecords(did, records); err != nil { 438 + t.Fatalf("SaveRecords failed: %v", err) 439 + } 440 + if err := storage.MarkFailed(did, []string{"key1", "key2", "key3"}, "error"); err != nil { 441 + t.Fatalf("MarkFailed failed: %v", err) 442 + } 443 + 444 + // Exit after 2 records 445 + var count int 446 + iterateFailed := storage.IterateFailed(did) 447 + iterateFailed(func(key string, rec []byte, errMsg string) bool { 448 + count++ 449 + return count < 2 // Exit after 2 records 450 + }) 451 + 452 + if count != 2 { 453 + t.Errorf("IterateFailed() returned %d records with early exit, want 2", count) 454 + } 455 + } 456 + 457 + func TestIterateFailedPreservesRecordData(t *testing.T) { 458 + storage := newTestStorage(t) 459 + did := testDID(t, "failed-data") 460 + 461 + records := map[string][]byte{ 462 + "key1": []byte(`{"trackName":"Test Track","artist":"Test Artist"}`), 463 + } 464 + if err := storage.SaveRecords(did, records); err != nil { 465 + t.Fatalf("SaveRecords failed: %v", err) 466 + } 467 + if err := storage.MarkFailed(did, []string{"key1"}, "network error"); err != nil { 468 + t.Fatalf("MarkFailed failed: %v", err) 469 + } 470 + 471 + var gotRecord []byte 472 + var gotErrMsg string 473 + iterateFailed := storage.IterateFailed(did) 474 + iterateFailed(func(key string, rec []byte, errMsg string) bool { 475 + if key == "key1" { 476 + gotRecord = rec 477 + gotErrMsg = errMsg 478 + } 479 + return true 480 + }) 481 + 482 + expectedRecord := []byte(`{"trackName":"Test Track","artist":"Test Artist"}`) 483 + if string(gotRecord) != string(expectedRecord) { 484 + t.Errorf("IterateFailed() record = %s, want %s", string(gotRecord), string(expectedRecord)) 485 + } 486 + 487 + if gotErrMsg != "network error" { 488 + t.Errorf("IterateFailed() errMsg = %s, want 'network error'", gotErrMsg) 489 + } 490 + }
+37 -12
cache/storage.go
··· 1 1 package cache 2 2 3 3 import ( 4 + "iter" 4 5 "time" 5 6 ) 6 7 7 - type Storage interface { 8 + // RecordStore handles record persistence operations. 9 + // Iterator methods use iter.Seq2 for iteration, with keys and values 10 + // cloned to memory to avoid holding read locks during iteration. 11 + type RecordStore interface { 12 + // SaveRecords stores records for a given DID. 8 13 SaveRecords(did string, records map[string][]byte) error 9 - IterateUnpublished(did string, fn func(key string, rec []byte) error) error 10 - IteratePublished(did string, fn func(key string, rec []byte) error) error 11 - IterateFailed(did string, fn func(key string, rec []byte, errMsg string) error) error 14 + 15 + // IterateUnpublished iterates over unpublished records. 16 + // The iterator yields (key, record) pairs. 17 + // If reverse is true, iterates in reverse order using Cursor.Last/Cursor.Prev. 18 + IterateUnpublished(did string, reverse bool) iter.Seq2[string, []byte] 19 + 20 + // IteratePublished iterates over published records. 21 + // The iterator yields (key, record) pairs. 22 + // If reverse is true, iterates in reverse order using Cursor.Last/Cursor.Prev. 23 + IteratePublished(did string, reverse bool) iter.Seq2[string, []byte] 24 + 25 + // IterateFailed iterates over failed records. 26 + // Returns a function that yields (key, record, errorMessage) triples. 27 + IterateFailed(did string) func(yield func(key string, rec []byte, errMsg string) bool) 28 + 12 29 MarkPublished(did string, keys ...string) error 13 30 MarkFailed(did string, keys []string, err string) error 14 31 RemoveFailed(did string, keys ...string) error 15 32 GetPublished(did string) (map[string]bool, error) 16 33 34 + Clear(did string) error 35 + Close() error 36 + } 37 + 38 + // QuotaStore handles rate limit quota tracking. 39 + type QuotaStore interface { 40 + GetMulti(keys []string) (map[string]int, error) 41 + IncrByMulti(counts map[string]int) error 42 + } 43 + 44 + // Storage combines RecordStore and QuotaStore interfaces. 45 + // Provided for backwards compatibility. 46 + type Storage interface { 47 + RecordStore 48 + QuotaStore 49 + 17 50 IsValid(did string) bool 18 51 Timestamp(did string) (time.Time, error) 19 52 20 - Clear(did string) error 21 53 ClearAll() error 22 - Close() error 23 54 24 55 // Stats returns database statistics 25 56 Stats() (DBStats, error) 26 - 27 - // KVStore implementation 28 - Get(key string) (int, error) 29 - IncrBy(key string, n int) (int, error) 30 - GetMulti(keys []string) (map[string]int, error) 31 - IncrByMulti(counts map[string]int) error 32 57 } 33 58 34 59 type DBStats struct {
+9 -22
flake.lock
··· 2 2 "nodes": { 3 3 "flake-parts": { 4 4 "inputs": { 5 - "nixpkgs-lib": "nixpkgs-lib" 5 + "nixpkgs-lib": [ 6 + "nixpkgs" 7 + ] 6 8 }, 7 9 "locked": { 8 - "lastModified": 1768135262, 9 - "narHash": "sha256-PVvu7OqHBGWN16zSi6tEmPwwHQ4rLPU9Plvs8/1TUBY=", 10 + "lastModified": 1769996383, 11 + "narHash": "sha256-AnYjnFWgS49RlqX7LrC4uA+sCCDBj0Ry/WOJ5XWAsa0=", 10 12 "owner": "hercules-ci", 11 13 "repo": "flake-parts", 12 - "rev": "80daad04eddbbf5a4d883996a73f3f542fa437ac", 14 + "rev": "57928607ea566b5db3ad13af0e57e921e6b12381", 13 15 "type": "github" 14 16 }, 15 17 "original": { ··· 20 22 }, 21 23 "nixpkgs": { 22 24 "locked": { 23 - "lastModified": 1768564909, 24 - "narHash": "sha256-Kell/SpJYVkHWMvnhqJz/8DqQg2b6PguxVWOuadbHCc=", 25 + "lastModified": 1770115704, 26 + "narHash": "sha256-KHFT9UWOF2yRPlAnSXQJh6uVcgNcWlFqqiAZ7OVlHNc=", 25 27 "owner": "NixOS", 26 28 "repo": "nixpkgs", 27 - "rev": "e4bae1bd10c9c57b2cf517953ab70060a828ee6f", 29 + "rev": "e6eae2ee2110f3d31110d5c222cd395303343b08", 28 30 "type": "github" 29 31 }, 30 32 "original": { 31 33 "owner": "NixOS", 32 34 "ref": "nixos-unstable", 33 35 "repo": "nixpkgs", 34 - "type": "github" 35 - } 36 - }, 37 - "nixpkgs-lib": { 38 - "locked": { 39 - "lastModified": 1765674936, 40 - "narHash": "sha256-k00uTP4JNfmejrCLJOwdObYC9jHRrr/5M/a/8L2EIdo=", 41 - "owner": "nix-community", 42 - "repo": "nixpkgs.lib", 43 - "rev": "2075416fcb47225d9b68ac469a5c4801a9c4dd85", 44 - "type": "github" 45 - }, 46 - "original": { 47 - "owner": "nix-community", 48 - "repo": "nixpkgs.lib", 49 36 "type": "github" 50 37 } 51 38 },
+3 -2
flake.nix
··· 1 1 { 2 2 inputs = { 3 3 flake-parts.url = "github:hercules-ci/flake-parts"; 4 + flake-parts.inputs.nixpkgs-lib.follows = "nixpkgs"; 4 5 nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; 5 6 }; 6 7 outputs = ··· 17 18 let 18 19 lazuli = pkgs.buildGoModule rec { 19 20 name = "lazuli"; 20 - version = "v0.2.0"; 21 + version = "v0.2.1"; 21 22 src = pkgs.nix-gitignore.gitignoreSource [ "*.csv" "*.zip" "*.json" ] ./.; 22 - vendorHash = "sha256-KnWoZ5UK8eigYw5uMSsLu4DIhzkSXmVHaE51Mr6hFmA="; 23 + vendorHash = "sha256-emSCQ/WULVZ8qQ630C1bEV8hVCj2FPidws7zhDTERP4="; 23 24 ldflags = [ 24 25 "-X" 25 26 "main.Version=${version}"
+1 -1
go.mod
··· 3 3 go 1.25.5 4 4 5 5 require ( 6 - github.com/bluesky-social/indigo v0.0.0-20260122235001-7f2e6b43efbb 6 + github.com/bluesky-social/indigo v0.0.0-20260202181658-ea3d39eec464 7 7 github.com/failsafe-go/failsafe-go v0.9.5 8 8 github.com/urfave/cli/v3 v3.6.2 9 9 go.etcd.io/bbolt v1.4.3
+3
go.sum
··· 4 4 github.com/bits-and-blooms/bitset v1.24.4/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= 5 5 github.com/bluesky-social/indigo v0.0.0-20260122235001-7f2e6b43efbb h1:3FvzRkxe85/HsnQubXgdg8Vf38J5d1Sk9XmOkm2TCvY= 6 6 github.com/bluesky-social/indigo v0.0.0-20260122235001-7f2e6b43efbb/go.mod h1:KIy0FgNQacp4uv2Z7xhNkV3qZiUSGuRky97s7Pa4v+o= 7 + github.com/bluesky-social/indigo v0.0.0-20260202181658-ea3d39eec464 h1:jL6cPOk1CZ8H06sEn+WFGWufHmqkawsGyDRl+BJhQjs= 8 + github.com/bluesky-social/indigo v0.0.0-20260202181658-ea3d39eec464/go.mod h1:VG/LeqLGNI3Ew7lsYixajnZGFfWPv144qbUddh+Oyag= 7 9 github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 8 10 github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 9 11 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= ··· 14 16 github.com/failsafe-go/failsafe-go v0.9.5/go.mod h1:IeRpglkcwzKagjDMh90ZhN2l4Ovt3+jemQBUbThag54= 15 17 github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= 16 18 github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= 19 + github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= 17 20 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= 18 21 github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= 19 22 github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY=
+65 -58
main.go
··· 10 10 "io/fs" 11 11 "log/slog" 12 12 "os" 13 - "os/signal" 14 - "runtime/pprof" 15 13 "slices" 16 14 "strings" 17 15 "time" ··· 53 51 type App struct { 54 52 log *slog.Logger 55 53 outputFormat string 56 - storage cache.Storage 57 54 } 58 55 59 56 func main() { ··· 66 63 } 67 64 68 65 func run() error { 69 - storage, err := cache.NewBoltStorage() 70 - if err != nil { 71 - return fmt.Errorf("open cache: %w", err) 72 - } 73 - 74 - cpuFile, _ := os.Create("cpu.prof") 75 - _ = pprof.StartCPUProfile(cpuFile) 76 - defer pprof.StopCPUProfile() // Ensures profile is written when main exits 77 - 78 - c := make(chan os.Signal, 1) 79 - signal.Notify(c, os.Interrupt) 80 - 81 - go func() { 82 - <-c 83 - fmt.Println("\nInterrupt received, saving profile and exiting...") 84 - pprof.StopCPUProfile() 85 - _ = cpuFile.Close() 86 - os.Exit(0) 87 - }() 88 - 89 - app := &App{storage: storage} 90 - 66 + app := &App{} 91 67 cmd := &cli.Command{ 92 68 Name: "lazuli", 93 69 Usage: "Import Last.fm and Spotify listening history to Bluesky", ··· 101 77 app.dedupeCommand(), 102 78 app.debugCommand(), 103 79 app.versionCommand(), 104 - }, 105 - After: func(ctx context.Context, cmd *cli.Command) error { 106 - return storage.Close() 107 80 }, 108 81 } 109 82 ··· 235 208 } 236 209 237 210 func (a *App) runStats(ctx context.Context, cmd *cli.Command) error { 238 - stats, err := a.storage.Stats() 211 + // Use read-only storage for stats to allow viewing while main process has it open 212 + statsStorage, err := cache.NewBoltStorage(true) 213 + if err != nil { 214 + return fmt.Errorf("open read-only cache: %w", err) 215 + } 216 + defer statsStorage.Close() 217 + 218 + stats, err := statsStorage.Stats() 239 219 if err != nil { 240 220 return fmt.Errorf("failed to get database stats: %w", err) 241 221 } 242 222 243 - limiter := sync.NewRateLimiter(a.storage, 1) 223 + limiter := sync.NewRateLimiter(statsStorage, 1) 244 224 writes, global, err := limiter.Stats() 245 225 if err != nil { 246 226 return fmt.Errorf("failed to get rate limit stats: %w", err) ··· 289 269 } 290 270 291 271 func (a *App) runRetry(ctx context.Context, cmd *cli.Command) error { 272 + storage, err := cache.NewBoltStorage(false) 273 + if err != nil { 274 + return fmt.Errorf("open cache: %w", err) 275 + } 276 + defer storage.Close() 277 + 292 278 authClient, err := a.prepareAuth(ctx, cmd) 293 279 if err != nil { 294 280 return err ··· 296 282 did := authClient.DID() 297 283 dryRun := cmd.Bool("dry-run") 298 284 299 - limiter := sync.NewRateLimiter(a.storage, 0.9) 285 + limiter := sync.NewRateLimiter(storage, 0.9) 300 286 repoClient := sync.NewRateClient(authClient.APIClient(), did, limiter) 301 287 302 288 var failedRecords []struct { ··· 304 290 rec sync.PlayRecord 305 291 } 306 292 307 - err = a.storage.IterateFailed(did, func(key string, rec []byte, errMsg string) error { 293 + iterateFailed := storage.IterateFailed(did) 294 + iterateFailed(func(key string, rec []byte, errMsg string) bool { 308 295 var playRec sync.PlayRecord 309 296 if err := json.Unmarshal(rec, &playRec); err != nil { 310 - return nil 297 + return true // continue 311 298 } 312 299 failedRecords = append(failedRecords, struct { 313 300 key string 314 301 rec sync.PlayRecord 315 302 }{key, playRec}) 316 - return nil 303 + return true // continue 317 304 }) 318 - if err != nil { 319 - return fmt.Errorf("failed to load failed records: %w", err) 320 - } 321 305 322 306 if len(failedRecords) == 0 { 323 307 fmt.Println("No failed records to retry.") ··· 336 320 continue 337 321 } 338 322 339 - res := sync.PublishBatch(ctx, repoClient, did, []*sync.PlayRecord{&fr.rec}, a.storage, sync.DefaultClientAgent) 323 + res := sync.PublishBatch(ctx, repoClient, did, []*sync.PlayRecord{&fr.rec}, storage, sync.DefaultClientAgent) 340 324 341 325 if res == nil { 342 326 fmt.Printf("Successfully retried: %s - %s\n", fr.rec.ArtistName(), fr.rec.TrackName) 343 - if err := a.storage.MarkPublished(did, fr.key); err != nil { 327 + if err := storage.MarkPublished(did, fr.key); err != nil { 344 328 a.log.Error("Failed to mark record as published", sync.ErrorAttr(err), slog.String("key", fr.key)) 345 329 } 346 - if err := a.storage.RemoveFailed(did, fr.key); err != nil { 330 + if err := storage.RemoveFailed(did, fr.key); err != nil { 347 331 a.log.Error("Failed to remove record from failed list", sync.ErrorAttr(err), slog.String("key", fr.key)) 348 332 } 349 333 successCount++ ··· 358 342 } 359 343 360 344 func (a *App) runFailed(ctx context.Context, cmd *cli.Command) error { 345 + storage, err := cache.NewBoltStorage(true) 346 + if err != nil { 347 + return fmt.Errorf("open read-only cache: %w", err) 348 + } 349 + defer storage.Close() 350 + 361 351 authClient, err := a.prepareAuth(ctx, cmd) 362 352 if err != nil { 363 353 return err ··· 371 361 } 372 362 373 363 var failed []FailedRecord 374 - err = a.storage.IterateFailed(did, func(key string, rec []byte, errMsg string) error { 364 + iterateFailed := storage.IterateFailed(did) 365 + iterateFailed(func(key string, rec []byte, errMsg string) bool { 375 366 var playRec sync.PlayRecord 376 367 _ = json.Unmarshal(rec, &playRec) 377 368 failed = append(failed, FailedRecord{ ··· 379 370 Error: errMsg, 380 371 Record: playRec, 381 372 }) 382 - return nil 373 + return true // continue 383 374 }) 384 - if err != nil { 385 - return fmt.Errorf("failed to iterate failed records: %w", err) 386 - } 387 375 388 376 if a.outputFormat == "json" { 389 377 data, _ := json.MarshalIndent(failed, "", " ") ··· 508 496 } 509 497 510 498 func (a *App) runImport(ctx context.Context, cmd *cli.Command) error { 499 + storage, err := cache.NewBoltStorage(false) 500 + if err != nil { 501 + return fmt.Errorf("open cache: %w", err) 502 + } 503 + defer storage.Close() 504 + 511 505 handle, password, err := a.getCredentials(cmd) 512 506 if err != nil { 513 507 return err ··· 525 519 tolerance := cmd.Duration("tolerance") 526 520 527 521 if clearCache { 528 - if err := a.storage.ClearAll(); err != nil { 522 + if err := storage.ClearAll(); err != nil { 529 523 a.log.Error("Failed to clear cache", sync.ErrorAttr(err)) 530 524 } else { 531 525 a.log.Info("Cache cleared") ··· 549 543 } 550 544 a.log.Info("Authenticated", sync.DIDAttr(authClient.DID()), slog.String("pds", authClient.PDS())) 551 545 552 - limiter := sync.NewRateLimiter(a.storage, 0.9) 546 + limiter := sync.NewRateLimiter(storage, 0.9) 553 547 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter) 554 548 555 - existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.DID(), a.storage, fresh) 549 + existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh) 556 550 if err != nil { 557 551 return fmt.Errorf("fetch existing records: %w", err) 558 552 } 559 553 a.log.Info("Fetched existing records", slog.Int("count", len(existingRecords))) 560 554 561 - published, _ := a.storage.GetPublished(authClient.DID()) 555 + published, _ := storage.GetPublished(authClient.DID()) 562 556 newRecords := sync.FilterNew(records, existingRecords, published) 563 557 skippedCount := len(records) - len(newRecords) 564 558 a.log.Info("Filtered to new records", ··· 582 576 value, _ := json.Marshal(rec) 583 577 newEntries[key] = value 584 578 } 585 - if err := a.storage.SaveRecords(authClient.DID(), newEntries); err != nil { 579 + if err := storage.SaveRecords(authClient.DID(), newEntries); err != nil { 586 580 return fmt.Errorf("save new records to storage: %w", err) 587 581 } 588 582 } ··· 592 586 publishOpts := sync.PublishOptions{ 593 587 BatchSize: batchSize, 594 588 DryRun: dryRun, 589 + Reverse: reverse, 595 590 ATProtoClient: repoClient, 596 591 ProgressLog: progressLog, 597 592 ClientAgent: fmt.Sprintf("lazuli/%s", Version), 598 - Storage: a.storage, 593 + Storage: storage, 599 594 Limiter: limiter, 600 595 } 601 596 ··· 653 648 } 654 649 655 650 func (a *App) runSync(ctx context.Context, cmd *cli.Command) error { 651 + storage, err := cache.NewBoltStorage(false) 652 + if err != nil { 653 + return fmt.Errorf("open cache: %w", err) 654 + } 655 + defer storage.Close() 656 + 656 657 authClient, err := a.prepareAuth(ctx, cmd) 657 658 if err != nil { 658 659 return err ··· 661 662 fresh := cmd.Bool("fresh") 662 663 a.log.Info("Starting sync operation", sync.DIDAttr(authClient.DID()), slog.Bool("fresh", fresh)) 663 664 664 - limiter := sync.NewRateLimiter(a.storage, 0.85) 665 + limiter := sync.NewRateLimiter(storage, 0.85) 665 666 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter) 666 667 667 668 if fresh { 668 - if err := a.storage.Clear(authClient.DID()); err != nil { 669 + if err := storage.Clear(authClient.DID()); err != nil { 669 670 a.log.Error("Failed to clear cache", sync.ErrorAttr(err)) 670 671 } else { 671 672 a.log.Info("Cache cleared") 672 673 } 673 674 } 674 675 675 - existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.DID(), a.storage, fresh) 676 + existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh) 676 677 if err != nil { 677 678 return fmt.Errorf("fetch existing records: %w", err) 678 679 } ··· 683 684 } 684 685 685 686 func (a *App) runDedupe(ctx context.Context, cmd *cli.Command) error { 687 + storage, err := cache.NewBoltStorage(false) 688 + if err != nil { 689 + return fmt.Errorf("open cache: %w", err) 690 + } 691 + defer storage.Close() 692 + 686 693 authClient, err := a.prepareAuth(ctx, cmd) 687 694 if err != nil { 688 695 return fmt.Errorf("authentication failed: %w", err) ··· 696 703 slog.Bool("dry_run", dryRun), 697 704 slog.Bool("fresh", fresh)) 698 705 699 - limiter := sync.NewRateLimiter(a.storage, 0.9) 706 + limiter := sync.NewRateLimiter(storage, 0.9) 700 707 repoClient := sync.NewRateClient(authClient.APIClient(), authClient.DID(), limiter) 701 708 702 709 if fresh { 703 - if err := a.storage.Clear(authClient.DID()); err != nil { 710 + if err := storage.Clear(authClient.DID()); err != nil { 704 711 a.log.Error("Failed to clear cache", sync.ErrorAttr(err)) 705 712 } else { 706 713 a.log.Info("Cache cleared") 707 714 } 708 715 } 709 716 710 - existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.DID(), a.storage, fresh) 717 + existingRecords, err := sync.FetchExisting(ctx, repoClient, authClient.DID(), storage, fresh) 711 718 if err != nil { 712 719 return fmt.Errorf("failed to fetch existing records: %w", err) 713 720 } ··· 787 794 } 788 795 } 789 796 790 - if err := a.storage.Clear(authClient.DID()); err != nil { 797 + if err := storage.Clear(authClient.DID()); err != nil { 791 798 a.log.Error("Failed to clear cache", "err", err) 792 799 } 793 800
+23 -20
sync/publish.go
··· 59 59 PublishOptions struct { 60 60 BatchSize int 61 61 DryRun bool 62 + Reverse bool 62 63 ATProtoClient ATProtoClient 63 64 ProgressLog func(ProgressReport) 64 - Storage cache.Storage 65 + Storage cache.RecordStore 65 66 Limiter RateLimiter 66 67 ClientAgent string 67 68 RetryDelay time.Duration ··· 85 86 86 87 batchProcessor struct { 87 88 Client ATProtoClient 88 - Storage cache.Storage 89 + Storage cache.RecordStore 89 90 DID string 90 91 ClientAgent string 91 92 DryRun bool ··· 116 117 } 117 118 118 119 // batchRecords iterates through storage and builds record batches 119 - func batchRecords(ctx context.Context, storage cache.Storage, did string, batchSize int) ([]recordBatch, error) { 120 + func batchRecords(ctx context.Context, storage cache.RecordStore, did string, batchSize int, reverse bool) ([]recordBatch, error) { 120 121 var batches []recordBatch 121 122 var currentBatch recordBatch 122 123 123 - err := storage.IterateUnpublished(did, func(key string, rec []byte) error { 124 + for key, rec := range storage.IterateUnpublished(did, reverse) { 124 125 select { 125 126 case <-ctx.Done(): 126 - return ctx.Err() 127 + return nil, ctx.Err() 127 128 default: 128 129 } 129 130 ··· 133 134 if storage != nil { 134 135 _ = storage.MarkFailed(did, []string{key}, "malformed record") 135 136 } 136 - return nil // Skip malformed records 137 + continue // Skip malformed records 137 138 } 138 139 139 140 currentBatch.Records = append(currentBatch.Records, &record) 140 141 currentBatch.Keys = append(currentBatch.Keys, key) 141 142 142 143 if len(currentBatch.Records) >= batchSize { 144 + records := make([]*PlayRecord, len(currentBatch.Records)) 145 + copy(records, currentBatch.Records) 146 + keys := make([]string, len(currentBatch.Keys)) 147 + copy(keys, currentBatch.Keys) 143 148 batches = append(batches, recordBatch{ 144 - Records: append([]*PlayRecord{}, currentBatch.Records...), 145 - Keys: append([]string{}, currentBatch.Keys...), 149 + Records: records, 150 + Keys: keys, 146 151 }) 147 152 currentBatch = recordBatch{} 148 153 } 149 - return nil 150 - }) 151 - if err != nil { 152 - return nil, err 153 154 } 154 155 155 156 // Add the last partial batch if it has records ··· 179 180 } 180 181 } 181 182 182 - err := failsafe.With(DefaultRetryPolicy).WithContext(ctx).Run(func() error { 183 + policy := DefaultRetryPolicy 184 + 185 + err := failsafe.With(policy).WithContext(ctx).Run(func() error { 183 186 return PublishBatch(ctx, processor.Client, processor.DID, batch.Records, processor.Storage, processor.ClientAgent) 184 187 }) 185 188 if err != nil { ··· 229 232 return errorResult(startTime) 230 233 } 231 234 232 - batches, err := batchRecords(ctx, opts.Storage, client.DID(), batchSize) 235 + batches, err := batchRecords(ctx, opts.Storage, client.DID(), batchSize, opts.Reverse) 233 236 if err != nil { 234 237 cancelled := ctx.Err() != nil 235 238 return newPublishResult(0, 0, 0, startTime, cancelled) ··· 339 342 slog.String("rate", formatRate(ratePerMinute(success, time.Since(startTime))))) 340 343 } 341 344 342 - func PublishBatch(ctx context.Context, client ATProtoClient, did string, batch []*PlayRecord, storage cache.Storage, clientAgent string) error { 345 + func PublishBatch(ctx context.Context, client ATProtoClient, did string, batch []*PlayRecord, storage cache.RecordStore, clientAgent string) error { 343 346 if len(batch) == 0 { 344 347 return nil 345 348 } ··· 394 397 published, err := storage.GetPublished(did) 395 398 if err == nil && len(published) > 0 && storage.IsValid(did) { 396 399 records := make([]ExistingRecord, 0, len(published)) 397 - err := storage.IteratePublished(did, func(key string, data []byte) error { 400 + for _, data := range storage.IteratePublished(did, false) { 398 401 var value PlayRecord 399 402 if err := json.Unmarshal(data, &value); err != nil { 400 - return nil 403 + slog.Debug("failed to unmarshal cached record", ErrorAttr(err)) 404 + continue 401 405 } 402 406 records = append(records, ExistingRecord{ 403 407 URI: generateRecordURI(did, &value), 404 408 Value: &value, 405 409 }) 406 - return nil 407 - }) 408 - if err == nil { 410 + } 411 + if len(records) > 0 { 409 412 slog.Debug("loaded from cache", slog.Int("count", len(records))) 410 413 return records, nil 411 414 }
+166 -18
sync/publish_test.go
··· 5 5 "encoding/json" 6 6 "errors" 7 7 "fmt" 8 + "iter" 8 9 "maps" 9 10 "sync" 10 11 "testing" ··· 19 20 20 21 // Mock Storage 21 22 type mockStorage struct { 22 - cache.Storage 23 + cache.RecordStore 23 24 unpublished map[string][]byte 24 25 published map[string]bool 25 26 failed map[string]string ··· 43 44 return nil 44 45 } 45 46 46 - func (m *mockStorage) IterateUnpublished(did string, fn func(key string, rec []byte) error) error { 47 - m.mu.Lock() 48 - keys := make([]string, 0, len(m.unpublished)) 49 - for k := range m.unpublished { 50 - keys = append(keys, k) 47 + func (m *mockStorage) IterateUnpublished(did string, reverse bool) iter.Seq2[string, []byte] { 48 + return func(yield func(key string, rec []byte) bool) { 49 + m.mu.Lock() 50 + keys := make([]string, 0, len(m.unpublished)) 51 + for k := range m.unpublished { 52 + keys = append(keys, k) 53 + } 54 + m.mu.Unlock() 55 + 56 + for _, k := range keys { 57 + m.mu.Lock() 58 + rec, ok := m.unpublished[k] 59 + m.mu.Unlock() 60 + if ok { 61 + if !yield(k, rec) { 62 + return 63 + } 64 + } 65 + } 51 66 } 52 - m.mu.Unlock() 67 + } 53 68 54 - for _, k := range keys { 69 + func (m *mockStorage) IteratePublished(did string, reverse bool) iter.Seq2[string, []byte] { 70 + return func(yield func(key string, rec []byte) bool) { 55 71 m.mu.Lock() 56 - rec, ok := m.unpublished[k] 72 + keys := make([]string, 0, len(m.published)) 73 + for k := range m.published { 74 + keys = append(keys, k) 75 + } 57 76 m.mu.Unlock() 58 - if ok { 59 - if err := fn(k, rec); err != nil { 60 - return err 77 + 78 + for _, k := range keys { 79 + m.mu.Lock() 80 + rec, ok := m.unpublished[k] // Get record data from unpublished map 81 + m.mu.Unlock() 82 + if ok { 83 + if !yield(k, rec) { 84 + return 85 + } 61 86 } 62 87 } 63 88 } 64 - return nil 65 89 } 66 90 67 91 func (m *mockStorage) MarkPublished(did string, keys ...string) error { ··· 228 252 t.Run(tt.name, func(t *testing.T) { 229 253 t.Parallel() 230 254 231 - ctx := context.Background() 255 + ctx := t.Context() 232 256 if tt.ctxCancel { 233 257 var cancel context.CancelFunc 234 258 ctx, cancel = context.WithCancel(ctx) ··· 247 271 storage.unpublished[fmt.Sprintf("key%d", i)] = data 248 272 } 249 273 250 - batches, err := batchRecords(ctx, storage, did, tt.batchSize) 274 + batches, err := batchRecords(ctx, storage, did, tt.batchSize, false) 251 275 252 276 if (err != nil) != tt.wantErr { 253 277 t.Errorf("BuildRecordBatches() error = %v, wantErr %v", err, tt.wantErr) ··· 277 301 } 278 302 } 279 303 304 + func TestIteratePublished(t *testing.T) { 305 + tests := []struct { 306 + name string 307 + setupStorage func() *mockStorage 308 + reverse bool 309 + wantKeys []string 310 + }{ 311 + { 312 + name: "returns only published records", 313 + setupStorage: func() *mockStorage { 314 + s := newMockStorage() 315 + s.unpublished["key1"] = []byte(`{"trackName":"a"}`) 316 + s.unpublished["key2"] = []byte(`{"trackName":"b"}`) 317 + s.unpublished["key3"] = []byte(`{"trackName":"c"}`) 318 + s.published["key1"] = true 319 + s.published["key3"] = true 320 + return s 321 + }, 322 + reverse: false, 323 + wantKeys: []string{"key1", "key3"}, 324 + }, 325 + { 326 + name: "handles empty published set", 327 + setupStorage: func() *mockStorage { 328 + s := newMockStorage() 329 + s.unpublished["key1"] = []byte(`{"trackName":"a"}`) 330 + return s 331 + }, 332 + reverse: false, 333 + wantKeys: nil, 334 + }, 335 + } 336 + 337 + for _, tt := range tests { 338 + t.Run(tt.name, func(t *testing.T) { 339 + t.Parallel() 340 + 341 + storage := tt.setupStorage() 342 + 343 + var gotKeys []string 344 + for key, rec := range storage.IteratePublished("did:test", tt.reverse) { 345 + gotKeys = append(gotKeys, key) 346 + _ = rec 347 + } 348 + 349 + if len(gotKeys) != len(tt.wantKeys) { 350 + t.Errorf("IteratePublished() returned %d keys, want %d", len(gotKeys), len(tt.wantKeys)) 351 + return 352 + } 353 + 354 + for i, key := range gotKeys { 355 + if key != tt.wantKeys[i] { 356 + t.Errorf("IteratePublished() key[%d] = %s, want %s", i, key, tt.wantKeys[i]) 357 + } 358 + } 359 + }) 360 + } 361 + } 362 + 280 363 func TestProcessBatch(t *testing.T) { 281 364 tests := []struct { 282 365 name string ··· 379 462 t.Run(tt.name, func(t *testing.T) { 380 463 t.Parallel() 381 464 382 - ctx := context.Background() 465 + ctx := t.Context() 383 466 result := processBatch(ctx, tt.batch, tt.processor) 384 467 385 468 if result.SuccessCount != tt.wantSuccess { ··· 547 630 wantSuccess: 0, 548 631 wantErrors: 2, 549 632 }, 633 + { 634 + name: "publish with reverse iteration", 635 + opts: PublishOptions{ 636 + BatchSize: 2, 637 + Reverse: true, 638 + Storage: newMockStorage(), 639 + ClientAgent: "test-agent", 640 + }, 641 + records: []PlayRecord{ 642 + {TrackName: "Song 1"}, 643 + {TrackName: "Song 2"}, 644 + {TrackName: "Song 3"}, 645 + }, 646 + wantSuccess: 3, 647 + wantErrors: 0, 648 + }, 649 + { 650 + name: "publish with empty records", 651 + opts: PublishOptions{ 652 + BatchSize: 2, 653 + Storage: newMockStorage(), 654 + ClientAgent: "test-agent", 655 + }, 656 + records: []PlayRecord{}, 657 + wantSuccess: 0, 658 + wantErrors: 0, 659 + }, 660 + { 661 + name: "publish with transient errors and retry", 662 + opts: PublishOptions{ 663 + BatchSize: 1, 664 + Storage: newMockStorage(), 665 + ClientAgent: "test-agent", 666 + }, 667 + records: []PlayRecord{ 668 + {TrackName: "Song 1"}, 669 + }, 670 + setupClient: func() *mockATProtoClient { 671 + return &mockATProtoClient{ 672 + applyWritesFunc: func(ctx context.Context, collection string, records []*PlayRecord) error { 673 + return &atclient.APIError{StatusCode: 500} // Transient error 674 + }, 675 + } 676 + }, 677 + wantSuccess: 0, 678 + wantErrors: 1, 679 + }, 680 + { 681 + name: "publish with storage failure", 682 + opts: PublishOptions{ 683 + BatchSize: 1, 684 + Storage: newFailingStorage(), 685 + ClientAgent: "test-agent", 686 + }, 687 + records: []PlayRecord{ 688 + {TrackName: "Song 1"}, 689 + }, 690 + wantSuccess: 0, 691 + wantErrors: 1, 692 + }, 550 693 } 551 694 552 695 for _, tt := range tests { 553 696 t.Run(tt.name, func(t *testing.T) { 554 697 synctest.Test(t, func(t *testing.T) { 555 - ctx := context.Background() 698 + ctx := t.Context() 556 699 did := "did:example:123" 557 700 558 - storage := tt.opts.Storage.(*mockStorage) 701 + var storage *mockStorage 702 + if fs, ok := tt.opts.Storage.(*failingStorage); ok { 703 + storage = fs.mockStorage 704 + } else { 705 + storage = tt.opts.Storage.(*mockStorage) 706 + } 559 707 // Add records to storage 560 708 for i, record := range tt.records { 561 709 data, _ := json.Marshal(record)