this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

:construction: creating loop subscriber

+56 -17
+40
src/processor.gleam
··· 1 1 import birl 2 + import gleam/dynamic 3 + import gleam/dynamic/decode 2 4 import gleam/erlang/process 5 + import gleam/json 3 6 import gleam/otp/actor 7 + import gleam/result 4 8 import integrations/default 5 9 import integrations/fallback 6 10 import model.{type PaymentRequest, PaymentRequest} 11 + import redis 12 + import valkyrie 7 13 8 14 pub type Message { 9 15 Process(element: PaymentRequest) ··· 23 29 } 24 30 25 31 fn handle_message(state: PaymentRequest, message: Message) { 32 + echo "inside on message" 26 33 case message { 27 34 Process(data) -> { 28 35 let body_to_send = data |> default.create_body ··· 43 50 } 44 51 } 45 52 } 53 + 54 + pub fn loop_worker(subject: process.Subject(Message), conn: valkyrie.Connection) { 55 + echo "inside pool" 56 + case redis.read_queue_payments(conn) { 57 + "" -> Nil 58 + data -> { 59 + let assert Ok(message) = parse_data_redis(data) 60 + process.send(subject, Process(message)) 61 + } 62 + } 63 + 64 + process.sleep(2000) 65 + loop_worker(subject, conn) 66 + } 67 + 68 + fn parse_data_redis(message: String) { 69 + echo message 70 + let parse = { 71 + use amount <- decode.field("amount", decode.float) 72 + use correlation_id <- decode.field("correlationId", decode.string) 73 + use requested_at <- decode.field("requestedAt", decode.string) 74 + 75 + decode.success(PaymentRequest( 76 + amount: amount, 77 + correlation_id: correlation_id, 78 + requested_at: requested_at 79 + |> birl.parse 80 + |> result.unwrap(birl.now()), 81 + )) 82 + } 83 + 84 + json.parse(message, parse) 85 + }
+5 -7
src/rinha_2025.gleam
··· 12 12 let valky = valkyrie.named_connection(valkey_pool_name) 13 13 let worker_pool = processor.create_worker_to_read_messages() 14 14 15 - let assert Ok(a) = worker_pool 16 - 17 - let ctx = server.Context(valkye_conn: valky, worker_subject: a.data) 15 + let ctx = server.Context(valkye_conn: valky) 18 16 19 17 let assert Ok(_) = 20 18 supervisor.new(supervisor.OneForOne) 21 19 |> supervisor.add(valkey_pool) 22 20 |> supervisor.add(web.create_server_supervised(ctx)) 23 - |> supervisor.add( 24 - supervision.worker(fn() { worker_pool }) 25 - |> supervision.timeout(1000), 26 - ) 21 + |> supervisor.add(supervision.worker(fn() { worker_pool })) 27 22 |> supervisor.start 23 + 24 + let assert Ok(pool) = worker_pool 25 + processor.loop_worker(pool.data, valky) 28 26 29 27 process.sleep_forever() 30 28 }
+10 -9
src/web/controllers/payment_controller.gleam
··· 35 35 use json <- wisp.require_json(req) 36 36 use body <- decode_payment_body(json) 37 37 38 - // let assert Ok(_) = 39 - // echo json.object([ 40 - // #("correlationId", json.string(body.correlation_id)), 41 - // #("amount", json.float(body.amount)), 42 - // ]) 43 - // |> json.to_string 44 - // |> list.wrap 45 - // |> redis.enqueue_payments(ctx.valkye_conn) 38 + let assert Ok(_) = 39 + echo json.object([ 40 + #("correlationId", json.string(body.correlation_id)), 41 + #("amount", json.float(body.amount)), 42 + #("requestedAt", json.string(birl.now() |> birl.to_iso8601)), 43 + ]) 44 + |> json.to_string 45 + |> list.wrap 46 + |> redis.enqueue_payments(ctx.valkye_conn) 46 47 47 - actor.send(ctx.worker_subject, processor.Process(body)) 48 + // actor.send(ctx.worker_subject, processor.Process(body)) 48 49 49 50 wisp.no_content() 50 51 }
+1 -1
src/web/server.gleam
··· 5 5 pub type Context { 6 6 Context( 7 7 valkye_conn: valkyrie.Connection, 8 - worker_subject: process.Subject(processor.Message), 8 + // worker_subject: process.Subject(processor.Message), 9 9 ) 10 10 }