A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 653 lines 20 kB view raw
1"""Live ATProto network integration tests. 2 3These tests connect to the real ATProto network (Bluesky) and verify 4that atdata's atmosphere functionality works correctly with actual API calls. 5 6Requirements: 7 - testing.env file with ATDATA_TEST_HANDLE and ATDATA_TEST_APP_PASSWORD 8 - Network connectivity to bsky.social 9 10Run with: 11 source testing.env && uv run pytest tests/test_integration_atmosphere_live.py -v 12 13These tests are marked with @pytest.mark.network and @pytest.mark.slow. 14To skip in CI: pytest -m "not network" 15""" 16 17import os 18import uuid 19import pytest 20from datetime import datetime 21 22from numpy.typing import NDArray 23 24import atdata 25from atdata.atmosphere import ( 26 AtmosphereClient, 27 AtmosphereIndex, 28 SchemaPublisher, 29 SchemaLoader, 30 DatasetPublisher, 31 DatasetLoader, 32) 33from atdata.atmosphere._types import LEXICON_NAMESPACE 34 35 36## 37# Test Configuration 38 39 40# Prefix for all test records - makes cleanup easier 41TEST_PREFIX = "atdata-live-test" 42 43 44def get_test_credentials(): 45 """Get test credentials from environment.""" 46 handle = os.environ.get("ATDATA_TEST_HANDLE") 47 password = os.environ.get("ATDATA_TEST_APP_PASSWORD") 48 return handle, password 49 50 51def skip_if_no_credentials(): 52 """Skip test if credentials not available.""" 53 handle, password = get_test_credentials() 54 if not handle or not password: 55 pytest.skip( 56 "Live test credentials not configured (set ATDATA_TEST_HANDLE and ATDATA_TEST_APP_PASSWORD)" 57 ) 58 59 60## 61# Test Sample Types 62 63 64@atdata.packable 65class LiveTestSample: 66 """Simple sample for live tests.""" 67 68 name: str 69 value: int 70 71 72@atdata.packable 73class LiveTestArraySample: 74 """Sample with NDArray for live tests.""" 75 76 label: str 77 data: NDArray 78 79 80## 81# Fixtures 82 83 84@pytest.fixture(scope="module") 85def live_client(): 86 """Create authenticated client for live tests. 87 88 This fixture is module-scoped so we reuse the same session 89 across all tests in this file, reducing API calls. 90 """ 91 handle, password = get_test_credentials() 92 if not handle or not password: 93 pytest.skip("Live test credentials not configured") 94 95 client = AtmosphereClient() 96 client.login(handle, password) 97 98 yield client 99 100 # Cleanup: delete test records created during this session 101 # This runs after all tests in the module complete 102 103 104@pytest.fixture(scope="module") 105def live_index(live_client): 106 """Create AtmosphereIndex for live tests.""" 107 return AtmosphereIndex(live_client) 108 109 110@pytest.fixture 111def unique_name(): 112 """Generate a unique name for test records.""" 113 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S") 114 unique_id = uuid.uuid4().hex[:8] 115 return f"{TEST_PREFIX}-{timestamp}-{unique_id}" 116 117 118## 119# Cleanup Utilities 120 121 122def cleanup_test_schemas(client: AtmosphereClient): 123 """Delete all test schema records.""" 124 loader = SchemaLoader(client) 125 deleted = 0 126 failed = 0 127 128 for record in loader.list_all(): 129 name = record.get("value", {}).get("name", "") 130 if TEST_PREFIX in name: 131 uri = record.get("uri", "") 132 if uri: 133 try: 134 client.delete_record(uri) 135 deleted += 1 136 except Exception: 137 failed += 1 # Continue cleanup even if individual deletes fail 138 139 return deleted 140 141 142def cleanup_test_datasets(client: AtmosphereClient): 143 """Delete all test dataset records.""" 144 loader = DatasetLoader(client) 145 deleted = 0 146 failed = 0 147 148 for record in loader.list_all(): 149 name = record.get("value", {}).get("name", "") 150 if TEST_PREFIX in name: 151 uri = record.get("uri", "") 152 if uri: 153 try: 154 client.delete_record(uri) 155 deleted += 1 156 except Exception: 157 failed += 1 # Continue cleanup even if individual deletes fail 158 159 return deleted 160 161 162## 163# Authentication Tests 164 165 166@pytest.mark.network 167@pytest.mark.slow 168class TestLiveAuthentication: 169 """Live tests for authentication flow.""" 170 171 def test_login_succeeds(self): 172 """Should successfully authenticate with valid credentials.""" 173 handle, password = get_test_credentials() 174 skip_if_no_credentials() 175 176 client = AtmosphereClient() 177 client.login(handle, password) 178 179 assert client.is_authenticated 180 assert client.did is not None 181 assert client.did.startswith("did:plc:") 182 183 def test_session_export_import(self): 184 """Should export and restore session.""" 185 handle, password = get_test_credentials() 186 skip_if_no_credentials() 187 188 # Create first client and login 189 client1 = AtmosphereClient() 190 client1.login(handle, password) 191 session_string = client1.export_session() 192 193 assert session_string is not None 194 assert len(session_string) > 0 195 196 # Create second client and restore session 197 client2 = AtmosphereClient() 198 client2.login_with_session(session_string) 199 200 assert client2.is_authenticated 201 assert client2.did == client1.did 202 203 def test_invalid_credentials_raises(self): 204 """Should raise on invalid credentials.""" 205 client = AtmosphereClient() 206 207 with pytest.raises(Exception): 208 client.login("invalid.handle.test", "wrong-password") 209 210 211## 212# Schema Operation Tests 213 214 215@pytest.mark.network 216@pytest.mark.slow 217class TestLiveSchemaOperations: 218 """Live tests for schema publishing and retrieval.""" 219 220 def test_publish_schema(self, live_client, unique_name): 221 """Should publish a schema to ATProto.""" 222 223 # Create a unique sample type for this test 224 @atdata.packable 225 class UniqueTestSample: 226 name: str 227 count: int 228 229 # Monkey-patch the module name to include test prefix 230 UniqueTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 231 232 publisher = SchemaPublisher(live_client) 233 uri = publisher.publish(UniqueTestSample, version="1.0.0") 234 235 assert uri is not None 236 assert "at://" in str(uri) 237 assert LEXICON_NAMESPACE in str(uri) 238 239 def test_list_schemas(self, live_client): 240 """Should list published schemas.""" 241 loader = SchemaLoader(live_client) 242 schemas = loader.list_all() 243 244 # Should return a list (may be empty if no schemas published) 245 assert isinstance(schemas, list) 246 247 def test_publish_and_retrieve_schema(self, live_client, unique_name): 248 """Should publish then retrieve a schema by URI.""" 249 250 @atdata.packable 251 class RetrievableTestSample: 252 field1: str 253 field2: int 254 255 RetrievableTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 256 257 publisher = SchemaPublisher(live_client) 258 uri = publisher.publish(RetrievableTestSample, version="1.0.0") 259 260 loader = SchemaLoader(live_client) 261 schema = loader.get(str(uri)) 262 263 assert schema is not None 264 # Schema name defaults to just the class name (not full module path) 265 assert schema["name"] == "RetrievableTestSample" 266 assert schema["version"] == "1.0.0" 267 268 field_names = {f["name"] for f in schema["fields"]} 269 assert "field1" in field_names 270 assert "field2" in field_names 271 272 def test_schema_with_ndarray_field(self, live_client, unique_name): 273 """Should publish schema with NDArray field type.""" 274 275 @atdata.packable 276 class ArrayTestSample: 277 label: str 278 embedding: NDArray 279 280 ArrayTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 281 282 publisher = SchemaPublisher(live_client) 283 uri = publisher.publish(ArrayTestSample, version="1.0.0") 284 285 loader = SchemaLoader(live_client) 286 schema = loader.get(str(uri)) 287 288 # Find the embedding field 289 embedding_field = next(f for f in schema["fields"] if f["name"] == "embedding") 290 assert embedding_field is not None 291 # Field type should indicate ndarray 292 assert "ndarray" in embedding_field["fieldType"]["$type"].lower() 293 294 295## 296# Dataset Operation Tests 297 298 299@pytest.mark.network 300@pytest.mark.slow 301class TestLiveDatasetOperations: 302 """Live tests for dataset publishing and retrieval.""" 303 304 def test_publish_dataset_with_urls(self, live_client, unique_name): 305 """Should publish a dataset record with external URLs.""" 306 307 # First publish a schema 308 @atdata.packable 309 class DatasetTestSample: 310 id: int 311 312 DatasetTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 313 314 schema_pub = SchemaPublisher(live_client) 315 schema_uri = schema_pub.publish(DatasetTestSample, version="1.0.0") 316 317 # Now publish dataset with external URLs 318 dataset_pub = DatasetPublisher(live_client) 319 dataset_uri = dataset_pub.publish_with_urls( 320 urls=["https://example.com/test-shard-000000.tar"], 321 schema_uri=str(schema_uri), 322 name=unique_name, 323 description="Test dataset for live integration tests", 324 ) 325 326 assert dataset_uri is not None 327 assert "at://" in str(dataset_uri) 328 329 def test_list_datasets(self, live_client): 330 """Should list published datasets.""" 331 loader = DatasetLoader(live_client) 332 datasets = loader.list_all() 333 334 assert isinstance(datasets, list) 335 336 def test_publish_and_retrieve_dataset(self, live_client, unique_name): 337 """Should publish then retrieve a dataset.""" 338 339 @atdata.packable 340 class RetrievableDatasetSample: 341 value: int 342 343 RetrievableDatasetSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 344 345 # Publish schema 346 schema_pub = SchemaPublisher(live_client) 347 schema_uri = schema_pub.publish(RetrievableDatasetSample, version="1.0.0") 348 349 # Publish dataset 350 test_urls = [ 351 "https://example.com/shard-000000.tar", 352 "https://example.com/shard-000001.tar", 353 ] 354 355 dataset_pub = DatasetPublisher(live_client) 356 dataset_uri = dataset_pub.publish_with_urls( 357 urls=test_urls, 358 schema_uri=str(schema_uri), 359 name=unique_name, 360 description="Retrievable test dataset", 361 tags=["test", "integration"], 362 ) 363 364 # Retrieve dataset 365 loader = DatasetLoader(live_client) 366 dataset = loader.get(str(dataset_uri)) 367 368 assert dataset is not None 369 assert dataset["name"] == unique_name 370 assert dataset["description"] == "Retrievable test dataset" 371 372 def test_to_dataset_with_fake_urls_fails_on_iteration( 373 self, live_client, unique_name 374 ): 375 """Attempting to iterate a dataset with fake URLs should fail. 376 377 This test documents a known limitation: we can publish and retrieve 378 dataset *metadata* with fake URLs, but actual data iteration fails. 379 For true E2E tests, we need either: 380 1. Real external URLs (e.g., S3 with test data) 381 2. ATProto blob storage support (not yet implemented) 382 """ 383 384 @atdata.packable 385 class IterationTestSample: 386 value: int 387 388 IterationTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 389 390 # Publish schema 391 schema_pub = SchemaPublisher(live_client) 392 schema_uri = schema_pub.publish(IterationTestSample, version="1.0.0") 393 394 # Publish dataset with fake URL 395 dataset_pub = DatasetPublisher(live_client) 396 dataset_uri = dataset_pub.publish_with_urls( 397 urls=["https://example.com/fake-shard-000000.tar"], 398 schema_uri=str(schema_uri), 399 name=unique_name, 400 description="Dataset with fake URLs", 401 ) 402 403 # Can retrieve metadata just fine 404 loader = DatasetLoader(live_client) 405 urls = loader.get_urls(str(dataset_uri)) 406 assert urls == ["https://example.com/fake-shard-000000.tar"] 407 408 # But creating a Dataset and iterating should fail 409 # (the URL doesn't actually exist) 410 with pytest.raises(Exception): 411 ds = loader.to_dataset(str(dataset_uri), IterationTestSample) 412 # Attempt to iterate - this should fail when trying to fetch data 413 # Consume the iterator to trigger the network request 414 list(ds.ordered()) 415 416 def test_full_e2e_with_local_fixture(self, live_client, unique_name): 417 """Full E2E: publish schema + dataset, retrieve, iterate over real data. 418 419 This test uses a local file:// URL to test the complete flow: 420 1. Publish schema to ATProto 421 2. Publish dataset record with local file URL 422 3. Retrieve dataset record 423 4. Load data via to_dataset() and iterate 424 5. Verify we get the expected samples 425 """ 426 from pathlib import Path 427 428 # Define sample type matching the fixture 429 @atdata.packable 430 class FixtureSample: 431 id: int 432 name: str 433 value: int 434 435 FixtureSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 436 437 # Get absolute path to test fixture 438 fixture_path = Path(__file__).parent / "fixtures" / "test_samples.tar" 439 if not fixture_path.exists(): 440 pytest.skip("Test fixture not found") 441 fixture_url = f"file://{fixture_path.absolute()}" 442 443 # 1. Publish schema 444 schema_pub = SchemaPublisher(live_client) 445 schema_uri = schema_pub.publish(FixtureSample, version="1.0.0") 446 447 # 2. Publish dataset with local file URL 448 dataset_pub = DatasetPublisher(live_client) 449 dataset_uri = dataset_pub.publish_with_urls( 450 urls=[fixture_url], 451 schema_uri=str(schema_uri), 452 name=unique_name, 453 description="E2E test with real data", 454 ) 455 456 # 3. Retrieve dataset record 457 loader = DatasetLoader(live_client) 458 record = loader.get(str(dataset_uri)) 459 assert record["name"] == unique_name 460 461 # 4. Load and iterate 462 ds = loader.to_dataset(str(dataset_uri), FixtureSample) 463 samples = list(ds.ordered()) 464 465 # 5. Verify data (3 samples in fixture) 466 assert len(samples) == 3 467 468 # Check sample values (batched as lists) 469 assert samples[0].id == [0] 470 assert samples[0].name == ["test_sample_0"] 471 assert samples[0].value == [0] 472 473 assert samples[2].id == [2] 474 assert samples[2].name == ["test_sample_2"] 475 assert samples[2].value == [20] 476 477 def test_blob_storage_roundtrip(self, live_client, unique_name): 478 """Full E2E: upload blob, publish dataset, retrieve and iterate. 479 480 This tests the complete blob storage workflow: 481 1. Create a WebDataset tar in memory using as_wds 482 2. Upload as blob to PDS 483 3. Publish dataset record with blob storage 484 4. Retrieve record and get blob URLs 485 5. Load data via to_dataset() and iterate 486 6. Verify samples match original data 487 """ 488 import io 489 import webdataset as wds 490 491 # Define sample type 492 @atdata.packable 493 class BlobTestSample: 494 id: int 495 message: str 496 497 BlobTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 498 499 # 1. Create WebDataset tar in memory using proper as_wds pattern 500 expected_samples = [ 501 BlobTestSample(id=0, message="hello from blob"), 502 BlobTestSample(id=1, message="blob storage works"), 503 BlobTestSample(id=2, message="atproto is cool"), 504 ] 505 506 tar_buffer = io.BytesIO() 507 with wds.writer.TarWriter(tar_buffer) as sink: 508 for sample in expected_samples: 509 sink.write(sample.as_wds) 510 511 tar_data = tar_buffer.getvalue() 512 513 # 2. Publish schema 514 schema_pub = SchemaPublisher(live_client) 515 schema_uri = schema_pub.publish(BlobTestSample, version="1.0.0") 516 517 # 3. Publish dataset with blob storage 518 dataset_pub = DatasetPublisher(live_client) 519 dataset_uri = dataset_pub.publish_with_blobs( 520 blobs=[tar_data], 521 schema_uri=str(schema_uri), 522 name=unique_name, 523 description="E2E blob storage test", 524 ) 525 526 assert dataset_uri is not None 527 assert "at://" in str(dataset_uri) 528 529 # 4. Retrieve and verify storage type 530 loader = DatasetLoader(live_client) 531 storage_type = loader.get_storage_type(str(dataset_uri)) 532 assert storage_type == "blobs" 533 534 # 5. Get blob URLs 535 blob_urls = loader.get_blob_urls(str(dataset_uri)) 536 assert len(blob_urls) == 1 537 assert "getBlob" in blob_urls[0] 538 539 # 6. Load and iterate 540 ds = loader.to_dataset(str(dataset_uri), BlobTestSample) 541 samples = list(ds.ordered()) 542 543 # 7. Verify data (3 samples) 544 assert len(samples) == 3 545 546 # Check sample values (batched as lists) 547 assert samples[0].id == [0] 548 assert samples[0].message == ["hello from blob"] 549 550 assert samples[1].id == [1] 551 assert samples[1].message == ["blob storage works"] 552 553 assert samples[2].id == [2] 554 assert samples[2].message == ["atproto is cool"] 555 556 557## 558# AtmosphereIndex Tests 559 560 561@pytest.mark.network 562@pytest.mark.slow 563class TestLiveAtmosphereIndex: 564 """Live tests for AtmosphereIndex wrapper.""" 565 566 def test_index_list_datasets(self, live_index): 567 """Should list datasets via AtmosphereIndex.""" 568 datasets = list(live_index.list_datasets()) 569 570 # Should return iterable of entries 571 assert isinstance(datasets, list) 572 573 def test_index_publish_schema(self, live_index, unique_name): 574 """Should publish schema via AtmosphereIndex.""" 575 576 @atdata.packable 577 class IndexTestSample: 578 data: str 579 580 IndexTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 581 582 schema_ref = live_index.publish_schema(IndexTestSample, version="1.0.0") 583 584 assert schema_ref is not None 585 assert "at://" in str(schema_ref) 586 587 def test_index_get_schema(self, live_index, unique_name): 588 """Should retrieve schema via AtmosphereIndex.""" 589 590 @atdata.packable 591 class GetSchemaTestSample: 592 field: int 593 594 GetSchemaTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 595 596 schema_ref = live_index.publish_schema(GetSchemaTestSample, version="1.0.0") 597 schema = live_index.get_schema(str(schema_ref)) 598 599 # Schema name defaults to just the class name (not full module path) 600 assert schema["name"] == "GetSchemaTestSample" 601 602 603## 604# Error Handling Tests 605 606 607@pytest.mark.network 608@pytest.mark.slow 609class TestLiveErrorHandling: 610 """Live tests for error handling with real API.""" 611 612 def test_get_nonexistent_record(self, live_client): 613 """Should raise on getting non-existent record.""" 614 loader = SchemaLoader(live_client) 615 616 fake_uri = ( 617 f"at://{live_client.did}/{LEXICON_NAMESPACE}.sampleSchema/nonexistent12345" 618 ) 619 620 with pytest.raises(Exception): 621 loader.get(fake_uri) 622 623 def test_publish_without_auth_raises(self): 624 """Should raise when publishing without authentication.""" 625 client = AtmosphereClient() 626 # Not logged in 627 628 publisher = SchemaPublisher(client) 629 630 with pytest.raises(ValueError, match="authenticated"): 631 publisher.publish(LiveTestSample, version="1.0.0") 632 633 634## 635# Cleanup Test (runs last) 636 637 638@pytest.mark.network 639@pytest.mark.slow 640class TestZZZCleanup: 641 """Cleanup test records. Named ZZZ to run last.""" 642 643 def test_cleanup_test_records(self, live_client): 644 """Clean up all test records created during this test run.""" 645 schemas_deleted = cleanup_test_schemas(live_client) 646 datasets_deleted = cleanup_test_datasets(live_client) 647 648 print( 649 f"\nCleanup: deleted {schemas_deleted} schemas, {datasets_deleted} datasets" 650 ) 651 652 # Just verify cleanup ran without error 653 assert True