🏗️ Elegant & Highly Performant Async Gemini Server Framework for the Modern Age
async framework gemini-protocol protocol gemini rust
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

perf(router): Reduce per-connection overhead with shared RequestHandler

Fuwn 4a803481 5165fd07

+278 -283
+278 -283
src/router.rs
··· 114 114 listener_address: String, 115 115 } 116 116 117 + struct RequestHandler { 118 + routes: matchit::Router<Arc<AsyncMutex<Box<dyn RouteResponse>>>>, 119 + error_handler: Arc<AsyncMutex<Box<dyn ErrorResponse>>>, 120 + headers: Arc<Mutex<Vec<Box<dyn Partial>>>>, 121 + footers: Arc<Mutex<Vec<Box<dyn Partial>>>>, 122 + pre_route_callback: Arc<Mutex<Box<dyn PreRouteHook>>>, 123 + post_route_callback: Arc<Mutex<Box<dyn PostRouteHook>>>, 124 + character_set: String, 125 + languages: Vec<String>, 126 + async_modules: Arc<AsyncMutex<Vec<Box<dyn AsyncModule + Send>>>>, 127 + modules: Arc<Mutex<Vec<Box<dyn Module + Send>>>>, 128 + options: HashSet<RouterOption>, 129 + } 130 + 131 + impl RequestHandler { 132 + #[allow( 133 + clippy::too_many_lines, 134 + clippy::significant_drop_in_scrutinee, 135 + clippy::cognitive_complexity 136 + )] 137 + async fn handle(&self, stream: &mut Stream) -> Result<(), Box<dyn Error>> { 138 + let mut buffer = [0u8; 1024]; 139 + let mut url = Url::parse("gemini://fuwn.me/")?; 140 + let mut footer = String::new(); 141 + let mut header = String::new(); 142 + 143 + while let Ok(size) = stream.read(&mut buffer).await { 144 + let request = or_error!( 145 + stream, 146 + std::str::from_utf8(&buffer[0..size]).map(ToString::to_string), 147 + "59 The server (Windmark) received a bad request: {}" 148 + ); 149 + let request_trimmed = request 150 + .find("\r\n") 151 + .map_or(&request[..], |pos| &request[..pos]); 152 + 153 + url = or_error!( 154 + stream, 155 + Url::parse(request_trimmed), 156 + "59 The server (Windmark) received a bad request: {}" 157 + ); 158 + 159 + if request.contains("\r\n") { 160 + break; 161 + } 162 + } 163 + 164 + if url.path().is_empty() { 165 + url.set_path("/"); 166 + } 167 + 168 + let mut path = url.path().to_string(); 169 + 170 + if self 171 + .options 172 + .contains(&RouterOption::AllowCaseInsensitiveLookup) 173 + { 174 + path = path.to_lowercase(); 175 + } 176 + 177 + let mut route = self.routes.at(&path); 178 + 179 + if route.is_err() { 180 + if self 181 + .options 182 + .contains(&RouterOption::RemoveExtraTrailingSlash) 183 + && path.ends_with('/') 184 + && path != "/" 185 + { 186 + let trimmed = path.trim_end_matches('/'); 187 + 188 + if trimmed != path { 189 + path = trimmed.to_string(); 190 + route = self.routes.at(&path); 191 + } 192 + } else if self 193 + .options 194 + .contains(&RouterOption::AddMissingTrailingSlash) 195 + && !path.ends_with('/') 196 + { 197 + let mut path_with_slash = String::with_capacity(path.len() + 1); 198 + 199 + path_with_slash.push_str(&path); 200 + path_with_slash.push('/'); 201 + 202 + if self.routes.at(&path_with_slash).is_ok() { 203 + path = path_with_slash; 204 + route = self.routes.at(&path); 205 + } 206 + } 207 + } 208 + 209 + let peer_certificate = stream.ssl().peer_certificate(); 210 + let url_clone = url.clone(); 211 + let hook_context = HookContext::new( 212 + stream.get_ref().peer_addr(), 213 + url_clone.clone(), 214 + route.as_ref().ok().map(|route| route.params.clone()), 215 + peer_certificate.clone(), 216 + ); 217 + let hook_context_clone = hook_context.clone(); 218 + 219 + for module in &mut *self.async_modules.lock().await { 220 + module.on_pre_route(hook_context_clone.clone()).await; 221 + } 222 + 223 + let hook_context_clone = hook_context.clone(); 224 + 225 + if let Ok(mut modules) = self.modules.lock() { 226 + for module in &mut *modules { 227 + module.on_pre_route(hook_context_clone.clone()); 228 + } 229 + } 230 + 231 + if let Ok(mut callback) = self.pre_route_callback.lock() { 232 + callback.call(hook_context.clone()); 233 + } 234 + 235 + let mut content = if let Ok(ref route) = route { 236 + let route_context = RouteContext::new( 237 + stream.get_ref().peer_addr(), 238 + url_clone, 239 + &route.params, 240 + peer_certificate, 241 + ); 242 + 243 + { 244 + let mut headers = self.headers.lock().expect("headers lock poisoned"); 245 + 246 + for partial_header in &mut *headers { 247 + writeln!( 248 + &mut header, 249 + "{}", 250 + partial_header.call(route_context.clone()), 251 + ) 252 + .expect("failed to write header"); 253 + } 254 + } 255 + 256 + { 257 + let mut footers = self.footers.lock().expect("footers lock poisoned"); 258 + let length = footers.len(); 259 + 260 + for (i, partial_footer) in footers.iter_mut().enumerate() { 261 + let _ = write!( 262 + &mut footer, 263 + "{}{}", 264 + partial_footer.call(route_context.clone()), 265 + if length > 1 && i != length - 1 { 266 + "\n" 267 + } else { 268 + "" 269 + }, 270 + ); 271 + } 272 + } 273 + 274 + let mut lock = (*route.value).lock().await; 275 + let handler = lock.call(route_context); 276 + 277 + handler.await 278 + } else { 279 + (*self.error_handler) 280 + .lock() 281 + .await 282 + .call(ErrorContext::new( 283 + stream.get_ref().peer_addr(), 284 + url_clone, 285 + peer_certificate, 286 + )) 287 + .await 288 + }; 289 + 290 + let hook_context_clone = hook_context.clone(); 291 + 292 + for module in &mut *self.async_modules.lock().await { 293 + module.on_post_route(hook_context_clone.clone()).await; 294 + } 295 + 296 + let hook_context_clone = hook_context.clone(); 297 + 298 + if let Ok(mut modules) = self.modules.lock() { 299 + for module in &mut *modules { 300 + module.on_post_route(hook_context_clone.clone()); 301 + } 302 + } 303 + 304 + if let Ok(mut callback) = self.post_route_callback.lock() { 305 + callback.call(hook_context, &mut content); 306 + } 307 + 308 + let status_code = 309 + if content.status == 21 || content.status == 22 || content.status == 23 { 310 + 20 311 + } else { 312 + content.status 313 + }; 314 + let status_line = match content.status { 315 + 20 => { 316 + let mime = content.mime.as_deref().unwrap_or("text/gemini"); 317 + let charset = content 318 + .character_set 319 + .as_deref() 320 + .unwrap_or(&self.character_set); 321 + let lang = content 322 + .languages 323 + .as_ref() 324 + .map_or_else(|| self.languages.join(","), |l| l.join(",")); 325 + 326 + format!("{status_code} {mime}; charset={charset}; lang={lang}") 327 + } 328 + 21 => { 329 + format!( 330 + "{} {}", 331 + status_code, 332 + content.mime.as_deref().unwrap_or_default() 333 + ) 334 + } 335 + #[cfg(feature = "auto-deduce-mime")] 336 + 22 => { 337 + format!( 338 + "{} {}", 339 + status_code, 340 + content.mime.as_deref().unwrap_or_default() 341 + ) 342 + } 343 + _ => { 344 + format!("{} {}", status_code, content.content) 345 + } 346 + }; 347 + let body = match content.status { 348 + 20 => { 349 + let mut body = String::with_capacity( 350 + header.len() + content.content.len() + footer.len() + 1, 351 + ); 352 + 353 + body.push_str(&header); 354 + body.push_str(&content.content); 355 + body.push('\n'); 356 + body.push_str(&footer); 357 + 358 + body 359 + } 360 + 21 | 22 => content.content, 361 + _ => String::new(), 362 + }; 363 + let mut response = 364 + String::with_capacity(status_line.len() + body.len() + 2); 365 + 366 + response.push_str(&status_line); 367 + response.push_str("\r\n"); 368 + response.push_str(&body); 369 + stream.write_all(response.as_bytes()).await?; 370 + #[cfg(feature = "tokio")] 371 + stream.shutdown().await?; 372 + #[cfg(feature = "async-std")] 373 + stream.get_mut().shutdown(std::net::Shutdown::Both)?; 374 + 375 + Ok(()) 376 + } 377 + } 378 + 117 379 impl Router { 118 380 /// Create a new `Router` 119 381 /// ··· 343 605 #[cfg(feature = "logger")] 344 606 info!("windmark is listening for connections"); 345 607 608 + let handler = Arc::new(RequestHandler { 609 + routes: self.routes.clone(), 610 + error_handler: self.error_handler.clone(), 611 + headers: self.headers.clone(), 612 + footers: self.footers.clone(), 613 + pre_route_callback: self.pre_route_callback.clone(), 614 + post_route_callback: self.post_route_callback.clone(), 615 + character_set: self.character_set.clone(), 616 + languages: self.languages.clone(), 617 + async_modules: self.async_modules.clone(), 618 + modules: self.modules.clone(), 619 + options: self.options.clone(), 620 + }); 621 + 346 622 loop { 347 623 match listener.accept().await { 348 624 Ok((stream, _)) => { 349 - let routes = self.routes.clone(); 350 - let error_handler = self.error_handler.clone(); 351 - let headers = self.headers.clone(); 352 - let footers = self.footers.clone(); 353 - let async_modules = self.async_modules.clone(); 354 - let modules = self.modules.clone(); 355 - let pre_route_callback = self.pre_route_callback.clone(); 356 - let post_route_callback = self.post_route_callback.clone(); 357 - let character_set = self.character_set.clone(); 358 - let languages = self.languages.clone(); 359 - let options = self.options.clone(); 625 + let handler = Arc::clone(&handler); 360 626 let acceptor = self.ssl_acceptor.clone(); 361 627 #[cfg(feature = "tokio")] 362 628 let spawner = tokio::spawn; ··· 384 650 warn!("stream accept error: {e:?}"); 385 651 } 386 652 387 - let router_instance = Self { 388 - routes, 389 - error_handler, 390 - private_key_file_name: String::new(), 391 - private_key_content: None, 392 - certificate_file_name: String::new(), 393 - certificate_content: None, 394 - headers, 395 - footers, 396 - ssl_acceptor: acceptor, 397 - #[cfg(feature = "logger")] 398 - default_logger: false, 399 - #[cfg(feature = "logger")] 400 - log_filter: String::new(), 401 - pre_route_callback, 402 - post_route_callback, 403 - character_set, 404 - languages, 405 - port: 0, 406 - async_modules, 407 - modules, 408 - options, 409 - listener_address: String::new(), 410 - }; 411 - 412 - if let Err(e) = router_instance.handle(&mut stream).await { 653 + if let Err(e) = handler.handle(&mut stream).await { 413 654 error!("handle error: {e}"); 414 655 } 415 656 } ··· 420 661 Err(e) => error!("tcp stream error: {e:?}"), 421 662 } 422 663 } 423 - } 424 - 425 - #[allow( 426 - clippy::too_many_lines, 427 - clippy::significant_drop_in_scrutinee, 428 - clippy::cognitive_complexity 429 - )] 430 - async fn handle(&self, stream: &mut Stream) -> Result<(), Box<dyn Error>> { 431 - let mut buffer = [0u8; 1024]; 432 - let mut url = Url::parse("gemini://fuwn.me/")?; 433 - let mut footer = String::new(); 434 - let mut header = String::new(); 435 - 436 - while let Ok(size) = stream.read(&mut buffer).await { 437 - let request = or_error!( 438 - stream, 439 - std::str::from_utf8(&buffer[0..size]).map(ToString::to_string), 440 - "59 The server (Windmark) received a bad request: {}" 441 - ); 442 - let request_trimmed = request 443 - .find("\r\n") 444 - .map_or(&request[..], |pos| &request[..pos]); 445 - 446 - url = or_error!( 447 - stream, 448 - Url::parse(request_trimmed), 449 - "59 The server (Windmark) received a bad request: {}" 450 - ); 451 - 452 - if request.contains("\r\n") { 453 - break; 454 - } 455 - } 456 - 457 - if url.path().is_empty() { 458 - url.set_path("/"); 459 - } 460 - 461 - let mut path = url.path().to_string(); 462 - 463 - if self 464 - .options 465 - .contains(&RouterOption::AllowCaseInsensitiveLookup) 466 - { 467 - path = path.to_lowercase(); 468 - } 469 - 470 - let mut route = self.routes.at(&path); 471 - 472 - if route.is_err() { 473 - if self 474 - .options 475 - .contains(&RouterOption::RemoveExtraTrailingSlash) 476 - && path.ends_with('/') 477 - && path != "/" 478 - { 479 - let trimmed = path.trim_end_matches('/'); 480 - 481 - if trimmed != path { 482 - path = trimmed.to_string(); 483 - route = self.routes.at(&path); 484 - } 485 - } else if self 486 - .options 487 - .contains(&RouterOption::AddMissingTrailingSlash) 488 - && !path.ends_with('/') 489 - { 490 - let mut path_with_slash = String::with_capacity(path.len() + 1); 491 - 492 - path_with_slash.push_str(&path); 493 - path_with_slash.push('/'); 494 - 495 - if self.routes.at(&path_with_slash).is_ok() { 496 - path = path_with_slash; 497 - route = self.routes.at(&path); 498 - } 499 - } 500 - } 501 - 502 - let peer_certificate = stream.ssl().peer_certificate(); 503 - let url_clone = url.clone(); 504 - let hook_context = HookContext::new( 505 - stream.get_ref().peer_addr(), 506 - url_clone.clone(), 507 - route.as_ref().ok().map(|route| route.params.clone()), 508 - peer_certificate.clone(), 509 - ); 510 - let hook_context_clone = hook_context.clone(); 511 - 512 - for module in &mut *self.async_modules.lock().await { 513 - module.on_pre_route(hook_context_clone.clone()).await; 514 - } 515 - 516 - let hook_context_clone = hook_context.clone(); 517 - 518 - if let Ok(mut modules) = self.modules.lock() { 519 - for module in &mut *modules { 520 - module.on_pre_route(hook_context_clone.clone()); 521 - } 522 - } 523 - 524 - if let Ok(mut callback) = self.pre_route_callback.lock() { 525 - callback.call(hook_context.clone()); 526 - } 527 - 528 - let mut content = if let Ok(ref route) = route { 529 - let route_context = RouteContext::new( 530 - stream.get_ref().peer_addr(), 531 - url_clone, 532 - &route.params, 533 - peer_certificate, 534 - ); 535 - 536 - { 537 - let mut headers = self.headers.lock().expect("headers lock poisoned"); 538 - 539 - for partial_header in &mut *headers { 540 - writeln!( 541 - &mut header, 542 - "{}", 543 - partial_header.call(route_context.clone()), 544 - ) 545 - .expect("failed to write header"); 546 - } 547 - } 548 - 549 - { 550 - let mut footers = self.footers.lock().expect("footers lock poisoned"); 551 - let length = footers.len(); 552 - 553 - for (i, partial_footer) in footers.iter_mut().enumerate() { 554 - let _ = write!( 555 - &mut footer, 556 - "{}{}", 557 - partial_footer.call(route_context.clone()), 558 - if length > 1 && i != length - 1 { 559 - "\n" 560 - } else { 561 - "" 562 - }, 563 - ); 564 - } 565 - } 566 - 567 - let mut lock = (*route.value).lock().await; 568 - let handler = lock.call(route_context); 569 - 570 - handler.await 571 - } else { 572 - (*self.error_handler) 573 - .lock() 574 - .await 575 - .call(ErrorContext::new( 576 - stream.get_ref().peer_addr(), 577 - url_clone, 578 - peer_certificate, 579 - )) 580 - .await 581 - }; 582 - 583 - let hook_context_clone = hook_context.clone(); 584 - 585 - for module in &mut *self.async_modules.lock().await { 586 - module.on_post_route(hook_context_clone.clone()).await; 587 - } 588 - 589 - let hook_context_clone = hook_context.clone(); 590 - 591 - if let Ok(mut modules) = self.modules.lock() { 592 - for module in &mut *modules { 593 - module.on_post_route(hook_context_clone.clone()); 594 - } 595 - } 596 - 597 - if let Ok(mut callback) = self.post_route_callback.lock() { 598 - callback.call(hook_context, &mut content); 599 - } 600 - 601 - let status_code = 602 - if content.status == 21 || content.status == 22 || content.status == 23 { 603 - 20 604 - } else { 605 - content.status 606 - }; 607 - let status_line = match content.status { 608 - 20 => { 609 - let mime = content.mime.as_deref().unwrap_or("text/gemini"); 610 - let charset = content 611 - .character_set 612 - .as_deref() 613 - .unwrap_or(&self.character_set); 614 - let lang = content 615 - .languages 616 - .as_ref() 617 - .map_or_else(|| self.languages.join(","), |l| l.join(",")); 618 - 619 - format!("{status_code} {mime}; charset={charset}; lang={lang}") 620 - } 621 - 21 => { 622 - format!( 623 - "{} {}", 624 - status_code, 625 - content.mime.as_deref().unwrap_or_default() 626 - ) 627 - } 628 - #[cfg(feature = "auto-deduce-mime")] 629 - 22 => { 630 - format!( 631 - "{} {}", 632 - status_code, 633 - content.mime.as_deref().unwrap_or_default() 634 - ) 635 - } 636 - _ => { 637 - format!("{} {}", status_code, content.content) 638 - } 639 - }; 640 - let body = match content.status { 641 - 20 => { 642 - let mut body = String::with_capacity( 643 - header.len() + content.content.len() + footer.len() + 1, 644 - ); 645 - 646 - body.push_str(&header); 647 - body.push_str(&content.content); 648 - body.push('\n'); 649 - body.push_str(&footer); 650 - 651 - body 652 - } 653 - 21 | 22 => content.content, 654 - _ => String::new(), 655 - }; 656 - let mut response = 657 - String::with_capacity(status_line.len() + body.len() + 2); 658 - 659 - response.push_str(&status_line); 660 - response.push_str("\r\n"); 661 - response.push_str(&body); 662 - stream.write_all(response.as_bytes()).await?; 663 - #[cfg(feature = "tokio")] 664 - stream.shutdown().await?; 665 - #[cfg(feature = "async-std")] 666 - stream.get_mut().shutdown(std::net::Shutdown::Both)?; 667 - 668 - Ok(()) 669 664 } 670 665 671 666 fn create_acceptor(&mut self) -> Result<(), Box<dyn Error>> {