A loose federation of distributed, typed datasets
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor(planning): reorganize planning docs and add v0.3 roadmap

- Move ATProto integration docs to .planning/setup/ for historical reference
- Add v0.3 roadmap with codebase review and synthesis documents
- Update CHANGELOG.md with v0.3 roadmap entry (#373)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+1191
.chainlink/issues.db

This is a binary file and will not be displayed.

.planning/01_overview.md .planning/setup/01_overview.md
.planning/02_lexicon_design.md .planning/setup/02_lexicon_design.md
.planning/03_python_client.md .planning/setup/03_python_client.md
.planning/04_appview.md .planning/setup/04_appview.md
.planning/05_codegen.md .planning/setup/05_codegen.md
.planning/README.md .planning/setup/README.md
.planning/atproto_integration.md .planning/setup/atproto_integration.md
.planning/decisions/01_schema_representation_format.md .planning/setup/decisions/01_schema_representation_format.md
.planning/decisions/02_lens_code_storage.md .planning/setup/decisions/02_lens_code_storage.md
.planning/decisions/03_webdataset_storage.md .planning/setup/decisions/03_webdataset_storage.md
.planning/decisions/04_schema_evolution.md .planning/setup/decisions/04_schema_evolution.md
.planning/decisions/05_lexicon_namespace.md .planning/setup/decisions/05_lexicon_namespace.md
.planning/decisions/06_lexicon_validation.md .planning/setup/decisions/06_lexicon_validation.md
.planning/decisions/README.md .planning/setup/decisions/README.md
.planning/decisions/assessment.md .planning/setup/decisions/assessment.md
.planning/decisions/record_lexicon_assessment.md .planning/setup/decisions/record_lexicon_assessment.md
.planning/decisions/sampleSchema_design_questions.md .planning/setup/decisions/sampleSchema_design_questions.md
.planning/examples/code/ndarray_roundtrip.py .planning/setup/examples/code/ndarray_roundtrip.py
.planning/examples/code/validate_ndarray_shim.py .planning/setup/examples/code/validate_ndarray_shim.py
.planning/examples/dataset_blob_storage.json .planning/setup/examples/dataset_blob_storage.json
.planning/examples/dataset_external_storage.json .planning/setup/examples/dataset_external_storage.json
.planning/examples/lens_example.json .planning/setup/examples/lens_example.json
.planning/examples/sampleSchema_example.json .planning/setup/examples/sampleSchema_example.json
.planning/lexicons/README.md .planning/setup/lexicons/README.md
.planning/lexicons/README_ARRAY_FORMATS.md .planning/setup/lexicons/README_ARRAY_FORMATS.md
.planning/lexicons/README_SCHEMA_TYPES.md .planning/setup/lexicons/README_SCHEMA_TYPES.md
.planning/lexicons/ac.foundation.dataset.arrayFormat.json .planning/setup/lexicons/ac.foundation.dataset.arrayFormat.json
.planning/lexicons/ac.foundation.dataset.getLatestSchema.json .planning/setup/lexicons/ac.foundation.dataset.getLatestSchema.json
.planning/lexicons/ac.foundation.dataset.lens.json .planning/setup/lexicons/ac.foundation.dataset.lens.json
.planning/lexicons/ac.foundation.dataset.record.json .planning/setup/lexicons/ac.foundation.dataset.record.json
.planning/lexicons/ac.foundation.dataset.sampleSchema.json .planning/setup/lexicons/ac.foundation.dataset.sampleSchema.json
.planning/lexicons/ac.foundation.dataset.schemaType.json .planning/setup/lexicons/ac.foundation.dataset.schemaType.json
.planning/lexicons/ac.foundation.dataset.storageBlobs.json .planning/setup/lexicons/ac.foundation.dataset.storageBlobs.json
.planning/lexicons/ac.foundation.dataset.storageExternal.json .planning/setup/lexicons/ac.foundation.dataset.storageExternal.json
.planning/lexicons/ndarray_shim.json .planning/setup/lexicons/ndarray_shim.json
.planning/ndarray_shim_spec.md .planning/setup/ndarray_shim_spec.md
+175
.planning/roadmap/v0.3/01_codebase-review.md
··· 1 + # Codebase Review & Feature Assessment 2 + 3 + **Document Type**: Initial technical review 4 + **Reviewer**: Claude (automated analysis) 5 + **Date**: 2026-01-26 6 + 7 + --- 8 + 9 + ## Executive Summary 10 + 11 + `atdata` is a well-architected library for typed, distributed datasets built on WebDataset. The codebase demonstrates strong foundations in: 12 + 13 + 1. **Type-safe samples** via `@packable` decorator and `PackableSample` base class 14 + 2. **Lens transformations** for schema-preserving type conversions 15 + 3. **Dual storage backends**: local (Redis + S3) and atmosphere (ATProto) 16 + 4. **Protocol-based abstractions** enabling backend interchangeability 17 + 18 + The current state suggests v0.2.x maturity with solid core functionality. The v0.3 roadmap should focus on **operational maturity** and **developer experience** rather than new fundamental abstractions. 19 + 20 + --- 21 + 22 + ## Current Architecture Assessment 23 + 24 + ### Strengths 25 + 26 + 1. **Clean Protocol Hierarchy** 27 + - `Packable`, `IndexEntry`, `AbstractIndex`, `AbstractDataStore`, `DataSource` 28 + - These enable swapping backends without changing application code 29 + - The local → atmosphere promotion workflow validates this design 30 + 31 + 2. **Elegant Type System** 32 + - `@packable` decorator elegantly wraps dataclasses with serialization 33 + - Auto-registration of `DictSample → T` lenses enables gradual typing 34 + - `NDArray` handling is transparent and well-integrated 35 + 36 + 3. **HuggingFace-Style API** 37 + - `load_dataset()` with split detection mirrors familiar patterns 38 + - `@handle/dataset` notation for index-based lookup is intuitive 39 + - `DatasetDict` provides expected container semantics 40 + 41 + 4. **Modular ATProto Integration** 42 + - `atmosphere/` module is cleanly isolated 43 + - Lexicon design is thorough and well-documented 44 + - Promotion workflow (`promote.py`) demonstrates backend portability 45 + 46 + ### Areas for Improvement 47 + 48 + 1. **Query/Filter Capabilities** 49 + - No built-in filtering beyond iterating all samples 50 + - Manifest/index files not yet implemented (per architecture-doc.md) 51 + - Cross-shard queries require full dataset scan 52 + 53 + 2. **Batch Processing Integration** 54 + - No Modal/Ray/Dask integration patterns 55 + - Worker coordination for parallel shard processing undocumented 56 + - No built-in retry/checkpoint semantics 57 + 58 + 3. **Developer Experience Gaps** 59 + - Schema evolution story unclear (how to migrate between versions) 60 + - CLI limited to `local up/down/status` and `diagnose` 61 + - No dataset inspection/preview utilities 62 + 63 + 4. **Observability** 64 + - No structured logging 65 + - No metrics collection hooks 66 + - Error messages could be more diagnostic 67 + 68 + --- 69 + 70 + ## Open Chainlink Issues Analysis 71 + 72 + ### High Priority (Blocking Progress) 73 + 74 + | Issue | Summary | Assessment | 75 + |-------|---------|------------| 76 + | #363 | Fix Google docstring Example sections | Documentation quality - complete | 77 + | #362 | Plan auto-generated API docs | Documentation infrastructure | 78 + | #76 | Validate record Lexicon definitions | Blocking atmosphere stability | 79 + | #44 | Planning for ATProto Integration | Meta-planning, mostly complete | 80 + 81 + ### Feature Gaps (Medium Priority) 82 + 83 + | Issue | Summary | Assessment | 84 + |-------|---------|------------| 85 + | #246 | `Dataset.from_index_entry()` static method | Convenience API | 86 + | #244 | Implement PDSBlobStore | Alternative storage backend | 87 + | #293 | S3 URI scheme issues | URL handling edge cases | 88 + | #200-205 | Live ATProto network tests | Integration testing infrastructure | 89 + 90 + ### Long-term (Lower Priority) 91 + 92 + | Issue | Summary | Assessment | 93 + |-------|---------|------------| 94 + | #17-21 | ATProto phases 1-5 | Multi-phase roadmap items | 95 + | #32-43 | AppView, codegen, performance | Future enhancements | 96 + 97 + --- 98 + 99 + ## Recommended v0.3 Focus Areas 100 + 101 + Based on codebase analysis and issue backlog: 102 + 103 + ### 1. Manifest System (High Impact) 104 + 105 + The architecture-doc.md outlines a sophisticated manifest system. This addresses the critical gap of query/filter without full dataset scans. 106 + 107 + **Suggested scope:** 108 + - Per-shard manifest generation during write 109 + - Shard-level aggregate summaries (categorical counts, numeric bounds) 110 + - DuckDB/Polars-friendly Parquet manifest format 111 + - Query executor with shard pruning 112 + 113 + ### 2. Processing Backend Integration (High Impact) 114 + 115 + The Modal + R2 + WebDataset architecture in architecture-doc.md is compelling. Key additions: 116 + 117 + **Suggested scope:** 118 + - `@atdata.processor` decorator for Modal functions 119 + - `Dataset.map()` method dispatching to Modal workers 120 + - Built-in shard-to-worker assignment 121 + - Result manifest collection 122 + 123 + ### 3. Developer Experience (Medium Impact) 124 + 125 + **Suggested scope:** 126 + - `atdata inspect <dataset>` CLI command 127 + - `atdata schema show/diff` for schema inspection 128 + - `Dataset.head(n)` for quick preview 129 + - Better error messages with suggestions 130 + 131 + ### 4. Documentation & Testing (Foundation) 132 + 133 + **Suggested scope:** 134 + - Complete docstring Example sections (issue #363) 135 + - End-to-end tutorial: local → S3 → atmosphere 136 + - Integration test suite for live ATProto 137 + - Performance benchmarks 138 + 139 + --- 140 + 141 + ## Technical Debt Observations 142 + 143 + 1. **Deprecation Notices**: `Repo` class is deprecated but still present 144 + 2. **TODO Comments**: Several in-code TODOs about quartodoc formatting 145 + 3. **Test Coverage**: Good unit tests but integration tests need work 146 + 4. **Type Annotations**: Generally good but some `Any` escapes 147 + 148 + --- 149 + 150 + ## Dependencies & External Factors 151 + 152 + ### Current Dependencies 153 + - `webdataset`: Core streaming infrastructure 154 + - `msgpack`/`ormsgpack`: Serialization 155 + - `redis`: Local index storage 156 + - `s3fs`/`boto3`: S3 storage 157 + - `atproto`: ATProto client 158 + 159 + ### Potential New Dependencies for v0.3 160 + - `modal`: Serverless compute 161 + - `duckdb`: Manifest querying 162 + - `polars`/`pyarrow`: Efficient manifest I/O 163 + - `cloudflare-workers` (JS): R2 event bridge 164 + 165 + --- 166 + 167 + ## Conclusion 168 + 169 + The atdata codebase is well-positioned for v0.3. The core abstractions are sound. The next release should focus on: 170 + 171 + 1. **Operational capabilities**: Manifests, processing, observability 172 + 2. **Developer experience**: CLI, inspection, documentation 173 + 3. **Production readiness**: Testing, error handling, performance 174 + 175 + The architecture-doc.md provides excellent direction for the processing backend. A synthesis with these findings follows in the roadmap document.
+503
.planning/roadmap/v0.3/02_synthesis-roadmap.md
··· 1 + # v0.3 Synthesis Roadmap 2 + 3 + **Synthesized from:** 4 + - `01_codebase-review.md` - Technical assessment 5 + - `architecture-doc.md` - Data processing backend vision 6 + - Chainlink issues backlog 7 + 8 + --- 9 + 10 + ## Vision Statement 11 + 12 + v0.3 transforms atdata from a **dataset library** into a **dataset operations platform** by adding: 13 + 14 + 1. **Queryable manifests** - Filter/aggregate without loading data 15 + 2. **Serverless processing** - Modal-native parallel transforms 16 + 3. **Event-driven pipelines** - R2 notifications trigger processing 17 + 4. **Production tooling** - CLI, observability, error handling 18 + 19 + --- 20 + 21 + ## Feature Domains 22 + 23 + ### Domain 1: Manifest System 24 + 25 + **Source**: architecture-doc.md sections on "Per-Shard Manifest Design" and "Indexing and Query Strategy" 26 + 27 + **Goal**: Enable efficient queries over large datasets without full scans. 28 + 29 + #### 1.1 Manifest Generation 30 + 31 + ```python 32 + # During shard write 33 + with ShardWriter(..., manifest=True) as sink: 34 + for sample in ds.ordered(): 35 + sink.write(sample.as_wds) 36 + # Automatically writes: 37 + # shard-00042.tar 38 + # shard-00042.manifest.json (header + aggregates) 39 + # shard-00042.manifest.parquet (per-sample metadata) 40 + ``` 41 + 42 + **Implementation pieces:** 43 + - `ManifestBuilder` class tracking samples during write 44 + - `ManifestField` definition in schema (which fields are queryable) 45 + - YAML/JSON header format for shard metadata 46 + - Parquet writer for per-sample queryable fields 47 + 48 + #### 1.2 Manifest Schema Integration 49 + 50 + ```python 51 + @packable 52 + class ImageSample: 53 + image: NDArray # Not in manifest (too large) 54 + label: str # manifest: categorical 55 + confidence: float # manifest: numeric 56 + tags: list[str] # manifest: set 57 + ``` 58 + 59 + **Implementation pieces:** 60 + - `@manifest_field` decorator or field annotation 61 + - Auto-derive manifest fields from schema 62 + - Aggregate type inference (categorical/numeric/set) 63 + 64 + #### 1.3 Query Executor 65 + 66 + ```python 67 + # Find samples with confidence > 0.9 and label in ['dog', 'cat'] 68 + results = ds.query( 69 + where=lambda m: m.confidence > 0.9 and m.label in ['dog', 'cat'] 70 + ) 71 + 72 + # Returns SampleLocations (shard + offset) for direct access 73 + for loc in results: 74 + sample = ds.get_sample(loc) 75 + ``` 76 + 77 + **Implementation pieces:** 78 + - `ManifestLoader` for reading across shards 79 + - `QueryPlan` with shard pruning via aggregates 80 + - `SampleLocation` type for sample addressing 81 + - Optional DuckDB backend for SQL queries 82 + 83 + #### Dependencies: None (foundational) 84 + 85 + --- 86 + 87 + ### Domain 2: Processing Backend (Modal) 88 + 89 + **Source**: architecture-doc.md sections on "Core Stack", "System Architecture", "Modal Code Structure" 90 + 91 + **Goal**: Enable scalable map-reduce over datasets using Modal. 92 + 93 + #### 2.1 Processor Decorator 94 + 95 + ```python 96 + import atdata 97 + from modal import App 98 + 99 + app = App("my-processor") 100 + 101 + @atdata.processor(app, cpu=2, memory=4096) 102 + def embed_images(sample: ImageSample) -> EmbeddingSample: 103 + embedding = model.encode(sample.image) 104 + return EmbeddingSample( 105 + embedding=embedding, 106 + label=sample.label, 107 + ) 108 + ``` 109 + 110 + **Implementation pieces:** 111 + - `@atdata.processor` wrapping Modal function 112 + - Automatic shard distribution via `.map()` 113 + - Schema type extraction from function signature 114 + - Output shard writing with manifest 115 + 116 + #### 2.2 Dataset.map() Method 117 + 118 + ```python 119 + # Transform dataset using Modal workers 120 + output_ds = input_ds.map( 121 + embed_images, 122 + output_prefix="s3://bucket/embedded/v1", 123 + workers_per_shard=1, 124 + ) 125 + ``` 126 + 127 + **Implementation pieces:** 128 + - Shard enumeration and dispatch 129 + - Worker result collection 130 + - Output dataset registration 131 + - Progress tracking 132 + 133 + #### 2.3 Dispatcher Pattern 134 + 135 + ```python 136 + @app.function() 137 + @modal.web_endpoint(method="POST") 138 + async def dispatch(request: Request): 139 + """Central dispatcher for job coordination.""" 140 + body = await request.json() 141 + shards = list_shards(body["input_prefix"]) 142 + 143 + # Fan out to workers 144 + results = process_shard.map(shards) 145 + return {"status": "started", "num_shards": len(shards)} 146 + ``` 147 + 148 + **Implementation pieces:** 149 + - `atdata.modal.Dispatcher` class 150 + - Job tracking and result aggregation 151 + - Error handling with partial results 152 + - Retry semantics 153 + 154 + #### Dependencies: Domain 1 (manifests for output) 155 + 156 + --- 157 + 158 + ### Domain 3: Event-Driven Pipeline (R2 Notifications) 159 + 160 + **Source**: architecture-doc.md section on "R2 Event Notifications → Modal" 161 + 162 + **Goal**: Trigger processing automatically when data arrives. 163 + 164 + #### 3.1 Cloudflare Worker Bridge 165 + 166 + ```javascript 167 + // workers/r2-to-modal.js 168 + export default { 169 + async queue(batch, env) { 170 + for (const msg of batch.messages) { 171 + await fetch(`${env.MODAL_ENDPOINT}/ingest`, { 172 + method: "POST", 173 + body: JSON.stringify({ 174 + bucket: msg.body.bucket, 175 + object: msg.body.object.key, 176 + action: msg.body.action, 177 + }), 178 + }); 179 + msg.ack(); 180 + } 181 + } 182 + }; 183 + ``` 184 + 185 + **Implementation pieces:** 186 + - Reference Cloudflare Worker template 187 + - Environment variable configuration guide 188 + - Modal endpoint for receiving events 189 + - Event filtering logic (ignore non-.tar files) 190 + 191 + #### 3.2 Pipeline Definition 192 + 193 + ```python 194 + @atdata.pipeline( 195 + trigger="r2://bucket/incoming", 196 + output="r2://bucket/processed", 197 + ) 198 + def process_uploads(shard_url: str): 199 + ds = Dataset[RawSample](shard_url) 200 + return ds.map(transform_sample) 201 + ``` 202 + 203 + **Implementation pieces:** 204 + - Pipeline registration system 205 + - Trigger pattern matching 206 + - Output destination configuration 207 + - Idempotency/deduplication 208 + 209 + #### Dependencies: Domain 2 (Modal processing) 210 + 211 + --- 212 + 213 + ### Domain 4: Developer Experience 214 + 215 + **Source**: Codebase review findings 216 + 217 + **Goal**: Make atdata pleasant to use day-to-day. 218 + 219 + #### 4.1 CLI Enhancements 220 + 221 + ```bash 222 + # Inspect dataset 223 + atdata inspect s3://bucket/data.tar 224 + # Output: sample count, schema, shard info 225 + 226 + # Show schema 227 + atdata schema show @local/my-dataset 228 + # Output: field types, versions, description 229 + 230 + # Compare schemas 231 + atdata schema diff v1.0.0 v1.1.0 232 + 233 + # Preview samples 234 + atdata preview s3://bucket/data.tar --limit 5 235 + # Output: rendered samples with truncated arrays 236 + ``` 237 + 238 + **Implementation pieces:** 239 + - CLI module extension (`cli/inspect.py`, `cli/schema.py`, `cli/preview.py`) 240 + - Rich terminal output formatting 241 + - S3/local/atmosphere URL resolution 242 + 243 + #### 4.2 Dataset Convenience Methods 244 + 245 + ```python 246 + # Quick preview 247 + samples = ds.head(10) 248 + 249 + # Sample by key 250 + sample = ds.get("sample_00042") 251 + 252 + # Schema access 253 + fields = ds.schema.fields 254 + version = ds.schema.version 255 + 256 + # Statistics (from manifest) 257 + stats = ds.describe() 258 + ``` 259 + 260 + **Implementation pieces:** 261 + - `Dataset.head()` method 262 + - `Dataset.get()` for keyed access 263 + - `Dataset.schema` property 264 + - `Dataset.describe()` using manifest 265 + 266 + #### 4.3 Error Messages 267 + 268 + Current: 269 + ``` 270 + ValueError: No registered lens from source <class '__main__.A'> to view <class '__main__.B'> 271 + ``` 272 + 273 + Improved: 274 + ``` 275 + LensNotFoundError: No lens transforms A → B 276 + 277 + Available lenses from A: 278 + - A → C (via name_lens) 279 + 280 + Did you mean to define: 281 + @lens 282 + def a_to_b(a: A) -> B: 283 + return B(...) 284 + ``` 285 + 286 + **Implementation pieces:** 287 + - Custom exception hierarchy 288 + - Suggestion generation 289 + - Contextual help in errors 290 + 291 + #### Dependencies: None (can proceed in parallel) 292 + 293 + --- 294 + 295 + ### Domain 5: Production Hardening 296 + 297 + **Source**: Codebase review findings + architecture-doc.md reliability concerns 298 + 299 + **Goal**: Make atdata suitable for production workloads. 300 + 301 + #### 5.1 Observability 302 + 303 + ```python 304 + # Structured logging 305 + import structlog 306 + atdata.configure_logging(structlog.get_logger()) 307 + 308 + # Metrics 309 + atdata.metrics.samples_processed.inc() 310 + atdata.metrics.shard_processing_time.observe(duration) 311 + 312 + # Tracing 313 + with atdata.trace("process_shard", shard_id=shard): 314 + ... 315 + ``` 316 + 317 + **Implementation pieces:** 318 + - Pluggable logging interface 319 + - Optional metrics (prometheus-client compatible) 320 + - OpenTelemetry trace integration 321 + 322 + #### 5.2 Error Handling 323 + 324 + ```python 325 + # Partial failures 326 + try: 327 + results = ds.map(processor) 328 + except PartialFailureError as e: 329 + succeeded = e.succeeded_shards 330 + failed = e.failed_shards 331 + # Retry only failed shards 332 + retry_results = ds.map(processor, shards=failed, retry=True) 333 + ``` 334 + 335 + **Implementation pieces:** 336 + - `PartialFailureError` with detailed info 337 + - Checkpoint/resume support 338 + - Dead-letter queue for persistent failures 339 + 340 + #### 5.3 Testing Infrastructure 341 + 342 + ```python 343 + # Mock atmosphere client for tests 344 + @pytest.fixture 345 + def mock_atmosphere(): 346 + with atdata.testing.mock_atmosphere() as client: 347 + yield client 348 + 349 + # Test dataset fixtures 350 + def test_processing(tmp_dataset): 351 + result = tmp_dataset.map(my_processor) 352 + assert len(result.head(10)) == 10 353 + ``` 354 + 355 + **Implementation pieces:** 356 + - `atdata.testing` module 357 + - Mock clients and indices 358 + - Dataset fixtures 359 + - Integration test patterns 360 + 361 + #### Dependencies: None (can proceed in parallel) 362 + 363 + --- 364 + 365 + ## Phased Implementation 366 + 367 + ### Phase A: Foundation (Weeks 1-3) 368 + 369 + **Focus**: Manifest system + Dev experience basics 370 + 371 + | Task | Domain | Complexity | Dependencies | 372 + |------|--------|------------|--------------| 373 + | ManifestBuilder class | 1 | Medium | None | 374 + | Manifest field annotations | 1 | Low | ManifestBuilder | 375 + | Dataset.head() method | 4 | Low | None | 376 + | CLI inspect command | 4 | Medium | None | 377 + | Custom exception hierarchy | 4 | Low | None | 378 + 379 + **Exit criteria:** 380 + - Can write datasets with manifests 381 + - Can inspect datasets via CLI 382 + - Better error messages in common cases 383 + 384 + ### Phase B: Query + Processing (Weeks 4-6) 385 + 386 + **Focus**: Manifest queries + Modal integration 387 + 388 + | Task | Domain | Complexity | Dependencies | 389 + |------|--------|------------|--------------| 390 + | ManifestLoader | 1 | Medium | Phase A | 391 + | Query executor with pruning | 1 | High | ManifestLoader | 392 + | @atdata.processor decorator | 2 | Medium | None | 393 + | Dataset.map() method | 2 | High | @processor | 394 + | Modal dispatcher | 2 | Medium | Dataset.map() | 395 + 396 + **Exit criteria:** 397 + - Can query datasets without loading all data 398 + - Can run parallel transforms on Modal 399 + - End-to-end: ingest → transform → query 400 + 401 + ### Phase C: Pipelines + Production (Weeks 7-9) 402 + 403 + **Focus**: Event-driven + production hardening 404 + 405 + | Task | Domain | Complexity | Dependencies | 406 + |------|--------|------------|--------------| 407 + | R2 Worker template | 3 | Low | None | 408 + | Pipeline definition API | 3 | Medium | Modal | 409 + | Observability hooks | 5 | Medium | None | 410 + | Error handling improvements | 5 | Medium | None | 411 + | Testing infrastructure | 5 | Medium | None | 412 + 413 + **Exit criteria:** 414 + - Can trigger processing on R2 upload 415 + - Structured logging and metrics available 416 + - Integration test suite passing 417 + 418 + ### Phase D: Polish (Weeks 10-11) 419 + 420 + **Focus**: Documentation, edge cases, performance 421 + 422 + | Task | Domain | Complexity | Dependencies | 423 + |------|--------|------------|--------------| 424 + | End-to-end tutorial | - | Medium | All phases | 425 + | API reference completion | - | Medium | Phase C | 426 + | Performance benchmarks | 5 | Medium | Phase B | 427 + | Edge case fixes | - | Variable | All phases | 428 + 429 + **Exit criteria:** 430 + - Documentation complete 431 + - Benchmark results documented 432 + - Release candidate ready 433 + 434 + --- 435 + 436 + ## Risk Assessment 437 + 438 + ### Technical Risks 439 + 440 + | Risk | Likelihood | Impact | Mitigation | 441 + |------|------------|--------|------------| 442 + | Modal API changes | Medium | High | Pin versions, abstract interface | 443 + | Manifest format lock-in | Low | Medium | Version field, migration tools | 444 + | R2 notification delays | Low | Low | Polling fallback | 445 + | Parquet compatibility | Low | Low | Use pyarrow directly | 446 + 447 + ### Resource Risks 448 + 449 + | Risk | Likelihood | Impact | Mitigation | 450 + |------|------------|--------|------------| 451 + | Scope creep | High | Medium | Strict phase gates | 452 + | External dependency delays | Medium | Medium | Stub interfaces early | 453 + | Testing infrastructure gaps | Medium | High | Invest in Phase A | 454 + 455 + --- 456 + 457 + ## Success Metrics 458 + 459 + ### Functional Metrics 460 + - [ ] Manifest-based query completes 10x faster than full scan 461 + - [ ] Modal processing achieves >80% worker utilization 462 + - [ ] R2 → processing latency <30s (push mode) 463 + - [ ] CLI commands execute in <1s for local datasets 464 + 465 + ### Quality Metrics 466 + - [ ] Test coverage >80% for new code 467 + - [ ] All public APIs documented with examples 468 + - [ ] Zero known security vulnerabilities 469 + - [ ] Error messages include actionable suggestions 470 + 471 + ### Adoption Metrics 472 + - [ ] Tutorial completion rate tracked 473 + - [ ] GitHub issues for UX friction points 474 + - [ ] Community dataset publications via atmosphere 475 + 476 + --- 477 + 478 + ## Appendix: Architecture-doc.md Key Quotes 479 + 480 + > "Scientists and data producers interact via thin clients (CLI, notebooks, simple UI) without managing infra day-to-day" 481 + 482 + This drives Domain 4 (Developer Experience). 483 + 484 + > "Natural parallelization unit: One shard = one worker, no coordination needed" 485 + 486 + This validates the Modal shard-per-worker model in Domain 2. 487 + 488 + > "Per-shard manifests ... enables shard skipping without reading sample details" 489 + 490 + This is the core insight for Domain 1. 491 + 492 + > "Emit a manifest alongside each shard during processing" 493 + 494 + This suggests manifest generation should be automatic during writes. 495 + 496 + --- 497 + 498 + ## Next Steps 499 + 500 + 1. Review and approve this roadmap 501 + 2. Create chainlink issues for Phase A tasks 502 + 3. Begin implementation of ManifestBuilder 503 + 4. Set up Modal project for processing experiments
+512
.planning/roadmap/v0.3/architecture-doc.md
··· 1 + # Data Processing Backend Architecture 2 + 3 + This document outlines the architecture for a robust, serverless data processing backend designed to handle large-ish datasets while keeping scientists and data producers unencumbered by infrastructure concerns. 4 + 5 + --- 6 + 7 + ## Design Goals 8 + 9 + 1. **Invisible infrastructure**: Scientists and data producers interact via thin clients (CLI, notebooks, simple UI) without managing infra day-to-day 10 + 2. **Parallelizable processing**: Support map-reduce style workflows over sharded datasets 11 + 3. **Fire-and-forget execution**: Jobs complete asynchronously; results are queried from output storage 12 + 4. **Event-driven and manual triggers**: Support both automatic processing on data arrival and ad-hoc runs 13 + 5. **Flexible job granularity**: Different jobs can chunk work differently based on their needs 14 + 15 + --- 16 + 17 + ## Core Stack 18 + 19 + | Component | Choice | Rationale | 20 + |-----------|--------|-----------| 21 + | **Object Storage** | Cloudflare R2 | S3-compatible, zero egress fees, event notifications via Queues | 22 + | **Compute** | Modal | Serverless Python functions, native `.map()` for parallelism, GPU support | 23 + | **Data Format** | WebDataset (via atdata) | Sharded tar files, streaming-friendly, natural parallelization unit | 24 + | **Job Queue** | None (Modal-native) | Modal's `.spawn()` and `.map()` provide sufficient queuing semantics | 25 + 26 + ### Why not an external queue (e.g., QStash)? 27 + 28 + For this workload, Modal provides: 29 + - `.spawn()` for fire-and-forget async calls 30 + - `.map()` / `.starmap()` for parallel fan-out 31 + - `@web_endpoint` to receive HTTP triggers 32 + - Built-in retries at the function level 33 + 34 + An external queue would add value for guaranteed delivery during Modal outages or complex rate limiting, but adds complexity. Since the source of truth is "what's in S3," failed jobs can be retried by re-triggering. The data remains in the bucket. 35 + 36 + --- 37 + 38 + ## System Architecture 39 + 40 + ``` 41 + ┌─────────────────┐ ┌─────────────────┐ 42 + │ Scientist CLI │ │ R2 Event │ 43 + │ / Notebook │ │ Notification │ 44 + └────────┬────────┘ └────────┬────────┘ 45 + │ │ 46 + │ HTTP POST │ Cloudflare Queue → Worker 47 + ▼ ▼ 48 + ┌─────────────────────────────────────────────┐ 49 + │ Modal @web_endpoint │ 50 + │ (job dispatcher / router) │ 51 + └────────────────────┬────────────────────────┘ 52 + 53 + │ .map() / .starmap() 54 + 55 + ┌─────────────────────────────────────────────┐ 56 + │ Modal workers (CPU or GPU) │ 57 + │ read from R2 → process → write to R2 │ 58 + └────────────────────┬────────────────────────┘ 59 + 60 + │ write alongside output shards 61 + 62 + ┌─────────────────────────────────────────────┐ 63 + │ Per-shard manifests + output │ 64 + │ (enables indexing and queries) │ 65 + └─────────────────────────────────────────────┘ 66 + ``` 67 + 68 + --- 69 + 70 + ## R2 Event Notifications → Modal 71 + 72 + R2 event notifications are GA and send messages to Cloudflare Queues when bucket data changes. Two consumption patterns are available: 73 + 74 + ### Option A: Push via Cloudflare Worker (Recommended) 75 + 76 + ``` 77 + R2 bucket event → Cloudflare Queue → Consumer Worker → HTTP POST to Modal 78 + ``` 79 + 80 + A minimal Cloudflare Worker (~20 lines) forwards events to Modal's `@web_endpoint`. This provides: 81 + - **Batching**: Aggregate multiple events before calling Modal 82 + - **Filtering**: Ignore events you don't care about 83 + - **Retry logic**: At the Cloudflare layer 84 + 85 + Example Worker: 86 + 87 + ```javascript 88 + export default { 89 + async queue(batch, env) { 90 + for (const msg of batch.messages) { 91 + const event = msg.body; 92 + await fetch("https://your-org--dispatcher.modal.run/ingest", { 93 + method: "POST", 94 + headers: { 95 + "Content-Type": "application/json", 96 + "Authorization": `Bearer ${env.MODAL_SECRET}` 97 + }, 98 + body: JSON.stringify({ 99 + bucket: event.bucket, 100 + object: event.object.key, 101 + action: event.action, 102 + timestamp: new Date().toISOString() 103 + }) 104 + }); 105 + msg.ack(); 106 + } 107 + } 108 + }; 109 + ``` 110 + 111 + ### Option B: Pull-based Consumer 112 + 113 + A Modal scheduled function polls the queue periodically. Simpler (no Worker to maintain) but adds latency. 114 + 115 + --- 116 + 117 + ## Data Format: WebDataset with atdata Schemas 118 + 119 + ### Why WebDataset? 120 + 121 + WebDataset stores data as sharded tar files where each sample consists of files sharing a common basename: 122 + 123 + ``` 124 + shard-00042.tar 125 + ├── sample_00000.jpg 126 + ├── sample_00000.json 127 + ├── sample_00001.jpg 128 + ├── sample_00001.json 129 + └── ... 130 + ``` 131 + 132 + Benefits for this architecture: 133 + - **Natural parallelization unit**: One shard = one worker, no coordination needed 134 + - **Streaming-friendly**: Workers process tar files sequentially without loading into memory 135 + - **S3-optimal**: Sequential reads, no random access penalty 136 + - **Append is trivial**: New data = new shards, never rewrite existing data 137 + 138 + ### atdata Layer 139 + 140 + atdata adds: 141 + 1. **Sample type schemas**: Each sample conforms to a known schema, enabling structured serialization/deserialization 142 + 2. **External schema management**: Schema is separate from data, allowing evolution 143 + 144 + This gives excellent append semantics (just create new tar files) while maintaining type safety. 145 + 146 + --- 147 + 148 + ## Modal Code Structure 149 + 150 + ### Dispatcher 151 + 152 + ```python 153 + @app.function() 154 + @modal.web_endpoint(method="POST") 155 + async def dispatch(request: Request): 156 + body = await request.json() 157 + input_prefix = body["input_prefix"] 158 + output_prefix = body["output_prefix"] 159 + job_type = body["job_type"] 160 + 161 + shards = list_shards(input_prefix) # S3 list operation 162 + 163 + # Fan out to workers 164 + results = worker.map( 165 + [(s, output_prefix, job_type) for s in shards] 166 + ) 167 + return {"status": "started", "num_shards": len(shards)} 168 + ``` 169 + 170 + ### Worker 171 + 172 + ```python 173 + @app.function(cpu=2, memory=4096, timeout=600) 174 + def worker(shard_url: str, output_prefix: str, job_type: str): 175 + schema = get_schema_for_job(job_type) 176 + 177 + # Stream from R2, process, write output shard with manifest 178 + with ShardWriter(output_shard) as sink: 179 + for sample in stream_shard(shard_url, schema): 180 + processed = process_sample(sample, job_type) 181 + sink.write(processed) 182 + 183 + # Write manifest alongside shard 184 + write_manifest(output_shard, manifest) 185 + ``` 186 + 187 + ### Shard Size Guidelines 188 + 189 + Target shards that take 30 seconds to 5 minutes to process: 190 + - **Too small**: Per-task overhead dominates (cold starts, container spin-up) 191 + - **Too large**: Memory pressure, coarse retry granularity 192 + - **Just right**: Good parallelism, efficient resource use 193 + 194 + --- 195 + 196 + ## Indexing and Query Strategy 197 + 198 + The core tension: append-friendly storage (WebDataset) vs. queryable storage. The solution is a layered approach using per-shard manifests. 199 + 200 + ### The Indexing Spectrum 201 + 202 + ``` 203 + Less infrastructure More infrastructure 204 + │ │ 205 + ▼ ▼ 206 + ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ 207 + │ Partitioned │ │ Per-shard │ │ Centralized │ │ Materialized│ 208 + │ bucket paths│ │ manifests │ │ metadata DB │ │ query views │ 209 + └──────────────┘ └──────────────┘ └──────────────┘ └──────────────┘ 210 + ``` 211 + 212 + ### Recommended Phased Approach 213 + 214 + **Phase 1: Per-shard manifests** (Start here) 215 + - Emit a manifest alongside each shard during processing 216 + - Simple query executor scans manifests and returns matching sample locations 217 + - No external systems, manifests live with data 218 + 219 + **Phase 2: Add aggregates for shard-level filtering** 220 + - Include statistical summaries in manifests 221 + - Skip entire shards that can't match a query (e.g., max_score < query_min_score) 222 + 223 + **Phase 3: Materialized query views** 224 + - Common queries produce new WebDatasets 225 + - Results cached by query hash, reused across training runs 226 + 227 + **Phase 4: Centralized DB (maybe never)** 228 + - Only if manifest scanning becomes a bottleneck at millions of shards 229 + - Or if you need complex queries (joins, full-text search) 230 + 231 + --- 232 + 233 + ## Per-Shard Manifest Design 234 + 235 + Each shard gets a companion manifest with three layers of information: 236 + 237 + ### Layer 1: Shard-Level Metadata 238 + 239 + Tiny header for coarse filtering—always read this first. 240 + 241 + ```yaml 242 + shard_id: "dataset-v2/train/shard-00042" 243 + schema_type: "image_classification" 244 + schema_version: "2.3.0" 245 + 246 + # Provenance 247 + created_at: "2025-01-22T14:32:00Z" 248 + source_job_id: "modal-job-abc123" 249 + parent_shards: ["raw/shard-00010"] 250 + pipeline_version: "1.4.2" 251 + 252 + # Physical properties 253 + num_samples: 1000 254 + size_bytes: 524288000 255 + checksum_sha256: "a1b2c3..." 256 + 257 + # Temporal bounds 258 + time_min: "2025-01-01T00:00:00Z" 259 + time_max: "2025-01-15T23:59:59Z" 260 + ``` 261 + 262 + ### Layer 2: Statistical Aggregates 263 + 264 + Enable shard skipping without reading sample details. 265 + 266 + **Categorical fields:** 267 + ```yaml 268 + aggregates: 269 + label: 270 + type: categorical 271 + cardinality: 3 272 + values: {dog: 423, cat: 512, bird: 65} 273 + ``` 274 + 275 + **Numeric fields:** 276 + ```yaml 277 + aggregates: 278 + confidence_score: 279 + type: numeric 280 + min: 0.12 281 + max: 0.98 282 + mean: 0.72 283 + histogram: 284 + buckets: [0.0, 0.2, 0.4, 0.6, 0.8, 1.0] 285 + counts: [12, 45, 189, 402, 352] 286 + ``` 287 + 288 + **Set/tag fields:** 289 + ```yaml 290 + aggregates: 291 + tags: 292 + type: set 293 + all_values: [outdoor, indoor, night, day, urban, rural] 294 + bloom_filter_base64: "..." # For high-cardinality sets 295 + ``` 296 + 297 + ### Layer 3: Per-Sample Metadata 298 + 299 + Queryable fields for each sample in the shard. 300 + 301 + ```yaml 302 + samples: 303 + - __key__: "sample_00000" 304 + __offset__: 0 # Byte offset for direct access 305 + __size__: 52480 306 + label: "dog" 307 + confidence: 0.92 308 + tags: [outdoor, day] 309 + 310 + - __key__: "sample_00001" 311 + __offset__: 52480 312 + __size__: 48192 313 + label: "cat" 314 + confidence: 0.87 315 + tags: [indoor] 316 + ``` 317 + 318 + **Field selection guidelines:** 319 + - **Include**: Fields you'll filter on, aggregate on, or use for joins 320 + - **Exclude**: Large blobs (images, embeddings), unique-per-sample values with no query utility 321 + 322 + --- 323 + 324 + ## Manifest Format Recommendations 325 + 326 + ### Hybrid Approach (Recommended) 327 + 328 + ``` 329 + shard-00042.tar # The data 330 + shard-00042.manifest.json # Header + aggregates (tiny, human-readable) 331 + shard-00042.manifest.parquet # Per-sample metadata (columnar, compressed) 332 + ``` 333 + 334 + ### Why Parquet for Sample Metadata? 335 + 336 + - **Columnar**: Read only the columns you need 337 + - **Compressed**: 5-10x smaller than JSON 338 + - **Ecosystem**: DuckDB, Polars, pandas read it natively 339 + - **Aggregation**: Query across all manifests without loading into memory 340 + 341 + ```python 342 + import duckdb 343 + 344 + # Query across all manifests 345 + result = duckdb.query(""" 346 + SELECT label, COUNT(*) as count 347 + FROM 's3://bucket/dataset/**/*.manifest.parquet' 348 + WHERE confidence > 0.9 349 + GROUP BY label 350 + """) 351 + ``` 352 + 353 + ### Simple Alternative 354 + 355 + For smaller scale (<10K samples per shard), plain JSON is fine: 356 + 357 + ```python 358 + def write_manifest(manifest, path): 359 + if manifest.num_samples < 10_000: 360 + write_json(path + ".manifest.json", manifest) 361 + else: 362 + # Split for larger shards 363 + write_json(path + ".manifest.json", manifest.header_and_aggregates()) 364 + write_jsonl(path + ".manifest.samples.jsonl", manifest.samples) 365 + ``` 366 + 367 + --- 368 + 369 + ## Schema Integration 370 + 371 + Derive manifest fields from atdata schemas: 372 + 373 + ```python 374 + @dataclass 375 + class ImageClassificationSchema: 376 + """atdata schema for image classification samples.""" 377 + image: bytes 378 + label: str 379 + confidence: float 380 + tags: List[str] 381 + metadata: dict 382 + 383 + def manifest_fields(schema: Type) -> List[ManifestField]: 384 + """Define which fields are queryable.""" 385 + return [ 386 + ManifestField("label", indexed=True, aggregate="categorical"), 387 + ManifestField("confidence", indexed=True, aggregate="numeric"), 388 + ManifestField("tags", indexed=True, aggregate="set"), 389 + # 'image' excluded - too large 390 + # 'metadata' excluded - unstructured 391 + ] 392 + ``` 393 + 394 + When you define a new schema type, you also define what's queryable. The manifest writer automatically extracts those fields. 395 + 396 + --- 397 + 398 + ## Write Path Integration 399 + 400 + ```python 401 + @app.function() 402 + def process_shard(input_shard: str, output_prefix: str, job_id: str): 403 + schema = get_schema_for_shard(input_shard) 404 + manifest_fields = schema.manifest_fields() 405 + 406 + manifest = ManifestBuilder( 407 + shard_id=output_shard_id, 408 + schema_type=schema.name, 409 + schema_version=schema.version, 410 + source_job_id=job_id, 411 + parent_shards=[input_shard], 412 + ) 413 + 414 + output_shard = f"{output_prefix}/{output_shard_id}.tar" 415 + 416 + with ShardWriter(output_shard) as sink: 417 + for sample in stream_shard(input_shard, schema): 418 + processed = process_sample(sample) 419 + sink.write(processed) 420 + 421 + manifest.add_sample( 422 + key=processed["__key__"], 423 + offset=sink.current_offset, 424 + fields={f: processed[f] for f in manifest_fields} 425 + ) 426 + 427 + manifest.finalize_aggregates() 428 + manifest.write(output_shard.replace(".tar", "")) 429 + ``` 430 + 431 + --- 432 + 433 + ## Query Execution 434 + 435 + ```python 436 + def execute_query(bucket_prefix: str, predicate: Callable) -> List[SampleLocation]: 437 + manifests = list_manifests(bucket_prefix) 438 + 439 + results = [] 440 + for manifest in manifests: 441 + # Shard-level filtering using aggregates 442 + if not shard_might_match(manifest.aggregates, predicate): 443 + continue 444 + 445 + # Sample-level filtering 446 + for sample_meta in manifest.samples: 447 + if predicate(sample_meta): 448 + results.append(SampleLocation( 449 + shard=manifest.shard_id, 450 + key=sample_meta["__key__"], 451 + offset=sample_meta.get("__offset__") 452 + )) 453 + 454 + return results 455 + 456 + def shard_might_match(aggregates, predicate): 457 + """Quick check using aggregates before scanning samples.""" 458 + if predicate.requires_label("dog") and "dog" not in aggregates["label"]["values"]: 459 + return False 460 + if predicate.min_score and aggregates["confidence_score"]["max"] < predicate.min_score: 461 + return False 462 + return True 463 + ``` 464 + 465 + --- 466 + 467 + ## Open Design Questions 468 + 469 + 1. **Schema evolution in manifests**: When adding new queryable fields, how to handle old manifests? 470 + - Backfill (expensive but clean) 471 + - Nullable (queries skip old shards) 472 + - Versioned readers (handle missing fields gracefully) 473 + 474 + 2. **Manifest update story**: If manifests are immutable (written once with shard), life is simple. If updates are needed (e.g., adding derived fields later), need atomicity and versioning strategy. 475 + 476 + 3. **Sample ID structure**: If "find sample by ID" needs to be fast, consider structured IDs (e.g., `{shard_id}/{seq}`) so location is derivable, or maintain a separate key→location index. 477 + 478 + --- 479 + 480 + ## Future Considerations 481 + 482 + ### Materialized Query Views 483 + 484 + For frequently-used queries, materialize results as new WebDatasets: 485 + 486 + ```python 487 + @app.function() 488 + def materialize_query(query: Query, output_prefix: str): 489 + matching_samples = execute_query(query) 490 + 491 + with ShardWriter(output_prefix, maxcount=10000) as sink: 492 + for sample in matching_samples: 493 + sink.write(sample) 494 + 495 + return {"output": output_prefix, "num_samples": sink.count} 496 + ``` 497 + 498 + Cache by query hash; reuse across training runs. 499 + 500 + ### Centralized Database 501 + 502 + If manifest scanning becomes a bottleneck (millions of shards) or complex queries are needed (joins, full-text search), consider: 503 + - **SQLite in S3**: Viable up to ~10M samples 504 + - **DuckDB**: Better for analytical queries, can query Parquet directly 505 + - **Postgres (Neon/Supabase)**: For concurrent writers, transactions, real-time updates 506 + 507 + ### Vector Search 508 + 509 + If queries like "samples similar to this embedding" are needed, consider: 510 + - Storing embedding vectors in manifests (if small enough) 511 + - External vector DB (Pinecone, Qdrant, pgvector) 512 + - Approximate nearest neighbor indexes alongside shards
+1
CHANGELOG.md
··· 25 25 - **Comprehensive integration test suite**: 593 tests covering E2E flows, error handling, edge cases 26 26 27 27 ### Changed 28 + - Create v0.3 roadmap synthesis document (#373) 28 29 - Document justfile in CLAUDE.md (#372) 29 30 - Make docs script work from any directory (#371) 30 31 - Add uv script shortcut 'docs' for documentation build (#370)