this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Implement kvstore in tarfiles store

+409 -6
+117 -6
cmd/butterfly/store/tarfiles.go
··· 348 348 return contents, nil 349 349 } 350 350 351 - // KvGet retrieves a value from general KV storage (not yet implemented) 351 + // KvGet retrieves a value from general KV storage 352 352 func (t *TarfilesStore) KvGet(namespace string, key string) (string, error) { 353 - return "", fmt.Errorf("KvGet not yet implemented for tarfiles store") 353 + // Sanitize namespace and key to prevent directory traversal 354 + namespace, key, err := sanitizeNamespaceAndKey(namespace, key) 355 + if err != nil { 356 + return "", err 357 + } 358 + 359 + // Build the file path 360 + filePath := filepath.Join(t.Dirpath, namespace, key+".json") 361 + 362 + // Read the file 363 + value, err := os.ReadFile(filePath) 364 + if err != nil { 365 + if os.IsNotExist(err) { 366 + return "", fmt.Errorf("key %q not found in namespace %q", key, namespace) 367 + } 368 + return "", fmt.Errorf("failed to read key file: %w", err) 369 + } 370 + 371 + return string(value), nil 354 372 } 355 373 356 - // KvPut stores a value in general KV storage (not yet implemented) 374 + // KvPut stores a value in general KV storage using atomic file operations 357 375 func (t *TarfilesStore) KvPut(namespace string, key string, value string) error { 358 - return fmt.Errorf("KvPut not yet implemented for tarfiles store") 376 + // Sanitize namespace and key to prevent directory traversal 377 + namespace, key, err := sanitizeNamespaceAndKey(namespace, key) 378 + if err != nil { 379 + return err 380 + } 381 + 382 + t.mu.Lock() 383 + defer t.mu.Unlock() 384 + 385 + // Create the namespace directory if it doesn't exist 386 + namespaceDir := filepath.Join(t.Dirpath, namespace) 387 + if err := os.MkdirAll(namespaceDir, 0755); err != nil { 388 + return fmt.Errorf("failed to create namespace directory: %w", err) 389 + } 390 + 391 + // Write to file 392 + file := filepath.Join(namespaceDir, key+".json") 393 + if err := os.WriteFile(file, []byte(value), 0644); err != nil { 394 + return fmt.Errorf("failed to write file: %w", err) 395 + } 396 + 397 + return nil 359 398 } 360 399 361 - // KvDel deletes a value from general KV storage (not yet implemented) 400 + // KvDel deletes a value from general KV storage 362 401 func (t *TarfilesStore) KvDel(namespace string, key string) error { 363 - return fmt.Errorf("KvDel not yet implemented for tarfiles store") 402 + // Sanitize namespace and key to prevent directory traversal 403 + namespace, key, err := sanitizeNamespaceAndKey(namespace, key) 404 + if err != nil { 405 + return err 406 + } 407 + 408 + t.mu.Lock() 409 + defer t.mu.Unlock() 410 + 411 + // Build the file path 412 + filePath := filepath.Join(t.Dirpath, namespace, key+".json") 413 + 414 + // Remove the file 415 + err = os.Remove(filePath) 416 + if err != nil && !os.IsNotExist(err) { 417 + return fmt.Errorf("failed to delete key file: %w", err) 418 + } 419 + 420 + return nil 421 + } 422 + 423 + // sanitizeNamespaceAndKey sanitizes namespace and key to make them safe for filesystem operations 424 + func sanitizeNamespaceAndKey(namespace, key string) (string, string, error) { 425 + if namespace == "" { 426 + return "", "", fmt.Errorf("namespace cannot be empty") 427 + } 428 + if key == "" { 429 + return "", "", fmt.Errorf("key cannot be empty") 430 + } 431 + 432 + // Replace problematic characters with safe alternatives 433 + replacer := strings.NewReplacer( 434 + "..", "_", // Replace parent directory references 435 + "/", "_", // Replace forward slashes 436 + "\\", "_", // Replace backslashes 437 + "\x00", "_", // Replace null characters 438 + ":", "_", // Replace colons (problematic on some filesystems) 439 + "*", "_", // Replace wildcards 440 + "?", "_", // Replace wildcards 441 + "\"", "_", // Replace quotes 442 + "<", "_", // Replace less than 443 + ">", "_", // Replace greater than 444 + "|", "_", // Replace pipe 445 + "\n", "_", // Replace newlines 446 + "\r", "_", // Replace carriage returns 447 + "\t", "_", // Replace tabs 448 + ) 449 + 450 + // Sanitize namespace and key 451 + namespace = replacer.Replace(namespace) 452 + key = replacer.Replace(key) 453 + 454 + // Ensure they don't start with a dot (hidden files) 455 + if strings.HasPrefix(namespace, ".") { 456 + namespace = "_" + namespace[1:] 457 + } 458 + if strings.HasPrefix(key, ".") { 459 + key = "_" + key[1:] 460 + } 461 + 462 + // Trim any leading/trailing spaces 463 + namespace = strings.TrimSpace(namespace) 464 + key = strings.TrimSpace(key) 465 + 466 + // Final check - make sure they're not empty after sanitization 467 + if namespace == "" { 468 + return "", "", fmt.Errorf("namespace became empty after sanitization") 469 + } 470 + if key == "" { 471 + return "", "", fmt.Errorf("key became empty after sanitization") 472 + } 473 + 474 + return namespace, key, nil 364 475 }
+292
cmd/butterfly/store/tarfiles_test.go
··· 3 3 import ( 4 4 "context" 5 5 "encoding/json" 6 + "fmt" 7 + "os" 6 8 "path/filepath" 7 9 "strings" 10 + "sync" 8 11 "testing" 9 12 "time" 10 13 ··· 396 399 assert.Contains(t, contents, "app.bsky.feed.post/valid.json") 397 400 assert.Contains(t, contents, "app.bsky.feed.post/valid2.json") 398 401 } 402 + 403 + func TestTarfilesStore_KvStore_BasicOperations(t *testing.T) { 404 + tmpDir := t.TempDir() 405 + store := NewTarfilesStore(tmpDir) 406 + 407 + ctx := context.Background() 408 + err := store.Setup(ctx) 409 + require.NoError(t, err) 410 + defer store.Close() 411 + 412 + // Test Put and Get 413 + err = store.KvPut("namespace1", "key1", "value1") 414 + require.NoError(t, err) 415 + 416 + value, err := store.KvGet("namespace1", "key1") 417 + require.NoError(t, err) 418 + assert.Equal(t, "value1", value) 419 + 420 + // Test updating existing key 421 + err = store.KvPut("namespace1", "key1", "updated_value1") 422 + require.NoError(t, err) 423 + 424 + value, err = store.KvGet("namespace1", "key1") 425 + require.NoError(t, err) 426 + assert.Equal(t, "updated_value1", value) 427 + 428 + // Test different namespace 429 + err = store.KvPut("namespace2", "key1", "value2") 430 + require.NoError(t, err) 431 + 432 + value, err = store.KvGet("namespace2", "key1") 433 + require.NoError(t, err) 434 + assert.Equal(t, "value2", value) 435 + 436 + // Verify namespace isolation 437 + value, err = store.KvGet("namespace1", "key1") 438 + require.NoError(t, err) 439 + assert.Equal(t, "updated_value1", value) 440 + 441 + // Verify files exist on disk 442 + file1 := filepath.Join(tmpDir, "namespace1", "key1.json") 443 + assert.FileExists(t, file1) 444 + file2 := filepath.Join(tmpDir, "namespace2", "key1.json") 445 + assert.FileExists(t, file2) 446 + } 447 + 448 + func TestTarfilesStore_KvStore_GetNonExistent(t *testing.T) { 449 + tmpDir := t.TempDir() 450 + store := NewTarfilesStore(tmpDir) 451 + 452 + ctx := context.Background() 453 + err := store.Setup(ctx) 454 + require.NoError(t, err) 455 + defer store.Close() 456 + 457 + // Test get non-existent key 458 + _, err = store.KvGet("namespace1", "nonexistent") 459 + require.Error(t, err) 460 + assert.Contains(t, err.Error(), "not found") 461 + 462 + // Test get from non-existent namespace 463 + _, err = store.KvGet("nonexistent_namespace", "key1") 464 + require.Error(t, err) 465 + assert.Contains(t, err.Error(), "not found") 466 + } 467 + 468 + func TestTarfilesStore_KvStore_Delete(t *testing.T) { 469 + tmpDir := t.TempDir() 470 + store := NewTarfilesStore(tmpDir) 471 + 472 + ctx := context.Background() 473 + err := store.Setup(ctx) 474 + require.NoError(t, err) 475 + defer store.Close() 476 + 477 + // Put a value 478 + err = store.KvPut("namespace1", "key1", "value1") 479 + require.NoError(t, err) 480 + 481 + // Verify it exists 482 + value, err := store.KvGet("namespace1", "key1") 483 + require.NoError(t, err) 484 + assert.Equal(t, "value1", value) 485 + 486 + // Verify file exists 487 + filePath := filepath.Join(tmpDir, "namespace1", "key1.json") 488 + assert.FileExists(t, filePath) 489 + 490 + // Delete it 491 + err = store.KvDel("namespace1", "key1") 492 + require.NoError(t, err) 493 + 494 + // Verify it's gone 495 + _, err = store.KvGet("namespace1", "key1") 496 + require.Error(t, err) 497 + assert.Contains(t, err.Error(), "not found") 498 + 499 + // Verify file is gone 500 + assert.NoFileExists(t, filePath) 501 + 502 + // Delete non-existent key should not error 503 + err = store.KvDel("namespace1", "nonexistent") 504 + require.NoError(t, err) 505 + 506 + // Delete from non-existent namespace should not error 507 + err = store.KvDel("nonexistent_namespace", "key1") 508 + require.NoError(t, err) 509 + } 510 + 511 + func TestTarfilesStore_KvStore_Sanitization(t *testing.T) { 512 + tmpDir := t.TempDir() 513 + store := NewTarfilesStore(tmpDir) 514 + 515 + ctx := context.Background() 516 + err := store.Setup(ctx) 517 + require.NoError(t, err) 518 + defer store.Close() 519 + 520 + // Test with problematic characters that should be sanitized 521 + testCases := []struct { 522 + namespace string 523 + key string 524 + value string 525 + }{ 526 + {"namespace/with/slashes", "key/with/slashes", "value1"}, 527 + {"namespace\\with\\backslashes", "key\\with\\backslashes", "value2"}, 528 + {"namespace:with:colons", "key:with:colons", "value3"}, 529 + {"namespace..with..dots", "key..with..dots", "value4"}, 530 + {".hidden_namespace", ".hidden_key", "value5"}, 531 + {"namespace*with?wildcards", "key*with?wildcards", "value6"}, 532 + {"namespace\nwith\nnewlines", "key\twith\ttabs", "value7"}, 533 + {"namespace with spaces", "key with spaces", "value8"}, 534 + } 535 + 536 + for _, tc := range testCases { 537 + err := store.KvPut(tc.namespace, tc.key, tc.value) 538 + require.NoError(t, err, "Failed to put %s/%s", tc.namespace, tc.key) 539 + 540 + value, err := store.KvGet(tc.namespace, tc.key) 541 + require.NoError(t, err, "Failed to get %s/%s", tc.namespace, tc.key) 542 + assert.Equal(t, tc.value, value) 543 + } 544 + 545 + // Verify sanitized filenames exist 546 + entries, err := os.ReadDir(tmpDir) 547 + require.NoError(t, err) 548 + 549 + // Should have multiple namespace directories (sanitized) 550 + assert.Greater(t, len(entries), 0) 551 + 552 + // Check that no directory contains problematic characters 553 + for _, entry := range entries { 554 + name := entry.Name() 555 + if name == ".tmp" { 556 + continue // system file, skip 557 + } 558 + assert.NotContains(t, name, "/") 559 + assert.NotContains(t, name, "\\") 560 + assert.NotContains(t, name, "..") 561 + assert.NotContains(t, name, ":") 562 + assert.NotContains(t, name, "*") 563 + assert.NotContains(t, name, "?") 564 + assert.NotContains(t, name, "\n") 565 + assert.NotContains(t, name, "\t") 566 + assert.False(t, strings.HasPrefix(name, ".")) 567 + } 568 + } 569 + 570 + func TestTarfilesStore_KvStore_EmptyValues(t *testing.T) { 571 + tmpDir := t.TempDir() 572 + store := NewTarfilesStore(tmpDir) 573 + 574 + ctx := context.Background() 575 + err := store.Setup(ctx) 576 + require.NoError(t, err) 577 + defer store.Close() 578 + 579 + // Test with empty string value 580 + err = store.KvPut("namespace1", "emptykey", "") 581 + require.NoError(t, err) 582 + 583 + value, err := store.KvGet("namespace1", "emptykey") 584 + require.NoError(t, err) 585 + assert.Equal(t, "", value) 586 + 587 + // Test empty namespace should error 588 + err = store.KvPut("", "key1", "value1") 589 + require.Error(t, err) 590 + assert.Contains(t, err.Error(), "namespace cannot be empty") 591 + 592 + // Test empty key should error 593 + err = store.KvPut("namespace1", "", "value1") 594 + require.Error(t, err) 595 + assert.Contains(t, err.Error(), "key cannot be empty") 596 + } 597 + 598 + func TestTarfilesStore_KvStore_LargeValues(t *testing.T) { 599 + tmpDir := t.TempDir() 600 + store := NewTarfilesStore(tmpDir) 601 + 602 + ctx := context.Background() 603 + err := store.Setup(ctx) 604 + require.NoError(t, err) 605 + defer store.Close() 606 + 607 + // Test with large value (1MB) 608 + largeValue := strings.Repeat("a", 1024*1024) 609 + 610 + err = store.KvPut("namespace1", "largekey", largeValue) 611 + require.NoError(t, err) 612 + 613 + value, err := store.KvGet("namespace1", "largekey") 614 + require.NoError(t, err) 615 + assert.Equal(t, largeValue, value) 616 + } 617 + 618 + func TestTarfilesStore_KvStore_Persistence(t *testing.T) { 619 + tmpDir := t.TempDir() 620 + 621 + // Create store and add data 622 + store1 := NewTarfilesStore(tmpDir) 623 + ctx := context.Background() 624 + err := store1.Setup(ctx) 625 + require.NoError(t, err) 626 + 627 + err = store1.KvPut("namespace1", "key1", "value1") 628 + require.NoError(t, err) 629 + err = store1.KvPut("namespace2", "key2", "value2") 630 + require.NoError(t, err) 631 + 632 + store1.Close() 633 + 634 + // Create new store instance with same directory 635 + store2 := NewTarfilesStore(tmpDir) 636 + err = store2.Setup(ctx) 637 + require.NoError(t, err) 638 + defer store2.Close() 639 + 640 + // Verify data persisted 641 + value, err := store2.KvGet("namespace1", "key1") 642 + require.NoError(t, err) 643 + assert.Equal(t, "value1", value) 644 + 645 + value, err = store2.KvGet("namespace2", "key2") 646 + require.NoError(t, err) 647 + assert.Equal(t, "value2", value) 648 + } 649 + 650 + func TestTarfilesStore_KvStore_ConcurrentAccess(t *testing.T) { 651 + tmpDir := t.TempDir() 652 + store := NewTarfilesStore(tmpDir) 653 + 654 + ctx := context.Background() 655 + err := store.Setup(ctx) 656 + require.NoError(t, err) 657 + defer store.Close() 658 + 659 + // Test concurrent writes to different keys 660 + var wg sync.WaitGroup 661 + numGoroutines := 10 662 + numOperations := 50 663 + 664 + for i := 0; i < numGoroutines; i++ { 665 + wg.Add(1) 666 + go func(goroutineID int) { 667 + defer wg.Done() 668 + for j := 0; j < numOperations; j++ { 669 + key := fmt.Sprintf("key_%d_%d", goroutineID, j) 670 + value := fmt.Sprintf("value_%d_%d", goroutineID, j) 671 + err := store.KvPut("concurrent_namespace", key, value) 672 + assert.NoError(t, err) 673 + } 674 + }(i) 675 + } 676 + 677 + wg.Wait() 678 + 679 + // Verify all values were written correctly 680 + for i := 0; i < numGoroutines; i++ { 681 + for j := 0; j < numOperations; j++ { 682 + key := fmt.Sprintf("key_%d_%d", i, j) 683 + expectedValue := fmt.Sprintf("value_%d_%d", i, j) 684 + 685 + value, err := store.KvGet("concurrent_namespace", key) 686 + require.NoError(t, err) 687 + assert.Equal(t, expectedValue, value) 688 + } 689 + } 690 + }