An educational pure functional programming library in TypeScript
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

Add educational comments to task-queue example

+249 -70
+150 -45
examples/task-queue/with-purus.ts
··· 1 1 /** 2 - * Task Queue - with purus 2 + * Task Queue Example - Concurrency and Dependency Injection 3 + * ========================================================== 4 + * 5 + * This example teaches three advanced purus concepts: 6 + * 7 + * 1. REAL CANCELLATION - When timeout() fires, the job actually stops 8 + * Promise.race just ignores the loser - it keeps running in the background. 9 + * purus calls the cleanup function, which clears the timer and sets cancelled=true. 10 + * 11 + * 2. DEPENDENCY INJECTION - provide() and accessEff() for testability 12 + * The QueueEnv type defines what dependencies the effect needs. 13 + * provide(prodEnv) supplies production logger, provide(testEnv) supplies silent one. 14 + * No DI framework needed - it's just functions and types. 3 15 * 4 - * Same job processor, but: 5 - * - timeout() actually cancels the job 6 - * - Dependencies are injected (easy to test) 7 - * - Errors are typed, not unknown 16 + * 3. TYPED ERRORS THROUGH THE PIPELINE - No error information lost 17 + * JobError is a union type. Every handler knows exactly what can fail. 18 + * Compare with without-purus.ts where errors become `unknown`. 19 + * 20 + * Compare with without-purus.ts to see: 21 + * - Promise.race that doesn't actually cancel (line 41-48) 22 + * - Hardcoded logger that's awkward to mock (line 19-22) 23 + * - Lost error types through the pipeline 24 + * 25 + * Prerequisites: http-client and workflow-engine examples 26 + * This is the most advanced example - it combines effects, DI, and concurrency. 8 27 */ 9 28 10 29 import { ··· 25 44 runPromise, 26 45 } from "../../src/index" 27 46 28 - // ----------------------------------------------------------------------------- 29 - // Job types 30 - // ----------------------------------------------------------------------------- 47 + // ============================================================================= 48 + // SECTION 1: Types 49 + // ============================================================================= 50 + // 51 + // Job: The unit of work to process 52 + // JobError: What can go wrong (typed, not `unknown`) 53 + // QueueEnv: Dependencies that the effect requires (injected at runtime) 54 + // ============================================================================= 31 55 32 56 type Job = { 33 57 id: string ··· 35 59 payload: Record<string, unknown> 36 60 } 37 61 62 + // Typed errors - we know exactly what can fail 38 63 type JobError = 39 64 | { readonly _tag: "TransientError"; readonly jobId: string; readonly message: string } 40 65 | { readonly _tag: "TimeoutError"; readonly jobId: string; readonly ms: number } ··· 46 71 ({ _tag: "TimeoutError", jobId, ms }), 47 72 } 48 73 49 - // Environment - swap this out in tests 74 + // ============================================================================= 75 + // SECTION 2: Dependency Injection with Environments 76 + // ============================================================================= 77 + // 78 + // THE PROBLEM WITH HARDCODED DEPENDENCIES: 79 + // In without-purus.ts, the logger is a module-level constant. 80 + // To test silently, you'd need to mock the module or set NODE_ENV. 81 + // 82 + // THE SOLUTION - ENVIRONMENT TYPES: 83 + // 1. Define an interface (QueueEnv) that describes what the effect needs 84 + // 2. Use accessEff() to read from the environment 85 + // 3. Use provide() to supply the actual implementation 86 + // 87 + // HOW IT WORKS: 88 + // - Eff<A, E, R> has an R type parameter - the "requirements" 89 + // - processJob returns Eff<void, never, QueueEnv> - it REQUIRES QueueEnv 90 + // - provide(prodEnv) supplies QueueEnv, changing R from QueueEnv to unknown 91 + // 92 + // TESTING: Just swap provide(prodEnv) with provide(testEnv) in tests. 93 + // ============================================================================= 94 + 95 + // Environment type - what the effect requires 50 96 type QueueEnv = { 51 97 logger: { 52 98 info: (msg: string) => void ··· 54 100 } 55 101 } 56 102 57 - // ----------------------------------------------------------------------------- 58 - // Job execution - with real cancellation 59 - // ----------------------------------------------------------------------------- 103 + // Production environment - logs to console 104 + const prodEnv: QueueEnv = { 105 + logger: { 106 + info: (msg) => console.log(`[INFO] ${msg}`), 107 + error: (msg) => console.log(`[ERROR] ${msg}`), 108 + }, 109 + } 110 + 111 + // Test environment - silent logging for unit tests 112 + // Just swap provide(prodEnv) with provide(testEnv) in tests 113 + const _testEnv: QueueEnv = { 114 + logger: { 115 + info: () => {}, 116 + error: () => {}, 117 + }, 118 + } 119 + 120 + // ============================================================================= 121 + // SECTION 3: Real Cancellation with async() 122 + // ============================================================================= 123 + // 124 + // THE PROBLEM WITH Promise.race: 125 + // In without-purus.ts (line 41-48), Promise.race returns when one settles. 126 + // But THE LOSING PROMISE KEEPS RUNNING! If the job takes 10 seconds and 127 + // timeout is 5 seconds, the job runs for 10 seconds anyway - we just ignore it. 128 + // 129 + // THE SOLUTION - CLEANUP FUNCTIONS: 130 + // async() takes a register function that returns a cleanup function. 131 + // When timeout() fires (or manual interrupt), the cleanup function runs. 132 + // 133 + // HOW IT WORKS: 134 + // 1. Register function starts async work 135 + // 2. Returns cleanup function (clears timer, sets cancelled flag) 136 + // 3. On timeout: cleanup runs -> timer cleared -> no resume called 137 + // 4. The job actually STOPS, not just ignored 138 + // 139 + // GOTCHA: Check the cancelled flag before calling resume. 140 + // If cleanup ran, the fiber is done - calling resume causes problems. 141 + // ============================================================================= 60 142 61 143 const executeJob = (job: Job): Eff<void, JobError, QueueEnv> => 62 144 async((resume) => { 145 + // IMPORTANT: This flag tracks whether we've been cancelled 146 + // If true, we must NOT call resume - the fiber is already done 63 147 let cancelled = false 148 + 64 149 const delay = Math.random() * 200 + 50 65 150 66 151 const timeoutId = setTimeout(() => { 67 - if (cancelled) return // Already cancelled, don't resume 152 + // CHECK BEFORE RESUME: If cancelled, the fiber is done 153 + // Calling resume after cancellation causes undefined behavior 154 + if (cancelled) return 68 155 156 + // 30% chance of failure for demo purposes 69 157 if (Math.random() < 0.3) { 70 158 resume(Exit.fail(JobError.transient(job.id, "transient error"))) 71 159 } else { ··· 73 161 } 74 162 }, delay) 75 163 76 - // Return cleanup - called on timeout or interrupt 164 + // CLEANUP FUNCTION: Called on timeout, interrupt, or race-loser 165 + // This is the key difference from Promise.race - we actually clean up 77 166 return () => { 78 - cancelled = true 79 - clearTimeout(timeoutId) 167 + cancelled = true // Signal that we're done 168 + clearTimeout(timeoutId) // Cancel the pending timer 80 169 } 81 170 }) 82 171 83 - // ----------------------------------------------------------------------------- 84 - // Job pipeline 85 - // ----------------------------------------------------------------------------- 172 + // ============================================================================= 173 + // SECTION 4: Job Pipeline 174 + // ============================================================================= 175 + // 176 + // COMPOSABILITY IN ACTION: 177 + // The pipeline is built from simple, reusable combinators: 178 + // - accessEff() reads the environment for logging 179 + // - retry() wraps with retry logic 180 + // - timeout() adds cancellation timeout 181 + // - catchAll() recovers from errors 182 + // 183 + // Each piece is independent. Want more retries? Change one number. 184 + // Want a longer timeout? Change one number. No restructuring needed. 185 + // ============================================================================= 86 186 87 187 const TIMEOUT_MS = 5000 88 188 const MAX_RETRIES = 3 89 189 90 190 const processJob = (job: Job): Eff<void, never, QueueEnv> => 91 191 pipe( 192 + // accessEff gets the environment and runs an effect with it 92 193 accessEff((env: QueueEnv) => 93 194 pipe( 94 195 succeed(undefined), ··· 98 199 }) 99 200 ) 100 201 ), 202 + 203 + // Retry up to MAX_RETRIES times on failure 101 204 retry(MAX_RETRIES), 205 + 206 + // Timeout after TIMEOUT_MS - ACTUALLY CANCELS (unlike Promise.race) 102 207 timeout(TIMEOUT_MS), 208 + 209 + // Convert timeout (null) to typed error 103 210 flatMap((result) => 104 211 result === null 105 212 ? fail(JobError.timeout(job.id, TIMEOUT_MS)) 106 213 : succeed(result) 107 214 ), 215 + 216 + // Error recovery - log and continue 217 + // Note: error is JobError, not unknown - we know exactly what failed 108 218 catchAll((error: JobError) => 109 219 accessEff((env: QueueEnv) => { 110 220 const msg = error._tag === "TimeoutError" ··· 114 224 return succeed(undefined) 115 225 }) 116 226 ), 227 + 228 + // Final log 117 229 flatMap(() => 118 230 accessEff((env: QueueEnv) => { 119 231 env.logger.info(`Job ${job.id} completed`) ··· 122 234 ) 123 235 ) 124 236 125 - // ----------------------------------------------------------------------------- 126 - // Queue processing 127 - // ----------------------------------------------------------------------------- 237 + // ============================================================================= 238 + // SECTION 5: Queue Processing 239 + // ============================================================================= 240 + // 241 + // allSequential processes jobs one at a time. 242 + // For parallel processing, use `all()` instead. 243 + // ============================================================================= 128 244 129 245 const processQueue = (jobs: Job[]): Eff<void, never, QueueEnv> => 130 246 pipe( ··· 132 248 mapEff(() => undefined) 133 249 ) 134 250 135 - // ----------------------------------------------------------------------------- 136 - // Environments 137 - // ----------------------------------------------------------------------------- 138 - 139 - const prodEnv: QueueEnv = { 140 - logger: { 141 - info: (msg) => console.log(`[INFO] ${msg}`), 142 - error: (msg) => console.log(`[ERROR] ${msg}`), 143 - }, 144 - } 145 - 146 - // For tests - just swap provide(prodEnv) with provide(testEnv) 147 - const _testEnv: QueueEnv = { 148 - logger: { 149 - info: () => {}, 150 - error: () => {}, 151 - }, 152 - } 153 - 154 - // ----------------------------------------------------------------------------- 155 - // Demo 156 - // ----------------------------------------------------------------------------- 251 + // ============================================================================= 252 + // SECTION 6: Demo 253 + // ============================================================================= 254 + // 255 + // NOTICE: 256 + // - provide(prodEnv) injects production logger into the entire pipeline 257 + // - For tests, you'd use provide(testEnv) for silent logging 258 + // - No DI framework, no mocking library - just functions and types 259 + // ============================================================================= 157 260 158 261 const main = async () => { 159 262 console.log("=== Task Queue (with purus) ===\n") ··· 164 267 { id: "job-3", type: "sync", payload: { source: "db", target: "cache" } }, 165 268 ] 166 269 270 + // provide() injects the environment into the effect 271 + // Swap prodEnv with testEnv for unit tests - no code changes needed 167 272 const program = pipe( 168 273 processQueue(jobs), 169 - provide(prodEnv) 274 + provide(prodEnv) // <- Change to testEnv in tests 170 275 ) 171 276 172 277 await runPromise(program)
+99 -25
examples/task-queue/without-purus.ts
··· 1 1 /** 2 - * Task Queue - vanilla TypeScript 2 + * Task Queue - Vanilla TypeScript (Comparison) 3 + * ============================================= 4 + * 5 + * This is the "before" version. Compare with with-purus.ts to see how purus 6 + * improves concurrency and testability. 7 + * 8 + * PROBLEMS THIS APPROACH HAS: 9 + * 10 + * 1. Promise.race DOESN'T CANCEL - It just ignores the loser 11 + * See executeWithTimeout() on line 48. When timeout wins, the job 12 + * KEEPS RUNNING in the background. We're not cancelling, just ignoring. 13 + * This wastes resources and can cause side effects after "timeout". 14 + * 15 + * 2. HARDCODED LOGGER - Awkward to mock in tests 16 + * The logger is a module-level constant (line 21-24). 17 + * To test silently, you'd need jest.mock() or process.env checks. 18 + * Compare with-purus.ts where you just swap provide(prodEnv) for provide(testEnv). 19 + * 20 + * 3. ERRORS BECOME UNKNOWN - Type information is lost 21 + * lastError is `unknown`. We throw it, catch it, and lose all structure. 22 + * In with-purus.ts, JobError flows through typed - no guessing. 23 + * 24 + * THE KEY INSIGHT: Promise.race is not cancellation. 25 + * Run both examples and watch the logs - see how purus actually stops work. 3 26 * 4 - * A simple background job processor. Note how Promise.race 5 - * doesn't actually cancel the losing promise. 27 + * Prerequisites: http-client and workflow-engine examples 28 + * Next: Compare with with-purus.ts 6 29 */ 7 30 8 - // ----------------------------------------------------------------------------- 9 - // Job types 10 - // ----------------------------------------------------------------------------- 11 - 12 31 type Job = { 13 32 id: string 14 33 type: "email" | "image" | "sync" 15 34 payload: Record<string, unknown> 16 35 } 17 36 18 - // Hardcoded logger - awkward to mock in tests 37 + // ============================================================================= 38 + // SECTION 1: Hardcoded Logger (The Problem) 39 + // ============================================================================= 40 + // 41 + // This logger is a module-level constant. 42 + // In tests, you'd typically need: 43 + // - jest.mock() to replace it 44 + // - process.env.NODE_ENV checks to disable logging 45 + // - A mocking library 46 + // 47 + // Compare with-purus.ts where you just swap provide(prodEnv) with provide(testEnv). 48 + // ============================================================================= 49 + 19 50 const logger = { 20 51 info: (msg: string) => console.log(`[INFO] ${msg}`), 21 52 error: (msg: string) => console.log(`[ERROR] ${msg}`), 22 53 } 23 54 24 - // ----------------------------------------------------------------------------- 25 - // Job execution 26 - // ----------------------------------------------------------------------------- 55 + // ============================================================================= 56 + // SECTION 2: Job Execution 57 + // ============================================================================= 58 + // 59 + // A simple async function that simulates work. 60 + // No cleanup mechanism - when this starts, it runs to completion. 61 + // ============================================================================= 27 62 28 63 const executeJob = async (job: Job): Promise<void> => { 29 64 const delay = Math.random() * 200 + 50 ··· 37 72 logger.info(`Job ${job.id} (${job.type}) completed`) 38 73 } 39 74 75 + // ============================================================================= 76 + // SECTION 3: Promise.race Timeout (The Problem) 77 + // ============================================================================= 78 + // 79 + // !!! THIS IS THE KEY PROBLEM !!! 80 + // 81 + // Promise.race() returns when ONE promise settles. 82 + // BUT THE OTHER PROMISE KEEPS RUNNING! 83 + // 84 + // If executeJob takes 10 seconds and timeout is 5 seconds: 85 + // - Promise.race returns after 5 seconds with "Timeout" 86 + // - executeJob continues running for 5 more seconds 87 + // - Any side effects from executeJob still happen 88 + // - We've used resources for work we're "ignoring" 89 + // 90 + // In with-purus.ts, timeout() calls the cleanup function, which sets 91 + // cancelled=true and clearTimeout(). The job actually STOPS. 92 + // ============================================================================= 93 + 40 94 const executeWithTimeout = async (job: Job, timeoutMs: number): Promise<void> => { 41 - // Promise.race returns when one settles, but the loser keeps running! 42 - // We're just ignoring the result, not actually cancelling the job. 95 + // Promise.race: Returns when first promise settles, but... 96 + // THE LOSER KEEPS RUNNING IN THE BACKGROUND! 97 + // 98 + // If executeJob takes 10s and timeout is 5s: 99 + // - We get "Timeout" after 5s 100 + // - But executeJob runs for the full 10s anyway 101 + // - Any side effects still happen 43 102 const result = await Promise.race([ 44 103 executeJob(job), 45 104 new Promise<never>((_, reject) => ··· 49 108 return result 50 109 } 51 110 52 - // ----------------------------------------------------------------------------- 53 - // Retry logic 54 - // ----------------------------------------------------------------------------- 111 + // ============================================================================= 112 + // SECTION 4: Retry Logic 113 + // ============================================================================= 114 + // 115 + // Standard retry loop with error accumulation. 116 + // 117 + // NOTICE: lastError is `unknown`. We've lost all type information about 118 + // what went wrong. The caller has to guess and instanceof-check. 119 + // ============================================================================= 55 120 56 121 const executeWithRetry = async ( 57 122 job: Job, 58 123 maxRetries: number, 59 124 timeoutMs: number 60 125 ): Promise<void> => { 61 - let lastError: unknown 126 + let lastError: unknown // <- All error type information is lost here 62 127 63 128 for (let attempt = 1; attempt <= maxRetries; attempt++) { 64 129 try { ··· 66 131 await executeWithTimeout(job, timeoutMs) 67 132 return 68 133 } catch (e) { 69 - lastError = e 134 + lastError = e // <- e is unknown, we've lost all structure 70 135 logger.error(`Attempt ${attempt} failed: ${e instanceof Error ? e.message : e}`) 71 136 } 72 137 } 73 138 74 - throw lastError // Caller gets unknown 139 + throw lastError // <- Caller gets unknown, not a typed error 75 140 } 76 141 77 - // ----------------------------------------------------------------------------- 78 - // Queue processing 79 - // ----------------------------------------------------------------------------- 142 + // ============================================================================= 143 + // SECTION 5: Queue Processing 144 + // ============================================================================= 145 + // 146 + // Processes jobs sequentially with error recovery. 147 + // Note how we track success/failure but can't distinguish error types. 148 + // ============================================================================= 80 149 81 150 const processQueue = async (jobs: Job[]): Promise<void> => { 82 151 const results: Array<{ job: Job; success: boolean; error?: unknown }> = [] ··· 86 155 await executeWithRetry(job, 3, 5000) 87 156 results.push({ job, success: true }) 88 157 } catch (e) { 158 + // e is unknown - we can't distinguish timeout vs transient vs permanent 89 159 results.push({ job, success: false, error: e }) 90 160 } 91 161 } ··· 95 165 logger.info(`Queue complete: ${succeeded} succeeded, ${failed} failed`) 96 166 } 97 167 98 - // ----------------------------------------------------------------------------- 99 - // Demo 100 - // ----------------------------------------------------------------------------- 168 + // ============================================================================= 169 + // SECTION 6: Demo 170 + // ============================================================================= 171 + // 172 + // Run this and compare with with-purus.ts. 173 + // Notice how there's no way to swap out the logger for testing. 174 + // ============================================================================= 101 175 102 176 const main = async () => { 103 177 console.log("=== Task Queue (without purus) ===\n")