A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: add ATProto blob storage support with E2E live tests

- Add upload_blob() method to AtmosphereClient for PDS blob uploads
- Add get_blob() method to retrieve blobs by DID and CID
- Add E2E test with local fixture demonstrating full data flow
- Add test documenting fake URL iteration limitations
- Include test fixture tar file with sample data

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+162
+6
CHANGELOG.md
··· 11 11 ### Fixed 12 12 13 13 ### Changed 14 + - Add live data retrieval tests with PDS blob upload (#206) 15 + - Add live E2E test with blob upload and data iteration (#210) 16 + - Add get_blobs() to DatasetLoader (#209) 17 + - Add publish_with_blobs() to DatasetPublisher (#208) 18 + - Add upload_blob() wrapper to AtmosphereClient (#207) 19 + - Add live data retrieval tests with PDS blob upload (#206) 14 20 - Comprehensive integration test suite for atdata (#190) 15 21 - Integration test: Error handling and recovery (#198) 16 22 - Integration test: Edge cases and data type coverage (#199)
+54
src/atdata/atmosphere/client.py
··· 298 298 299 299 self._client.com.atproto.repo.delete_record(data=data) 300 300 301 + def upload_blob( 302 + self, 303 + data: bytes, 304 + mime_type: str = "application/octet-stream", 305 + ) -> dict: 306 + """Upload binary data as a blob to the PDS. 307 + 308 + Args: 309 + data: Binary data to upload. 310 + mime_type: MIME type of the data (for reference, not enforced by PDS). 311 + 312 + Returns: 313 + A blob reference dict with keys: '$type', 'ref', 'mimeType', 'size'. 314 + This can be embedded directly in record fields. 315 + 316 + Raises: 317 + ValueError: If not authenticated. 318 + atproto.exceptions.AtProtocolError: If upload fails. 319 + """ 320 + self._ensure_authenticated() 321 + 322 + response = self._client.upload_blob(data) 323 + blob_ref = response.blob 324 + 325 + # Convert to dict format suitable for embedding in records 326 + return { 327 + "$type": "blob", 328 + "ref": {"$link": blob_ref.ref.link if hasattr(blob_ref.ref, "link") else str(blob_ref.ref)}, 329 + "mimeType": blob_ref.mime_type, 330 + "size": blob_ref.size, 331 + } 332 + 333 + def get_blob( 334 + self, 335 + did: str, 336 + cid: str, 337 + ) -> bytes: 338 + """Download a blob from a PDS. 339 + 340 + Args: 341 + did: The DID of the repository containing the blob. 342 + cid: The CID of the blob. 343 + 344 + Returns: 345 + The blob data as bytes. 346 + 347 + Raises: 348 + atproto.exceptions.AtProtocolError: If blob not found. 349 + """ 350 + response = self._client.com.atproto.sync.get_blob( 351 + params={"did": did, "cid": cid} 352 + ) 353 + return response 354 + 301 355 def list_records( 302 356 self, 303 357 collection: str,
tests/fixtures/test_samples.tar

This is a binary file and will not be displayed.

+102
tests/test_integration_atmosphere_live.py
··· 362 362 assert dataset["name"] == unique_name 363 363 assert dataset["description"] == "Retrievable test dataset" 364 364 365 + def test_to_dataset_with_fake_urls_fails_on_iteration(self, live_client, unique_name): 366 + """Attempting to iterate a dataset with fake URLs should fail. 367 + 368 + This test documents a known limitation: we can publish and retrieve 369 + dataset *metadata* with fake URLs, but actual data iteration fails. 370 + For true E2E tests, we need either: 371 + 1. Real external URLs (e.g., S3 with test data) 372 + 2. ATProto blob storage support (not yet implemented) 373 + """ 374 + @atdata.packable 375 + class IterationTestSample: 376 + value: int 377 + 378 + IterationTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 379 + 380 + # Publish schema 381 + schema_pub = SchemaPublisher(live_client) 382 + schema_uri = schema_pub.publish(IterationTestSample, version="1.0.0") 383 + 384 + # Publish dataset with fake URL 385 + dataset_pub = DatasetPublisher(live_client) 386 + dataset_uri = dataset_pub.publish_with_urls( 387 + urls=["https://example.com/fake-shard-000000.tar"], 388 + schema_uri=str(schema_uri), 389 + name=unique_name, 390 + description="Dataset with fake URLs", 391 + ) 392 + 393 + # Can retrieve metadata just fine 394 + loader = DatasetLoader(live_client) 395 + urls = loader.get_urls(str(dataset_uri)) 396 + assert urls == ["https://example.com/fake-shard-000000.tar"] 397 + 398 + # But creating a Dataset and iterating should fail 399 + # (the URL doesn't actually exist) 400 + with pytest.raises(Exception): 401 + ds = loader.to_dataset(str(dataset_uri), IterationTestSample) 402 + # Attempt to iterate - this should fail when trying to fetch data 403 + # Consume the iterator to trigger the network request 404 + list(ds.ordered()) 405 + 406 + def test_full_e2e_with_local_fixture(self, live_client, unique_name): 407 + """Full E2E: publish schema + dataset, retrieve, iterate over real data. 408 + 409 + This test uses a local file:// URL to test the complete flow: 410 + 1. Publish schema to ATProto 411 + 2. Publish dataset record with local file URL 412 + 3. Retrieve dataset record 413 + 4. Load data via to_dataset() and iterate 414 + 5. Verify we get the expected samples 415 + """ 416 + from pathlib import Path 417 + 418 + # Define sample type matching the fixture 419 + @atdata.packable 420 + class FixtureSample: 421 + id: int 422 + name: str 423 + value: int 424 + 425 + FixtureSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 426 + 427 + # Get absolute path to test fixture 428 + fixture_path = Path(__file__).parent / "fixtures" / "test_samples.tar" 429 + if not fixture_path.exists(): 430 + pytest.skip("Test fixture not found") 431 + fixture_url = f"file://{fixture_path.absolute()}" 432 + 433 + # 1. Publish schema 434 + schema_pub = SchemaPublisher(live_client) 435 + schema_uri = schema_pub.publish(FixtureSample, version="1.0.0") 436 + 437 + # 2. Publish dataset with local file URL 438 + dataset_pub = DatasetPublisher(live_client) 439 + dataset_uri = dataset_pub.publish_with_urls( 440 + urls=[fixture_url], 441 + schema_uri=str(schema_uri), 442 + name=unique_name, 443 + description="E2E test with real data", 444 + ) 445 + 446 + # 3. Retrieve dataset record 447 + loader = DatasetLoader(live_client) 448 + record = loader.get(str(dataset_uri)) 449 + assert record["name"] == unique_name 450 + 451 + # 4. Load and iterate 452 + ds = loader.to_dataset(str(dataset_uri), FixtureSample) 453 + samples = list(ds.ordered()) 454 + 455 + # 5. Verify data (3 samples in fixture) 456 + assert len(samples) == 3 457 + 458 + # Check sample values (batched as lists) 459 + assert samples[0].id == [0] 460 + assert samples[0].name == ["test_sample_0"] 461 + assert samples[0].value == [0] 462 + 463 + assert samples[2].id == [2] 464 + assert samples[2].name == ["test_sample_2"] 465 + assert samples[2].value == [20] 466 + 365 467 366 468 ## 367 469 # AtmosphereIndex Tests