A loose federation of distributed, typed datasets
1"""Live ATProto network integration tests.
2
3These tests connect to the real ATProto network (Bluesky) and verify
4that atdata's atmosphere functionality works correctly with actual API calls.
5
6Requirements:
7 - testing.env file with ATDATA_TEST_HANDLE and ATDATA_TEST_APP_PASSWORD
8 - Network connectivity to bsky.social
9
10Run with:
11 source testing.env && uv run pytest tests/test_integration_atmosphere_live.py -v
12
13These tests are marked with @pytest.mark.network and @pytest.mark.slow.
14To skip in CI: pytest -m "not network"
15"""
16
17import os
18import uuid
19import pytest
20from datetime import datetime
21
22from numpy.typing import NDArray
23
24import atdata
25from atdata.atmosphere import (
26 AtmosphereClient,
27 AtmosphereIndex,
28 SchemaPublisher,
29 SchemaLoader,
30 DatasetPublisher,
31 DatasetLoader,
32)
33from atdata.atmosphere._types import LEXICON_NAMESPACE
34
35
36##
37# Test Configuration
38
39
40# Prefix for all test records - makes cleanup easier
41TEST_PREFIX = "atdata-live-test"
42
43
44def get_test_credentials():
45 """Get test credentials from environment."""
46 handle = os.environ.get("ATDATA_TEST_HANDLE")
47 password = os.environ.get("ATDATA_TEST_APP_PASSWORD")
48 return handle, password
49
50
51def skip_if_no_credentials():
52 """Skip test if credentials not available."""
53 handle, password = get_test_credentials()
54 if not handle or not password:
55 pytest.skip(
56 "Live test credentials not configured (set ATDATA_TEST_HANDLE and ATDATA_TEST_APP_PASSWORD)"
57 )
58
59
60##
61# Test Sample Types
62
63
64@atdata.packable
65class LiveTestSample:
66 """Simple sample for live tests."""
67
68 name: str
69 value: int
70
71
72@atdata.packable
73class LiveTestArraySample:
74 """Sample with NDArray for live tests."""
75
76 label: str
77 data: NDArray
78
79
80##
81# Fixtures
82
83
84@pytest.fixture(scope="module")
85def live_client():
86 """Create authenticated client for live tests.
87
88 This fixture is module-scoped so we reuse the same session
89 across all tests in this file, reducing API calls.
90 """
91 handle, password = get_test_credentials()
92 if not handle or not password:
93 pytest.skip("Live test credentials not configured")
94
95 client = AtmosphereClient()
96 client.login(handle, password)
97
98 yield client
99
100 # Cleanup: delete test records created during this session
101 # This runs after all tests in the module complete
102
103
104@pytest.fixture(scope="module")
105def live_index(live_client):
106 """Create AtmosphereIndex for live tests."""
107 return AtmosphereIndex(live_client)
108
109
110@pytest.fixture
111def unique_name():
112 """Generate a unique name for test records."""
113 timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
114 unique_id = uuid.uuid4().hex[:8]
115 return f"{TEST_PREFIX}-{timestamp}-{unique_id}"
116
117
118##
119# Cleanup Utilities
120
121
122def cleanup_test_schemas(client: AtmosphereClient):
123 """Delete all test schema records."""
124 loader = SchemaLoader(client)
125 deleted = 0
126 failed = 0
127
128 for record in loader.list_all():
129 name = record.get("value", {}).get("name", "")
130 if TEST_PREFIX in name:
131 uri = record.get("uri", "")
132 if uri:
133 try:
134 client.delete_record(uri)
135 deleted += 1
136 except Exception:
137 failed += 1 # Continue cleanup even if individual deletes fail
138
139 return deleted
140
141
142def cleanup_test_datasets(client: AtmosphereClient):
143 """Delete all test dataset records."""
144 loader = DatasetLoader(client)
145 deleted = 0
146 failed = 0
147
148 for record in loader.list_all():
149 name = record.get("value", {}).get("name", "")
150 if TEST_PREFIX in name:
151 uri = record.get("uri", "")
152 if uri:
153 try:
154 client.delete_record(uri)
155 deleted += 1
156 except Exception:
157 failed += 1 # Continue cleanup even if individual deletes fail
158
159 return deleted
160
161
162##
163# Authentication Tests
164
165
166@pytest.mark.network
167@pytest.mark.slow
168class TestLiveAuthentication:
169 """Live tests for authentication flow."""
170
171 def test_login_succeeds(self):
172 """Should successfully authenticate with valid credentials."""
173 handle, password = get_test_credentials()
174 skip_if_no_credentials()
175
176 client = AtmosphereClient()
177 client.login(handle, password)
178
179 assert client.is_authenticated
180 assert client.did is not None
181 assert client.did.startswith("did:plc:")
182
183 def test_session_export_import(self):
184 """Should export and restore session."""
185 handle, password = get_test_credentials()
186 skip_if_no_credentials()
187
188 # Create first client and login
189 client1 = AtmosphereClient()
190 client1.login(handle, password)
191 session_string = client1.export_session()
192
193 assert session_string is not None
194 assert len(session_string) > 0
195
196 # Create second client and restore session
197 client2 = AtmosphereClient()
198 client2.login_with_session(session_string)
199
200 assert client2.is_authenticated
201 assert client2.did == client1.did
202
203 def test_invalid_credentials_raises(self):
204 """Should raise on invalid credentials."""
205 client = AtmosphereClient()
206
207 with pytest.raises(Exception):
208 client.login("invalid.handle.test", "wrong-password")
209
210
211##
212# Schema Operation Tests
213
214
215@pytest.mark.network
216@pytest.mark.slow
217class TestLiveSchemaOperations:
218 """Live tests for schema publishing and retrieval."""
219
220 def test_publish_schema(self, live_client, unique_name):
221 """Should publish a schema to ATProto."""
222
223 # Create a unique sample type for this test
224 @atdata.packable
225 class UniqueTestSample:
226 name: str
227 count: int
228
229 # Monkey-patch the module name to include test prefix
230 UniqueTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}"
231
232 publisher = SchemaPublisher(live_client)
233 uri = publisher.publish(UniqueTestSample, version="1.0.0")
234
235 assert uri is not None
236 assert "at://" in str(uri)
237 assert LEXICON_NAMESPACE in str(uri)
238
239 def test_list_schemas(self, live_client):
240 """Should list published schemas."""
241 loader = SchemaLoader(live_client)
242 schemas = loader.list_all()
243
244 # Should return a list (may be empty if no schemas published)
245 assert isinstance(schemas, list)
246
247 def test_publish_and_retrieve_schema(self, live_client, unique_name):
248 """Should publish then retrieve a schema by URI."""
249
250 @atdata.packable
251 class RetrievableTestSample:
252 field1: str
253 field2: int
254
255 RetrievableTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}"
256
257 publisher = SchemaPublisher(live_client)
258 uri = publisher.publish(RetrievableTestSample, version="1.0.0")
259
260 loader = SchemaLoader(live_client)
261 schema = loader.get(str(uri))
262
263 assert schema is not None
264 # Schema name defaults to just the class name (not full module path)
265 assert schema["name"] == "RetrievableTestSample"
266 assert schema["version"] == "1.0.0"
267
268 field_names = {f["name"] for f in schema["fields"]}
269 assert "field1" in field_names
270 assert "field2" in field_names
271
272 def test_schema_with_ndarray_field(self, live_client, unique_name):
273 """Should publish schema with NDArray field type."""
274
275 @atdata.packable
276 class ArrayTestSample:
277 label: str
278 embedding: NDArray
279
280 ArrayTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}"
281
282 publisher = SchemaPublisher(live_client)
283 uri = publisher.publish(ArrayTestSample, version="1.0.0")
284
285 loader = SchemaLoader(live_client)
286 schema = loader.get(str(uri))
287
288 # Find the embedding field
289 embedding_field = next(f for f in schema["fields"] if f["name"] == "embedding")
290 assert embedding_field is not None
291 # Field type should indicate ndarray
292 assert "ndarray" in embedding_field["fieldType"]["$type"].lower()
293
294
295##
296# Dataset Operation Tests
297
298
299@pytest.mark.network
300@pytest.mark.slow
301class TestLiveDatasetOperations:
302 """Live tests for dataset publishing and retrieval."""
303
304 def test_publish_dataset_with_urls(self, live_client, unique_name):
305 """Should publish a dataset record with external URLs."""
306
307 # First publish a schema
308 @atdata.packable
309 class DatasetTestSample:
310 id: int
311
312 DatasetTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}"
313
314 schema_pub = SchemaPublisher(live_client)
315 schema_uri = schema_pub.publish(DatasetTestSample, version="1.0.0")
316
317 # Now publish dataset with external URLs
318 dataset_pub = DatasetPublisher(live_client)
319 dataset_uri = dataset_pub.publish_with_urls(
320 urls=["https://example.com/test-shard-000000.tar"],
321 schema_uri=str(schema_uri),
322 name=unique_name,
323 description="Test dataset for live integration tests",
324 )
325
326 assert dataset_uri is not None
327 assert "at://" in str(dataset_uri)
328
329 def test_list_datasets(self, live_client):
330 """Should list published datasets."""
331 loader = DatasetLoader(live_client)
332 datasets = loader.list_all()
333
334 assert isinstance(datasets, list)
335
336 def test_publish_and_retrieve_dataset(self, live_client, unique_name):
337 """Should publish then retrieve a dataset."""
338
339 @atdata.packable
340 class RetrievableDatasetSample:
341 value: int
342
343 RetrievableDatasetSample.__module__ = f"{TEST_PREFIX}.{unique_name}"
344
345 # Publish schema
346 schema_pub = SchemaPublisher(live_client)
347 schema_uri = schema_pub.publish(RetrievableDatasetSample, version="1.0.0")
348
349 # Publish dataset
350 test_urls = [
351 "https://example.com/shard-000000.tar",
352 "https://example.com/shard-000001.tar",
353 ]
354
355 dataset_pub = DatasetPublisher(live_client)
356 dataset_uri = dataset_pub.publish_with_urls(
357 urls=test_urls,
358 schema_uri=str(schema_uri),
359 name=unique_name,
360 description="Retrievable test dataset",
361 tags=["test", "integration"],
362 )
363
364 # Retrieve dataset
365 loader = DatasetLoader(live_client)
366 dataset = loader.get(str(dataset_uri))
367
368 assert dataset is not None
369 assert dataset["name"] == unique_name
370 assert dataset["description"] == "Retrievable test dataset"
371
372 def test_to_dataset_with_fake_urls_fails_on_iteration(
373 self, live_client, unique_name
374 ):
375 """Attempting to iterate a dataset with fake URLs should fail.
376
377 This test documents a known limitation: we can publish and retrieve
378 dataset *metadata* with fake URLs, but actual data iteration fails.
379 For true E2E tests, we need either:
380 1. Real external URLs (e.g., S3 with test data)
381 2. ATProto blob storage support (not yet implemented)
382 """
383
384 @atdata.packable
385 class IterationTestSample:
386 value: int
387
388 IterationTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}"
389
390 # Publish schema
391 schema_pub = SchemaPublisher(live_client)
392 schema_uri = schema_pub.publish(IterationTestSample, version="1.0.0")
393
394 # Publish dataset with fake URL
395 dataset_pub = DatasetPublisher(live_client)
396 dataset_uri = dataset_pub.publish_with_urls(
397 urls=["https://example.com/fake-shard-000000.tar"],
398 schema_uri=str(schema_uri),
399 name=unique_name,
400 description="Dataset with fake URLs",
401 )
402
403 # Can retrieve metadata just fine
404 loader = DatasetLoader(live_client)
405 urls = loader.get_urls(str(dataset_uri))
406 assert urls == ["https://example.com/fake-shard-000000.tar"]
407
408 # But creating a Dataset and iterating should fail
409 # (the URL doesn't actually exist)
410 with pytest.raises(Exception):
411 ds = loader.to_dataset(str(dataset_uri), IterationTestSample)
412 # Attempt to iterate - this should fail when trying to fetch data
413 # Consume the iterator to trigger the network request
414 list(ds.ordered())
415
416 def test_full_e2e_with_local_fixture(self, live_client, unique_name):
417 """Full E2E: publish schema + dataset, retrieve, iterate over real data.
418
419 This test uses a local file:// URL to test the complete flow:
420 1. Publish schema to ATProto
421 2. Publish dataset record with local file URL
422 3. Retrieve dataset record
423 4. Load data via to_dataset() and iterate
424 5. Verify we get the expected samples
425 """
426 from pathlib import Path
427
428 # Define sample type matching the fixture
429 @atdata.packable
430 class FixtureSample:
431 id: int
432 name: str
433 value: int
434
435 FixtureSample.__module__ = f"{TEST_PREFIX}.{unique_name}"
436
437 # Get absolute path to test fixture
438 fixture_path = Path(__file__).parent / "fixtures" / "test_samples.tar"
439 if not fixture_path.exists():
440 pytest.skip("Test fixture not found")
441 fixture_url = f"file://{fixture_path.absolute()}"
442
443 # 1. Publish schema
444 schema_pub = SchemaPublisher(live_client)
445 schema_uri = schema_pub.publish(FixtureSample, version="1.0.0")
446
447 # 2. Publish dataset with local file URL
448 dataset_pub = DatasetPublisher(live_client)
449 dataset_uri = dataset_pub.publish_with_urls(
450 urls=[fixture_url],
451 schema_uri=str(schema_uri),
452 name=unique_name,
453 description="E2E test with real data",
454 )
455
456 # 3. Retrieve dataset record
457 loader = DatasetLoader(live_client)
458 record = loader.get(str(dataset_uri))
459 assert record["name"] == unique_name
460
461 # 4. Load and iterate
462 ds = loader.to_dataset(str(dataset_uri), FixtureSample)
463 samples = list(ds.ordered())
464
465 # 5. Verify data (3 samples in fixture)
466 assert len(samples) == 3
467
468 # Check sample values (batched as lists)
469 assert samples[0].id == [0]
470 assert samples[0].name == ["test_sample_0"]
471 assert samples[0].value == [0]
472
473 assert samples[2].id == [2]
474 assert samples[2].name == ["test_sample_2"]
475 assert samples[2].value == [20]
476
477 def test_blob_storage_roundtrip(self, live_client, unique_name):
478 """Full E2E: upload blob, publish dataset, retrieve and iterate.
479
480 This tests the complete blob storage workflow:
481 1. Create a WebDataset tar in memory using as_wds
482 2. Upload as blob to PDS
483 3. Publish dataset record with blob storage
484 4. Retrieve record and get blob URLs
485 5. Load data via to_dataset() and iterate
486 6. Verify samples match original data
487 """
488 import io
489 import webdataset as wds
490
491 # Define sample type
492 @atdata.packable
493 class BlobTestSample:
494 id: int
495 message: str
496
497 BlobTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}"
498
499 # 1. Create WebDataset tar in memory using proper as_wds pattern
500 expected_samples = [
501 BlobTestSample(id=0, message="hello from blob"),
502 BlobTestSample(id=1, message="blob storage works"),
503 BlobTestSample(id=2, message="atproto is cool"),
504 ]
505
506 tar_buffer = io.BytesIO()
507 with wds.writer.TarWriter(tar_buffer) as sink:
508 for sample in expected_samples:
509 sink.write(sample.as_wds)
510
511 tar_data = tar_buffer.getvalue()
512
513 # 2. Publish schema
514 schema_pub = SchemaPublisher(live_client)
515 schema_uri = schema_pub.publish(BlobTestSample, version="1.0.0")
516
517 # 3. Publish dataset with blob storage
518 dataset_pub = DatasetPublisher(live_client)
519 dataset_uri = dataset_pub.publish_with_blobs(
520 blobs=[tar_data],
521 schema_uri=str(schema_uri),
522 name=unique_name,
523 description="E2E blob storage test",
524 )
525
526 assert dataset_uri is not None
527 assert "at://" in str(dataset_uri)
528
529 # 4. Retrieve and verify storage type
530 loader = DatasetLoader(live_client)
531 storage_type = loader.get_storage_type(str(dataset_uri))
532 assert storage_type == "blobs"
533
534 # 5. Get blob URLs
535 blob_urls = loader.get_blob_urls(str(dataset_uri))
536 assert len(blob_urls) == 1
537 assert "getBlob" in blob_urls[0]
538
539 # 6. Load and iterate
540 ds = loader.to_dataset(str(dataset_uri), BlobTestSample)
541 samples = list(ds.ordered())
542
543 # 7. Verify data (3 samples)
544 assert len(samples) == 3
545
546 # Check sample values (batched as lists)
547 assert samples[0].id == [0]
548 assert samples[0].message == ["hello from blob"]
549
550 assert samples[1].id == [1]
551 assert samples[1].message == ["blob storage works"]
552
553 assert samples[2].id == [2]
554 assert samples[2].message == ["atproto is cool"]
555
556
557##
558# AtmosphereIndex Tests
559
560
561@pytest.mark.network
562@pytest.mark.slow
563class TestLiveAtmosphereIndex:
564 """Live tests for AtmosphereIndex wrapper."""
565
566 def test_index_list_datasets(self, live_index):
567 """Should list datasets via AtmosphereIndex."""
568 datasets = list(live_index.list_datasets())
569
570 # Should return iterable of entries
571 assert isinstance(datasets, list)
572
573 def test_index_publish_schema(self, live_index, unique_name):
574 """Should publish schema via AtmosphereIndex."""
575
576 @atdata.packable
577 class IndexTestSample:
578 data: str
579
580 IndexTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}"
581
582 schema_ref = live_index.publish_schema(IndexTestSample, version="1.0.0")
583
584 assert schema_ref is not None
585 assert "at://" in str(schema_ref)
586
587 def test_index_get_schema(self, live_index, unique_name):
588 """Should retrieve schema via AtmosphereIndex."""
589
590 @atdata.packable
591 class GetSchemaTestSample:
592 field: int
593
594 GetSchemaTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}"
595
596 schema_ref = live_index.publish_schema(GetSchemaTestSample, version="1.0.0")
597 schema = live_index.get_schema(str(schema_ref))
598
599 # Schema name defaults to just the class name (not full module path)
600 assert schema["name"] == "GetSchemaTestSample"
601
602
603##
604# Error Handling Tests
605
606
607@pytest.mark.network
608@pytest.mark.slow
609class TestLiveErrorHandling:
610 """Live tests for error handling with real API."""
611
612 def test_get_nonexistent_record(self, live_client):
613 """Should raise on getting non-existent record."""
614 loader = SchemaLoader(live_client)
615
616 fake_uri = (
617 f"at://{live_client.did}/{LEXICON_NAMESPACE}.sampleSchema/nonexistent12345"
618 )
619
620 with pytest.raises(Exception):
621 loader.get(fake_uri)
622
623 def test_publish_without_auth_raises(self):
624 """Should raise when publishing without authentication."""
625 client = AtmosphereClient()
626 # Not logged in
627
628 publisher = SchemaPublisher(client)
629
630 with pytest.raises(ValueError, match="authenticated"):
631 publisher.publish(LiveTestSample, version="1.0.0")
632
633
634##
635# Cleanup Test (runs last)
636
637
638@pytest.mark.network
639@pytest.mark.slow
640class TestZZZCleanup:
641 """Cleanup test records. Named ZZZ to run last."""
642
643 def test_cleanup_test_records(self, live_client):
644 """Clean up all test records created during this test run."""
645 schemas_deleted = cleanup_test_schemas(live_client)
646 datasets_deleted = cleanup_test_datasets(live_client)
647
648 print(
649 f"\nCleanup: deleted {schemas_deleted} schemas, {datasets_deleted} datasets"
650 )
651
652 # Just verify cleanup ran without error
653 assert True