Mirror of https://github.com/roostorg/osprey github.com/roostorg/osprey
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 215 lines 6.7 kB view raw
1#![cfg(feature = "test_server")] 2 3use std::convert::TryInto; 4use std::ffi::OsStr; 5use std::future::Future; 6use std::net::{Ipv4Addr, SocketAddr}; 7use std::path::Path; 8use std::process::Stdio; 9use std::time::Duration; 10 11use etcd::Client as InnerClient; 12use nix::sys::signal::{self, Signal}; 13use nix::unistd::Pid; 14use tempfile::TempDir; 15use tokio::net::TcpSocket; 16use tokio::process::{Child, Command}; 17 18use crate::{Client, EtcdError}; 19 20/// Builder for [`Server`]. 21#[derive(Clone, Debug)] 22pub struct ServerBuilder<'a> { 23 exe: &'a OsStr, 24 data_dir: Option<&'a Path>, 25} 26 27impl<'a> Default for ServerBuilder<'a> { 28 fn default() -> Self { 29 ServerBuilder { 30 exe: OsStr::new("etcd"), 31 data_dir: None, 32 } 33 } 34} 35 36impl<'a> ServerBuilder<'a> { 37 /// Sets the etcd binary to use. 38 /// By default, the etcd from `PATH` will be used. 39 pub fn executable_path(&'a mut self, path: &'a (impl AsRef<Path> + ?Sized)) -> &'a mut Self { 40 self.exe = path.as_ref().as_os_str(); 41 self 42 } 43 44 /// Sets the path to the data directory to use. 45 /// By default, a temporary directory will be created for the process 46 /// and deleted when the [`Server`] is `Drop`ped. 47 pub fn data_directory(&'a mut self, path: &'a (impl AsRef<Path> + ?Sized)) -> &'a mut Self { 48 self.data_dir = Some(path.as_ref()); 49 self 50 } 51 52 /// Starts a new etcd process, waits for it to be ready, 53 /// then returns its [`Server`] handle. 54 pub fn start(&self) -> impl Future<Output = Result<Server, EtcdError>> + Send + 'static { 55 let mut command = Command::new(self.exe); 56 command.stdin(Stdio::null()); 57 command.stdout(Stdio::null()); 58 command.kill_on_drop(true); 59 60 let result = (|| -> Result<Server, EtcdError> { 61 let port = pick_unused_port()?; 62 let peer_port = pick_unused_port()?; 63 let endpoint_url = format!("http://localhost:{}", port); 64 command.arg(format!("--listen-client-urls={}", &endpoint_url)); 65 command.arg(format!("--listen-peer-urls=http://localhost:{}", peer_port)); 66 command.arg(format!( 67 "--advertise-client-urls=http://localhost:{}", 68 peer_port 69 )); 70 let data_dir = if let Some(data_dir) = self.data_dir { 71 command.arg(format!("--data-dir={}", data_dir.display())); 72 None 73 } else { 74 let dir = tempfile::tempdir()?; 75 command.arg(format!("--data-dir={}", dir.path().display())); 76 Some(dir) 77 }; 78 Ok(Server { 79 endpoint_url, 80 child: command.spawn()?, 81 _data_dir: data_dir, 82 }) 83 })(); 84 85 async move { 86 let mut server = result?; 87 88 let client = InnerClient::new(&[server.endpoint_url()]); 89 let wait_future = server.child.wait(); 90 let healthy_future = wait_for_etcd_healthy(&client); 91 92 tokio::select! { 93 wait_result = wait_future => { 94 match wait_result { 95 Ok(status) => Err(EtcdError::IoError { error: std::io::Error::new(std::io::ErrorKind::Other, format!("etcd exited with {}", status)) }), 96 Err(error) => Err(EtcdError::IoError { error }) 97 } 98 }, 99 _ = healthy_future => { 100 Ok(server) 101 }, 102 } 103 } 104 } 105} 106 107/// A running etcd server. 108/// 109/// Dropping the `Server` will send a kill signal to the subprocess 110/// but not wait on it to exit. 111/// You should try to call [`Server::stop`] whenever possible. 112#[derive(Debug)] 113pub struct Server { 114 endpoint_url: String, 115 child: Child, 116 _data_dir: Option<TempDir>, // deleted on Drop 117} 118 119impl Server { 120 /// Creates a new builder. 121 #[inline] 122 pub fn builder<'a>() -> ServerBuilder<'a> { 123 ServerBuilder::default() 124 } 125 126 /// Sends an interrupt signal to the etcd process, 127 /// then returns a [`Future`] that resolves when the subprocess has exited. 128 /// 129 /// # Errors 130 /// 131 /// If signalling the subprocess fails, then an error will be returned. 132 pub fn stop(mut self) -> impl Future<Output = Result<(), EtcdError>> + Send + 'static { 133 let pid = Pid::from_raw( 134 self.child 135 .id() 136 .expect("child already stopped") 137 .try_into() 138 .expect("invalid pid"), 139 ); 140 let kill_result = signal::kill(pid, Signal::SIGTERM); 141 async move { 142 if let Err(errno) = kill_result { 143 Err(std::io::Error::from(errno).into()) 144 } else { 145 self.child.wait().await.ok(); 146 Ok(()) 147 } 148 } 149 } 150 151 /// Returns the server's etcd endpoint URL. 152 #[inline] 153 pub fn endpoint_url(&self) -> &str { 154 &self.endpoint_url 155 } 156 157 /// Create a new client with the server as its sole endpoint. 158 #[inline] 159 pub fn new_client(&self) -> anyhow::Result<Client> { 160 Client::new(&[&self.endpoint_url]) 161 } 162} 163 164async fn wait_for_etcd_healthy(client: &InnerClient) { 165 loop { 166 if client.health().await.iter().all(Result::is_ok) { 167 return; 168 } 169 tokio::time::sleep(Duration::from_millis(100)).await; 170 } 171} 172 173fn pick_unused_port() -> std::io::Result<u16> { 174 let socket = TcpSocket::new_v4()?; 175 socket.set_reuseaddr(true)?; 176 socket.bind(SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0))?; 177 Ok(socket.local_addr()?.port()) 178} 179 180#[cfg(test)] 181mod tests { 182 use super::*; 183 184 #[tokio::test] 185 async fn startup() { 186 if let Err(err) = which::which("etcd") { 187 eprintln!("etcd not found (skipping): {}", err); 188 return; 189 } 190 let server = Server::builder().start().await.unwrap(); 191 server.stop().await.unwrap(); 192 } 193 194 #[tokio::test] 195 async fn basic_read() { 196 if let Err(err) = which::which("etcd") { 197 eprintln!("etcd not found (skipping): {}", err); 198 return; 199 } 200 let server = Server::builder().start().await.unwrap(); 201 // Use raw client to avoid conflating with potential client bugs. 202 let client = InnerClient::new(&[server.endpoint_url()]); 203 etcd::kv::create(&client, "/foo", "bar", None) 204 .await 205 .unwrap(); 206 let got = etcd::kv::get(&client, "/foo", Default::default()) 207 .await 208 .unwrap(); 209 assert_eq!( 210 got.data.node.value.as_ref().map(String::as_str), 211 Some("bar") 212 ); 213 server.stop().await.unwrap(); 214 } 215}