A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(api): propagate S3 credentials through load_dataset and add shards property

- load_dataset now creates S3Source with credentials when index has S3DataStore
- Add Dataset.shards property (lazy iterator) following xs/list_xs() convention
- Deprecate Dataset.shard_list in favor of list_shards()
- Update AbstractIndex.data_store to be a @property in the protocol
- Loosen publish_schema signature to accept any type with runtime Packable validation
- Document xs/list_xs() naming convention in CLAUDE.md
- Add test for S3 credential propagation in load_dataset

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+736 -30
.chainlink/issues.db

This is a binary file and will not be displayed.

+526
.planning/roadmap/v0.3/03_human-review-assessment.md
··· 1 + # Human Review Assessment & Implementation Plan 2 + 3 + **Source**: `.review/human-review.md` 4 + **Chainlink**: #374 (parent), #375-379 (subissues) 5 + **Date**: 2026-01-26 6 + 7 + --- 8 + 9 + ## Issue 1: PackableSample → Packable Protocol Migration 10 + 11 + **Chainlink**: #375 12 + 13 + ### Problem Statement 14 + 15 + The `@packable` decorator creates a class that inherits from `PackableSample`, but type checkers don't recognize this inheritance at static analysis time. This causes linting errors when passing `@packable`-decorated classes to functions expecting `Type[Packable]`, such as `Index.publish_schema()`. 16 + 17 + ### Current State Analysis 18 + 19 + ```python 20 + # Current: @packable creates a new class inheriting from PackableSample 21 + @packable 22 + class MyData: 23 + name: str 24 + 25 + # Type checker sees: Type[MyData] (original class) 26 + # Runtime sees: Type[MyData] where MyData inherits from PackableSample 27 + ``` 28 + 29 + The `Packable` protocol in `_protocols.py` is correctly defined as `@runtime_checkable`, and runtime checks work: 30 + 31 + ```python 32 + >>> isinstance(MyData(name='x'), Packable) 33 + True 34 + ``` 35 + 36 + But static type checkers don't see the transformed class structure. 37 + 38 + ### Pros and Cons of Migration 39 + 40 + #### Option A: Keep `PackableSample` base class 41 + 42 + **Pros:** 43 + - Explicit inheritance visible in code 44 + - IDE autocomplete works without additional configuration 45 + - `__post_init__` hooks work naturally 46 + 47 + **Cons:** 48 + - `@packable` decorator doesn't play well with type checkers 49 + - Dual patterns (`@packable` vs explicit inheritance) cause confusion 50 + - Protocol-based signatures (`Type[Packable]`) don't lint cleanly 51 + 52 + #### Option B: Pure Protocol-based approach 53 + 54 + **Pros:** 55 + - Structural typing is more Pythonic 56 + - No inheritance needed - just implement the interface 57 + - Works with any class that has the right methods/properties 58 + 59 + **Cons:** 60 + - Need to manually implement `from_data`, `from_bytes`, `packed`, `as_wds` 61 + - Lose automatic `__post_init__` behavior 62 + - More boilerplate for users 63 + 64 + #### Option C: Hybrid with `@dataclass_transform` (Recommended) 65 + 66 + **Pros:** 67 + - Type checkers understand the transformation 68 + - Keeps `@packable` as primary API 69 + - Protocol checks work at both runtime and static analysis 70 + 71 + **Cons:** 72 + - Requires Python 3.11+ for full `@dataclass_transform` support 73 + - May need plugin for older type checkers 74 + 75 + ### Implementation Plan 76 + 77 + 1. **Add `@dataclass_transform()` to `@packable`** (already partially done at line 935) 78 + - Verify it's working correctly with pyright/mypy 79 + 80 + 2. **Fix return type annotation** on `@packable`: 81 + ```python 82 + @dataclass_transform() 83 + def packable(cls: type[_T]) -> type[_T & Packable]: # Intersection type 84 + ... 85 + ``` 86 + 87 + 3. **Add Protocol verification** in tests: 88 + ```python 89 + def test_packable_satisfies_protocol(): 90 + @packable 91 + class TestSample: 92 + x: int 93 + 94 + # Static type check (via reveal_type in comments) 95 + sample_type: Type[Packable] = TestSample # Should not error 96 + ``` 97 + 98 + 4. **Update `publish_schema` signature** to be more permissive: 99 + ```python 100 + def publish_schema( 101 + self, 102 + sample_type: Type[Packable], # Keep this 103 + # OR use a TypeVar bound to Packable 104 + ... 105 + ) 106 + ``` 107 + 108 + ### Estimated Effort: 4-6 hours 109 + 110 + --- 111 + 112 + ## Issue 2: Redis Index Entry Expiration 113 + 114 + **Chainlink**: #376 115 + 116 + ### Problem Statement 117 + 118 + Redis is inconsistently removing index entries over time, even though no explicit TTL is set. 119 + 120 + ### Current State Analysis 121 + 122 + Looking at the code in `local.py`: 123 + 124 + ```python 125 + # Line 655 - Writing dataset entries 126 + redis.hset(save_key, mapping=data) # No TTL 127 + 128 + # Line 1387 - Writing schemas 129 + self._redis.set(redis_key, schema_json) # No TTL 130 + ``` 131 + 132 + Redis configuration check shows: 133 + ``` 134 + maxmemory-policy: noeviction 135 + maxmemory: 0 (unlimited) 136 + ``` 137 + 138 + With `noeviction` policy and unlimited memory, Redis should NOT automatically remove keys. 139 + 140 + ### Potential Causes 141 + 142 + 1. **External process clearing keys**: Another application or script may be running `FLUSHDB`/`FLUSHALL` 143 + 144 + 2. **Redis persistence configuration**: 145 + - If `RDB` or `AOF` persistence is disabled, data is lost on Redis restart 146 + - Check with: `redis-cli CONFIG GET save` and `redis-cli CONFIG GET appendonly` 147 + 148 + 3. **Different Redis instances**: 149 + - Development vs production Redis may have different configurations 150 + - Docker containers may be recreated with fresh data 151 + 152 + 4. **Key name collisions**: 153 + - If key prefix changed between versions, old keys appear "missing" 154 + 155 + 5. **Memory pressure** (unlikely with noeviction): 156 + - Even with noeviction, if Redis runs out of memory, writes fail 157 + 158 + ### Investigation Steps 159 + 160 + ```bash 161 + # Check persistence settings 162 + redis-cli CONFIG GET save 163 + redis-cli CONFIG GET appendonly 164 + 165 + # Check if keys exist (pattern match) 166 + redis-cli KEYS "LocalDatasetEntry:*" 167 + redis-cli KEYS "LocalSchema:*" 168 + 169 + # Check memory 170 + redis-cli INFO memory 171 + 172 + # Monitor for FLUSHDB/DEL commands 173 + redis-cli MONITOR 174 + ``` 175 + 176 + ### Implementation Plan 177 + 178 + 1. **Add Redis health check to CLI**: 179 + ```python 180 + # atdata diagnose redis 181 + def diagnose_redis(redis: Redis) -> dict: 182 + return { 183 + "persistence": { 184 + "rdb_enabled": redis.config_get("save"), 185 + "aof_enabled": redis.config_get("appendonly"), 186 + }, 187 + "memory": redis.info("memory"), 188 + "dataset_count": len(list(redis.scan_iter("LocalDatasetEntry:*"))), 189 + "schema_count": len(list(redis.scan_iter("LocalSchema:*"))), 190 + } 191 + ``` 192 + 193 + 2. **Add logging on write/read**: 194 + ```python 195 + def write_to(self, redis: Redis): 196 + logger.debug(f"Writing entry {self.cid} to Redis key {save_key}") 197 + redis.hset(save_key, mapping=data) 198 + logger.debug(f"Verified entry exists: {redis.exists(save_key)}") 199 + ``` 200 + 201 + 3. **Document Redis requirements**: 202 + - Add to README: "Requires Redis with persistence enabled" 203 + - Provide example `redis.conf` for production use 204 + 205 + 4. **Consider backup/restore utilities**: 206 + ```python 207 + # atdata local backup --output index-backup.json 208 + # atdata local restore --input index-backup.json 209 + ``` 210 + 211 + ### Estimated Effort: 3-4 hours 212 + 213 + --- 214 + 215 + ## Issue 3: `xs` Property vs `list_xs()` Convention Audit 216 + 217 + **Chainlink**: #377 218 + 219 + ### Convention Definition 220 + 221 + ```python 222 + class Foo: 223 + @property 224 + def xs(self) -> Iterator[X]: 225 + """Lazy iteration over X items.""" 226 + for x in self._get_xs(): 227 + yield x 228 + 229 + def list_xs(self) -> list[X]: 230 + """Fully evaluated list of X items.""" 231 + return list(self.xs) 232 + ``` 233 + 234 + ### Current State Audit 235 + 236 + | Class | Property | Method | Status | 237 + |-------|----------|--------|--------| 238 + | `Index` | `entries` (Generator) | `list_entries()` | ✅ Correct | 239 + | `Index` | `datasets` (Generator) | `list_datasets()` | ✅ Correct | 240 + | `Index` | `schemas` (Generator) | `list_schemas()` | ✅ Correct | 241 + | `AtmosphereIndex` | `datasets` (Iterator) | `list_datasets()` | ✅ Correct | 242 + | `AtmosphereIndex` | `schemas` (Iterator) | `list_schemas()` | ✅ Correct | 243 + | `Dataset` | - | `list_shards()` | ⚠️ Missing `shards` property | 244 + | `URLSource` | `shards` (Iterator) | `list_shards()` | ✅ Correct | 245 + | `S3Source` | `shards` (Iterator) | `list_shards()` | ✅ Correct | 246 + | `DataSource` (Protocol) | `shards` (Iterator) | `list_shards()` | ✅ Correct | 247 + 248 + ### Issues Found 249 + 250 + 1. **`Dataset` class** has `list_shards()` but no lazy `shards` property 251 + 2. **Legacy `shard_list` property** exists but is marked deprecated - should route to `list_shards()` 252 + 3. **`DatasetDict.num_shards`** uses `shard_list` internally - should use `list_shards()` 253 + 254 + ### Implementation Plan 255 + 256 + 1. **Add `Dataset.shards` property**: 257 + ```python 258 + @property 259 + def shards(self) -> Iterator[str]: 260 + """Lazily iterate over shard identifiers.""" 261 + return iter(self._source.list_shards()) 262 + ``` 263 + 264 + 2. **Update `DatasetDict.num_shards`**: 265 + ```python 266 + @property 267 + def num_shards(self) -> dict[str, int]: 268 + return {name: len(ds.list_shards()) for name, ds in self.items()} 269 + ``` 270 + 271 + 3. **Add deprecation warnings** to legacy properties: 272 + ```python 273 + @property 274 + def shard_list(self) -> list[str]: 275 + warnings.warn("shard_list is deprecated, use list_shards()", DeprecationWarning) 276 + return self.list_shards() 277 + ``` 278 + 279 + 4. **Document convention** in CLAUDE.md: 280 + ```markdown 281 + ## Naming Conventions 282 + 283 + - `foo.xs` - @property returning Iterator/Generator (lazy) 284 + - `foo.list_xs()` - method returning list (eager) 285 + ``` 286 + 287 + ### Estimated Effort: 2-3 hours 288 + 289 + --- 290 + 291 + ## Issue 4: `load_dataset` Source Credentials 292 + 293 + **Chainlink**: #378 294 + 295 + ### Problem Statement 296 + 297 + When `load_dataset` loads from an index with an S3 data store, the returned `Dataset` doesn't use the S3 credentials. Similarly, atproto-based loading should use the appropriate storage mechanism. 298 + 299 + ### Current State Analysis 300 + 301 + In `_hf_api.py:620-627`: 302 + ```python 303 + data_urls, schema_ref = _resolve_indexed_path(path, index) 304 + # ... 305 + url = _shards_to_wds_url(data_urls) 306 + ds = Dataset[resolved_type](url) # Uses URLSource (no credentials!) 307 + ``` 308 + 309 + The `_resolve_indexed_path` function does transform URLs through `data_store.read_url()`, but this only works for URL transformation (s3:// → https://), not for credential injection. 310 + 311 + ### Required Behavior 312 + 313 + 1. **Local index with S3DataStore**: 314 + - Extract credentials from `index.data_store` 315 + - Create `S3Source` with those credentials 316 + - Pass `S3Source` to `Dataset` 317 + 318 + 2. **AtmosphereIndex with blob storage**: 319 + - Resolve blob references to AT URIs 320 + - Create appropriate source (future `BlobSource`) 321 + - Pass source to `Dataset` 322 + 323 + 3. **Plain URLs** (no index or index without data_store): 324 + - Current behavior is correct (use `URLSource`) 325 + 326 + ### Implementation Plan 327 + 328 + 1. **Extend `_resolve_indexed_path` to return source**: 329 + ```python 330 + def _resolve_indexed_path( 331 + path: str, 332 + index: "AbstractIndex", 333 + ) -> tuple[DataSource, str]: 334 + """Resolve @handle/dataset path to DataSource and schema_ref.""" 335 + handle_or_did, dataset_name = _parse_indexed_path(path) 336 + entry = index.get_dataset(dataset_name) 337 + 338 + # Build appropriate DataSource 339 + data_urls = entry.data_urls 340 + 341 + if hasattr(index, 'data_store') and index.data_store is not None: 342 + store = index.data_store 343 + if isinstance(store, S3DataStore): 344 + # Extract S3 credentials and create S3Source 345 + source = S3Source.from_urls( 346 + data_urls, 347 + endpoint=store.credentials.get('AWS_ENDPOINT'), 348 + access_key=store.credentials.get('AWS_ACCESS_KEY_ID'), 349 + secret_key=store.credentials.get('AWS_SECRET_ACCESS_KEY'), 350 + ) 351 + return source, entry.schema_ref 352 + 353 + # Default: URL-based source 354 + url = _shards_to_wds_url(data_urls) 355 + source = URLSource(url) 356 + return source, entry.schema_ref 357 + ``` 358 + 359 + 2. **Update `load_dataset` to use DataSource**: 360 + ```python 361 + if _is_indexed_path(path): 362 + # ... 363 + source, schema_ref = _resolve_indexed_path(path, index) 364 + resolved_type = sample_type if sample_type is not None else index.decode_schema(schema_ref) 365 + ds = Dataset[resolved_type](source) # Pass source, not URL 366 + # ... 367 + ``` 368 + 369 + 3. **Add `AbstractDataStore.create_source()` method** (optional but cleaner): 370 + ```python 371 + class AbstractDataStore(Protocol): 372 + def create_source(self, urls: list[str]) -> DataSource: 373 + """Create a DataSource for reading these URLs.""" 374 + ... 375 + ``` 376 + 377 + 4. **Future: AtmosphereIndex blob support**: 378 + ```python 379 + # In AtmosphereIndex or AtmosphereDataStore 380 + def create_source(self, urls: list[str]) -> DataSource: 381 + if all(url.startswith("at://") for url in urls): 382 + return BlobSource(urls, client=self.client) 383 + return URLSource(_shards_to_wds_url(urls)) 384 + ``` 385 + 386 + ### Estimated Effort: 4-6 hours 387 + 388 + --- 389 + 390 + ## Issue 5: `load_dataset` Overload Type Hints 391 + 392 + **Chainlink**: #379 393 + 394 + ### Problem Statement 395 + 396 + Calls like: 397 + ```python 398 + ds = load_dataset("@local/data", TextSample, split='train', index=index) 399 + ``` 400 + produce linting errors because the `AbstractIndex` protocol doesn't align with `local.Index` for type checking. 401 + 402 + ### Current State Analysis 403 + 404 + The overloads in `_hf_api.py:481-529` use: 405 + ```python 406 + index: Optional["AbstractIndex"] = None 407 + ``` 408 + 409 + But `AbstractIndex` is a Protocol, and `local.Index` is a concrete class. The issue is likely one of: 410 + 411 + 1. **Protocol compatibility**: `Index` doesn't fully satisfy `AbstractIndex` 412 + 2. **Import issues**: `AbstractIndex` may not be recognized correctly 413 + 3. **Optional handling**: The `Optional[...]` wrapping may cause issues 414 + 415 + ### Investigation 416 + 417 + Check if `Index` satisfies `AbstractIndex`: 418 + 419 + ```python 420 + # AbstractIndex requires: 421 + # - data_store: Optional[AbstractDataStore] 422 + # - insert_dataset(...) 423 + # - get_dataset(...) 424 + # - datasets (property) 425 + # - list_datasets() 426 + # - publish_schema(...) 427 + # - get_schema(...) 428 + # - schemas (property) 429 + # - list_schemas() 430 + # - decode_schema(...) 431 + ``` 432 + 433 + Looking at `local.Index`, it has all these methods/properties. The issue is likely the `data_store` attribute type. 434 + 435 + ### Root Cause Hypothesis 436 + 437 + In `_protocols.py:191`: 438 + ```python 439 + data_store: Optional["AbstractDataStore"] 440 + ``` 441 + 442 + In `local.py:1006`: 443 + ```python 444 + def data_store(self) -> AbstractDataStore | None: 445 + ``` 446 + 447 + The Protocol uses a class attribute annotation, but `Index` uses a property. Protocol properties vs class attributes can cause type checker confusion. 448 + 449 + ### Implementation Plan 450 + 451 + 1. **Fix Protocol `data_store` to be a property**: 452 + ```python 453 + class AbstractIndex(Protocol): 454 + @property 455 + def data_store(self) -> Optional["AbstractDataStore"]: 456 + """Optional data store for writing shards.""" 457 + ... 458 + ``` 459 + 460 + 2. **Verify all Protocol members match implementation**: 461 + - Run `pyright --verifytypes atdata` to check 462 + - Ensure return types match exactly 463 + 464 + 3. **Add explicit Protocol inheritance** (alternative approach): 465 + ```python 466 + # If structural typing continues to fail, use explicit registration 467 + from typing import runtime_checkable 468 + 469 + # In local.py 470 + AbstractIndex.register(Index) # Explicit ABC registration 471 + ``` 472 + 473 + 4. **Simplify overloads** if needed: 474 + ```python 475 + # Consider using @overload with Union types instead of Optional 476 + @overload 477 + def load_dataset( 478 + path: str, 479 + sample_type: Type[ST], 480 + *, 481 + split: str, 482 + index: AbstractIndex | None = None, # Union instead of Optional 483 + ) -> Dataset[ST]: ... 484 + ``` 485 + 486 + 5. **Add type test file**: 487 + ```python 488 + # tests/test_types.py (for pyright --verifytypes) 489 + from atdata import load_dataset, Dataset 490 + from atdata.local import Index 491 + from atdata._protocols import AbstractIndex 492 + 493 + def check_index_protocol(index: AbstractIndex) -> None: 494 + pass 495 + 496 + def test_index_satisfies_protocol(): 497 + index = Index() 498 + check_index_protocol(index) # Should not error 499 + ``` 500 + 501 + ### Estimated Effort: 3-4 hours 502 + 503 + --- 504 + 505 + ## Summary & Prioritization 506 + 507 + | Issue | Priority | Effort | Dependencies | 508 + |-------|----------|--------|--------------| 509 + | #378 load_dataset credentials | High | 4-6h | None | 510 + | #379 Type hint fixes | High | 3-4h | None | 511 + | #375 Packable protocol | Medium | 4-6h | None | 512 + | #377 Naming convention | Low | 2-3h | None | 513 + | #376 Redis investigation | Medium | 3-4h | User environment | 514 + 515 + **Recommended order**: #378 → #379 → #375 → #377 → #376 516 + 517 + The credentials issue (#378) is the most impactful for users. Type hints (#379) affect developer experience. The Redis issue (#376) requires user environment investigation first. 518 + 519 + --- 520 + 521 + ## Next Steps 522 + 523 + 1. Approve this assessment 524 + 2. Implement #378 (load_dataset credentials) 525 + 3. Implement #379 (type hints) 526 + 4. Revisit #375-377 based on priority
+4
.review/human-review.md
··· 1 1 * We had talked previously about potentially moving `PackableSample` to a `Packable` protocol to simplify some type hints / etc. Let's go over the pros and cons of this. This shows up for the linting / typing of `local.Index.publish_schema`, where the way that @packable is working right now doesn't properly get the PackableSample superclass to register for this signature. 2 + 2 3 * We have an interesting persistent issue with Redis removing old records; can you think through why it seems like Redis resets our index entries somewhat inconsistently over time? Is there a Redis setting that might be responsible for this? 4 + 3 5 * We want to make sure that we keep to the pattern that `foo.xs` is an @property that gives a (lazy) iterable for `x`, while `foo.list_xs` is a fully evaluated list for all of the `x`s that uses `foo.xs` under the hood. We should go through the full codebase to evaluate for following this convention. 6 + 4 7 * `load_dataset` has a couple issues: 5 8 * We updated how `Dataset` is initialized to be able to accommodate a number of different underlying sources of `wds`-compatible data, and we should make it so that the overloads for `load_dataset` properly connect up with this; for example: 6 9 * If `load_dataset` is coming from a specified local index with an S3 store, we should use the S3 credentials there as the source for the `Dataset` returned by `load_dataset` 7 10 * If it's using an atproto location (like `'@maxine.science/mnist'`), the returned `Dataset` should use whatever is the storage mechanism referenced in that atproto record from the network (for example, blobs as at-uris that are wrapped in a file-like interface for passing to `webdataset`). 11 + 8 12 * Calls like 9 13 ```python 10 14 ds = load_dataset( "@local/proto-text-samples-3", TextSample,
+6
CHANGELOG.md
··· 25 25 - **Comprehensive integration test suite**: 593 tests covering E2E flows, error handling, edge cases 26 26 27 27 ### Changed 28 + - Investigate Redis index entry expiration/reset issue (#376) 29 + - Audit codebase for xs/@property vs list_xs() convention (#377) 30 + - Evaluate PackableSample → Packable protocol migration (#375) 31 + - Fix load_dataset overload type hints for AbstractIndex (#379) 32 + - Fix load_dataset to use source-appropriate credentials (#378) 33 + - Review and plan human-review.md feedback items (#374) 28 34 - Create v0.3 roadmap synthesis document (#373) 29 35 - Document justfile in CLAUDE.md (#372) 30 36 - Make docs script work from any directory (#371)
+17
CLAUDE.md
··· 142 142 - Single shard: `path/to/file-000000.tar` 143 143 - Multiple shards: `path/to/file-{000000..000009}.tar` 144 144 145 + ### Naming Conventions 146 + 147 + **Property vs Method Pattern for Collections** 148 + 149 + When exposing collections of items, follow this convention: 150 + 151 + - `foo.xs` - `@property` returning `Iterator[X]` (lazy iteration) 152 + - `foo.list_xs()` - method returning `list[X]` (eager, fully evaluated) 153 + 154 + Examples: 155 + - `index.datasets` / `index.list_datasets()` 156 + - `index.schemas` / `index.list_schemas()` 157 + - `dataset.shards` / `dataset.list_shards()` 158 + 159 + The lazy property enables memory-efficient iteration over large collections, 160 + while the method provides a concrete list when needed. 161 + 145 162 ### Important Implementation Details 146 163 147 164 **Type Parameters**
+7 -1
prototyping/human-review-atmosphere.ipynb
··· 6 6 "id": "87dec017", 7 7 "metadata": {}, 8 8 "outputs": [], 9 - "source": [] 9 + "source": [ 10 + "import numpy as np\n", 11 + "from numpy.typing import NDArray\n", 12 + "import atdata\n", 13 + "from atdata.local import LocalDatasetEntry, S3DataStore, Index\n", 14 + "import webdataset as wds" 15 + ] 10 16 } 11 17 ], 12 18 "metadata": {
+24 -3
prototyping/human-review-local.ipynb
··· 16 16 }, 17 17 { 18 18 "cell_type": "code", 19 - "execution_count": 34, 19 + "execution_count": 2, 20 20 "id": "1f7ea651", 21 21 "metadata": {}, 22 22 "outputs": [], ··· 47 47 " text = 'Hello',\n", 48 48 " category = 'test',\n", 49 49 ")" 50 + ] 51 + }, 52 + { 53 + "cell_type": "code", 54 + "execution_count": 5, 55 + "id": "2679d94c", 56 + "metadata": {}, 57 + "outputs": [ 58 + { 59 + "data": { 60 + "text/plain": [ 61 + "b'\\x82\\xa4text\\xa5Hello\\xa8category\\xa4test'" 62 + ] 63 + }, 64 + "execution_count": 5, 65 + "metadata": {}, 66 + "output_type": "execute_result" 67 + } 68 + ], 69 + "source": [ 70 + "x.packed" 50 71 ] 51 72 }, 52 73 { ··· 424 445 }, 425 446 { 426 447 "cell_type": "code", 427 - "execution_count": null, 448 + "execution_count": 43, 428 449 "id": "e74d68f6", 429 450 "metadata": {}, 430 451 "outputs": [ ··· 495 516 }, 496 517 { 497 518 "cell_type": "code", 498 - "execution_count": null, 519 + "execution_count": 48, 499 520 "id": "4a2736f0", 500 521 "metadata": {}, 501 522 "outputs": [
+37 -14
src/atdata/_hf_api.py
··· 43 43 ) 44 44 45 45 from .dataset import Dataset, PackableSample, DictSample 46 + from ._sources import URLSource, S3Source 47 + from ._protocols import DataSource 46 48 47 49 if TYPE_CHECKING: 48 50 from ._protocols import AbstractIndex 51 + from .local import S3DataStore 49 52 50 53 ## 51 54 # Type variables ··· 133 136 This property accesses the shard list, which may trigger 134 137 shard enumeration for remote datasets. 135 138 """ 136 - return {name: len(ds.shard_list) for name, ds in self.items()} 139 + return {name: len(ds.list_shards()) for name, ds in self.items()} 137 140 138 141 139 142 ## ··· 443 446 def _resolve_indexed_path( 444 447 path: str, 445 448 index: "AbstractIndex", 446 - ) -> tuple[list[str], str]: 447 - """Resolve @handle/dataset path to URLs and schema_ref via index lookup. 449 + ) -> tuple[DataSource, str]: 450 + """Resolve @handle/dataset path to DataSource and schema_ref via index lookup. 448 451 449 452 Args: 450 453 path: Path in @handle/dataset format. 451 454 index: Index to use for lookup. 452 455 453 456 Returns: 454 - Tuple of (data_urls, schema_ref). 457 + Tuple of (DataSource, schema_ref). The DataSource is configured with 458 + appropriate credentials when the index has an S3DataStore. 455 459 456 460 Raises: 457 461 KeyError: If dataset not found in index. ··· 461 465 # For AtmosphereIndex, we need to resolve handle to DID first 462 466 # For LocalIndex, the handle is ignored and we just look up by name 463 467 entry = index.get_dataset(dataset_name) 464 - 465 468 data_urls = entry.data_urls 466 469 467 - # Transform URLs through data store if available. 468 - # This handles S3-compatible endpoints (like Cloudflare R2, MinIO) that need 469 - # URL transformation from s3:// to https:// for WebDataset streaming. 470 + # Check if index has a data store 470 471 if hasattr(index, 'data_store') and index.data_store is not None: 471 - data_urls = [index.data_store.read_url(url) for url in data_urls] 472 + store = index.data_store 473 + 474 + # Import here to avoid circular imports at module level 475 + from .local import S3DataStore 476 + 477 + # For S3DataStore with S3 URLs, create S3Source with credentials 478 + if isinstance(store, S3DataStore): 479 + if data_urls and all(url.startswith("s3://") for url in data_urls): 480 + source = S3Source.from_urls( 481 + data_urls, 482 + endpoint=store.credentials.get("AWS_ENDPOINT"), 483 + access_key=store.credentials.get("AWS_ACCESS_KEY_ID"), 484 + secret_key=store.credentials.get("AWS_SECRET_ACCESS_KEY"), 485 + region=store.credentials.get("AWS_REGION"), 486 + ) 487 + return source, entry.schema_ref 488 + 489 + # For any data store, use read_url to transform URLs if needed 490 + # (handles endpoint URL conversion for HTTPS access, etc.) 491 + transformed_urls = [store.read_url(url) for url in data_urls] 492 + url = _shards_to_wds_url(transformed_urls) 493 + return URLSource(url), entry.schema_ref 472 494 473 - return data_urls, entry.schema_ref 495 + # Default: URL-based source without credentials 496 + url = _shards_to_wds_url(data_urls) 497 + return URLSource(url), entry.schema_ref 474 498 475 499 476 500 ## ··· 617 641 "Pass index=LocalIndex() or index=AtmosphereIndex(client)." 618 642 ) 619 643 620 - data_urls, schema_ref = _resolve_indexed_path(path, index) 644 + source, schema_ref = _resolve_indexed_path(path, index) 621 645 622 646 # Resolve sample_type from schema if not provided 623 647 resolved_type: Type = sample_type if sample_type is not None else index.decode_schema(schema_ref) 624 648 625 - # For indexed datasets, we treat all URLs as a single "train" split 626 - url = _shards_to_wds_url(data_urls) 627 - ds = Dataset[resolved_type](url) 649 + # Create dataset from the resolved source (includes credentials if S3) 650 + ds = Dataset[resolved_type](source) 628 651 629 652 if split is not None: 630 653 # Indexed datasets are single-split by default
+23 -4
src/atdata/_protocols.py
··· 186 186 ... print(f"{entry.name} -> {entry.schema_ref}") 187 187 """ 188 188 189 - # Optional data store (not required by protocol, but supported by some implementations) 190 - # Use hasattr() to check if available: if hasattr(index, 'data_store') and index.data_store 191 - data_store: Optional["AbstractDataStore"] 189 + @property 190 + def data_store(self) -> Optional["AbstractDataStore"]: 191 + """Optional data store for reading/writing shards. 192 + 193 + If present, ``load_dataset`` will use it for credential resolution 194 + (e.g., S3 credentials from S3DataStore). 195 + 196 + Returns: 197 + AbstractDataStore instance, or None if this index doesn't have 198 + an associated data store. 199 + 200 + Note: 201 + Not all index implementations provide a data_store. Use 202 + ``hasattr(index, 'data_store') and index.data_store is not None`` 203 + for safe access. 204 + """ 205 + ... 192 206 193 207 # Dataset operations 194 208 ··· 252 266 253 267 def publish_schema( 254 268 self, 255 - sample_type: Type[Packable], 269 + sample_type: type, 256 270 *, 257 271 version: str = "1.0.0", 258 272 **kwargs, 259 273 ) -> str: 260 274 """Publish a schema for a sample type. 261 275 276 + The sample_type is accepted as ``type`` rather than ``Type[Packable]`` to 277 + support ``@packable``-decorated classes, which satisfy the Packable protocol 278 + at runtime but cannot be statically verified by type checkers. 279 + 262 280 Args: 263 281 sample_type: A Packable type (PackableSample subclass or @packable-decorated). 282 + Validated at runtime via the @runtime_checkable Packable protocol. 264 283 version: Semantic version string for the schema. 265 284 **kwargs: Additional backend-specific options. 266 285
+33 -5
src/atdata/dataset.py
··· 56 56 Any, 57 57 Optional, 58 58 Dict, 59 + Iterator, 59 60 Sequence, 60 61 Iterable, 61 62 Callable, ··· 465 466 """ 466 467 if self._sample_type_cache is None: 467 468 self._sample_type_cache = typing.get_args( self.__orig_class__)[0] 469 + assert self._sample_type_cache is not None 468 470 return self._sample_type_cache 469 471 470 472 def __getattr__( self, name ): ··· 500 502 RT = TypeVar( 'RT', bound = PackableSample ) 501 503 502 504 503 - class _ShardListStage(wds.PipelineStage): 505 + class _ShardListStage(wds.utils.PipelineStage): 504 506 """Pipeline stage that yields {url: shard_id} dicts from a DataSource. 505 507 506 508 This is analogous to SimpleShardList but works with any DataSource. ··· 516 518 yield {"url": shard_id} 517 519 518 520 519 - class _StreamOpenerStage(wds.PipelineStage): 521 + class _StreamOpenerStage(wds.utils.PipelineStage): 520 522 """Pipeline stage that opens streams from a DataSource. 521 523 522 524 Takes {url: shard_id} dicts and adds a stream using source.open_shard(). ··· 582 584 """ 583 585 if self._sample_type_cache is None: 584 586 self._sample_type_cache = typing.get_args( self.__orig_class__ )[0] 587 + assert self._sample_type_cache is not None 585 588 return self._sample_type_cache 586 589 @property 587 590 def batch_type( self ) -> Type: ··· 666 669 ret._output_lens = lenses.transform( self.sample_type, ret.sample_type ) 667 670 return ret 668 671 669 - def list_shards( self ) -> list[str]: 672 + @property 673 + def shards(self) -> Iterator[str]: 674 + """Lazily iterate over shard identifiers. 675 + 676 + Yields: 677 + Shard identifiers (e.g., 'train-000000.tar', 'train-000001.tar'). 678 + 679 + Example: 680 + :: 681 + 682 + >>> for shard in ds.shards: 683 + ... print(f"Processing {shard}") 684 + """ 685 + return iter(self._source.list_shards()) 686 + 687 + def list_shards(self) -> list[str]: 670 688 """Get list of individual dataset shards. 671 689 672 690 Returns: ··· 677 695 678 696 # Legacy alias for backwards compatibility 679 697 @property 680 - def shard_list( self ) -> list[str]: 681 - """List of individual dataset shards (deprecated, use list_shards()).""" 698 + def shard_list(self) -> list[str]: 699 + """List of individual dataset shards (deprecated, use list_shards()). 700 + 701 + .. deprecated:: 702 + Use :meth:`list_shards` instead. 703 + """ 704 + import warnings 705 + warnings.warn( 706 + "shard_list is deprecated, use list_shards() instead", 707 + DeprecationWarning, 708 + stacklevel=2, 709 + ) 682 710 return self.list_shards() 683 711 684 712 @property
+21 -3
src/atdata/local.py
··· 1342 1342 1343 1343 def publish_schema( 1344 1344 self, 1345 - sample_type: Type[Packable], 1345 + sample_type: type, 1346 1346 *, 1347 1347 version: str | None = None, 1348 1348 description: str | None = None, ··· 1350 1350 """Publish a schema for a sample type to Redis. 1351 1351 1352 1352 Args: 1353 - sample_type: The PackableSample subclass to publish. 1353 + sample_type: A Packable type (@packable-decorated or PackableSample subclass). 1354 1354 version: Semantic version string (e.g., '1.0.0'). If None, 1355 1355 auto-increments from the latest published version (patch bump), 1356 1356 or starts at '1.0.0' if no previous version exists. ··· 1362 1362 1363 1363 Raises: 1364 1364 ValueError: If sample_type is not a dataclass. 1365 - TypeError: If a field type is not supported. 1365 + TypeError: If sample_type doesn't satisfy the Packable protocol, 1366 + or if a field type is not supported. 1366 1367 """ 1368 + # Validate that sample_type satisfies Packable protocol at runtime 1369 + # This catches non-packable types early with a clear error message 1370 + try: 1371 + # Check protocol compliance by verifying required methods exist 1372 + if not (hasattr(sample_type, 'from_data') and 1373 + hasattr(sample_type, 'from_bytes') and 1374 + callable(getattr(sample_type, 'from_data', None)) and 1375 + callable(getattr(sample_type, 'from_bytes', None))): 1376 + raise TypeError( 1377 + f"{sample_type.__name__} does not satisfy the Packable protocol. " 1378 + "Use @packable decorator or inherit from PackableSample." 1379 + ) 1380 + except AttributeError: 1381 + raise TypeError( 1382 + f"sample_type must be a class, got {type(sample_type).__name__}" 1383 + ) 1384 + 1367 1385 # Auto-increment version if not specified 1368 1386 if version is None: 1369 1387 latest = self._get_latest_schema_version(sample_type.__name__)
+38
tests/test_hf_api.py
··· 835 835 836 836 # URL should be unchanged 837 837 assert ds.url == "s3://bucket/data.tar" 838 + 839 + def test_indexed_path_creates_s3source_with_credentials(self): 840 + """load_dataset creates S3Source with credentials when S3DataStore is available.""" 841 + from atdata.local import S3DataStore 842 + from atdata._sources import S3Source 843 + 844 + # Create a real S3DataStore with mock credentials 845 + mock_credentials = { 846 + "AWS_ACCESS_KEY_ID": "test-access-key", 847 + "AWS_SECRET_ACCESS_KEY": "test-secret-key", 848 + "AWS_ENDPOINT": "https://r2.example.com", 849 + } 850 + 851 + # Mock the S3DataStore 852 + mock_store = Mock(spec=S3DataStore) 853 + mock_store.credentials = mock_credentials 854 + 855 + mock_index = Mock() 856 + mock_index.data_store = mock_store 857 + mock_entry = Mock() 858 + mock_entry.data_urls = ["s3://my-bucket/train-000.tar", "s3://my-bucket/train-001.tar"] 859 + mock_entry.schema_ref = "local://schemas/test@1.0.0" 860 + mock_index.get_dataset.return_value = mock_entry 861 + 862 + ds = load_dataset( 863 + "@local/my-dataset", 864 + SimpleTestSample, 865 + index=mock_index, 866 + split="train", 867 + ) 868 + 869 + # Verify the dataset source is an S3Source with credentials 870 + assert isinstance(ds.source, S3Source) 871 + assert ds.source.bucket == "my-bucket" 872 + assert ds.source.keys == ["train-000.tar", "train-001.tar"] 873 + assert ds.source.endpoint == "https://r2.example.com" 874 + assert ds.source.access_key == "test-access-key" 875 + assert ds.source.secret_key == "test-secret-key"