A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

docs: add comprehensive docstrings to atdata.local module

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>

+158 -22
+1
.vscode/settings.json
··· 1 1 { 2 2 "cSpell.words": [ 3 3 "atdata", 4 + "atproto", 4 5 "creds", 5 6 "getattr", 6 7 "hgetall",
+157 -22
src/atdata/local.py
··· 1 - """TODO""" 1 + """Local repository storage for atdata datasets. 2 + 3 + This module provides a local storage backend for atdata datasets using: 4 + - S3-compatible object storage for dataset tar files and metadata 5 + - Redis for indexing and tracking datasets 6 + 7 + The main classes are: 8 + - Repo: Manages dataset storage in S3 with Redis indexing 9 + - Index: Redis-backed index for tracking dataset metadata 10 + - BasicIndexEntry: Index entry representing a stored dataset 11 + 12 + This is intended for development and small-scale deployment before 13 + migrating to the full atproto PDS infrastructure. 14 + """ 2 15 3 16 ## 4 17 # Imports ··· 55 68 # Helpers 56 69 57 70 def _kind_str_for_sample_type( st: Type[PackableSample] ) -> str: 58 - """TODO""" 71 + """Convert a sample type to a fully-qualified string identifier. 72 + 73 + Args: 74 + st: The sample type class. 75 + 76 + Returns: 77 + A string in the format 'module.name' identifying the sample type. 78 + """ 59 79 return f'{st.__module__}.{st.__name__}' 60 80 61 81 def _decode_bytes_dict( d: dict[bytes, bytes] ) -> dict[str, str]: 62 - """TODO""" 82 + """Decode a dictionary with byte keys and values to strings. 83 + 84 + Redis returns dictionaries with bytes keys/values, this converts them to strings. 85 + 86 + Args: 87 + d: Dictionary with bytes keys and values. 88 + 89 + Returns: 90 + Dictionary with UTF-8 decoded string keys and values. 91 + """ 63 92 return { 64 93 k.decode('utf-8'): v.decode('utf-8') 65 94 for k, v in d.items() ··· 71 100 72 101 @dataclass 73 102 class BasicIndexEntry: 74 - """TODO""" 103 + """Index entry for a dataset stored in the repository. 104 + 105 + Tracks metadata about a dataset stored in S3, including its location, 106 + type, and unique identifier. 107 + """ 75 108 ## 76 109 77 110 wds_url: str 78 - """TODO""" 111 + """WebDataset URL for the dataset tar files, for use with atdata.Dataset.""" 112 + 79 113 sample_kind: str 80 - """TODO""" 114 + """Fully-qualified sample type name (e.g., 'module.ClassName').""" 81 115 82 116 metadata_url: str | None 83 - """TODO""" 117 + """S3 URL to the dataset's metadata msgpack file, if any.""" 84 118 85 119 uuid: str | None = field( default_factory = lambda: str( uuid4() ) ) 86 - """TODO""" 120 + """Unique identifier for this dataset entry. Defaults to a new UUID if not provided.""" 87 121 88 122 def write_to( self, redis: Redis ): 89 - """TODO""" 123 + """Persist this index entry to Redis. 124 + 125 + Stores the entry as a Redis hash with key 'BasicIndexEntry:{uuid}'. 126 + 127 + Args: 128 + redis: Redis connection to write to. 129 + """ 90 130 save_key = f'BasicIndexEntry:{self.uuid}' 91 131 # TODO figure out how to get linting to work correctly here 92 132 redis.hset( save_key, mapping = asdict( self ) ) 93 133 94 134 def _s3_env( credentials_path: str | Path ) -> dict[str, Any]: 95 - """TODO""" 135 + """Load S3 credentials from a .env file. 136 + 137 + Args: 138 + credentials_path: Path to .env file containing S3 credentials. 139 + 140 + Returns: 141 + Dictionary with AWS_ENDPOINT, AWS_ACCESS_KEY_ID, and AWS_SECRET_ACCESS_KEY. 142 + 143 + Raises: 144 + AssertionError: If required credentials are missing from the file. 145 + """ 96 146 ## 97 147 credentials_path = Path( credentials_path ) 98 148 env_values = dotenv_values( credentials_path ) 99 149 assert 'AWS_ENDPOINT' in env_values 100 150 assert 'AWS_ACCESS_KEY_ID' in env_values 101 151 assert 'AWS_SECRET_ACCESS_KEY' in env_values 102 - 152 + 103 153 return { 104 154 k: env_values[k] 105 155 for k in ( ··· 110 160 } 111 161 112 162 def _s3_from_credentials( creds: str | Path | dict ) -> S3FileSystem: 113 - """TODO""" 163 + """Create an S3FileSystem from credentials. 164 + 165 + Args: 166 + creds: Either a path to a .env file with credentials, or a dict 167 + containing AWS_ENDPOINT, AWS_ACCESS_KEY_ID, and AWS_SECRET_ACCESS_KEY. 168 + 169 + Returns: 170 + Configured S3FileSystem instance. 171 + """ 114 172 ## 115 173 if not isinstance( creds, dict ): 116 174 creds = _s3_env( creds ) 117 - 175 + 118 176 return S3FileSystem( 119 177 endpoint_url = creds['AWS_ENDPOINT'], 120 178 key = creds['AWS_ACCESS_KEY_ID'], ··· 126 184 # Classes 127 185 128 186 class Repo: 129 - """TODO""" 187 + """Repository for storing and managing atdata datasets. 188 + 189 + Provides storage of datasets in S3-compatible object storage with Redis-based 190 + indexing. Datasets are stored as WebDataset tar files with optional metadata. 191 + 192 + Attributes: 193 + s3_credentials: S3 credentials dictionary or None. 194 + bucket_fs: S3FileSystem instance or None. 195 + hive_path: Path within S3 bucket for storing datasets. 196 + hive_bucket: Name of the S3 bucket. 197 + index: Index instance for tracking datasets. 198 + """ 130 199 131 200 ## 132 201 ··· 139 208 # 140 209 **kwargs 141 210 ) -> None: 142 - """TODO""" 211 + """Initialize a repository. 212 + 213 + Args: 214 + s3_credentials: Path to .env file with S3 credentials, or dict with 215 + AWS_ENDPOINT, AWS_ACCESS_KEY_ID, and AWS_SECRET_ACCESS_KEY. 216 + If None, S3 functionality will be disabled. 217 + hive_path: Path within the S3 bucket to store datasets. 218 + Required if s3_credentials is provided. 219 + redis: Redis connection for indexing. If None, creates a new connection. 220 + **kwargs: Additional arguments (reserved for future use). 221 + 222 + Raises: 223 + ValueError: If hive_path is not provided when s3_credentials is set. 224 + """ 143 225 144 226 if s3_credentials is None: 145 227 self.s3_credentials = None ··· 174 256 # 175 257 **kwargs 176 258 ) -> tuple[BasicIndexEntry, Dataset[T]]: 177 - """TODO""" 259 + """Insert a dataset into the repository. 260 + 261 + Writes the dataset to S3 as WebDataset tar files, stores metadata, 262 + and creates an index entry in Redis. 263 + 264 + Args: 265 + ds: The dataset to insert. 266 + cache_local: If True, write to local temporary storage first, then 267 + copy to S3. This can be faster for some workloads. 268 + **kwargs: Additional arguments passed to wds.ShardWriter. 269 + 270 + Returns: 271 + A tuple of (index_entry, new_dataset) where: 272 + - index_entry: BasicIndexEntry for the stored dataset 273 + - new_dataset: Dataset object pointing to the stored copy 274 + 275 + Raises: 276 + AssertionError: If S3 credentials or hive_path are not configured. 277 + RuntimeError: If no shards were written. 278 + """ 178 279 179 280 assert self.s3_credentials is not None 180 281 assert self.hive_bucket is not None ··· 318 419 319 420 320 421 class Index: 321 - """TODO""" 422 + """Redis-backed index for tracking datasets in a repository. 423 + 424 + Maintains a registry of BasicIndexEntry objects in Redis, allowing 425 + enumeration and lookup of stored datasets. 426 + 427 + Attributes: 428 + _redis: Redis connection for index storage. 429 + """ 322 430 323 431 ## 324 432 ··· 326 434 redis: Redis | None = None, 327 435 **kwargs 328 436 ) -> None: 329 - """TODO""" 437 + """Initialize an index. 438 + 439 + Args: 440 + redis: Redis connection to use. If None, creates a new connection 441 + using the provided kwargs. 442 + **kwargs: Additional arguments passed to Redis() constructor if 443 + redis is None. 444 + """ 330 445 ## 331 446 332 447 if redis is not None: ··· 340 455 341 456 @property 342 457 def all_entries( self ) -> list[BasicIndexEntry]: 343 - """TODO""" 458 + """Get all index entries as a list. 459 + 460 + Returns: 461 + List of all BasicIndexEntry objects in the index. 462 + """ 344 463 return list( self.entries ) 345 464 346 465 @property 347 466 def entries( self ) -> Generator[BasicIndexEntry, None, None]: 348 - """TODO""" 467 + """Iterate over all index entries. 468 + 469 + Scans Redis for all BasicIndexEntry keys and yields them one at a time. 470 + 471 + Yields: 472 + BasicIndexEntry objects from the index. 473 + """ 349 474 ## 350 475 for key in self._redis.scan_iter( match = 'BasicIndexEntry:*' ): 351 476 # TODO typing issue for `redis` 352 477 cur_entry_data = _decode_bytes_dict( self._redis.hgetall( key ) ) 353 478 cur_entry = BasicIndexEntry( **cur_entry_data ) 354 479 yield cur_entry 355 - 480 + 356 481 return 357 482 358 483 def add_entry( self, ds: Dataset, 359 484 uuid: str | None = None, 360 485 ) -> BasicIndexEntry: 361 - """TODO""" 486 + """Add a dataset to the index. 487 + 488 + Creates a BasicIndexEntry for the dataset and persists it to Redis. 489 + 490 + Args: 491 + ds: The dataset to add to the index. 492 + uuid: Optional UUID for the entry. If None, a new UUID is generated. 493 + 494 + Returns: 495 + The created BasicIndexEntry object. 496 + """ 362 497 ## 363 498 temp_sample_kind = _kind_str_for_sample_type( ds.sample_type ) 364 499