A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Additional feature work on local redis ipml

+251 -1
+4 -1
.vscode/settings.json
··· 2 2 "cSpell.words": [ 3 3 "atdata", 4 4 "getattr", 5 + "hgetall", 5 6 "msgpack", 6 7 "pypi", 7 8 "pyproject", 8 - "pytest" 9 + "pytest", 10 + "schemamodels", 11 + "unpackb" 9 12 ] 10 13 }
+4
pyproject.toml
··· 13 13 "numpy>=2.3.4", 14 14 "ormsgpack>=1.11.0", 15 15 "pandas>=2.3.3", 16 + "pydantic>=2.12.5", 17 + "redis-om>=0.3.5", 18 + "schemamodels>=0.9.1", 16 19 "tqdm>=4.67.1", 17 20 "webdataset>=1.0.2", 18 21 ] ··· 29 32 30 33 [dependency-groups] 31 34 dev = [ 35 + "jupyter>=1.1.1", 32 36 "pytest>=8.4.2", 33 37 "pytest-cov>=7.0.0", 34 38 ]
+101
src/atdata/_local_dev.py
··· 1 + """TODO""" 2 + 3 + ## 4 + # Imports 5 + 6 + from atdata.dataset import ( 7 + Dataset, 8 + ) 9 + 10 + from uuid import uuid4 11 + 12 + from redis_om import ( 13 + EmbeddedJsonModel, 14 + JsonModel, 15 + Field, 16 + Migrator, 17 + ) 18 + 19 + from typing import ( 20 + Any, 21 + Optional, 22 + Dict, 23 + ) 24 + 25 + 26 + ## 27 + # Redis object model 28 + 29 + class SampleSchema( EmbeddedJsonModel ): 30 + """TODO""" 31 + identifier: str = Field( index = True ) 32 + json_schema: Dict[str, Any] 33 + 34 + class MetadataSchema( EmbeddedJsonModel ): 35 + """TODO""" 36 + identifier: str = Field( index = True ) 37 + json_schema: Dict[str, Any] 38 + 39 + class IndexEntry( JsonModel ): 40 + """TODO""" 41 + wds_url: str 42 + sample_schema: SampleSchema 43 + metadata_schema: Optional[MetadataSchema] = None 44 + 45 + 46 + ## 47 + # Classes 48 + 49 + class Index: 50 + """TODO""" 51 + 52 + ## 53 + 54 + def __init__(self) -> None: 55 + """TODO""" 56 + ## 57 + 58 + # ... 59 + 60 + # Needed before we can do anything with redis-om queries 61 + Migrator().run() 62 + 63 + def list( self ): 64 + """TODO""" 65 + ## 66 + all_entries = IndexEntry.find().all() 67 + return all_entries 68 + 69 + def add( self, ds: Dataset ) -> IndexEntry: 70 + """TODO""" 71 + ## 72 + test_schema = { 73 + "$id": "https://schema.dev/fake-schema.schema.json", 74 + "$schema": "http://json-schema.org/draft-07/schema#", 75 + "title": "fake-schema", 76 + "description": "Blue Blah", 77 + "type": "object", 78 + "properties": { 79 + "property_a": { 80 + "default": 5, 81 + "type": "integer" 82 + }, 83 + "property_b": { 84 + "type": "string" 85 + } 86 + } 87 + } 88 + 89 + test_schema_entry = SampleSchema( 90 + identifier = str( uuid4() ), 91 + json_schema = test_schema 92 + ) 93 + 94 + new_index_entry = IndexEntry( 95 + wds_url = ds.url, 96 + sample_schema = test_schema_entry, 97 + ) 98 + 99 + return new_index_entry.save() 100 + 101 + #
+27
src/atdata/dataset.py
··· 48 48 from tqdm import tqdm 49 49 import numpy as np 50 50 import pandas as pd 51 + import requests 51 52 52 53 import typing 53 54 from typing import ( ··· 456 457 ... 457 458 >>> # Transform to a different view 458 459 >>> ds_view = ds.as_type(MyDataView) 460 + 461 + TODO Expand this to show information on the `metadata_url` field 459 462 """ 460 463 461 464 # sample_class: Type = get_parameters( ) ··· 501 504 """ 502 505 super().__init__() 503 506 self.url = url 507 + """WebDataset brace-notation URL pointing to tar files, e.g., 508 + ``"path/to/file-{000000..000009}.tar"`` for multiple shards or 509 + ``"path/to/file-000000.tar"`` for a single shard. 510 + """ 511 + 512 + self._metadata: dict[str, Any] | None = None 513 + 514 + self.metadata_url: str | None = None 515 + """TODO""" 504 516 505 517 # Allow addition of automatic transformation of raw underlying data 506 518 self._output_lens: Lens | None = None ··· 557 569 wds.filters.map( lambda x: x['url'] ) 558 570 ) 559 571 return list( pipe ) 572 + 573 + @property 574 + def metadata( self ) -> dict[str, Any] | None: 575 + """TODO""" 576 + 577 + if self.metadata_url is None: 578 + return None 579 + 580 + if self._metadata is None: 581 + with requests.get( self.metadata_url, stream = True ) as response: 582 + response.raise_for_status() 583 + self._metadata = msgpack.unpackb( response.content, raw = False ) 584 + 585 + # Use our cached values 586 + return self._metadata 560 587 561 588 def ordered( self, 562 589 batch_size: int | None = 1,
+115
src/atdata/local.py
··· 1 + """TODO""" 2 + 3 + ## 4 + # Imports 5 + 6 + from atdata import ( 7 + PackableSample, 8 + Dataset, 9 + ) 10 + 11 + from uuid import uuid4 12 + 13 + # from redis_om import ( 14 + # EmbeddedJsonModel, 15 + # JsonModel, 16 + # Field, 17 + # Migrator, 18 + # get_redis_connection, 19 + # ) 20 + from redis import ( 21 + Redis, 22 + ) 23 + 24 + from dataclasses import ( 25 + dataclass, 26 + asdict, 27 + field, 28 + ) 29 + from typing import ( 30 + Any, 31 + Optional, 32 + Dict, 33 + Type, 34 + ) 35 + 36 + 37 + ## 38 + # Heplers 39 + 40 + def _kind_str_for_sample_type( st: Type[PackableSample] ) -> str: 41 + """TODO""" 42 + return f'{st.__module__}.{st.__name__}' 43 + 44 + 45 + ## 46 + # Redis object model 47 + 48 + @dataclass 49 + class BasicIndexEntry: 50 + """TODO""" 51 + ## 52 + 53 + wds_url: str 54 + """TODO""" 55 + sample_kind: str 56 + """TODO""" 57 + 58 + metadata_url: str | None 59 + """TODO""" 60 + 61 + uuid: str | None = field( default_factory = lambda: str( uuid4() ) ) 62 + """TODO""" 63 + 64 + def save_to( self, redis: Redis ): 65 + """TODO""" 66 + save_key = f'BasicIndexEntry:{self.uuid}' 67 + # TODO figure out how to get linting to work correctly here 68 + redis.hset( save_key, mapping = asdict( self ) ) 69 + 70 + 71 + ## 72 + # Classes 73 + 74 + class Index: 75 + """TODO""" 76 + 77 + ## 78 + 79 + def __init__( self, 80 + redis: Redis | None = None, 81 + **kwargs 82 + ) -> None: 83 + """TODO""" 84 + ## 85 + 86 + if redis is not None: 87 + self._redis = redis 88 + else: 89 + self._redis = Redis() 90 + 91 + # needed before we can do anything with `redis` 92 + # TODO this only works / is necessary for `redis_om`` 93 + # Migrator().run() 94 + 95 + def list( self ): 96 + """TODO""" 97 + ## 98 + ret = [] 99 + for key in self._redis.scan_iter( match = 'BasicIndexEntry:*' ): 100 + ret.append( self._redis.hgetall( key ) ) 101 + return ret 102 + 103 + def add( self, ds: Dataset ) -> BasicIndexEntry: 104 + """TODO""" 105 + ## 106 + temp_sample_kind = _kind_str_for_sample_type( ds.sample_type ) 107 + 108 + ret_data = BasicIndexEntry( 109 + wds_url = ds.url, 110 + sample_kind = temp_sample_kind, 111 + metadata_url = ds.metadata_url, 112 + ) 113 + ret_data.save_to( self._redis ) 114 + 115 + return ret_data