this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

:sparkles: streamlined processor job and created endpoint to get summary

+201 -94
+2
purge-payments.hurl
··· 1 + POST http://localhost:8002/admin/purge-payments 2 + X-Rinha-Token: 123
+1
request-get-all.hurl
··· 1 + GET http://localhost:8000/payments-summary
+2 -2
set-fail.hurl
··· 1 - PUT http://localhost:8001/admin/configurations/failure 1 + PUT http://localhost:8002/admin/configurations/failure 2 2 X-Rinha-Token: 123 3 3 { 4 - "failure": true 4 + "failure": false 5 5 }
+6 -2
src/integrations/default.gleam src/integrations/provider.gleam
··· 7 7 import gleam/result 8 8 import model 9 9 10 + pub type ProviderConfig { 11 + ProviderConfig(url: String) 12 + } 13 + 10 14 pub fn create_body(body: model.PaymentRequest) -> String { 11 15 json.object([ 12 16 #("correlationId", json.string(body.correlation_id)), ··· 16 20 |> json.to_string 17 21 } 18 22 19 - pub fn default_provider_send_request(body: String) { 20 - let assert Ok(request) = request.to("http://localhost:8001/payments") 23 + pub fn send_request(provider: ProviderConfig, body: String) { 24 + let assert Ok(request) = request.to(provider.url <> "/payments") 21 25 22 26 use response <- result.try( 23 27 request
-19
src/integrations/fallback.gleam
··· 1 - import gleam/http 2 - import gleam/http/request 3 - import gleam/httpc 4 - import gleam/result 5 - import integrations/default 6 - 7 - pub fn fallback_provider_send_request(body: String) { 8 - let assert Ok(request) = request.to("http://localhost:8002/payments") 9 - 10 - use response <- result.try( 11 - request 12 - |> request.set_header("content-type", "application/json") 13 - |> request.set_body(body) 14 - |> request.set_method(http.Post) 15 - |> httpc.send, 16 - ) 17 - 18 - default.parse_http_response(response) 19 - }
+36
src/model.gleam
··· 1 1 import birl 2 + import gleam/dict 3 + import gleam/dynamic/decode 4 + import gleam/json 5 + import gleam/result 2 6 3 7 pub type PaymentRequest { 4 8 PaymentRequest(correlation_id: String, amount: Float, requested_at: birl.Time) 5 9 } 10 + 11 + pub fn to_dict(payment: PaymentRequest) -> dict.Dict(String, String) { 12 + dict.new() 13 + |> dict.insert(payment.correlation_id, payment |> to_json |> json.to_string) 14 + } 15 + 16 + pub fn to_json(payment: PaymentRequest) -> json.Json { 17 + [ 18 + #("correlationId", json.string(payment.correlation_id)), 19 + #("amount", json.float(payment.amount)), 20 + #("requestedAt", json.string(payment.requested_at |> birl.to_iso8601)), 21 + ] 22 + |> json.object 23 + } 24 + 25 + pub fn from_json_string(payment: String) { 26 + let parse = { 27 + use amount <- decode.field("amount", decode.float) 28 + use correlation_id <- decode.field("correlationId", decode.string) 29 + use requested_at <- decode.field("requestedAt", decode.string) 30 + 31 + decode.success(PaymentRequest( 32 + amount: amount, 33 + correlation_id: correlation_id, 34 + requested_at: requested_at 35 + |> birl.parse 36 + |> result.unwrap(birl.now()), 37 + )) 38 + } 39 + 40 + json.parse(payment, parse) 41 + }
+53 -45
src/processor.gleam
··· 1 - import birl 2 - import gleam/dynamic/decode 3 1 import gleam/erlang/process 4 - import gleam/json 5 2 import gleam/option 6 3 import gleam/otp/actor 7 4 import gleam/otp/supervision 8 - import gleam/result 9 - import integrations/default 10 - import integrations/fallback 11 - import model.{PaymentRequest} 5 + import integrations/provider 6 + import model 12 7 import redis 13 8 import valkyrie 14 9 ··· 16 11 Processor( 17 12 redis_conn: valkyrie.Connection, 18 13 name: option.Option(process.Name(Message)), 14 + connections: Int, 15 + providers: List(provider.ProviderConfig), 19 16 ) 20 17 } 21 18 ··· 23 20 ServerTick 24 21 } 25 22 26 - pub fn new(conn: valkyrie.Connection) { 27 - Processor(redis_conn: conn, name: option.None) 23 + pub fn new(conn: valkyrie.Connection) -> Processor { 24 + Processor(redis_conn: conn, name: option.None, connections: 1, providers: []) 28 25 } 29 26 30 - pub fn named(processor: Processor, name: process.Name(Message)) { 27 + pub fn named(processor: Processor, name: process.Name(Message)) -> Processor { 31 28 Processor(..processor, name: option.Some(name)) 32 29 } 33 30 31 + pub fn connections(processor: Processor, connections: Int) -> Processor { 32 + Processor(..processor, connections: connections) 33 + } 34 + 35 + pub fn providers( 36 + processor: Processor, 37 + providers: List(provider.ProviderConfig), 38 + ) -> Processor { 39 + Processor(..processor, providers: providers) 40 + } 41 + 34 42 pub fn start(processor: Processor) { 35 43 let ac = 36 44 processor ··· 49 57 } 50 58 51 59 fn handle_message(state: Processor, message: Message) { 52 - echo "inside on message" 53 60 case message { 54 61 ServerTick -> { 55 62 case redis.read_queue_payments(state.redis_conn) { 56 63 "" -> actor.continue(state) 57 64 data -> { 58 - let assert Ok(message) = parse_data_redis(data) 59 - 60 - let body_to_send = message |> default.create_body 61 - 62 - case default.default_provider_send_request(body_to_send) { 63 - Ok(_) -> actor.continue(state) 64 - Error(_) -> { 65 - echo "failed to make request" 66 - case fallback.fallback_provider_send_request(body_to_send) { 67 - Ok(_) -> actor.continue(state) 68 - Error(_) -> { 69 - echo "failed to make request" 70 - actor.continue(state) 71 - } 72 - } 73 - } 74 - } 75 - 65 + process.spawn(fn() { integrate_data(state, data) }) 76 66 actor.continue(state) 77 67 } 78 68 } ··· 81 71 } 82 72 83 73 pub fn loop_worker(subject: process.Subject(Message)) { 84 - echo "inside pool" 85 74 process.send(subject, ServerTick) 86 75 process.sleep(2000) 87 76 loop_worker(subject) 88 77 } 89 78 90 - fn parse_data_redis(message: String) { 91 - echo message 92 - let parse = { 93 - use amount <- decode.field("amount", decode.float) 94 - use correlation_id <- decode.field("correlationId", decode.string) 95 - use requested_at <- decode.field("requestedAt", decode.string) 79 + fn integrations( 80 + providers: List(provider.ProviderConfig), 81 + body: String, 82 + ) -> Result(Bool, Bool) { 83 + case providers { 84 + [] -> Error(False) 85 + [provider, ..rest] -> { 86 + echo "Trying provider -> " <> provider.url 87 + case provider.send_request(provider, body) { 88 + Ok(_) -> Ok(True) 89 + _ -> integrations(rest, body) 90 + } 91 + } 92 + } 93 + } 94 + 95 + fn integrate_data(processor: Processor, data: String) { 96 + echo "Trying to integrate " <> data 97 + let assert Ok(message) = model.from_json_string(data) 98 + 99 + let body_to_send = message |> provider.create_body 100 + 101 + case integrations(processor.providers, body_to_send) { 102 + Error(_) -> { 103 + echo "Failed, reenqueuing it" 96 104 97 - decode.success(PaymentRequest( 98 - amount: amount, 99 - correlation_id: correlation_id, 100 - requested_at: requested_at 101 - |> birl.parse 102 - |> result.unwrap(birl.now()), 103 - )) 104 - } 105 + processor.redis_conn 106 + |> redis.enqueue_payments([data]) 107 + } 108 + Ok(_) -> { 109 + echo "Success, saving it" 105 110 106 - json.parse(message, parse) 111 + processor.redis_conn 112 + |> redis.save_data(message |> model.to_dict) 113 + } 114 + } 107 115 }
+18 -3
src/redis.gleam
··· 1 + import gleam/dict 1 2 import gleam/erlang/process 2 3 import gleam/option 3 4 import valkyrie 4 5 5 6 const default_timeout = 1000 6 7 7 - pub fn create_supervised_pool() { 8 + const default_key = "payments" 9 + 10 + pub fn create_supervised_pool(host: String) { 8 11 let name = process.new_name("connection_pool") 9 12 10 13 #( 11 14 name, 12 15 valkyrie.default_config() 16 + |> valkyrie.host(host) 13 17 |> valkyrie.supervised_pool( 14 18 size: 10_000, 15 19 name: option.Some(name), ··· 18 22 ) 19 23 } 20 24 21 - pub fn enqueue_payments(body: List(String), conn: valkyrie.Connection) { 22 - valkyrie.lpush(conn, "payments_created", body, default_timeout) 25 + pub fn enqueue_payments(conn: valkyrie.Connection, body: List(String)) { 26 + conn 27 + |> valkyrie.lpush("payments_created", body, default_timeout) 23 28 } 24 29 25 30 pub fn read_queue_payments(conn: valkyrie.Connection) -> String { ··· 33 38 Error(_) -> "" 34 39 } 35 40 } 41 + 42 + pub fn save_data(conn: valkyrie.Connection, body: dict.Dict(String, String)) { 43 + conn 44 + |> valkyrie.hset(default_key, body, default_timeout) 45 + } 46 + 47 + pub fn get_all_saved_data(conn: valkyrie.Connection) { 48 + conn 49 + |> valkyrie.hgetall(default_key, default_timeout) 50 + }
+31 -2
src/rinha_2025.gleam
··· 1 + import gleam/bool 1 2 import gleam/erlang/process 3 + import gleam/list 2 4 import gleam/otp/static_supervisor as supervisor 3 5 import gleam/otp/supervision 6 + import gleam/string 7 + import integrations/provider 4 8 import processor 5 9 import redis 10 + import util 6 11 import valkyrie 7 12 import web/server 8 13 import web/web 9 14 10 15 pub fn main() -> Nil { 11 - let #(valkey_pool_name, valkey_pool) = redis.create_supervised_pool() 16 + let redis_host = util.get_env_var("REDIS_CONN", "localhost") 17 + let providers_env = util.get_env_var("PROVIDERS", "") 18 + let providers = 19 + providers_env 20 + |> string.split(",") 21 + |> list.filter(fn(url) { "" != url }) 22 + |> list.map(fn(url) { provider.ProviderConfig(url: url) }) 23 + let has_providers = list.length(providers) > 0 24 + 25 + let #(valkey_pool_name, valkey_pool) = 26 + redis.create_supervised_pool(redis_host) 12 27 let valky = valkyrie.named_connection(valkey_pool_name) 13 28 14 29 let worker_name = process.new_name("worker_pool") 15 30 let worker_pool_supervised = 16 31 processor.new(valky) 17 32 |> processor.named(worker_name) 33 + |> processor.providers(providers) 18 34 |> processor.supervised 19 35 20 36 let ctx = server.Context(valkye_conn: valky) ··· 23 39 supervisor.new(supervisor.OneForOne) 24 40 |> supervisor.add(valkey_pool) 25 41 |> supervisor.add(web.create_server_supervised(ctx)) 26 - |> supervisor.add(worker_pool_supervised) 42 + |> create_supervisor_with_processor(worker_pool_supervised, has_providers) 27 43 |> supervisor.start 28 44 29 45 process.spawn(fn() { 46 + use <- bool.guard(when: !has_providers, return: Ok(Nil)) 47 + 30 48 worker_name 31 49 |> process.named_subject 32 50 |> processor.loop_worker ··· 34 52 35 53 process.sleep_forever() 36 54 } 55 + 56 + fn create_supervisor_with_processor( 57 + manager: supervisor.Builder, 58 + processor: supervision.ChildSpecification(process.Subject(processor.Message)), 59 + has_processor: Bool, 60 + ) { 61 + use <- bool.guard(when: !has_processor, return: manager) 62 + 63 + manager 64 + |> supervisor.add(processor) 65 + }
+8
src/util.gleam
··· 1 + import envoy 2 + 3 + pub fn get_env_var(env: String, default: String) -> String { 4 + case envoy.get(env) { 5 + Error(_) -> default 6 + Ok(val) -> val 7 + } 8 + }
+42 -14
src/web/controllers/payment_controller.gleam
··· 1 1 import birl 2 + import gleam/dict 2 3 import gleam/dynamic 3 4 import gleam/dynamic/decode 4 - import gleam/erlang/process 5 + import gleam/http/response 5 6 import gleam/json 6 7 import gleam/list 7 - import gleam/otp/actor 8 + import gleam/string_tree 8 9 import model.{type PaymentRequest, PaymentRequest} 9 - import processor 10 10 import redis 11 + import valkyrie/resp 11 12 import web/server 12 13 import wisp 13 14 ··· 31 32 } 32 33 } 33 34 34 - pub fn handle_payment_post(req: wisp.Request, ctx: server.Context) { 35 + pub fn handle_payment_post( 36 + req: wisp.Request, 37 + ctx: server.Context, 38 + ) -> response.Response(wisp.Body) { 35 39 use json <- wisp.require_json(req) 36 40 use body <- decode_payment_body(json) 37 41 38 - let assert Ok(_) = 39 - echo json.object([ 40 - #("correlationId", json.string(body.correlation_id)), 41 - #("amount", json.float(body.amount)), 42 - #("requestedAt", json.string(birl.now() |> birl.to_iso8601)), 43 - ]) 44 - |> json.to_string 45 - |> list.wrap 46 - |> redis.enqueue_payments(ctx.valkye_conn) 42 + let data_to_insert = 43 + body 44 + |> model.to_json 45 + |> json.to_string 46 + |> list.wrap 47 47 48 - // actor.send(ctx.worker_subject, processor.Process(body)) 48 + let assert Ok(_) = 49 + echo ctx.valkye_conn 50 + |> redis.enqueue_payments(data_to_insert) 49 51 50 52 wisp.no_content() 51 53 } 54 + 55 + pub fn get_all_payments( 56 + _req: wisp.Request, 57 + ctx: server.Context, 58 + ) -> response.Response(wisp.Body) { 59 + case redis.get_all_saved_data(ctx.valkye_conn) { 60 + Ok(data) -> { 61 + let response = 62 + data 63 + |> dict.values 64 + |> list.map(fn(value) { 65 + let assert resp.BulkString(v) = value 66 + let assert Ok(json_data) = model.from_json_string(v) 67 + json_data 68 + }) 69 + |> json.array(of: model.to_json) 70 + |> json.to_string_tree 71 + 72 + wisp.ok() 73 + |> wisp.json_body(response) 74 + } 75 + Error(_) -> 76 + wisp.no_content() 77 + |> wisp.json_body(string_tree.from_string("[]")) 78 + } 79 + }
+1 -1
src/web/router.gleam
··· 14 14 } 15 15 ["payments-summary"] -> { 16 16 use <- wisp.require_method(req, http.Get) 17 - todo 17 + payment_controller.get_all_payments(req, ctx) 18 18 } 19 19 _ -> wisp.not_found() 20 20 }
+1 -6
src/web/server.gleam
··· 1 - import gleam/erlang/process 2 - import processor 3 1 import valkyrie 4 2 5 3 pub type Context { 6 - Context( 7 - valkye_conn: valkyrie.Connection, 8 - // worker_subject: process.Subject(processor.Message), 9 - ) 4 + Context(valkye_conn: valkyrie.Connection) 10 5 }