A loose federation of distributed, typed datasets
1"""Protocol compliance tests for atdata abstractions.
2
3These tests verify that concrete implementations satisfy their protocol
4definitions, ensuring interoperability between local and atmosphere backends.
5"""
6
7from unittest.mock import Mock
8
9from atdata._protocols import (
10 IndexEntry,
11)
12from atdata.local import LocalDatasetEntry, Index as LocalIndex, S3DataStore
13from atdata.atmosphere import AtmosphereIndex, AtmosphereIndexEntry
14
15
16class TestIndexEntryProtocol:
17 """Tests for IndexEntry protocol compliance."""
18
19 def test_local_dataset_entry_is_index_entry(self):
20 """LocalDatasetEntry should satisfy IndexEntry protocol."""
21 entry = LocalDatasetEntry(
22 name="test-dataset",
23 schema_ref="local://schemas/test@1.0.0",
24 data_urls=["s3://bucket/data.tar"],
25 metadata={"key": "value"},
26 )
27
28 # Protocol compliance via isinstance (runtime_checkable)
29 assert isinstance(entry, IndexEntry)
30
31 # Verify required properties exist and work
32 assert entry.name == "test-dataset"
33 assert entry.schema_ref == "local://schemas/test@1.0.0"
34 assert entry.data_urls == ["s3://bucket/data.tar"]
35 assert entry.metadata == {"key": "value"}
36
37 def test_atmosphere_index_entry_is_index_entry(self):
38 """AtmosphereIndexEntry should satisfy IndexEntry protocol."""
39 record = {
40 "name": "atmo-dataset",
41 "schemaRef": "at://did:plc:test/schema/abc",
42 "storage": {
43 "$type": "ac.foundation.dataset.storageExternal",
44 "urls": ["s3://bucket/data.tar"],
45 },
46 }
47 entry = AtmosphereIndexEntry("at://did:plc:test/record/xyz", record)
48
49 # Protocol compliance
50 assert isinstance(entry, IndexEntry)
51
52 # Verify properties
53 assert entry.name == "atmo-dataset"
54 assert entry.schema_ref == "at://did:plc:test/schema/abc"
55 assert entry.data_urls == ["s3://bucket/data.tar"]
56
57 def test_index_entry_with_none_metadata(self):
58 """IndexEntry should handle None metadata."""
59 entry = LocalDatasetEntry(
60 name="no-meta",
61 schema_ref="local://schemas/test@1.0.0",
62 data_urls=["s3://bucket/data.tar"],
63 metadata=None,
64 )
65
66 assert entry.metadata is None
67
68
69class TestAbstractIndexProtocol:
70 """Tests for AbstractIndex protocol compliance."""
71
72 def test_local_index_has_required_methods(self):
73 """LocalIndex should have all AbstractIndex methods."""
74 # Can't use isinstance with non-runtime_checkable Protocol
75 # So we verify methods exist
76 index = LocalIndex()
77
78 assert hasattr(index, "insert_dataset")
79 assert hasattr(index, "get_dataset")
80 assert hasattr(index, "list_datasets")
81 assert hasattr(index, "publish_schema")
82 assert hasattr(index, "get_schema")
83 assert hasattr(index, "list_schemas")
84 assert hasattr(index, "decode_schema")
85
86 # Verify methods are callable
87 assert callable(index.insert_dataset)
88 assert callable(index.get_dataset)
89 assert callable(index.list_datasets)
90 assert callable(index.publish_schema)
91 assert callable(index.get_schema)
92 assert callable(index.list_schemas)
93 assert callable(index.decode_schema)
94
95 def test_atmosphere_index_has_required_methods(self):
96 """AtmosphereIndex should have all AbstractIndex methods."""
97 mock_client = Mock()
98 mock_client.did = "did:plc:test"
99 index = AtmosphereIndex(mock_client)
100
101 assert hasattr(index, "insert_dataset")
102 assert hasattr(index, "get_dataset")
103 assert hasattr(index, "list_datasets")
104 assert hasattr(index, "publish_schema")
105 assert hasattr(index, "get_schema")
106 assert hasattr(index, "list_schemas")
107 assert hasattr(index, "decode_schema")
108
109 assert callable(index.insert_dataset)
110 assert callable(index.get_dataset)
111 assert callable(index.list_datasets)
112 assert callable(index.publish_schema)
113 assert callable(index.get_schema)
114 assert callable(index.list_schemas)
115 assert callable(index.decode_schema)
116
117
118class TestAbstractDataStoreProtocol:
119 """Tests for AbstractDataStore protocol compliance."""
120
121 def test_s3_datastore_has_required_methods(self):
122 """S3DataStore should have all AbstractDataStore methods."""
123 # Create with mock credentials
124 mock_creds = {
125 "AWS_ENDPOINT": "http://localhost:9000",
126 "AWS_ACCESS_KEY_ID": "test",
127 "AWS_SECRET_ACCESS_KEY": "test",
128 }
129
130 store = S3DataStore(mock_creds, bucket="test-bucket")
131
132 assert hasattr(store, "write_shards")
133 assert hasattr(store, "read_url")
134 assert hasattr(store, "supports_streaming")
135
136 assert callable(store.write_shards)
137 assert callable(store.read_url)
138 assert callable(store.supports_streaming)
139
140 def test_s3_datastore_supports_streaming(self):
141 """S3DataStore should report streaming support."""
142 mock_creds = {
143 "AWS_ENDPOINT": "http://localhost:9000",
144 "AWS_ACCESS_KEY_ID": "test",
145 "AWS_SECRET_ACCESS_KEY": "test",
146 }
147
148 store = S3DataStore(mock_creds, bucket="test-bucket")
149 assert store.supports_streaming() is True
150
151 def test_s3_datastore_read_url_passthrough(self):
152 """S3DataStore.read_url should return URL unchanged without custom endpoint."""
153 mock_creds = {
154 "AWS_ACCESS_KEY_ID": "test",
155 "AWS_SECRET_ACCESS_KEY": "test",
156 }
157
158 store = S3DataStore(mock_creds, bucket="test-bucket")
159 url = "s3://bucket/path/data.tar"
160 assert store.read_url(url) == url
161
162 def test_s3_datastore_read_url_transforms_with_endpoint(self):
163 """S3DataStore.read_url should transform s3:// to https:// with custom endpoint."""
164 mock_creds = {
165 "AWS_ENDPOINT": "http://localhost:9000",
166 "AWS_ACCESS_KEY_ID": "test",
167 "AWS_SECRET_ACCESS_KEY": "test",
168 }
169
170 store = S3DataStore(mock_creds, bucket="test-bucket")
171 url = "s3://bucket/path/data.tar"
172 # URL should be transformed to use the custom endpoint
173 assert store.read_url(url) == "http://localhost:9000/bucket/path/data.tar"
174
175
176class TestProtocolInteroperability:
177 """Tests verifying different implementations can be used interchangeably."""
178
179 def test_function_accepts_any_index_entry(self):
180 """Functions typed with IndexEntry should accept any implementation."""
181
182 def get_dataset_name(entry: IndexEntry) -> str:
183 return entry.name
184
185 # LocalDatasetEntry
186 local_entry = LocalDatasetEntry(
187 name="local-data",
188 schema_ref="local://schemas/test@1.0.0",
189 data_urls=["s3://bucket/data.tar"],
190 )
191 assert get_dataset_name(local_entry) == "local-data"
192
193 # AtmosphereIndexEntry
194 atmo_entry = AtmosphereIndexEntry(
195 "at://did:plc:test/record/xyz",
196 {"name": "atmo-data", "schemaRef": "at://schema", "storage": {}},
197 )
198 assert get_dataset_name(atmo_entry) == "atmo-data"
199
200 def test_function_accepts_any_index(self):
201 """Functions typed with AbstractIndex should accept any implementation."""
202
203 def count_datasets(index) -> int:
204 """Count datasets in an index."""
205 return sum(1 for _ in index.list_datasets())
206
207 # LocalIndex with mock redis
208 local_index = LocalIndex()
209 # Empty index returns 0
210 assert count_datasets(local_index) == 0
211
212 def test_index_entry_properties_consistent(self):
213 """All IndexEntry implementations should have consistent property types."""
214 local_entry = LocalDatasetEntry(
215 name="test",
216 schema_ref="local://schemas/test@1.0.0",
217 data_urls=["url1", "url2"],
218 metadata={"k": "v"},
219 )
220
221 atmo_entry = AtmosphereIndexEntry(
222 "at://test",
223 {
224 "name": "test",
225 "schemaRef": "at://schema",
226 "storage": {
227 "$type": "ac.foundation.dataset.storageExternal",
228 "urls": ["url1", "url2"],
229 },
230 },
231 )
232
233 # Both should return str for name
234 assert isinstance(local_entry.name, str)
235 assert isinstance(atmo_entry.name, str)
236
237 # Both should return str for schema_ref
238 assert isinstance(local_entry.schema_ref, str)
239 assert isinstance(atmo_entry.schema_ref, str)
240
241 # Both should return list[str] for data_urls
242 assert isinstance(local_entry.data_urls, list)
243 assert isinstance(atmo_entry.data_urls, list)
244 assert all(isinstance(u, str) for u in local_entry.data_urls)
245 assert all(isinstance(u, str) for u in atmo_entry.data_urls)
246
247
248class TestPolymorphicBehavior:
249 """Tests that verify actual polymorphic usage patterns work correctly."""
250
251 def test_process_entries_polymorphically(self):
252 """Process a mixed list of IndexEntry implementations uniformly."""
253 entries: list[IndexEntry] = [
254 LocalDatasetEntry(
255 name="local-1",
256 schema_ref="local://schemas/A@1.0.0",
257 data_urls=["s3://bucket/local1.tar"],
258 metadata={"source": "local"},
259 ),
260 AtmosphereIndexEntry(
261 "at://did:plc:test/record/1",
262 {
263 "name": "atmo-1",
264 "schemaRef": "at://did:plc:test/schema/A",
265 "storage": {
266 "$type": "ac.foundation.dataset.storageExternal",
267 "urls": ["s3://bucket/atmo1.tar"],
268 },
269 },
270 ),
271 LocalDatasetEntry(
272 name="local-2",
273 schema_ref="local://schemas/B@1.0.0",
274 data_urls=["s3://bucket/local2.tar", "s3://bucket/local2-001.tar"],
275 ),
276 ]
277
278 # Extract all names uniformly
279 names = [e.name for e in entries]
280 assert names == ["local-1", "atmo-1", "local-2"]
281
282 # Extract all schema refs
283 schema_refs = [e.schema_ref for e in entries]
284 assert schema_refs == [
285 "local://schemas/A@1.0.0",
286 "at://did:plc:test/schema/A",
287 "local://schemas/B@1.0.0",
288 ]
289
290 # Count total shards across all entries
291 total_urls = sum(len(e.data_urls) for e in entries)
292 assert total_urls == 4
293
294 # Filter by metadata presence
295 with_metadata = [e for e in entries if e.metadata is not None]
296 assert len(with_metadata) == 1
297 assert with_metadata[0].name == "local-1"
298
299 def test_index_entry_in_dict_key(self):
300 """IndexEntry.name can be used to build lookup structures."""
301 entries: list[IndexEntry] = [
302 LocalDatasetEntry(
303 name="dataset-a",
304 schema_ref="local://schemas/A@1.0.0",
305 data_urls=["url1"],
306 ),
307 AtmosphereIndexEntry(
308 "at://test",
309 {
310 "name": "dataset-b",
311 "schemaRef": "at://schema",
312 "storage": {
313 "$type": "ac.foundation.dataset.storageExternal",
314 "urls": ["url2"],
315 },
316 },
317 ),
318 ]
319
320 # Build a lookup by name
321 by_name: dict[str, IndexEntry] = {e.name: e for e in entries}
322
323 assert "dataset-a" in by_name
324 assert "dataset-b" in by_name
325 assert by_name["dataset-a"].data_urls == ["url1"]
326 assert by_name["dataset-b"].data_urls == ["url2"]
327
328 def test_generic_url_collector(self):
329 """A generic function can collect URLs from any IndexEntry."""
330
331 def collect_all_urls(entries: list[IndexEntry]) -> list[str]:
332 """Collect all data URLs from a list of entries."""
333 all_urls = []
334 for entry in entries:
335 all_urls.extend(entry.data_urls)
336 return all_urls
337
338 mixed_entries: list[IndexEntry] = [
339 LocalDatasetEntry(
340 name="ds1",
341 schema_ref="local://test@1.0.0",
342 data_urls=["s3://a/1.tar", "s3://a/2.tar"],
343 ),
344 AtmosphereIndexEntry(
345 "at://x",
346 {
347 "name": "ds2",
348 "schemaRef": "at://s",
349 "storage": {
350 "$type": "ac.foundation.dataset.storageExternal",
351 "urls": ["s3://b/1.tar"],
352 },
353 },
354 ),
355 ]
356
357 urls = collect_all_urls(mixed_entries)
358 assert urls == ["s3://a/1.tar", "s3://a/2.tar", "s3://b/1.tar"]