A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: use as_wds pattern for WebDataset tar creation in blob examples

Replace manual tarfile/msgpack writing with proper WebDataset
TarWriter and PackableSample.as_wds serialization in:
- examples/atmosphere_demo.py blob storage demo
- tests/test_integration_atmosphere_live.py blob roundtrip test

This ensures correct sample serialization with __key__ field
required for WebDataset iteration.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+20 -26
+1
CHANGELOG.md
··· 11 11 ### Fixed 12 12 13 13 ### Changed 14 + - Review and fix tar writing in examples to use as_wds pattern (#217) 14 15 - Add blob storage demo to atmosphere_demo.py example (#216) 15 16 - Implement full blob storage support for atmosphere datasets (#211) 16 17 - Add E2E blob roundtrip test (#215)
+10 -13
examples/atmosphere_demo.py
··· 287 287 password: App-specific password 288 288 """ 289 289 import io 290 - import tarfile 291 - import msgpack 290 + import webdataset as wds 292 291 293 292 print("\n" + "=" * 60) 294 293 print("Blob Storage Demo") ··· 306 305 id: int 307 306 text: str 308 307 309 - # Create a small WebDataset tar in memory 310 - print("\nCreating small dataset in memory...") 308 + # Create sample instances using the @packable type 311 309 samples = [ 312 - {"id": 0, "text": "Hello from blob storage!"}, 313 - {"id": 1, "text": "ATProto is decentralized."}, 314 - {"id": 2, "text": "atdata makes ML data easy."}, 310 + DemoSample(id=0, text="Hello from blob storage!"), 311 + DemoSample(id=1, text="ATProto is decentralized."), 312 + DemoSample(id=2, text="atdata makes ML data easy."), 315 313 ] 316 314 315 + # Create a WebDataset tar in memory using proper as_wds serialization 316 + print("\nCreating small dataset in memory...") 317 317 tar_buffer = io.BytesIO() 318 - with tarfile.open(fileobj=tar_buffer, mode="w") as tar: 319 - for i, sample in enumerate(samples): 320 - packed = msgpack.packb(sample) 321 - info = tarfile.TarInfo(name=f"sample_{i:06d}.msgpack") 322 - info.size = len(packed) 323 - tar.addfile(info, io.BytesIO(packed)) 318 + with wds.writer.TarWriter(tar_buffer) as sink: 319 + for sample in samples: 320 + sink.write(sample.as_wds) 324 321 325 322 tar_data = tar_buffer.getvalue() 326 323 print(f" Created tar with {len(samples)} samples ({len(tar_data):,} bytes)")
+9 -13
tests/test_integration_atmosphere_live.py
··· 466 466 """Full E2E: upload blob, publish dataset, retrieve and iterate. 467 467 468 468 This tests the complete blob storage workflow: 469 - 1. Create a WebDataset tar in memory 469 + 1. Create a WebDataset tar in memory using as_wds 470 470 2. Upload as blob to PDS 471 471 3. Publish dataset record with blob storage 472 472 4. Retrieve record and get blob URLs 473 473 5. Load data via to_dataset() and iterate 474 474 6. Verify samples match original data 475 475 """ 476 - import tarfile 477 476 import io 478 - import msgpack 477 + import webdataset as wds 479 478 480 479 # Define sample type 481 480 @atdata.packable ··· 485 484 486 485 BlobTestSample.__module__ = f"{TEST_PREFIX}.{unique_name}" 487 486 488 - # 1. Create WebDataset tar in memory 487 + # 1. Create WebDataset tar in memory using proper as_wds pattern 489 488 expected_samples = [ 490 - {"id": 0, "message": "hello from blob"}, 491 - {"id": 1, "message": "blob storage works"}, 492 - {"id": 2, "message": "atproto is cool"}, 489 + BlobTestSample(id=0, message="hello from blob"), 490 + BlobTestSample(id=1, message="blob storage works"), 491 + BlobTestSample(id=2, message="atproto is cool"), 493 492 ] 494 493 495 494 tar_buffer = io.BytesIO() 496 - with tarfile.open(fileobj=tar_buffer, mode="w") as tar: 497 - for i, sample in enumerate(expected_samples): 498 - packed = msgpack.packb(sample) 499 - info = tarfile.TarInfo(name=f"sample_{i:06d}.msgpack") 500 - info.size = len(packed) 501 - tar.addfile(info, io.BytesIO(packed)) 495 + with wds.writer.TarWriter(tar_buffer) as sink: 496 + for sample in expected_samples: 497 + sink.write(sample.as_wds) 502 498 503 499 tar_data = tar_buffer.getvalue() 504 500