A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: complete blob storage implementation for atmosphere datasets

DatasetPublisher:
- Add publish_with_blobs() for uploading data as ATProto blobs

DatasetLoader:
- Add get_storage_type() to detect external vs blob storage
- Add get_blobs() to retrieve blob references from records
- Add get_blob_urls() to generate fetchable URLs from blob refs
- Update to_dataset() to support both external and blob storage

AtmosphereClient:
- Fix get_blob() to resolve PDS endpoint from DID document
- Add _resolve_pds_endpoint() for DID → PDS resolution via plc.directory
- Add get_blob_url() for generating direct blob fetch URLs

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+317 -9
+5
CHANGELOG.md
··· 11 11 ### Fixed 12 12 13 13 ### Changed 14 + - Implement full blob storage support for atmosphere datasets (#211) 15 + - Add E2E blob roundtrip test (#215) 16 + - Implement get_blobs() and blob URL generation in DatasetLoader (#214) 17 + - Implement publish_with_blobs() in DatasetPublisher (#213) 18 + - Fix get_blob() - investigate 500 error and fix (#212) 14 19 - Add live data retrieval tests with PDS blob upload (#206) 15 20 - Add live E2E test with blob upload and data iteration (#210) 16 21 - Add get_blobs() to DatasetLoader (#209)
+64 -5
src/atdata/atmosphere/client.py
··· 337 337 ) -> bytes: 338 338 """Download a blob from a PDS. 339 339 340 + This resolves the PDS endpoint from the DID document and fetches 341 + the blob directly from the PDS. 342 + 340 343 Args: 341 344 did: The DID of the repository containing the blob. 342 345 cid: The CID of the blob. ··· 345 348 The blob data as bytes. 346 349 347 350 Raises: 348 - atproto.exceptions.AtProtocolError: If blob not found. 351 + ValueError: If PDS endpoint cannot be resolved. 352 + requests.HTTPError: If blob fetch fails. 353 + """ 354 + import requests 355 + 356 + # Resolve PDS endpoint from DID document 357 + pds_endpoint = self._resolve_pds_endpoint(did) 358 + if not pds_endpoint: 359 + raise ValueError(f"Could not resolve PDS endpoint for {did}") 360 + 361 + # Fetch blob from PDS 362 + url = f"{pds_endpoint}/xrpc/com.atproto.sync.getBlob" 363 + response = requests.get(url, params={"did": did, "cid": cid}) 364 + response.raise_for_status() 365 + return response.content 366 + 367 + def _resolve_pds_endpoint(self, did: str) -> Optional[str]: 368 + """Resolve the PDS endpoint for a DID. 369 + 370 + Args: 371 + did: The DID to resolve. 372 + 373 + Returns: 374 + The PDS service endpoint URL, or None if not found. 375 + """ 376 + import requests 377 + 378 + # For did:plc, query the PLC directory 379 + if did.startswith("did:plc:"): 380 + try: 381 + response = requests.get(f"https://plc.directory/{did}") 382 + response.raise_for_status() 383 + did_doc = response.json() 384 + 385 + for service in did_doc.get("service", []): 386 + if service.get("type") == "AtprotoPersonalDataServer": 387 + return service.get("serviceEndpoint") 388 + except requests.RequestException: 389 + return None 390 + 391 + # For did:web, would need different resolution (not implemented) 392 + return None 393 + 394 + def get_blob_url(self, did: str, cid: str) -> str: 395 + """Get the direct URL for fetching a blob. 396 + 397 + This is useful for passing to WebDataset or other HTTP clients. 398 + 399 + Args: 400 + did: The DID of the repository containing the blob. 401 + cid: The CID of the blob. 402 + 403 + Returns: 404 + The full URL for fetching the blob. 405 + 406 + Raises: 407 + ValueError: If PDS endpoint cannot be resolved. 349 408 """ 350 - response = self._client.com.atproto.sync.get_blob( 351 - params={"did": did, "cid": cid} 352 - ) 353 - return response 409 + pds_endpoint = self._resolve_pds_endpoint(did) 410 + if not pds_endpoint: 411 + raise ValueError(f"Could not resolve PDS endpoint for {did}") 412 + return f"{pds_endpoint}/xrpc/com.atproto.sync.getBlob?did={did}&cid={cid}" 354 413 355 414 def list_records( 356 415 self,
+165 -4
src/atdata/atmosphere/records.py
··· 187 187 validate=False, 188 188 ) 189 189 190 + def publish_with_blobs( 191 + self, 192 + blobs: list[bytes], 193 + schema_uri: str, 194 + *, 195 + name: str, 196 + description: Optional[str] = None, 197 + tags: Optional[list[str]] = None, 198 + license: Optional[str] = None, 199 + metadata: Optional[dict] = None, 200 + mime_type: str = "application/x-tar", 201 + rkey: Optional[str] = None, 202 + ) -> AtUri: 203 + """Publish a dataset with data stored as ATProto blobs. 204 + 205 + This method uploads the provided data as blobs to the PDS and creates 206 + a dataset record referencing them. Suitable for smaller datasets that 207 + fit within blob size limits (typically 50MB per blob, configurable). 208 + 209 + Args: 210 + blobs: List of binary data (e.g., tar shards) to upload as blobs. 211 + schema_uri: AT URI of the schema record. 212 + name: Human-readable dataset name. 213 + description: Human-readable description. 214 + tags: Searchable tags for discovery. 215 + license: SPDX license identifier. 216 + metadata: Arbitrary metadata dictionary. 217 + mime_type: MIME type for the blobs (default: application/x-tar). 218 + rkey: Optional explicit record key. 219 + 220 + Returns: 221 + The AT URI of the created dataset record. 222 + 223 + Note: 224 + Blobs are only retained by the PDS when referenced in a committed 225 + record. This method handles that automatically. 226 + """ 227 + # Upload all blobs 228 + blob_refs = [] 229 + for blob_data in blobs: 230 + blob_ref = self.client.upload_blob(blob_data, mime_type=mime_type) 231 + blob_refs.append(blob_ref) 232 + 233 + # Create storage location with blob references 234 + storage = StorageLocation( 235 + kind="blobs", 236 + blob_refs=blob_refs, 237 + ) 238 + 239 + metadata_bytes: Optional[bytes] = None 240 + if metadata is not None: 241 + metadata_bytes = msgpack.packb(metadata) 242 + 243 + dataset_record = DatasetRecord( 244 + name=name, 245 + schema_ref=schema_uri, 246 + storage=storage, 247 + description=description, 248 + tags=tags or [], 249 + license=license, 250 + metadata=metadata_bytes, 251 + ) 252 + 253 + return self.client.create_record( 254 + collection=f"{LEXICON_NAMESPACE}.record", 255 + record=dataset_record.to_record(), 256 + rkey=rkey, 257 + validate=False, 258 + ) 259 + 190 260 191 261 class DatasetLoader: 192 262 """Loads dataset records from ATProto. ··· 255 325 """ 256 326 return self.client.list_datasets(repo=repo, limit=limit) 257 327 328 + def get_storage_type(self, uri: str | AtUri) -> str: 329 + """Get the storage type of a dataset record. 330 + 331 + Args: 332 + uri: The AT URI of the dataset record. 333 + 334 + Returns: 335 + Either "external" or "blobs". 336 + 337 + Raises: 338 + ValueError: If storage type is unknown. 339 + """ 340 + record = self.get(uri) 341 + storage = record.get("storage", {}) 342 + storage_type = storage.get("$type", "") 343 + 344 + if "storageExternal" in storage_type: 345 + return "external" 346 + elif "storageBlobs" in storage_type: 347 + return "blobs" 348 + else: 349 + raise ValueError(f"Unknown storage type: {storage_type}") 350 + 258 351 def get_urls(self, uri: str | AtUri) -> list[str]: 259 352 """Get the WebDataset URLs from a dataset record. 260 353 ··· 276 369 elif "storageBlobs" in storage_type: 277 370 raise ValueError( 278 371 "Dataset uses blob storage, not external URLs. " 279 - "Use get_blobs() instead." 372 + "Use get_blob_urls() instead." 280 373 ) 281 374 else: 282 375 raise ValueError(f"Unknown storage type: {storage_type}") 283 376 377 + def get_blobs(self, uri: str | AtUri) -> list[dict]: 378 + """Get the blob references from a dataset record. 379 + 380 + Args: 381 + uri: The AT URI of the dataset record. 382 + 383 + Returns: 384 + List of blob reference dicts with keys: $type, ref, mimeType, size. 385 + 386 + Raises: 387 + ValueError: If the storage type is not blobs. 388 + """ 389 + record = self.get(uri) 390 + storage = record.get("storage", {}) 391 + 392 + storage_type = storage.get("$type", "") 393 + if "storageBlobs" in storage_type: 394 + return storage.get("blobs", []) 395 + elif "storageExternal" in storage_type: 396 + raise ValueError( 397 + "Dataset uses external URL storage, not blobs. " 398 + "Use get_urls() instead." 399 + ) 400 + else: 401 + raise ValueError(f"Unknown storage type: {storage_type}") 402 + 403 + def get_blob_urls(self, uri: str | AtUri) -> list[str]: 404 + """Get fetchable URLs for blob-stored dataset shards. 405 + 406 + This resolves the PDS endpoint and constructs URLs that can be 407 + used to fetch the blob data directly. 408 + 409 + Args: 410 + uri: The AT URI of the dataset record. 411 + 412 + Returns: 413 + List of URLs for fetching the blob data. 414 + 415 + Raises: 416 + ValueError: If storage type is not blobs or PDS cannot be resolved. 417 + """ 418 + if isinstance(uri, str): 419 + parsed_uri = AtUri.parse(uri) 420 + else: 421 + parsed_uri = uri 422 + 423 + blobs = self.get_blobs(uri) 424 + did = parsed_uri.authority 425 + 426 + urls = [] 427 + for blob in blobs: 428 + # Extract CID from blob reference 429 + ref = blob.get("ref", {}) 430 + cid = ref.get("$link") if isinstance(ref, dict) else str(ref) 431 + if cid: 432 + url = self.client.get_blob_url(did, cid) 433 + urls.append(url) 434 + 435 + return urls 436 + 284 437 def get_metadata(self, uri: str | AtUri) -> Optional[dict]: 285 438 """Get the metadata from a dataset record. 286 439 ··· 309 462 You must provide the sample type class, which should match the 310 463 schema referenced by the record. 311 464 465 + Supports both external URL storage and ATProto blob storage. 466 + 312 467 Args: 313 468 uri: The AT URI of the dataset record. 314 469 sample_type: The Python class for the sample type. ··· 317 472 A Dataset instance configured from the record. 318 473 319 474 Raises: 320 - ValueError: If the storage type is not external URLs. 475 + ValueError: If no storage URLs can be resolved. 321 476 322 477 Example: 323 478 >>> loader = DatasetLoader(client) ··· 328 483 # Import here to avoid circular import 329 484 from ..dataset import Dataset 330 485 331 - urls = self.get_urls(uri) 486 + storage_type = self.get_storage_type(uri) 487 + 488 + if storage_type == "external": 489 + urls = self.get_urls(uri) 490 + else: 491 + urls = self.get_blob_urls(uri) 492 + 332 493 if not urls: 333 - raise ValueError("Dataset record has no URLs") 494 + raise ValueError("Dataset record has no storage URLs") 334 495 335 496 # Use the first URL (multi-URL support could be added later) 336 497 url = urls[0]
+83
tests/test_integration_atmosphere_live.py
··· 464 464 assert samples[2].name == ["test_sample_2"] 465 465 assert samples[2].value == [20] 466 466 467 + def test_blob_storage_roundtrip(self, live_client, unique_name): 468 + """Full E2E: upload blob, publish dataset, retrieve and iterate. 469 + 470 + This tests the complete blob storage workflow: 471 + 1. Create a WebDataset tar in memory 472 + 2. Upload as blob to PDS 473 + 3. Publish dataset record with blob storage 474 + 4. Retrieve record and get blob URLs 475 + 5. Load data via to_dataset() and iterate 476 + 6. Verify samples match original data 477 + """ 478 + import tarfile 479 + import io 480 + import msgpack 481 + 482 + # Define sample type 483 + @atdata.packable 484 + class BlobTestSample: 485 + id: int 486 + message: str 487 + 488 + BlobTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 489 + 490 + # 1. Create WebDataset tar in memory 491 + expected_samples = [ 492 + {"id": 0, "message": "hello from blob"}, 493 + {"id": 1, "message": "blob storage works"}, 494 + {"id": 2, "message": "atproto is cool"}, 495 + ] 496 + 497 + tar_buffer = io.BytesIO() 498 + with tarfile.open(fileobj=tar_buffer, mode="w") as tar: 499 + for i, sample in enumerate(expected_samples): 500 + packed = msgpack.packb(sample) 501 + info = tarfile.TarInfo(name=f"sample_{i:06d}.msgpack") 502 + info.size = len(packed) 503 + tar.addfile(info, io.BytesIO(packed)) 504 + 505 + tar_data = tar_buffer.getvalue() 506 + 507 + # 2. Publish schema 508 + schema_pub = SchemaPublisher(live_client) 509 + schema_uri = schema_pub.publish(BlobTestSample, version="1.0.0") 510 + 511 + # 3. Publish dataset with blob storage 512 + dataset_pub = DatasetPublisher(live_client) 513 + dataset_uri = dataset_pub.publish_with_blobs( 514 + blobs=[tar_data], 515 + schema_uri=str(schema_uri), 516 + name=unique_name, 517 + description="E2E blob storage test", 518 + ) 519 + 520 + assert dataset_uri is not None 521 + assert "at://" in str(dataset_uri) 522 + 523 + # 4. Retrieve and verify storage type 524 + loader = DatasetLoader(live_client) 525 + storage_type = loader.get_storage_type(str(dataset_uri)) 526 + assert storage_type == "blobs" 527 + 528 + # 5. Get blob URLs 529 + blob_urls = loader.get_blob_urls(str(dataset_uri)) 530 + assert len(blob_urls) == 1 531 + assert "getBlob" in blob_urls[0] 532 + 533 + # 6. Load and iterate 534 + ds = loader.to_dataset(str(dataset_uri), BlobTestSample) 535 + samples = list(ds.ordered()) 536 + 537 + # 7. Verify data (3 samples) 538 + assert len(samples) == 3 539 + 540 + # Check sample values (batched as lists) 541 + assert samples[0].id == [0] 542 + assert samples[0].message == ["hello from blob"] 543 + 544 + assert samples[1].id == [1] 545 + assert samples[1].message == ["blob storage works"] 546 + 547 + assert samples[2].id == [2] 548 + assert samples[2].message == ["atproto is cool"] 549 + 467 550 468 551 ## 469 552 # AtmosphereIndex Tests