this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

:construction: using actor to send requests

+183 -21
+3 -1
gleam.toml
··· 20 20 gleam_http = ">= 4.1.1 and < 5.0.0" 21 21 gleam_json = ">= 3.0.2 and < 4.0.0" 22 22 gleam_erlang = ">= 1.2.0 and < 2.0.0" 23 - valkyrie = ">= 3.0.0 and < 4.0.0" 24 23 gleam_otp = ">= 1.0.0 and < 2.0.0" 24 + valkyrie = ">= 3.0.0 and < 4.0.0" 25 + birl = ">= 1.8.0 and < 2.0.0" 26 + gleam_httpc = ">= 5.0.0 and < 6.0.0" 25 27 26 28 [dev-dependencies] 27 29 gleeunit = ">= 1.0.0 and < 2.0.0"
+6
manifest.toml
··· 3 3 4 4 packages = [ 5 5 { name = "bath", version = "5.0.0", build_tools = ["gleam"], requirements = ["gleam_deque", "gleam_erlang", "gleam_otp", "gleam_stdlib", "logging"], otp_app = "bath", source = "hex", outer_checksum = "BB7A25E5177BC80E4106BC691E026F756A606E9752350F938F2284DB81D8A2C9" }, 6 + { name = "birl", version = "1.8.0", build_tools = ["gleam"], requirements = ["gleam_regexp", "gleam_stdlib", "ranger"], otp_app = "birl", source = "hex", outer_checksum = "2AC7BA26F998E3DFADDB657148BD5DDFE966958AD4D6D6957DD0D22E5B56C400" }, 6 7 { name = "directories", version = "1.2.0", build_tools = ["gleam"], requirements = ["envoy", "gleam_stdlib", "platform", "simplifile"], otp_app = "directories", source = "hex", outer_checksum = "D13090CFCDF6759B87217E8DDD73A75903A700148A82C1D33799F333E249BF9E" }, 7 8 { name = "envoy", version = "1.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "envoy", source = "hex", outer_checksum = "95FD059345AA982E89A0B6E2A3BF1CF43E17A7048DCD85B5B65D3B9E4E39D359" }, 8 9 { name = "exception", version = "2.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "exception", source = "hex", outer_checksum = "329D269D5C2A314F7364BD2711372B6F2C58FA6F39981572E5CA68624D291F8C" }, ··· 11 12 { name = "gleam_deque", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_deque", source = "hex", outer_checksum = "64D77068931338CF0D0CB5D37522C3E3CCA7CB7D6C5BACB41648B519CC0133C7" }, 12 13 { name = "gleam_erlang", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_erlang", source = "hex", outer_checksum = "F91CE62A2D011FA13341F3723DB7DB118541AAA5FE7311BD2716D018F01EF9E3" }, 13 14 { name = "gleam_http", version = "4.1.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_http", source = "hex", outer_checksum = "DD0271B32C356FB684EC7E9F48B1E835D0480168848581F68983C0CC371405D4" }, 15 + { name = "gleam_httpc", version = "5.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_http", "gleam_stdlib"], otp_app = "gleam_httpc", source = "hex", outer_checksum = "C545172618D07811494E97AAA4A0FB34DA6F6D0061FDC8041C2F8E3BE2B2E48F" }, 14 16 { name = "gleam_json", version = "3.0.2", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_json", source = "hex", outer_checksum = "874FA3C3BB6E22DD2BB111966BD40B3759E9094E05257899A7C08F5DE77EC049" }, 15 17 { name = "gleam_otp", version = "1.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "7020E652D18F9ABAC9C877270B14160519FA0856EE80126231C505D719AD68DA" }, 18 + { name = "gleam_regexp", version = "1.1.1", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_regexp", source = "hex", outer_checksum = "9C215C6CA84A5B35BB934A9B61A9A306EC743153BE2B0425A0D032E477B062A9" }, 16 19 { name = "gleam_stdlib", version = "0.62.0", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "DC8872BC0B8550F6E22F0F698CFE7F1E4BDA7312FDEB40D6C3F44C5B706C8310" }, 17 20 { name = "gleam_time", version = "1.4.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_time", source = "hex", outer_checksum = "DCDDC040CE97DA3D2A925CDBBA08D8A78681139745754A83998641C8A3F6587E" }, 18 21 { name = "gleam_yielder", version = "1.1.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleam_yielder", source = "hex", outer_checksum = "8E4E4ECFA7982859F430C57F549200C7749823C106759F4A19A78AEA6687717A" }, ··· 26 29 { name = "mist", version = "5.0.2", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_http", "gleam_otp", "gleam_stdlib", "gleam_yielder", "glisten", "gramps", "hpack_erl", "logging"], otp_app = "mist", source = "hex", outer_checksum = "0716CE491EA13E1AA1EFEC4B427593F8EB2B953B6EBDEBE41F15BE3D06A22918" }, 27 30 { name = "mug", version = "3.0.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "mug", source = "hex", outer_checksum = "49AD2B71690C6A615453D272300951C4BDE19FBF55B167D9C951F5CD89FEC820" }, 28 31 { name = "platform", version = "1.0.0", build_tools = ["gleam"], requirements = [], otp_app = "platform", source = "hex", outer_checksum = "8339420A95AD89AAC0F82F4C3DB8DD401041742D6C3F46132A8739F6AEB75391" }, 32 + { name = "ranger", version = "1.4.0", build_tools = ["gleam"], requirements = ["gleam_stdlib", "gleam_yielder"], otp_app = "ranger", source = "hex", outer_checksum = "C8988E8F8CDBD3E7F4D8F2E663EF76490390899C2B2885A6432E942495B3E854" }, 29 33 { name = "simplifile", version = "2.3.0", build_tools = ["gleam"], requirements = ["filepath", "gleam_stdlib"], otp_app = "simplifile", source = "hex", outer_checksum = "0A868DAC6063D9E983477981839810DC2E553285AB4588B87E3E9C96A7FB4CB4" }, 30 34 { name = "telemetry", version = "1.3.0", build_tools = ["rebar3"], requirements = [], otp_app = "telemetry", source = "hex", outer_checksum = "7015FC8919DBE63764F4B4B87A95B7C0996BD539E0D499BE6EC9D7F3875B79E6" }, 31 35 { name = "valkyrie", version = "3.0.0", build_tools = ["gleam"], requirements = ["bath", "gleam_erlang", "gleam_otp", "gleam_stdlib", "gleam_time", "mug"], otp_app = "valkyrie", source = "hex", outer_checksum = "E418ADDD0CB7A51A17F70D5219A213D17717D82D1B75B3F6919AE0AA40361F3E" }, ··· 33 37 ] 34 38 35 39 [requirements] 40 + birl = { version = ">= 1.8.0 and < 2.0.0" } 36 41 envoy = { version = ">= 1.0.2 and < 2.0.0" } 37 42 gleam_erlang = { version = ">= 1.2.0 and < 2.0.0" } 38 43 gleam_http = { version = ">= 4.1.1 and < 5.0.0" } 44 + gleam_httpc = { version = ">= 5.0.0 and < 6.0.0" } 39 45 gleam_json = { version = ">= 3.0.2 and < 4.0.0" } 40 46 gleam_otp = { version = ">= 1.0.0 and < 2.0.0" } 41 47 gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" }
+1 -1
request.hurl
··· 1 1 POST http://localhost:8000/payments 2 2 { 3 3 "amount": 19.90, 4 - "correlationId": "" 4 + "correlationId": "4a7901b8-7d26-4d9d-aa19-4dc1c7cf60b3" 5 5 }
+5
set-delay.hurl
··· 1 + PUT http://localhost:8001/admin/configurations/delay 2 + X-Rinha-Token: 123 3 + { 4 + "delay": 2000 5 + }
+5
set-fail.hurl
··· 1 + PUT http://localhost:8001/admin/configurations/failure 2 + X-Rinha-Token: 123 3 + { 4 + "failure": true 5 + }
+40
src/integrations/default.gleam
··· 1 + import birl 2 + import gleam/http 3 + import gleam/http/request 4 + import gleam/http/response 5 + import gleam/httpc 6 + import gleam/json 7 + import gleam/result 8 + import model 9 + 10 + pub fn create_body(body: model.PaymentRequest) -> String { 11 + json.object([ 12 + #("correlationId", json.string(body.correlation_id)), 13 + #("amount", json.float(body.amount)), 14 + #("requestedAt", json.string(birl.to_iso8601(body.requested_at))), 15 + ]) 16 + |> json.to_string 17 + } 18 + 19 + pub fn default_provider_send_request(body: String) { 20 + let assert Ok(request) = request.to("http://localhost:8001/payments") 21 + 22 + use response <- result.try( 23 + request 24 + |> request.set_header("content-type", "application/json") 25 + |> request.set_body(body) 26 + |> request.set_method(http.Post) 27 + |> httpc.send, 28 + ) 29 + 30 + parse_http_response(response) 31 + } 32 + 33 + pub fn parse_http_response( 34 + data: response.Response(String), 35 + ) -> Result(String, httpc.HttpError) { 36 + case data.status { 37 + status if status >= 200 && status < 300 -> Ok(data.body) 38 + _ -> Error(httpc.ResponseTimeout) 39 + } 40 + }
+19
src/integrations/fallback.gleam
··· 1 + import gleam/http 2 + import gleam/http/request 3 + import gleam/httpc 4 + import gleam/result 5 + import integrations/default 6 + 7 + pub fn fallback_provider_send_request(body: String) { 8 + let assert Ok(request) = request.to("http://localhost:8002/payments") 9 + 10 + use response <- result.try( 11 + request 12 + |> request.set_header("content-type", "application/json") 13 + |> request.set_body(body) 14 + |> request.set_method(http.Post) 15 + |> httpc.send, 16 + ) 17 + 18 + default.parse_http_response(response) 19 + }
+5
src/model.gleam
··· 1 + import birl 2 + 3 + pub type PaymentRequest { 4 + PaymentRequest(correlation_id: String, amount: Float, requested_at: birl.Time) 5 + }
+45
src/processor.gleam
··· 1 + import birl 2 + import gleam/erlang/process 3 + import gleam/otp/actor 4 + import integrations/default 5 + import integrations/fallback 6 + import model.{type PaymentRequest, PaymentRequest} 7 + 8 + pub type Message { 9 + Process(element: PaymentRequest) 10 + } 11 + 12 + pub fn create_worker_to_read_messages() { 13 + let name = process.new_name("worker_process") 14 + 15 + actor.new(PaymentRequest( 16 + amount: 0.0, 17 + correlation_id: "", 18 + requested_at: birl.now(), 19 + )) 20 + |> actor.named(name) 21 + |> actor.on_message(handle_message) 22 + |> actor.start 23 + } 24 + 25 + fn handle_message(state: PaymentRequest, message: Message) { 26 + case message { 27 + Process(data) -> { 28 + let body_to_send = data |> default.create_body 29 + 30 + case default.default_provider_send_request(body_to_send) { 31 + Ok(_) -> actor.continue(data) 32 + Error(_) -> { 33 + echo "failed to make request" 34 + case fallback.fallback_provider_send_request(body_to_send) { 35 + Ok(_) -> actor.continue(data) 36 + Error(_) -> { 37 + echo "failed to make request" 38 + actor.continue(data) 39 + } 40 + } 41 + } 42 + } 43 + } 44 + } 45 + }
+17 -3
src/redis.gleam
··· 2 2 import gleam/option 3 3 import valkyrie 4 4 5 + const default_timeout = 1000 6 + 5 7 pub fn create_supervised_pool() { 6 8 let name = process.new_name("connection_pool") 7 9 ··· 9 11 name, 10 12 valkyrie.default_config() 11 13 |> valkyrie.supervised_pool( 12 - size: 10, 14 + size: 10_000, 13 15 name: option.Some(name), 14 - timeout: 1000, 16 + timeout: default_timeout, 15 17 ), 16 18 ) 17 19 } 18 20 19 21 pub fn enqueue_payments(body: List(String), conn: valkyrie.Connection) { 20 - valkyrie.lpush(conn, "payments_created", body, 1000) 22 + valkyrie.lpush(conn, "payments_created", body, default_timeout) 23 + } 24 + 25 + pub fn read_queue_payments(conn: valkyrie.Connection) -> String { 26 + case valkyrie.exists(conn, ["payments_created"], default_timeout) { 27 + Ok(qtt) -> { 28 + case valkyrie.rpop(conn, "payments_created", qtt, default_timeout) { 29 + Ok(data) -> data 30 + Error(_) -> "" 31 + } 32 + } 33 + Error(_) -> "" 34 + } 21 35 }
+11 -2
src/rinha_2025.gleam
··· 1 1 import gleam/erlang/process 2 2 import gleam/otp/static_supervisor as supervisor 3 + import gleam/otp/supervision 4 + import processor 3 5 import redis 4 6 import valkyrie 5 7 import web/server ··· 7 9 8 10 pub fn main() -> Nil { 9 11 let #(valkey_pool_name, valkey_pool) = redis.create_supervised_pool() 12 + let valky = valkyrie.named_connection(valkey_pool_name) 13 + let worker_pool = processor.create_worker_to_read_messages() 10 14 11 - let ctx = 12 - server.Context(valkye_conn: valkyrie.named_connection(valkey_pool_name)) 15 + let assert Ok(a) = worker_pool 16 + 17 + let ctx = server.Context(valkye_conn: valky, worker_subject: a.data) 13 18 14 19 let assert Ok(_) = 15 20 supervisor.new(supervisor.OneForOne) 16 21 |> supervisor.add(valkey_pool) 17 22 |> supervisor.add(web.create_server_supervised(ctx)) 23 + |> supervisor.add( 24 + supervision.worker(fn() { worker_pool }) 25 + |> supervision.timeout(1000), 26 + ) 18 27 |> supervisor.start 19 28 20 29 process.sleep_forever()
+20 -13
src/web/controllers/payment_controller.gleam
··· 1 + import birl 1 2 import gleam/dynamic 2 3 import gleam/dynamic/decode 4 + import gleam/erlang/process 3 5 import gleam/json 4 6 import gleam/list 7 + import gleam/otp/actor 8 + import model.{type PaymentRequest, PaymentRequest} 9 + import processor 5 10 import redis 6 11 import web/server 7 12 import wisp 8 - 9 - type PaymentRequest { 10 - PaymentRequest(correlation_id: String, amount: Float) 11 - } 12 13 13 14 fn decode_payment_body( 14 15 body: dynamic.Dynamic, ··· 18 19 use correlation <- decode.field("correlationId", decode.string) 19 20 use amount <- decode.field("amount", decode.float) 20 21 21 - decode.success(PaymentRequest(correlation_id: correlation, amount: amount)) 22 + decode.success(PaymentRequest( 23 + correlation_id: correlation, 24 + amount: amount, 25 + requested_at: birl.now(), 26 + )) 22 27 } 23 28 case decode.run(body, decoder) { 24 29 Ok(body) -> cb(body) ··· 30 35 use json <- wisp.require_json(req) 31 36 use body <- decode_payment_body(json) 32 37 33 - let assert Ok(_) = 34 - echo json.object([ 35 - #("correlationId", json.string(body.correlation_id)), 36 - #("amount", json.float(body.amount)), 37 - ]) 38 - |> json.to_string 39 - |> list.wrap 40 - |> redis.enqueue_payments(ctx.valkye_conn) 38 + // let assert Ok(_) = 39 + // echo json.object([ 40 + // #("correlationId", json.string(body.correlation_id)), 41 + // #("amount", json.float(body.amount)), 42 + // ]) 43 + // |> json.to_string 44 + // |> list.wrap 45 + // |> redis.enqueue_payments(ctx.valkye_conn) 46 + 47 + actor.send(ctx.worker_subject, processor.Process(body)) 41 48 42 49 wisp.no_content() 43 50 }
+6 -1
src/web/server.gleam
··· 1 + import gleam/erlang/process 2 + import processor 1 3 import valkyrie 2 4 3 5 pub type Context { 4 - Context(valkye_conn: valkyrie.Connection) 6 + Context( 7 + valkye_conn: valkyrie.Connection, 8 + worker_subject: process.Subject(processor.Message), 9 + ) 5 10 }