A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 358 lines 13 kB view raw
1"""Protocol compliance tests for atdata abstractions. 2 3These tests verify that concrete implementations satisfy their protocol 4definitions, ensuring interoperability between local and atmosphere backends. 5""" 6 7from unittest.mock import Mock 8 9from atdata._protocols import ( 10 IndexEntry, 11) 12from atdata.local import LocalDatasetEntry, Index as LocalIndex, S3DataStore 13from atdata.atmosphere import AtmosphereIndex, AtmosphereIndexEntry 14 15 16class TestIndexEntryProtocol: 17 """Tests for IndexEntry protocol compliance.""" 18 19 def test_local_dataset_entry_is_index_entry(self): 20 """LocalDatasetEntry should satisfy IndexEntry protocol.""" 21 entry = LocalDatasetEntry( 22 name="test-dataset", 23 schema_ref="local://schemas/test@1.0.0", 24 data_urls=["s3://bucket/data.tar"], 25 metadata={"key": "value"}, 26 ) 27 28 # Protocol compliance via isinstance (runtime_checkable) 29 assert isinstance(entry, IndexEntry) 30 31 # Verify required properties exist and work 32 assert entry.name == "test-dataset" 33 assert entry.schema_ref == "local://schemas/test@1.0.0" 34 assert entry.data_urls == ["s3://bucket/data.tar"] 35 assert entry.metadata == {"key": "value"} 36 37 def test_atmosphere_index_entry_is_index_entry(self): 38 """AtmosphereIndexEntry should satisfy IndexEntry protocol.""" 39 record = { 40 "name": "atmo-dataset", 41 "schemaRef": "at://did:plc:test/schema/abc", 42 "storage": { 43 "$type": "ac.foundation.dataset.storageExternal", 44 "urls": ["s3://bucket/data.tar"], 45 }, 46 } 47 entry = AtmosphereIndexEntry("at://did:plc:test/record/xyz", record) 48 49 # Protocol compliance 50 assert isinstance(entry, IndexEntry) 51 52 # Verify properties 53 assert entry.name == "atmo-dataset" 54 assert entry.schema_ref == "at://did:plc:test/schema/abc" 55 assert entry.data_urls == ["s3://bucket/data.tar"] 56 57 def test_index_entry_with_none_metadata(self): 58 """IndexEntry should handle None metadata.""" 59 entry = LocalDatasetEntry( 60 name="no-meta", 61 schema_ref="local://schemas/test@1.0.0", 62 data_urls=["s3://bucket/data.tar"], 63 metadata=None, 64 ) 65 66 assert entry.metadata is None 67 68 69class TestAbstractIndexProtocol: 70 """Tests for AbstractIndex protocol compliance.""" 71 72 def test_local_index_has_required_methods(self): 73 """LocalIndex should have all AbstractIndex methods.""" 74 # Can't use isinstance with non-runtime_checkable Protocol 75 # So we verify methods exist 76 index = LocalIndex() 77 78 assert hasattr(index, "insert_dataset") 79 assert hasattr(index, "get_dataset") 80 assert hasattr(index, "list_datasets") 81 assert hasattr(index, "publish_schema") 82 assert hasattr(index, "get_schema") 83 assert hasattr(index, "list_schemas") 84 assert hasattr(index, "decode_schema") 85 86 # Verify methods are callable 87 assert callable(index.insert_dataset) 88 assert callable(index.get_dataset) 89 assert callable(index.list_datasets) 90 assert callable(index.publish_schema) 91 assert callable(index.get_schema) 92 assert callable(index.list_schemas) 93 assert callable(index.decode_schema) 94 95 def test_atmosphere_index_has_required_methods(self): 96 """AtmosphereIndex should have all AbstractIndex methods.""" 97 mock_client = Mock() 98 mock_client.did = "did:plc:test" 99 index = AtmosphereIndex(mock_client) 100 101 assert hasattr(index, "insert_dataset") 102 assert hasattr(index, "get_dataset") 103 assert hasattr(index, "list_datasets") 104 assert hasattr(index, "publish_schema") 105 assert hasattr(index, "get_schema") 106 assert hasattr(index, "list_schemas") 107 assert hasattr(index, "decode_schema") 108 109 assert callable(index.insert_dataset) 110 assert callable(index.get_dataset) 111 assert callable(index.list_datasets) 112 assert callable(index.publish_schema) 113 assert callable(index.get_schema) 114 assert callable(index.list_schemas) 115 assert callable(index.decode_schema) 116 117 118class TestAbstractDataStoreProtocol: 119 """Tests for AbstractDataStore protocol compliance.""" 120 121 def test_s3_datastore_has_required_methods(self): 122 """S3DataStore should have all AbstractDataStore methods.""" 123 # Create with mock credentials 124 mock_creds = { 125 "AWS_ENDPOINT": "http://localhost:9000", 126 "AWS_ACCESS_KEY_ID": "test", 127 "AWS_SECRET_ACCESS_KEY": "test", 128 } 129 130 store = S3DataStore(mock_creds, bucket="test-bucket") 131 132 assert hasattr(store, "write_shards") 133 assert hasattr(store, "read_url") 134 assert hasattr(store, "supports_streaming") 135 136 assert callable(store.write_shards) 137 assert callable(store.read_url) 138 assert callable(store.supports_streaming) 139 140 def test_s3_datastore_supports_streaming(self): 141 """S3DataStore should report streaming support.""" 142 mock_creds = { 143 "AWS_ENDPOINT": "http://localhost:9000", 144 "AWS_ACCESS_KEY_ID": "test", 145 "AWS_SECRET_ACCESS_KEY": "test", 146 } 147 148 store = S3DataStore(mock_creds, bucket="test-bucket") 149 assert store.supports_streaming() is True 150 151 def test_s3_datastore_read_url_passthrough(self): 152 """S3DataStore.read_url should return URL unchanged without custom endpoint.""" 153 mock_creds = { 154 "AWS_ACCESS_KEY_ID": "test", 155 "AWS_SECRET_ACCESS_KEY": "test", 156 } 157 158 store = S3DataStore(mock_creds, bucket="test-bucket") 159 url = "s3://bucket/path/data.tar" 160 assert store.read_url(url) == url 161 162 def test_s3_datastore_read_url_transforms_with_endpoint(self): 163 """S3DataStore.read_url should transform s3:// to https:// with custom endpoint.""" 164 mock_creds = { 165 "AWS_ENDPOINT": "http://localhost:9000", 166 "AWS_ACCESS_KEY_ID": "test", 167 "AWS_SECRET_ACCESS_KEY": "test", 168 } 169 170 store = S3DataStore(mock_creds, bucket="test-bucket") 171 url = "s3://bucket/path/data.tar" 172 # URL should be transformed to use the custom endpoint 173 assert store.read_url(url) == "http://localhost:9000/bucket/path/data.tar" 174 175 176class TestProtocolInteroperability: 177 """Tests verifying different implementations can be used interchangeably.""" 178 179 def test_function_accepts_any_index_entry(self): 180 """Functions typed with IndexEntry should accept any implementation.""" 181 182 def get_dataset_name(entry: IndexEntry) -> str: 183 return entry.name 184 185 # LocalDatasetEntry 186 local_entry = LocalDatasetEntry( 187 name="local-data", 188 schema_ref="local://schemas/test@1.0.0", 189 data_urls=["s3://bucket/data.tar"], 190 ) 191 assert get_dataset_name(local_entry) == "local-data" 192 193 # AtmosphereIndexEntry 194 atmo_entry = AtmosphereIndexEntry( 195 "at://did:plc:test/record/xyz", 196 {"name": "atmo-data", "schemaRef": "at://schema", "storage": {}}, 197 ) 198 assert get_dataset_name(atmo_entry) == "atmo-data" 199 200 def test_function_accepts_any_index(self): 201 """Functions typed with AbstractIndex should accept any implementation.""" 202 203 def count_datasets(index) -> int: 204 """Count datasets in an index.""" 205 return sum(1 for _ in index.list_datasets()) 206 207 # LocalIndex with mock redis 208 local_index = LocalIndex() 209 # Empty index returns 0 210 assert count_datasets(local_index) == 0 211 212 def test_index_entry_properties_consistent(self): 213 """All IndexEntry implementations should have consistent property types.""" 214 local_entry = LocalDatasetEntry( 215 name="test", 216 schema_ref="local://schemas/test@1.0.0", 217 data_urls=["url1", "url2"], 218 metadata={"k": "v"}, 219 ) 220 221 atmo_entry = AtmosphereIndexEntry( 222 "at://test", 223 { 224 "name": "test", 225 "schemaRef": "at://schema", 226 "storage": { 227 "$type": "ac.foundation.dataset.storageExternal", 228 "urls": ["url1", "url2"], 229 }, 230 }, 231 ) 232 233 # Both should return str for name 234 assert isinstance(local_entry.name, str) 235 assert isinstance(atmo_entry.name, str) 236 237 # Both should return str for schema_ref 238 assert isinstance(local_entry.schema_ref, str) 239 assert isinstance(atmo_entry.schema_ref, str) 240 241 # Both should return list[str] for data_urls 242 assert isinstance(local_entry.data_urls, list) 243 assert isinstance(atmo_entry.data_urls, list) 244 assert all(isinstance(u, str) for u in local_entry.data_urls) 245 assert all(isinstance(u, str) for u in atmo_entry.data_urls) 246 247 248class TestPolymorphicBehavior: 249 """Tests that verify actual polymorphic usage patterns work correctly.""" 250 251 def test_process_entries_polymorphically(self): 252 """Process a mixed list of IndexEntry implementations uniformly.""" 253 entries: list[IndexEntry] = [ 254 LocalDatasetEntry( 255 name="local-1", 256 schema_ref="local://schemas/A@1.0.0", 257 data_urls=["s3://bucket/local1.tar"], 258 metadata={"source": "local"}, 259 ), 260 AtmosphereIndexEntry( 261 "at://did:plc:test/record/1", 262 { 263 "name": "atmo-1", 264 "schemaRef": "at://did:plc:test/schema/A", 265 "storage": { 266 "$type": "ac.foundation.dataset.storageExternal", 267 "urls": ["s3://bucket/atmo1.tar"], 268 }, 269 }, 270 ), 271 LocalDatasetEntry( 272 name="local-2", 273 schema_ref="local://schemas/B@1.0.0", 274 data_urls=["s3://bucket/local2.tar", "s3://bucket/local2-001.tar"], 275 ), 276 ] 277 278 # Extract all names uniformly 279 names = [e.name for e in entries] 280 assert names == ["local-1", "atmo-1", "local-2"] 281 282 # Extract all schema refs 283 schema_refs = [e.schema_ref for e in entries] 284 assert schema_refs == [ 285 "local://schemas/A@1.0.0", 286 "at://did:plc:test/schema/A", 287 "local://schemas/B@1.0.0", 288 ] 289 290 # Count total shards across all entries 291 total_urls = sum(len(e.data_urls) for e in entries) 292 assert total_urls == 4 293 294 # Filter by metadata presence 295 with_metadata = [e for e in entries if e.metadata is not None] 296 assert len(with_metadata) == 1 297 assert with_metadata[0].name == "local-1" 298 299 def test_index_entry_in_dict_key(self): 300 """IndexEntry.name can be used to build lookup structures.""" 301 entries: list[IndexEntry] = [ 302 LocalDatasetEntry( 303 name="dataset-a", 304 schema_ref="local://schemas/A@1.0.0", 305 data_urls=["url1"], 306 ), 307 AtmosphereIndexEntry( 308 "at://test", 309 { 310 "name": "dataset-b", 311 "schemaRef": "at://schema", 312 "storage": { 313 "$type": "ac.foundation.dataset.storageExternal", 314 "urls": ["url2"], 315 }, 316 }, 317 ), 318 ] 319 320 # Build a lookup by name 321 by_name: dict[str, IndexEntry] = {e.name: e for e in entries} 322 323 assert "dataset-a" in by_name 324 assert "dataset-b" in by_name 325 assert by_name["dataset-a"].data_urls == ["url1"] 326 assert by_name["dataset-b"].data_urls == ["url2"] 327 328 def test_generic_url_collector(self): 329 """A generic function can collect URLs from any IndexEntry.""" 330 331 def collect_all_urls(entries: list[IndexEntry]) -> list[str]: 332 """Collect all data URLs from a list of entries.""" 333 all_urls = [] 334 for entry in entries: 335 all_urls.extend(entry.data_urls) 336 return all_urls 337 338 mixed_entries: list[IndexEntry] = [ 339 LocalDatasetEntry( 340 name="ds1", 341 schema_ref="local://test@1.0.0", 342 data_urls=["s3://a/1.tar", "s3://a/2.tar"], 343 ), 344 AtmosphereIndexEntry( 345 "at://x", 346 { 347 "name": "ds2", 348 "schemaRef": "at://s", 349 "storage": { 350 "$type": "ac.foundation.dataset.storageExternal", 351 "urls": ["s3://b/1.tar"], 352 }, 353 }, 354 ), 355 ] 356 357 urls = collect_all_urls(mixed_entries) 358 assert urls == ["s3://a/1.tar", "s3://a/2.tar", "s3://b/1.tar"]