flora is a fast and secure runtime that lets you write discord bots for your servers, with a rich TypeScript SDK, without worrying about running infrastructure. [mirror]
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix(runtime): keep bounded worker queue with backpressure

Session-Id: 870271fb-28d3-4bac-a600-6157407d68b4

+38 -13
+1 -1
BUCK2
··· 17 17 filegroup( 18 18 name = "rust_runtime_embed_inputs", 19 19 srcs = [ 20 - "example/src", 20 + "examples/basic/src", 21 21 "runtime-dist", 22 22 ], 23 23 visibility = ["PUBLIC"],
+12 -2
apps/runtime/src/runtime/mod.rs
··· 48 48 config: RuntimeConfig, 49 49 ) -> Self { 50 50 let num_workers = config.max_workers.clamp(1, MAX_WORKERS_LIMIT); 51 - info!(target: "flora:runtime", num_workers, "spawning worker pool"); 51 + let queue_capacity = config.worker_queue_capacity.max(1); 52 + info!(target: "flora:runtime", num_workers, queue_capacity, "spawning worker pool"); 52 53 let limits = RuntimeLimits::from_config(&config); 53 54 54 55 let workers: Vec<Worker> = (0..num_workers) 55 - .map(|id| spawn_worker(id, http.clone(), kv.clone(), secrets.clone(), limits)) 56 + .map(|id| { 57 + spawn_worker( 58 + id, 59 + http.clone(), 60 + kv.clone(), 61 + secrets.clone(), 62 + limits, 63 + queue_capacity, 64 + ) 65 + }) 56 66 .collect(); 57 67 58 68 Self {
+18 -8
apps/runtime/src/runtime/types.rs
··· 72 72 /// A worker thread that owns multiple guild isolates. 73 73 pub(super) struct Worker { 74 74 pub(super) id: usize, 75 - pub(super) sender: mpsc::UnboundedSender<WorkerCommand>, 75 + pub(super) sender: mpsc::Sender<WorkerCommand>, 76 76 pub(super) handle: Option<thread::JoinHandle<()>>, 77 77 pub(super) backlog: Arc<AtomicUsize>, 78 78 } ··· 80 80 impl Worker { 81 81 fn send_cmd(&self, cmd: WorkerCommand) -> Result<(), AnyError> { 82 82 self.backlog.fetch_add(1, Ordering::Relaxed); 83 - if self.sender.send(cmd).is_err() { 84 - self.backlog.fetch_sub(1, Ordering::Relaxed); 85 - return Err(AnyError::msg("worker unavailable")); 83 + match self.sender.try_send(cmd) { 84 + Ok(()) => Ok(()), 85 + Err(mpsc::error::TrySendError::Closed(_)) => { 86 + self.backlog.fetch_sub(1, Ordering::Relaxed); 87 + Err(AnyError::msg("worker unavailable")) 88 + } 89 + Err(mpsc::error::TrySendError::Full(_)) => { 90 + self.backlog.fetch_sub(1, Ordering::Relaxed); 91 + Err(AnyError::msg("worker queue full")) 92 + } 86 93 } 87 - Ok(()) 88 94 } 89 95 90 96 pub(super) fn send_shutdown(&self) { 91 97 self.backlog.fetch_add(1, Ordering::Relaxed); 92 - if self.sender.send(WorkerCommand::Shutdown).is_err() { 98 + if self.sender.try_send(WorkerCommand::Shutdown).is_err() { 93 99 self.backlog.fetch_sub(1, Ordering::Relaxed); 94 100 } 95 101 } ··· 182 188 runtime, 183 189 respond_to: tx, 184 190 }; 185 - if let Err(err) = self.sender.send(cmd) { 191 + if let Err(err) = self.sender.try_send(cmd) { 186 192 self.backlog.fetch_sub(1, Ordering::Relaxed); 187 - let WorkerCommand::MigrateIn { runtime, .. } = err.0 else { 193 + let cmd = match err { 194 + mpsc::error::TrySendError::Closed(cmd) => cmd, 195 + mpsc::error::TrySendError::Full(cmd) => cmd, 196 + }; 197 + let WorkerCommand::MigrateIn { runtime, .. } = cmd else { 188 198 return Err(MigrationInFailure::new( 189 199 AnyError::msg("worker unavailable"), 190 200 None,
+3 -2
apps/runtime/src/runtime/worker.rs
··· 47 47 kv: KvService, 48 48 secrets: SecretService, 49 49 limits: RuntimeLimits, 50 + queue_capacity: usize, 50 51 ) -> Worker { 51 - let (tx, rx) = mpsc::unbounded_channel(); 52 + let (tx, rx) = mpsc::channel(queue_capacity); 52 53 let backlog = Arc::new(AtomicUsize::new(0)); 53 54 let backlog_handle = Arc::clone(&backlog); 54 55 ··· 83 84 84 85 fn worker_thread( 85 86 worker_id: usize, 86 - mut receiver: mpsc::UnboundedReceiver<WorkerCommand>, 87 + mut receiver: mpsc::Receiver<WorkerCommand>, 87 88 http: Arc<Http>, 88 89 kv: KvService, 89 90 secrets: SecretService,
+4
crates/flora_config/src/lib.rs
··· 84 84 /// Max: 64 85 85 #[config(env = "RUNTIME_MAX_WORKERS", default = 4)] 86 86 pub max_workers: usize, 87 + /// Command queue capacity per worker. 88 + /// Default: 1024 89 + #[config(env = "RUNTIME_WORKER_QUEUE_CAPACITY", default = 1024)] 90 + pub worker_queue_capacity: usize, 87 91 /// Timeout in seconds for runtime bootstrap (0 disables). 88 92 #[config(env = "RUNTIME_BOOT_TIMEOUT_SECS", default = 5)] 89 93 pub boot_timeout_secs: u64,