about things
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

remove projects folder - planning docs don't belong in notes

zzstoatzz 204eb7b7 a1548de4

-372
-372
projects/prefect-zig/plan.md
··· 1 - # prefect-zig: zig implementation of the prefect server 2 - 3 - ## overview 4 - 5 - reimplement the prefect orchestration server in zig. goals: 6 - - single binary deployment 7 - - minimal memory footprint 8 - - predictable latency 9 - - compatible with existing prefect python clients 10 - 11 - ## architecture mapping 12 - 13 - ### python prefect → zig prefect 14 - 15 - | python component | zig equivalent | 16 - |-----------------|----------------| 17 - | fastapi routes | std.http.Server + thread pool | 18 - | sqlalchemy ORM | zqlite.zig / raw sql | 19 - | pydantic models | comptime struct validation | 20 - | asyncio services | std.Thread + message passing | 21 - | memory messaging | channel-like queues | 22 - | redis messaging | (phase 2) | 23 - 24 - ## core components 25 - 26 - ### 1. http server (priority: immediate) 27 - 28 - pattern from leaflet-search: 29 - ```zig 30 - // main.zig 31 - var pool: Thread.Pool = undefined; 32 - try pool.init(.{ .allocator = allocator, .n_jobs = MAX_HTTP_WORKERS }); 33 - 34 - while (true) { 35 - const conn = listener.accept() catch continue; 36 - pool.spawn(server.handleConnection, .{conn}); 37 - } 38 - ``` 39 - 40 - routes to implement (mvp): 41 - - `POST /flow_runs` - create flow run 42 - - `GET /flow_runs/{id}` - read flow run 43 - - `POST /flow_runs/{id}/set_state` - state transitions (critical) 44 - - `POST /flow_runs/filter` - list/query 45 - - `POST /flows` - create/get-or-create flow 46 - - `GET /health` - readiness probe 47 - 48 - ### 2. database layer (priority: immediate) 49 - 50 - **decision: sqlite first, postgres interface later** 51 - 52 - rationale: 53 - - zqlite.zig already available 54 - - single-file deployment 55 - - prefect oss already supports sqlite 56 - - can add postgres dialect behind same interface 57 - 58 - schema from `orm_models.py`: 59 - ```sql 60 - -- core tables (mvp) 61 - CREATE TABLE flow ( 62 - id BLOB PRIMARY KEY DEFAULT (randomblob(16)), 63 - created TEXT DEFAULT (datetime('now')), 64 - updated TEXT DEFAULT (datetime('now')), 65 - name TEXT NOT NULL UNIQUE, 66 - tags TEXT DEFAULT '[]', 67 - labels TEXT 68 - ); 69 - 70 - CREATE TABLE flow_run ( 71 - id BLOB PRIMARY KEY DEFAULT (randomblob(16)), 72 - created TEXT DEFAULT (datetime('now')), 73 - updated TEXT DEFAULT (datetime('now')), 74 - flow_id BLOB REFERENCES flow(id), 75 - deployment_id BLOB REFERENCES deployment(id), 76 - name TEXT NOT NULL, 77 - state_type TEXT, 78 - state_name TEXT, 79 - state_timestamp TEXT, 80 - run_count INTEGER DEFAULT 0, 81 - expected_start_time TEXT, 82 - next_scheduled_start_time TEXT, 83 - start_time TEXT, 84 - end_time TEXT, 85 - total_run_time REAL DEFAULT 0, 86 - tags TEXT DEFAULT '[]', 87 - parameters TEXT DEFAULT '{}', 88 - idempotency_key TEXT, 89 - -- ... more fields 90 - UNIQUE(flow_id, idempotency_key) 91 - ); 92 - 93 - CREATE TABLE flow_run_state ( 94 - id BLOB PRIMARY KEY DEFAULT (randomblob(16)), 95 - created TEXT DEFAULT (datetime('now')), 96 - flow_run_id BLOB REFERENCES flow_run(id) ON DELETE CASCADE, 97 - type TEXT NOT NULL, -- PENDING, RUNNING, COMPLETED, FAILED, etc. 98 - timestamp TEXT DEFAULT (datetime('now')), 99 - name TEXT NOT NULL, 100 - message TEXT, 101 - state_details TEXT DEFAULT '{}' 102 - ); 103 - 104 - -- indexes 105 - CREATE INDEX ix_flow_run__state_type ON flow_run(state_type); 106 - CREATE INDEX ix_flow_run__expected_start_time ON flow_run(expected_start_time); 107 - ``` 108 - 109 - database interface pattern: 110 - ```zig 111 - // db/interface.zig 112 - pub const Db = struct { 113 - conn: *zqlite.Conn, 114 - 115 - pub fn createFlowRun(self: *Db, flow_run: FlowRunCreate) !FlowRun { 116 - // ... 117 - } 118 - 119 - pub fn setFlowRunState(self: *Db, id: Uuid, state: State) !OrchestrationResult { 120 - // state transition logic 121 - } 122 - }; 123 - ``` 124 - 125 - ### 3. state machine / orchestration (priority: high) 126 - 127 - this is the heart of prefect. from `orchestration/rules.py`: 128 - 129 - state types: 130 - - SCHEDULED, PENDING, RUNNING, COMPLETED, FAILED, CANCELLED, CANCELLING, PAUSED, CRASHED 131 - 132 - valid transitions (simplified): 133 - ``` 134 - PENDING → RUNNING → COMPLETED 135 - → FAILED 136 - → CRASHED 137 - → CANCELLED 138 - → CANCELLED 139 - SCHEDULED → PENDING 140 - → RUNNING (direct) 141 - → CANCELLED 142 - ``` 143 - 144 - orchestration rules: 145 - 1. **CacheInsertion** - check/insert cache keys 146 - 2. **RetryFailedFlows** - reschedule on failure (if retries remain) 147 - 3. **RenameReruns** - increment name suffix on retry 148 - 4. **CopyScheduledTime** - preserve expected_start_time 149 - 5. **HandlePauseReschedule** - manage pause/resume 150 - 6. **UpdateFlowRunTrackerOnTasks** - track task completions 151 - 152 - pattern: 153 - ```zig 154 - // orchestration/rules.zig 155 - pub const OrchestrationResult = struct { 156 - state: ?State, 157 - status: enum { accept, reject, abort, wait }, 158 - details: ?Details, 159 - }; 160 - 161 - pub fn applyRules( 162 - ctx: *OrchestrationContext, 163 - initial_state: ?State, 164 - proposed_state: State, 165 - ) !OrchestrationResult { 166 - // run through policy chain 167 - } 168 - ``` 169 - 170 - ### 4. messaging system (priority: medium) 171 - 172 - from `utilities/messaging/memory.py`: 173 - 174 - ```zig 175 - // messaging/memory.zig 176 - pub const Message = struct { 177 - data: []const u8, 178 - attributes: std.StringHashMap([]const u8), 179 - retry_count: u32 = 0, 180 - }; 181 - 182 - pub const Topic = struct { 183 - name: []const u8, 184 - subscriptions: std.ArrayList(*Subscription), 185 - 186 - pub fn publish(self: *Topic, msg: Message) !void { 187 - for (self.subscriptions.items) |sub| { 188 - try sub.deliver(msg); 189 - } 190 - } 191 - }; 192 - 193 - pub const Subscription = struct { 194 - queue: std.fifo.LinearFifo(Message, .Dynamic), 195 - retry_queue: std.fifo.LinearFifo(Message, .Dynamic), 196 - max_retries: u32 = 3, 197 - 198 - pub fn get(self: *Subscription) ?Message { 199 - return self.retry_queue.readItem() orelse self.queue.readItem(); 200 - } 201 - }; 202 - ``` 203 - 204 - ### 5. background services (priority: medium) 205 - 206 - from `services/base.py`: 207 - 208 - services to implement: 209 - 1. **Scheduler** - create flow runs from deployment schedules 210 - 2. **LateRuns** - mark late runs, update states 211 - 3. **Foreman** - track worker heartbeats 212 - 4. **TaskRunRecorder** - persist task run states from events 213 - 5. **EventPersister** - write events to db 214 - 215 - pattern: 216 - ```zig 217 - // services/base.zig 218 - pub const Service = struct { 219 - name: []const u8, 220 - interval_ms: u64, 221 - running: std.atomic.Value(bool), 222 - 223 - pub fn start(self: *Service) !void { 224 - self.running.store(true, .monotonic); 225 - while (self.running.load(.monotonic)) { 226 - try self.run(); 227 - std.time.sleep(self.interval_ms * std.time.ns_per_ms); 228 - } 229 - } 230 - 231 - pub fn stop(self: *Service) void { 232 - self.running.store(false, .monotonic); 233 - } 234 - }; 235 - 236 - // services/scheduler.zig 237 - pub const Scheduler = struct { 238 - base: Service, 239 - db: *Db, 240 - 241 - pub fn run(self: *Scheduler) !void { 242 - const due_deployments = try self.db.getDeploymentsWithDueSchedules(); 243 - for (due_deployments) |deployment| { 244 - try self.createScheduledRun(deployment); 245 - } 246 - } 247 - }; 248 - ``` 249 - 250 - ## phased implementation 251 - 252 - ### phase 1: minimal viable server (weeks 1-2) 253 - - [ ] http server skeleton (health endpoint) 254 - - [ ] sqlite database with flow, flow_run, flow_run_state tables 255 - - [ ] create/read flow run 256 - - [ ] set_flow_run_state with basic state machine 257 - - [ ] flow runs filter endpoint 258 - 259 - **milestone**: python client can submit and track a flow run 260 - 261 - ### phase 2: state orchestration (weeks 3-4) 262 - - [ ] full state transition rules 263 - - [ ] orchestration policies 264 - - [ ] idempotency handling 265 - - [ ] task runs (similar to flow runs) 266 - 267 - **milestone**: python client can run flows with tasks 268 - 269 - ### phase 3: background services (weeks 5-6) 270 - - [ ] in-memory messaging 271 - - [ ] event persistence 272 - - [ ] scheduler service (cron-triggered runs) 273 - - [ ] late runs detection 274 - 275 - **milestone**: scheduled deployments work 276 - 277 - ### phase 4: deployments & workers (weeks 7-8) 278 - - [ ] deployments table/api 279 - - [ ] work pools & queues 280 - - [ ] worker heartbeat tracking 281 - - [ ] task scheduling queue 282 - 283 - **milestone**: workers can pick up and execute runs 284 - 285 - ### phase 5: horizontal scaling (future) 286 - - [ ] redis messaging backend 287 - - [ ] postgres support 288 - - [ ] connection pooling 289 - - [ ] read replicas 290 - 291 - ## project structure 292 - 293 - ``` 294 - prefect-zig/ 295 - ├── build.zig 296 - ├── build.zig.zon 297 - ├── src/ 298 - │ ├── main.zig # entrypoint, server setup 299 - │ ├── server/ 300 - │ │ ├── http.zig # connection handling 301 - │ │ ├── router.zig # route dispatch 302 - │ │ └── middleware.zig # cors, logging, etc. 303 - │ ├── api/ 304 - │ │ ├── flow_runs.zig 305 - │ │ ├── flows.zig 306 - │ │ ├── deployments.zig 307 - │ │ ├── task_runs.zig 308 - │ │ └── health.zig 309 - │ ├── db/ 310 - │ │ ├── interface.zig # database abstraction 311 - │ │ ├── sqlite.zig # sqlite implementation 312 - │ │ ├── schema.zig # table definitions 313 - │ │ └── migrations/ 314 - │ ├── models/ 315 - │ │ ├── flow.zig 316 - │ │ ├── flow_run.zig 317 - │ │ ├── state.zig 318 - │ │ └── ... 319 - │ ├── orchestration/ 320 - │ │ ├── rules.zig 321 - │ │ ├── policies.zig 322 - │ │ └── state_machine.zig 323 - │ ├── messaging/ 324 - │ │ ├── interface.zig 325 - │ │ ├── memory.zig 326 - │ │ └── (redis.zig - future) 327 - │ └── services/ 328 - │ ├── base.zig 329 - │ ├── scheduler.zig 330 - │ ├── late_runs.zig 331 - │ └── ... 332 - ├── tests/ 333 - └── fly.toml 334 - ``` 335 - 336 - ## dependencies 337 - 338 - ```zig 339 - // build.zig.zon 340 - .dependencies = .{ 341 - .zqlite = .{ 342 - .url = "https://github.com/karlseguin/zqlite.zig/...", 343 - }, 344 - // json parsing - std.json may suffice 345 - // uuid - comptime generation or std.crypto 346 - }, 347 - ``` 348 - 349 - ## open questions 350 - 351 - 1. **json serialization**: use std.json or bring in a library? python uses pydantic, need efficient json for api responses. 352 - 353 - 2. **uuid handling**: sqlite stores as BLOB, api needs string representation. comptime uuid parsing? 354 - 355 - 3. **datetime handling**: sqlite stores as TEXT (iso8601), need parsing/formatting. std has basics. 356 - 357 - 4. **connection pooling**: for sqlite, single writer + multiple readers? or serialize writes? 358 - 359 - 5. **migrations**: hand-write sql? build-time generation? 360 - 361 - ## testing strategy 362 - 363 - 1. **unit tests**: orchestration rules, state transitions 364 - 2. **integration tests**: http endpoints against real sqlite 365 - 3. **compatibility tests**: run prefect python client against zig server 366 - 4. **load tests**: throughput, latency under concurrent flow runs 367 - 368 - ## references 369 - 370 - - prefect server: `/Users/nate/github.com/prefecthq/prefect/src/prefect/server/` 371 - - leaflet-search patterns: `/Users/nate/tangled.sh/@zzstoatzz.io/leaflet-search/backend/` 372 - - zig notes: `/Users/nate/tangled.sh/@zzstoatzz.io/notes/languages/ziglang/`