WIP. A little custom music server
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

try using streams

+172 -1
backend/benchmark-stream/.gitkeep

This is a binary file and will not be displayed.

+142
backend/src/benchmark-stream.ts
··· 1 + import { BunContext, BunRuntime } from "@effect/platform-bun"; 2 + import { Console, Duration, Effect, Layer, Stream } from "effect"; 3 + import { DatabaseLive } from "./db"; 4 + import { Env, EnvLive } from "./env"; 5 + import { OtelLive } from "./otel"; 6 + import { readDirectory, readDirectoryStream } from "./file-parser"; 7 + 8 + // Helpers 9 + function formatBytes(bytes: number): string { 10 + return `${(bytes / 1024 / 1024).toFixed(2)} MB`; 11 + } 12 + 13 + function printMemorySnapshot(timestamp: number, heapUsed: number, heapMax: number) { 14 + const maxWidth = 100; 15 + 16 + const bar = "=".repeat((heapUsed / heapMax) * maxWidth); 17 + 18 + return `${bar} ${timestamp}ms: ${formatBytes(heapUsed)}\n`; 19 + } 20 + 21 + function printMemorySnapshots( 22 + iteration: number, 23 + memorySnapshots: Array<{ timestamp: number; rss: number; heapUsed: number }>, 24 + ) { 25 + return Effect.gen(function* () { 26 + yield* Console.log(`=== MEMORY SNAPSHOTS BY ITERATION ${iteration} ===`); 27 + 28 + const fileName = `./benchmark/mem-snapshots-${iteration}-${Date.now()}.txt`; 29 + 30 + const file = Bun.file(fileName); 31 + const writer = file.writer(); 32 + 33 + const maxHeap = memorySnapshots.reduce((acc, cur) => Math.max(acc, cur.heapUsed), 0); 34 + 35 + for (const snapshot of memorySnapshots) { 36 + const line = printMemorySnapshot(snapshot.timestamp, snapshot.heapUsed, maxHeap); 37 + writer.write(line); 38 + } 39 + yield* Console.log("=========================="); 40 + }); 41 + } 42 + 43 + // ## 44 + 45 + const layers = Layer.mergeAll(BunContext.layer, EnvLive, DatabaseLive.Default); 46 + 47 + const iteration = (i: number) => 48 + Effect.gen(function* () { 49 + const env = yield* Env.pipe(Effect.flatMap((x) => x.getEnv)).pipe(Effect.withSpan("env")); 50 + 51 + const results = { 52 + elapsed: 0, 53 + files: 0, 54 + }; 55 + 56 + const memorySnapshots: Array<{ timestamp: number; rss: number; heapUsed: number }> = []; 57 + let peakMemory = 0; 58 + const startMem = process.memoryUsage(); 59 + const startTime = Date.now(); 60 + 61 + const interval = setInterval(() => { 62 + const mem = process.memoryUsage(); 63 + const elapsed = Date.now() - startTime; 64 + 65 + memorySnapshots.push({ 66 + timestamp: elapsed, 67 + rss: mem.rss, 68 + heapUsed: mem.heapUsed, 69 + }); 70 + 71 + peakMemory = Math.max(peakMemory, mem.heapUsed); 72 + }, 50); 73 + 74 + const task = readDirectoryStream(env.FOLDER_PATH, []).pipe( 75 + Effect.flatMap((stream) => Stream.runCollect(stream)), 76 + ); 77 + 78 + yield* task.pipe( 79 + Effect.timed, 80 + Effect.tap(([duration, result]) => 81 + Console.log(`Iteration ${i} took ${Duration.toMillis(duration)}ms and found ${result.length} files`), 82 + ), 83 + Effect.tap(([duration, result]) => { 84 + results.elapsed += Duration.toMillis(duration); 85 + results.files += result.length; 86 + }), 87 + Effect.map(([_, result]) => result), 88 + ); 89 + 90 + clearInterval(interval); 91 + const endMem = process.memoryUsage(); 92 + 93 + return { 94 + ...results, 95 + startTime, 96 + endTime: Date.now(), 97 + peakMemory, 98 + memorySnapshots, 99 + startMem, 100 + endMem, 101 + }; 102 + }); 103 + 104 + type IterationResult = Effect.Effect.Success<ReturnType<typeof iteration>>; 105 + 106 + const benchmark = Effect.gen(function* () { 107 + const results: Array<IterationResult> = []; 108 + for (let i = 0; i < 10; i++) { 109 + const iterationResults = yield* iteration(i); 110 + results.push(iterationResults); 111 + } 112 + 113 + yield* Console.dir(results); 114 + yield* Effect.tryPromise(() => Bun.write("./benchmark-stream/results.json", JSON.stringify(results, null, 2))); 115 + 116 + const avgTime = results.reduce((acc, cur) => acc + cur.elapsed, 0) / results.length; 117 + const avgPeak = results.reduce((acc, cur) => acc + cur.peakMemory, 0) / results.length; 118 + const avgFiles = results.reduce((acc, cur) => acc + cur.files, 0) / results.length; 119 + const maxFiles = results.reduce((acc, cur) => Math.max(acc, cur.files), 0); 120 + const fileErrorRate = avgFiles / maxFiles; 121 + 122 + const avgHeapDelta = 123 + results.reduce((acc, cur) => acc + cur.endMem.heapUsed - cur.startMem.heapUsed, 0) / results.length; 124 + 125 + const template = ` 126 + === BENCHMARK RESULTS === 127 + Average time: ${avgTime}ms 128 + Files found: ${maxFiles} error rate ${fileErrorRate} 129 + Average peak memory: ${formatBytes(avgPeak)} 130 + Average heap delta: ${formatBytes(avgHeapDelta)} 131 + ========================= 132 + `; 133 + 134 + yield* Console.log(template); 135 + 136 + for (const [i, result] of results.entries()) { 137 + yield* printMemorySnapshots(i, result.memorySnapshots); 138 + } 139 + }).pipe(Effect.provide(OtelLive), Effect.provide(layers)); 140 + 141 + // @ts-expect-error - BunRuntime.runMain is not typed 142 + BunRuntime.runMain(benchmark);
+30 -1
backend/src/file-parser.ts
··· 1 1 import * as flac from "./flac"; 2 - import { Effect, Option, Console, Duration, Data } from "effect"; 2 + import { Effect, Option, Console, Duration, Data, Stream } from "effect"; 3 3 import { FileSystem, Path } from "@effect/platform"; 4 4 5 5 const SUPPORTED_EXTENSIONS = ["flac"] as const; ··· 9 9 cause?: unknown; 10 10 message: string; 11 11 }> {} 12 + 13 + const supportedExtensions = [".flac"] as const; 12 14 13 15 export function parseFile(path: string) { 14 16 return Effect.gen(function* () { ··· 83 85 return yield* parseManyFiles(filtered); 84 86 }).pipe(Effect.withSpan("readDirectory")); 85 87 } 88 + 89 + export function readDirectoryStream(dirPath: string, skip: string[] = []) { 90 + return Effect.gen(function* () { 91 + const fs = yield* FileSystem.FileSystem; 92 + const path = yield* Path.Path; 93 + 94 + const files = yield* fs.readDirectory(dirPath, { 95 + recursive: true, 96 + }); 97 + 98 + const stream = Stream.fromIterable(files).pipe( 99 + Stream.map((file) => path.resolve(dirPath, file)), 100 + Stream.tap((relative) => Console.debug("RELATIVE", relative)), 101 + 102 + Stream.filter((file) => supportedExtensions.some((ext) => file.endsWith(ext)) === false), 103 + Stream.filter((file) => skip.includes(file) === false), 104 + Stream.tap((filtered) => Console.debug("FILTERED", filtered)), 105 + 106 + Stream.mapEffect((file) => parseFile(file), { 107 + concurrency: 10, 108 + }), 109 + Stream.catchAll((err) => Stream.empty), 110 + ); 111 + 112 + return stream; 113 + }).pipe(Effect.withSpan("readDirectoryStream")); 114 + }