An educational pure functional programming library in TypeScript
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add task-queue example with side-by-side comparison

+374
+43
examples/task-queue/README.md
··· 1 + # Task Queue 2 + 3 + > Background job processing with retries and timeouts shouldn't require a framework. 4 + 5 + ## The Problem 6 + 7 + Background jobs are everywhere: sending emails, processing images, syncing data. Each job needs: 8 + - **Retry logic** for transient failures 9 + - **Timeouts** so jobs don't run forever 10 + - **Error handling** with context preserved 11 + - **Testability** - easy to mock dependencies 12 + 13 + In vanilla TypeScript, you end up with manual Promise tracking, scattered try/catch blocks, and code that's impossible to test without real dependencies. 14 + 15 + ## Run Both Versions 16 + 17 + ```bash 18 + bun run examples/task-queue/without-purus.ts 19 + bun run examples/task-queue/with-purus.ts 20 + ``` 21 + 22 + ## Without purus 23 + 24 + See `without-purus.ts` - realistic code showing: 25 + - Manual Promise tracking for workers 26 + - `Promise.race` for timeout (doesn't cancel the job!) 27 + - Lost error context after retries fail 28 + - Hard to test - dependencies are hardcoded 29 + 30 + ## With purus 31 + 32 + See `with-purus.ts` - same functionality with: 33 + - `fork`/`join` for fiber-based workers 34 + - `timeout(30000)` actually cancels the job 35 + - `catchAll` preserves error context 36 + - `provide(mockEnv)` makes testing trivial 37 + 38 + ## Key Takeaways 39 + 40 + - **Fibers > Promises** → Real cancellation, not just ignoring results 41 + - **Composable timeouts** → `timeout()` is just another combinator 42 + - **Error context preserved** → `catchAll` sees the original error 43 + - **Dependency injection** → `provide()` makes testing trivial
+206
examples/task-queue/with-purus.ts
··· 1 + /** 2 + * Task Queue - WITH purus 3 + * 4 + * Same functionality as without-purus.ts, but with: 5 + * - Fiber-based execution with real cancellation 6 + * - Composable retry/timeout combinators 7 + * - Error context preserved through the pipeline 8 + * - Dependency injection for easy testing 9 + */ 10 + 11 + import { 12 + // Types 13 + type Eff, 14 + Exit, 15 + 16 + // Constructors 17 + succeed, 18 + fail, 19 + async, 20 + 21 + // Transformations 22 + flatMap, 23 + mapEff, 24 + catchAll, 25 + 26 + // Environment 27 + accessEff, 28 + provide, 29 + 30 + // Combinators 31 + timeout, 32 + retry, 33 + allSequential, 34 + 35 + // Composition 36 + pipe, 37 + 38 + // Runners 39 + runPromise, 40 + } from "../../src/index" 41 + 42 + // ----------------------------------------------------------------------------- 43 + // Job Types 44 + // ----------------------------------------------------------------------------- 45 + 46 + type Job = { 47 + id: string 48 + type: "email" | "image" | "sync" 49 + payload: Record<string, unknown> 50 + } 51 + 52 + // SOLUTION: Typed error union - compiler knows all failure modes 53 + type JobError = 54 + | { readonly _tag: "TransientError"; readonly jobId: string; readonly message: string } 55 + | { readonly _tag: "TimeoutError"; readonly jobId: string; readonly ms: number } 56 + 57 + const JobError = { 58 + transient: (jobId: string, message: string): JobError => 59 + ({ _tag: "TransientError", jobId, message }), 60 + timeout: (jobId: string, ms: number): JobError => 61 + ({ _tag: "TimeoutError", jobId, ms }), 62 + } 63 + 64 + // SOLUTION: Environment type for dependency injection 65 + type QueueEnv = { 66 + logger: { 67 + info: (msg: string) => void 68 + error: (msg: string) => void 69 + } 70 + } 71 + 72 + // ----------------------------------------------------------------------------- 73 + // Job Execution 74 + // ----------------------------------------------------------------------------- 75 + 76 + // SOLUTION: executeJob returns an Eff with cleanup function for real cancellation 77 + const executeJob = (job: Job): Eff<void, JobError, QueueEnv> => 78 + async((resume) => { 79 + // SOLUTION: Cancellation token - cleanup function can abort this 80 + let cancelled = false 81 + const delay = Math.random() * 200 + 50 82 + 83 + const timeoutId = setTimeout(() => { 84 + if (cancelled) return // SOLUTION: Check cancellation before resuming 85 + 86 + // Simulate 30% failure rate 87 + if (Math.random() < 0.3) { 88 + resume(Exit.fail(JobError.transient(job.id, "transient error"))) 89 + } else { 90 + resume(Exit.succeed(undefined)) 91 + } 92 + }, delay) 93 + 94 + // SOLUTION: Return cleanup function - runtime calls this on timeout/interrupt 95 + return () => { 96 + cancelled = true 97 + clearTimeout(timeoutId) 98 + } 99 + }) 100 + 101 + // ----------------------------------------------------------------------------- 102 + // Job Processing Pipeline 103 + // ----------------------------------------------------------------------------- 104 + 105 + const TIMEOUT_MS = 5000 106 + const MAX_RETRIES = 3 107 + 108 + // SOLUTION: Composable pipeline - each combinator is just a function 109 + const processJob = (job: Job): Eff<void, never, QueueEnv> => 110 + pipe( 111 + // SOLUTION: accessEff to get logger from environment 112 + accessEff((env: QueueEnv) => 113 + pipe( 114 + succeed(undefined), 115 + flatMap(() => { 116 + env.logger.info(`Processing job ${job.id} (${job.type})`) 117 + return executeJob(job) 118 + }) 119 + ) 120 + ), 121 + // SOLUTION: retry(3) - composable, not scattered through code 122 + retry(MAX_RETRIES), 123 + // SOLUTION: timeout actually cancels the job via cleanup function! 124 + timeout(TIMEOUT_MS), 125 + flatMap((result) => 126 + result === null 127 + ? fail(JobError.timeout(job.id, TIMEOUT_MS)) 128 + : succeed(result) 129 + ), 130 + // SOLUTION: catchAll preserves typed error context 131 + catchAll((error: JobError) => 132 + accessEff((env: QueueEnv) => { 133 + // SOLUTION: error is JobError, not unknown - full type information! 134 + const msg = error._tag === "TimeoutError" 135 + ? `Job ${error.jobId} timed out after ${error.ms}ms` 136 + : `Job ${error.jobId} failed: ${error.message}` 137 + env.logger.error(msg) 138 + return succeed(undefined) 139 + }) 140 + ), 141 + // SOLUTION: Log success using environment 142 + flatMap(() => 143 + accessEff((env: QueueEnv) => { 144 + env.logger.info(`Job ${job.id} completed`) 145 + return succeed(undefined) 146 + }) 147 + ) 148 + ) 149 + 150 + // ----------------------------------------------------------------------------- 151 + // Queue Processing 152 + // ----------------------------------------------------------------------------- 153 + 154 + const processQueue = (jobs: Job[]): Eff<void, never, QueueEnv> => 155 + pipe( 156 + allSequential(jobs.map(processJob)), 157 + mapEff(() => undefined) 158 + ) 159 + 160 + // ----------------------------------------------------------------------------- 161 + // Environments 162 + // ----------------------------------------------------------------------------- 163 + 164 + // Production environment 165 + const prodEnv: QueueEnv = { 166 + logger: { 167 + info: (msg) => console.log(`[INFO] ${msg}`), 168 + error: (msg) => console.log(`[ERROR] ${msg}`), 169 + }, 170 + } 171 + 172 + // SOLUTION: Test environment - just provide a different object! 173 + const _testEnv: QueueEnv = { 174 + logger: { 175 + info: () => {}, // Silent for tests 176 + error: () => {}, 177 + }, 178 + } 179 + 180 + // ----------------------------------------------------------------------------- 181 + // Demo 182 + // ----------------------------------------------------------------------------- 183 + 184 + async function main() { 185 + console.log("=== Task Queue (with purus) ===\n") 186 + 187 + const jobs: Job[] = [ 188 + { id: "job-1", type: "email", payload: { to: "user@example.com" } }, 189 + { id: "job-2", type: "image", payload: { path: "/uploads/photo.jpg" } }, 190 + { id: "job-3", type: "sync", payload: { source: "db", target: "cache" } }, 191 + ] 192 + 193 + // SOLUTION: provide() injects dependencies - swap prodEnv for _testEnv in tests! 194 + const program = pipe( 195 + processQueue(jobs), 196 + provide(prodEnv) 197 + ) 198 + 199 + await runPromise(program) 200 + 201 + console.log("\n=== Done ===") 202 + } 203 + 204 + main() 205 + .catch(console.error) 206 + .finally(() => process.exit(0))
+125
examples/task-queue/without-purus.ts
··· 1 + /** 2 + * Task Queue - WITHOUT purus 3 + * 4 + * This is realistic vanilla TypeScript code for background job processing. 5 + * Notice the pain points marked with "PROBLEM:". 6 + */ 7 + 8 + // ----------------------------------------------------------------------------- 9 + // Job Types 10 + // ----------------------------------------------------------------------------- 11 + 12 + type Job = { 13 + id: string 14 + type: "email" | "image" | "sync" 15 + payload: Record<string, unknown> 16 + } 17 + 18 + // PROBLEM: Hardcoded logger - can't mock for testing without dependency injection framework 19 + const logger = { 20 + info: (msg: string) => console.log(`[INFO] ${msg}`), 21 + error: (msg: string) => console.log(`[ERROR] ${msg}`), 22 + } 23 + 24 + // ----------------------------------------------------------------------------- 25 + // Job Execution with Timeout 26 + // ----------------------------------------------------------------------------- 27 + 28 + async function executeJob(job: Job): Promise<void> { 29 + // Simulate job processing 30 + const delay = Math.random() * 200 + 50 31 + await new Promise(resolve => setTimeout(resolve, delay)) 32 + 33 + // Simulate 30% failure rate 34 + if (Math.random() < 0.3) { 35 + throw new Error(`Job ${job.id} failed: transient error`) 36 + } 37 + 38 + logger.info(`Job ${job.id} (${job.type}) completed`) 39 + } 40 + 41 + async function executeWithTimeout(job: Job, timeoutMs: number): Promise<void> { 42 + // PROBLEM: Promise.race doesn't cancel the losing promise! 43 + // The job keeps running even after timeout - we just ignore the result 44 + const result = await Promise.race([ 45 + executeJob(job), 46 + new Promise<never>((_, reject) => 47 + setTimeout(() => reject(new Error("Timeout")), timeoutMs) 48 + ), 49 + ]) 50 + return result 51 + } 52 + 53 + // ----------------------------------------------------------------------------- 54 + // Retry Logic 55 + // ----------------------------------------------------------------------------- 56 + 57 + async function executeWithRetry( 58 + job: Job, 59 + maxRetries: number, 60 + timeoutMs: number 61 + ): Promise<void> { 62 + let lastError: unknown 63 + 64 + for (let attempt = 1; attempt <= maxRetries; attempt++) { 65 + try { 66 + logger.info(`[Attempt ${attempt}/${maxRetries}] Processing job ${job.id}`) 67 + await executeWithTimeout(job, timeoutMs) 68 + return // Success! 69 + } catch (e) { 70 + // PROBLEM: We lose the original error context after multiple retries 71 + // Only the last error is preserved 72 + lastError = e 73 + logger.error(`Attempt ${attempt} failed: ${e instanceof Error ? e.message : e}`) 74 + } 75 + } 76 + 77 + // PROBLEM: Throwing unknown - caller has no type information about the error 78 + throw lastError 79 + } 80 + 81 + // ----------------------------------------------------------------------------- 82 + // Queue Processing 83 + // ----------------------------------------------------------------------------- 84 + 85 + async function processQueue(jobs: Job[]): Promise<void> { 86 + // PROBLEM: Manual promise tracking - easy to mess up 87 + const results: Array<{ job: Job; success: boolean; error?: unknown }> = [] 88 + 89 + for (const job of jobs) { 90 + try { 91 + await executeWithRetry(job, 3, 5000) 92 + results.push({ job, success: true }) 93 + } catch (e) { 94 + // PROBLEM: e is unknown - what kind of error was it? Timeout? Job failure? 95 + results.push({ job, success: false, error: e }) 96 + } 97 + } 98 + 99 + // Summary 100 + const succeeded = results.filter(r => r.success).length 101 + const failed = results.filter(r => !r.success).length 102 + logger.info(`Queue complete: ${succeeded} succeeded, ${failed} failed`) 103 + } 104 + 105 + // ----------------------------------------------------------------------------- 106 + // Demo 107 + // ----------------------------------------------------------------------------- 108 + 109 + async function main() { 110 + console.log("=== Task Queue (without purus) ===\n") 111 + 112 + const jobs: Job[] = [ 113 + { id: "job-1", type: "email", payload: { to: "user@example.com" } }, 114 + { id: "job-2", type: "image", payload: { path: "/uploads/photo.jpg" } }, 115 + { id: "job-3", type: "sync", payload: { source: "db", target: "cache" } }, 116 + ] 117 + 118 + await processQueue(jobs) 119 + 120 + console.log("\n=== Done ===") 121 + } 122 + 123 + main() 124 + .catch(console.error) 125 + .finally(() => process.exit(0))