A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

Initial copy-over

+555
+7
.gitignore
··· 1 + ## Custom 2 + 3 + # Don't commit `uv` lockfiles 4 + **/uv.lock 5 + 6 + ## 7 + 1 8 # Byte-compiled / optimized / DLL files 2 9 __pycache__/ 3 10 *.py[codz]
+1
.python-version
··· 1 + 3.12
+28
pyproject.toml
··· 1 + [project] 2 + name = "ekumen" 3 + version = "0.1.0" 4 + description = "Add your description here" 5 + readme = "README.md" 6 + authors = [ 7 + { name = "Maxine Levesque", email = "hello@maxine.science" } 8 + ] 9 + requires-python = ">=3.12" 10 + dependencies = [ 11 + "msgpack>=1.1.2", 12 + "numpy>=2.3.4", 13 + "ormsgpack>=1.11.0", 14 + "webdataset>=1.0.2", 15 + ] 16 + 17 + [project.scripts] 18 + ekumen = "ekumen:main" 19 + 20 + [build-system] 21 + requires = ["hatchling"] 22 + build-backend = "hatchling.build" 23 + 24 + [dependency-groups] 25 + dev = [ 26 + "pytest>=8.4.2", 27 + "pytest-cov>=7.0.0", 28 + ]
+2
src/ekumen/__init__.py
··· 1 + def main() -> None: 2 + print("Hello from ekumen!")
+30
src/ekumen/_helpers.py
··· 1 + """Assorted helper methods for `ekumen`""" 2 + 3 + ## 4 + # Imports 5 + 6 + from io import BytesIO 7 + import ormsgpack as omp 8 + 9 + import numpy as np 10 + 11 + 12 + ## 13 + # 14 + 15 + def pack_instance( x ) -> bytes: 16 + return omp.packb( x ) 17 + 18 + def unpack( bs: bytes ): 19 + return omp.unpackb( bs ) 20 + 21 + ## 22 + 23 + def array_to_bytes(x: np.ndarray) -> bytes: 24 + np_bytes = BytesIO() 25 + np.save(np_bytes, x, allow_pickle=True) 26 + return np_bytes.getvalue() 27 + 28 + def bytes_to_array(b: bytes) -> np.ndarray: 29 + np_bytes = BytesIO(b) 30 + return np.load(np_bytes, allow_pickle=True)
+418
src/ekumen/dataset.py
··· 1 + """Schematized WebDatasets""" 2 + 3 + ## 4 + # Imports 5 + 6 + import webdataset as wds 7 + 8 + from dataclasses import dataclass 9 + import uuid 10 + 11 + import numpy as np 12 + 13 + from abc import ( 14 + ABC, 15 + abstractmethod, 16 + ) 17 + from typing import ( 18 + Any, 19 + Optional, 20 + Dict, 21 + Sequence, 22 + # 23 + Self, 24 + Generic, 25 + Type, 26 + TypeVar, 27 + TypeAlias, 28 + ) 29 + # from typing_inspect import get_bound, get_parameters 30 + from numpy.typing import ( 31 + NDArray, 32 + ArrayLike, 33 + ) 34 + 35 + # 36 + 37 + # import ekumen.atmosphere as eat 38 + 39 + import msgpack 40 + import ormsgpack 41 + from . import _helpers as eh 42 + 43 + 44 + ## 45 + # Typing help 46 + 47 + WDSRawSample: TypeAlias = Dict[str, Any] 48 + WDSRawBatch: TypeAlias = Dict[str, Any] 49 + 50 + 51 + ## 52 + # Main base classes 53 + 54 + # TODO Check for best way to ensure this typevar is used as a dataclass type 55 + # DT = TypeVar( 'DT', bound = dataclass.__class__ ) 56 + DT = TypeVar( 'DT' ) 57 + 58 + MsgpackRawSample: TypeAlias = Dict[str, Any] 59 + 60 + @dataclass 61 + class ArrayBytes: 62 + """Annotates bytes that should be interpreted as the raw contents of a 63 + numpy NDArray""" 64 + 65 + raw_bytes: bytes 66 + """The raw bytes of the corresponding NDArray""" 67 + 68 + def __init__( self, 69 + array: Optional[ArrayLike] = None, 70 + raw: Optional[bytes] = None, 71 + ): 72 + """TODO""" 73 + 74 + if array is not None: 75 + array = np.array( array ) 76 + self.raw_bytes = eh.array_to_bytes( array ) 77 + 78 + elif raw is not None: 79 + self.raw_bytes = raw 80 + 81 + else: 82 + raise ValueError( 'Must provide either `array` or `raw` bytes' ) 83 + 84 + @property 85 + def to_numpy( self ) -> NDArray: 86 + """Return the `raw_bytes` data as an NDArray""" 87 + return eh.bytes_to_array( self.raw_bytes ) 88 + 89 + def _make_packable( x ): 90 + if isinstance( x, ArrayBytes ): 91 + return x.raw_bytes 92 + if isinstance( x, np.ndarray ): 93 + return eh.array_to_bytes( x ) 94 + return x 95 + 96 + class PackableSample( ABC ): 97 + """A sample that can be packed and unpacked with msgpack""" 98 + 99 + def __post_init__( self ): 100 + 101 + # Auto-convert known types when annotated 102 + for var_name, var_type in vars( self.__class__ )['__annotations__'].items(): 103 + 104 + # Annotation for this variable is to be an NDArray 105 + if var_type == NDArray: 106 + # ... so, we'll always auto-convert to numpy 107 + 108 + var_cur_value = getattr( self, var_name ) 109 + 110 + # Execute the appropriate conversion for intermediate data 111 + # based on what is provided 112 + 113 + if isinstance( var_cur_value, np.ndarray ): 114 + # we're good! 115 + pass 116 + 117 + elif isinstance( var_cur_value, ArrayBytes ): 118 + setattr( self, var_name, var_cur_value.to_numpy ) 119 + 120 + elif isinstance( var_cur_value, bytes ): 121 + setattr( self, var_name, eh.bytes_to_array( var_cur_value ) ) 122 + 123 + ## 124 + 125 + @classmethod 126 + def from_data( cls, data: MsgpackRawSample ) -> Self: 127 + """Create a sample instance from unpacked msgpack data""" 128 + return cls( **data ) 129 + 130 + @classmethod 131 + def from_bytes( cls, bs: bytes ) -> Self: 132 + """Create a sample instance from raw msgpack bytes""" 133 + return cls.from_data( ormsgpack.unpackb( bs ) ) 134 + 135 + @property 136 + def packed( self ) -> bytes: 137 + """Pack this sample's data into msgpack bytes""" 138 + 139 + # Make sure that all of our (possibly unpackable) data is in a packable 140 + # format 141 + o = { 142 + k: _make_packable( v ) 143 + for k, v in vars( self ).items() 144 + } 145 + 146 + ret = msgpack.packb( o ) 147 + 148 + if ret is None: 149 + raise RuntimeError( f'Failed to pack sample to bytes: {o}' ) 150 + 151 + return ret 152 + 153 + # TODO Expand to allow for specifying explicit __key__ 154 + @property 155 + def as_wds( self ) -> WDSRawSample: 156 + """Pack this sample's data for writing to webdataset""" 157 + return { 158 + # Generates a UUID that is timelike-sortable 159 + '__key__': str( uuid.uuid1( 0, 0 ) ), 160 + 'msgpack': self.packed, 161 + } 162 + 163 + def _batch_aggregate( xs: Sequence ): 164 + 165 + if not xs: 166 + # Empty sequence 167 + return [] 168 + 169 + # Aggregate 170 + if isinstance( xs[0], np.ndarray ): 171 + return np.array( list( xs ) ) 172 + 173 + return list( xs ) 174 + 175 + class SamlpeBatch( Generic[DT] ): 176 + 177 + def __init__( self, samples: Sequence[DT] ): 178 + """TODO""" 179 + self.samples = list( samples ) 180 + self._aggregate_cache = dict() 181 + 182 + @property 183 + def sample_type( self ) -> Type: 184 + """The type of each sample in this batch""" 185 + return self.__orig_class__.__args__[0] 186 + 187 + def __getattr__( self, name ): 188 + # Aggregate named params of sample type 189 + if name in vars( self.sample_type )['__annotations__']: 190 + if name not in self._aggregate_cache: 191 + self._aggregate_cache[name] = _batch_aggregate( 192 + [ getattr( x, name ) 193 + for x in self.samples ] 194 + ) 195 + 196 + return self._aggregate_cache[name] 197 + 198 + raise AttributeError( f'No sample attribute named {name}' ) 199 + 200 + 201 + # class AnySample( BaseModel ): 202 + # """A sample that can hold anything""" 203 + # value: Any 204 + 205 + # class AnyBatch( BaseModel ): 206 + # """A batch of `AnySample`s""" 207 + # values: list[AnySample] 208 + 209 + 210 + ST = TypeVar( 'ST', bound = PackableSample ) 211 + # BT = TypeVar( 'BT' ) 212 + 213 + # TODO For python 3.13 214 + # BT = TypeVar( 'BT', default = None ) 215 + # IT = TypeVar( 'IT', default = Any ) 216 + 217 + class Dataset( Generic[ST] ): 218 + """A dataset that ingests and formats raw samples from a WebDataset 219 + 220 + (Abstract base for subclassing) 221 + """ 222 + 223 + # sample_class: Type = get_parameters( ) 224 + # """The type of each returned sample from this `Dataset`'s iterator""" 225 + # batch_class: Type = get_bound( BT ) 226 + # """The type of a batch built from `sample_class`""" 227 + 228 + @property 229 + def sample_type( self ) -> Type: 230 + """The type of each returned sample from this `Dataset`'s iterator""" 231 + return self.__orig_class__.__args__[0] 232 + @property 233 + def batch_type( self ) -> Type: 234 + """The type of a batch built from `sample_class`""" 235 + # return self.__orig_class__.__args__[1] 236 + return SamlpeBatch[self.sample_type] 237 + 238 + 239 + # _schema_registry_sample: dict[str, Type] 240 + # _schema_registry_batch: dict[str, Type | None] 241 + 242 + # 243 + 244 + def __init__( self, url: str ) -> None: 245 + """TODO""" 246 + super().__init__() 247 + self.url = url 248 + 249 + # @classmethod 250 + # def register( cls, uri: str, 251 + # sample_class: Type, 252 + # batch_class: Optional[Type] = None, 253 + # ): 254 + # """Register an `ekumen` schema to use a particular dataset sample class""" 255 + # cls._schema_registry_sample[uri] = sample_class 256 + # cls._schema_registry_batch[uri] = batch_class 257 + 258 + # @classmethod 259 + # def at( cls, uri: str ) -> 'Dataset': 260 + # """Create a Dataset for the `ekumen` index entry at `uri`""" 261 + # client = eat.Client() 262 + # return cls( ) 263 + 264 + # Common functionality 265 + 266 + @property 267 + def shard_list( self ) -> list[str]: 268 + """List of individual dataset shards 269 + 270 + Returns: 271 + A full (non-lazy) list of the individual ``tar`` files within the 272 + source WebDataset. 273 + """ 274 + pipe = wds.DataPipeline( 275 + wds.SimpleShardList( self.url ), 276 + wds.map( lambda x: x['url'] ) 277 + ) 278 + return list( pipe ) 279 + 280 + def ordered( self, 281 + batch_size: int | None = 1, 282 + ) -> wds.DataPipeline: 283 + """Iterate over the dataset in order 284 + 285 + Args: 286 + batch_size (:obj:`int`, optional): The size of iterated batches. 287 + Default: 1. If ``None``, iterates over one sample at a time 288 + with no batch dimension. 289 + 290 + Returns: 291 + :obj:`webdataset.DataPipeline` A data pipeline that iterates over 292 + the dataset in its original sample order 293 + 294 + """ 295 + 296 + if batch_size is None: 297 + # TODO Duplication here 298 + return wds.DataPipeline( 299 + wds.SimpleShardList( self.url ), 300 + wds.split_by_worker, 301 + # 302 + wds.tarfile_to_samples(), 303 + # wds.map( self.preprocess ), 304 + wds.map( self.wrap ), 305 + ) 306 + 307 + return wds.DataPipeline( 308 + wds.SimpleShardList( self.url ), 309 + wds.split_by_worker, 310 + # 311 + wds.tarfile_to_samples(), 312 + # wds.map( self.preprocess ), 313 + wds.batched( batch_size ), 314 + wds.map( self.wrap_batch ), 315 + ) 316 + 317 + def shuffled( self, 318 + buffer_shards: int = 100, 319 + buffer_samples: int = 10_000, 320 + batch_size: int | None = 1, 321 + ) -> wds.DataPipeline: 322 + """Iterate over the dataset in random order 323 + 324 + Args: 325 + buffer_shards (int): Asdf 326 + batch_size (:obj:`int`, optional) The size of iterated batches. 327 + Default: 1. If ``None``, iterates over one sample at a time 328 + with no batch dimension. 329 + 330 + Returns: 331 + :obj:`webdataset.DataPipeline` A data pipeline that iterates over 332 + the dataset in its original sample order 333 + 334 + """ 335 + 336 + if batch_size is None: 337 + # TODO Duplication here 338 + return wds.DataPipeline( 339 + wds.SimpleShardList( self.url ), 340 + wds.shuffle( buffer_shards ), 341 + wds.split_by_worker, 342 + # 343 + wds.tarfile_to_samples(), 344 + # wds.shuffle( buffer_samples ), 345 + # wds.map( self.preprocess ), 346 + wds.shuffle( buffer_samples ), 347 + wds.map( self.wrap ), 348 + ) 349 + 350 + return wds.DataPipeline( 351 + wds.SimpleShardList( self.url ), 352 + wds.shuffle( buffer_shards ), 353 + wds.split_by_worker, 354 + # 355 + wds.tarfile_to_samples(), 356 + # wds.shuffle( buffer_samples ), 357 + # wds.map( self.preprocess ), 358 + wds.shuffle( buffer_samples ), 359 + wds.batched( batch_size ), 360 + wds.map( self.wrap_batch ), 361 + ) 362 + 363 + # Implemented by specific subclasses 364 + 365 + # @property 366 + # @abstractmethod 367 + # def url( self ) -> str: 368 + # """str: Brace-notation URL of the underlying full WebDataset""" 369 + # pass 370 + 371 + # @classmethod 372 + # # TODO replace Any with IT 373 + # def preprocess( cls, sample: WDSRawSample ) -> Any: 374 + # """Pre-built preprocessor for a raw `sample` from the given dataset""" 375 + # return sample 376 + 377 + # @classmethod 378 + # TODO replace Any with IT 379 + def wrap( self, sample: MsgpackRawSample ) -> ST: 380 + """Wrap a `sample` into the appropriate dataset-specific type""" 381 + assert 'msgpack' in sample 382 + assert type( sample['msgpack'] ) == bytes 383 + 384 + return self.sample_type.from_bytes( sample['msgpack'] ) 385 + 386 + try: 387 + assert type( sample ) == dict 388 + return cls.sample_class( **{ 389 + k: v 390 + for k, v in sample.items() if k != '__key__' 391 + } ) 392 + 393 + except Exception as e: 394 + # Sample constructor failed -- revert to default 395 + return AnySample( 396 + value = sample, 397 + ) 398 + 399 + def wrap_batch( self, batch: WDSRawBatch ) -> SamlpeBatch[ST]: 400 + """Wrap a `batch` of samples into the appropriate dataset-specific type 401 + 402 + This default implementation simply creates a list one sample at a time 403 + """ 404 + 405 + assert 'msgpack' in batch 406 + batch_unpacked = [ self.sample_type.from_bytes( bs ) 407 + for bs in batch['msgpack'] ] 408 + return SamlpeBatch[self.sample_type]( batch_unpacked ) 409 + 410 + 411 + # # @classmethod 412 + # def wrap_batch( self, batch: WDSRawBatch ) -> BT: 413 + # """Wrap a `batch` of samples into the appropriate dataset-specific type 414 + 415 + # This default implementation simply creates a list one sample at a time 416 + # """ 417 + # assert cls.batch_class is not None, 'No batch class specified' 418 + # return cls.batch_class( **batch )
+69
tests/test_dataset.py
··· 1 + """Test dataaset functionality.""" 2 + 3 + ## 4 + 5 + import pytest 6 + 7 + from dataclasses import dataclass 8 + 9 + import numpy as np 10 + 11 + from numpy.typing import NDArray 12 + from typing import ( 13 + Type, 14 + Any, 15 + ) 16 + 17 + import ekumen.dataset as ekd 18 + 19 + 20 + ## Sample test cases 21 + 22 + @dataclass 23 + class BasicTestSample( ekd.PackableSample ): 24 + name: str 25 + position: int 26 + value: float 27 + 28 + @dataclass 29 + class NumpyTestSample( ekd.PackableSample ): 30 + label: int 31 + image: NDArray 32 + 33 + test_sample_classes = [ 34 + ( 35 + BasicTestSample, { 36 + 'name': 'Hello, world!', 37 + 'position': 42, 38 + 'value': 1024.768, 39 + } 40 + ), 41 + ( 42 + NumpyTestSample, { 43 + 'label': 9_001, 44 + 'image': np.random.randn( 1024, 1024 ), 45 + } 46 + ) 47 + ] 48 + 49 + 50 + ## Tests 51 + 52 + @pytest.mark.parametrize( ('SampleType', 'sample_data'), test_sample_classes ) 53 + def test_create_sample( 54 + SampleType: Type[ekd.PackableSample], 55 + sample_data: ekd.MsgpackRawSample, 56 + ): 57 + """ 58 + Test our ability to create samples from semi-structured data 59 + """ 60 + sample = SampleType.from_data( sample_data ) 61 + assert isinstance( sample, SampleType ), f'Did not properly form sample for test type {SampleType}' 62 + 63 + for k, v in sample_data.items(): 64 + cur_assertion: bool 65 + if isinstance( v, np.ndarray ): 66 + cur_assertion = np.all( getattr( sample, k ) == v ) == True 67 + else: 68 + cur_assertion = getattr( sample, k ) == v 69 + assert cur_assertion, f'Did not properly incorporate property {k} of test type {SampleType}'