this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

:sparkles: last changes

+178 -66
+7 -4
docker-compose.yml
··· 9 9 deploy: 10 10 resources: 11 11 limits: 12 - cpus: "0.15" 12 + cpus: "0.1" 13 13 memory: "30MB" 14 14 volumes: 15 15 - redis-data:/data ··· 25 25 environment: 26 26 REDIS_CONN: valkey 27 27 PROVIDERS: "http://payment-processor-default:8080,http://payment-processor-fallback:8080" 28 + PROCESSOR_QTT: 1 29 + PROCESSOR_TIME: 50 28 30 depends_on: 29 31 - valkey 30 32 deploy: 31 33 resources: 32 34 limits: 33 - cpus: "0.9" 35 + cpus: "1" 34 36 memory: "255MB" 35 37 36 38 api2: 37 39 <<: *app 40 + container_name: api2 38 41 environment: 39 42 REDIS_CONN: valkey 40 43 PROVIDERS: 41 - container_name: api2 42 44 deploy: 43 45 resources: 44 46 limits: ··· 59 61 deploy: 60 62 resources: 61 63 limits: 62 - cpus: "0.15" 64 + cpus: "0.1" 63 65 memory: "10MB" 64 66 65 67 volumes: ··· 67 69 68 70 networks: 69 71 backend: 72 + driver: bridge 70 73 payment-processor: 71 74 external: true
+42 -1
src/integrations/provider.gleam
··· 1 1 import birl 2 + import gleam/dynamic/decode 2 3 import gleam/http 3 4 import gleam/http/request 4 5 import gleam/http/response ··· 8 9 import models/payment_request.{type PaymentRequest} 9 10 10 11 pub type ProviderConfig { 11 - ProviderConfig(url: String) 12 + ProviderConfig(url: String, min_response_time: Int, name: String) 12 13 } 13 14 14 15 pub fn create_body(body: PaymentRequest) -> String { ··· 42 43 _ -> Error(httpc.ResponseTimeout) 43 44 } 44 45 } 46 + 47 + pub type HealthcheckResponse { 48 + HealthcheckResponse(failing: Bool, min_response_time: Int) 49 + } 50 + 51 + fn health_check_req(req: request.Request(String)) { 52 + use response <- result.try( 53 + req 54 + |> request.set_header("content-type", "application/json") 55 + |> request.set_method(http.Get) 56 + |> httpc.send, 57 + ) 58 + 59 + Ok(response.body) 60 + } 61 + 62 + pub fn health_check( 63 + provider: ProviderConfig, 64 + ) -> Result(HealthcheckResponse, httpc.HttpError) { 65 + let assert Ok(request) = 66 + request.to(provider.url <> "/payments/service-health") 67 + 68 + case health_check_req(request) { 69 + Error(e) -> Error(e) 70 + Ok(body) -> { 71 + let parser = { 72 + use failing <- decode.field("failing", decode.bool) 73 + use min_response_time <- decode.field("minResponseTime", decode.int) 74 + 75 + decode.success(HealthcheckResponse( 76 + failing: failing, 77 + min_response_time: min_response_time, 78 + )) 79 + } 80 + 81 + let assert Ok(data) = json.parse(body, parser) 82 + Ok(data) 83 + } 84 + } 85 + }
+96 -52
src/processor.gleam
··· 1 + import gleam/bool 1 2 import gleam/erlang/process 2 3 import gleam/option 3 4 import gleam/otp/actor 4 5 import gleam/otp/supervision 5 - import gleam/string 6 6 import integrations/provider 7 7 import models/payment_request 8 8 import redis ··· 12 12 Processor( 13 13 redis_conn: valkyrie.Connection, 14 14 name: option.Option(process.Name(Message)), 15 - connections: Int, 16 15 providers: List(provider.ProviderConfig), 16 + selected_provider: provider.ProviderConfig, 17 17 ) 18 18 } 19 19 20 20 pub type Message { 21 21 ServerTick 22 + HealthCheck 22 23 } 23 24 24 25 pub fn new(conn: valkyrie.Connection) -> Processor { 25 - Processor(redis_conn: conn, name: option.None, connections: 1, providers: []) 26 + Processor( 27 + redis_conn: conn, 28 + name: option.None, 29 + providers: [], 30 + selected_provider: provider.ProviderConfig( 31 + url: "", 32 + min_response_time: -1, 33 + name: "", 34 + ), 35 + ) 26 36 } 27 37 28 38 pub fn named(processor: Processor, name: process.Name(Message)) -> Processor { 29 39 Processor(..processor, name: option.Some(name)) 30 40 } 31 41 32 - pub fn connections(processor: Processor, connections: Int) -> Processor { 33 - Processor(..processor, connections: connections) 34 - } 35 - 36 42 pub fn providers( 37 43 processor: Processor, 38 44 providers: List(provider.ProviderConfig), 39 45 ) -> Processor { 40 46 Processor(..processor, providers: providers) 47 + } 48 + 49 + fn selected_provider( 50 + processor: Processor, 51 + provider: provider.ProviderConfig, 52 + ) -> Processor { 53 + Processor(..processor, selected_provider: provider) 41 54 } 42 55 43 56 pub fn start(processor: Processor) { ··· 60 73 fn handle_message(state: Processor, message: Message) { 61 74 case message { 62 75 ServerTick -> { 63 - case redis.read_queue_payments(state.redis_conn) { 64 - "" -> actor.continue(state) 65 - data -> { 66 - process.spawn(fn() { integrate_data(state, data) }) 67 - actor.continue(state) 76 + process.spawn(fn() { 77 + case redis.read_queue_payments(state.redis_conn) { 78 + "" -> Nil 79 + data -> { 80 + let _ = case integrate_data(state, data) { 81 + Error(_) -> 82 + state.redis_conn 83 + |> redis.enqueue_payments([data]) 84 + Ok(message) -> 85 + state.redis_conn 86 + |> redis.save_data(message) 87 + } 88 + Nil 89 + } 68 90 } 69 - } 91 + }) 92 + 93 + actor.continue(state) 94 + } 95 + HealthCheck -> { 96 + echo "Healthcheck" 97 + let provider = 98 + get_faster_healthcheck(state.providers, state.selected_provider) 99 + 100 + state 101 + |> selected_provider(provider) 102 + |> actor.continue 70 103 } 71 104 } 72 105 } 73 106 74 - pub fn loop_worker(subject: process.Subject(Message)) { 107 + pub fn loop_worker(subject: process.Subject(Message), processor_time: Int) { 75 108 process.send(subject, ServerTick) 76 - process.sleep(200) 77 - loop_worker(subject) 109 + process.sleep(processor_time) 110 + loop_worker(subject, processor_time) 78 111 } 79 112 80 - fn integrations( 81 - providers: List(provider.ProviderConfig), 82 - body: String, 83 - ) -> Result(provider.ProviderConfig, Bool) { 84 - case providers { 85 - [] -> Error(False) 86 - [provider, ..rest] -> { 87 - echo "Trying provider -> " <> provider.url 88 - case provider.send_request(provider, body) { 89 - Ok(_) -> Ok(provider) 90 - _ -> integrations(rest, body) 91 - } 92 - } 93 - } 113 + pub fn loop_healthcheck(subject: process.Subject(Message)) { 114 + process.send(subject, HealthCheck) 115 + process.sleep(5000) 116 + loop_healthcheck(subject) 94 117 } 95 118 96 119 fn integrate_data(processor: Processor, data: String) { 97 - echo "Trying to integrate " <> data 98 120 let assert Ok(message) = payment_request.from_json_string(data) 99 121 100 122 let body_to_send = message |> provider.create_body 101 123 102 - case integrations(processor.providers, body_to_send) { 103 - Error(_) -> { 104 - echo "Failed, reenqueuing it" 105 - 106 - processor.redis_conn 107 - |> redis.enqueue_payments([data]) 124 + echo "Sending request to " <> processor.selected_provider.url 125 + case provider.send_request(processor.selected_provider, body_to_send) { 126 + Ok(_) -> { 127 + message 128 + |> payment_request.set_provider(processor.selected_provider.name) 129 + |> payment_request.to_dict 130 + |> Ok 108 131 } 109 - Ok(provider_processed) -> { 110 - echo "Success, saving it" 132 + _ -> Error(Nil) 133 + } 134 + } 111 135 112 - let save_provider = case 113 - string.contains(provider_processed.url, contain: "default") 114 - { 115 - True -> "default" 116 - _ -> "fallback" 117 - } 136 + fn get_faster_healthcheck( 137 + providers: List(provider.ProviderConfig), 138 + acc: provider.ProviderConfig, 139 + ) -> provider.ProviderConfig { 140 + case providers { 141 + [] -> acc 142 + [provider, ..rest] -> 143 + case provider.health_check(provider) { 144 + Error(_) -> acc 145 + Ok(data) -> { 146 + use <- bool.lazy_guard(when: acc.url == "", return: fn() { 147 + get_faster_healthcheck( 148 + rest, 149 + provider.ProviderConfig( 150 + ..provider, 151 + min_response_time: data.min_response_time, 152 + ), 153 + ) 154 + }) 118 155 119 - processor.redis_conn 120 - |> redis.save_data( 121 - message 122 - |> payment_request.set_provider(save_provider) 123 - |> payment_request.to_dict, 124 - ) 125 - } 156 + use <- bool.lazy_guard( 157 + when: acc.min_response_time <= data.min_response_time, 158 + return: fn() { get_faster_healthcheck(rest, acc) }, 159 + ) 160 + 161 + get_faster_healthcheck( 162 + rest, 163 + provider.ProviderConfig( 164 + ..provider, 165 + min_response_time: data.min_response_time, 166 + ), 167 + ) 168 + } 169 + } 126 170 } 127 171 }
+1 -1
src/redis.gleam
··· 15 15 valkyrie.default_config() 16 16 |> valkyrie.host(host) 17 17 |> valkyrie.supervised_pool( 18 - size: 10_000, 18 + size: 100_000, 19 19 name: option.Some(name), 20 20 timeout: default_timeout, 21 21 ),
+32 -4
src/rinha_2025.gleam
··· 1 1 import gleam/bool 2 2 import gleam/erlang/process 3 + import gleam/int 3 4 import gleam/list 4 5 import gleam/otp/static_supervisor as supervisor 5 6 import gleam/otp/supervision ··· 15 16 pub fn main() -> Nil { 16 17 let redis_host = util.get_env_var("REDIS_CONN", "localhost") 17 18 let providers_env = util.get_env_var("PROVIDERS", "") 19 + let assert Ok(processor_time) = 20 + util.get_env_var("PROCESSOR_TIME", "100") 21 + |> int.parse 22 + let assert Ok(processor_qtt) = 23 + util.get_env_var("PROCESSOR_QTT", "0") 24 + |> int.parse 25 + 18 26 let providers = 19 27 providers_env 20 28 |> string.split(",") 21 29 |> list.filter(fn(url) { "" != url }) 22 - |> list.map(fn(url) { provider.ProviderConfig(url: url) }) 30 + |> list.map(fn(url) { 31 + let provider_name = case string.contains(url, contain: "default") { 32 + True -> "default" 33 + _ -> "fallback" 34 + } 35 + provider.ProviderConfig( 36 + url: url, 37 + min_response_time: -1, 38 + name: provider_name, 39 + ) 40 + }) 23 41 let has_providers = list.length(providers) > 0 24 42 25 43 let #(valkey_pool_name, valkey_pool) = ··· 42 60 |> create_supervisor_with_processor(worker_pool_supervised, has_providers) 43 61 |> supervisor.start 44 62 45 - process.spawn(fn() { 46 - use <- bool.guard(when: !has_providers, return: Ok(Nil)) 63 + use <- bool.lazy_guard(when: !has_providers, return: process.sleep_forever) 47 64 65 + list.repeat(Nil, processor_qtt) 66 + |> list.map(fn(a) { 67 + process.spawn(fn() { 68 + worker_name 69 + |> process.named_subject 70 + |> processor.loop_worker(processor_time) 71 + }) 72 + a 73 + }) 74 + 75 + process.spawn(fn() { 48 76 worker_name 49 77 |> process.named_subject 50 - |> processor.loop_worker 78 + |> processor.loop_healthcheck 51 79 }) 52 80 53 81 process.sleep_forever()
-1
src/web/middleware.gleam
··· 6 6 ) -> wisp.Response { 7 7 let req = wisp.method_override(req) 8 8 9 - use <- wisp.log_request(req) 10 9 use <- wisp.rescue_crashes 11 10 use req <- wisp.handle_head(req) 12 11
-3
src/web/web.gleam
··· 1 1 import mist 2 2 import web/router 3 3 import web/server 4 - import wisp 5 4 import wisp/wisp_mist 6 5 7 6 pub fn create_server_supervised(ctx: server.Context) { 8 - wisp.configure_logger() 9 - 10 7 router.handle_request(_, ctx) 11 8 |> wisp_mist.handler("secret") 12 9 |> mist.new