Mirror of https://github.com/roostorg/osprey
github.com/roostorg/osprey
1#![cfg(feature = "test_server")]
2
3use std::convert::TryInto;
4use std::ffi::OsStr;
5use std::future::Future;
6use std::net::{Ipv4Addr, SocketAddr};
7use std::path::Path;
8use std::process::Stdio;
9use std::time::Duration;
10
11use etcd::Client as InnerClient;
12use nix::sys::signal::{self, Signal};
13use nix::unistd::Pid;
14use tempfile::TempDir;
15use tokio::net::TcpSocket;
16use tokio::process::{Child, Command};
17
18use crate::{Client, EtcdError};
19
20/// Builder for [`Server`].
21#[derive(Clone, Debug)]
22pub struct ServerBuilder<'a> {
23 exe: &'a OsStr,
24 data_dir: Option<&'a Path>,
25}
26
27impl<'a> Default for ServerBuilder<'a> {
28 fn default() -> Self {
29 ServerBuilder {
30 exe: OsStr::new("etcd"),
31 data_dir: None,
32 }
33 }
34}
35
36impl<'a> ServerBuilder<'a> {
37 /// Sets the etcd binary to use.
38 /// By default, the etcd from `PATH` will be used.
39 pub fn executable_path(&'a mut self, path: &'a (impl AsRef<Path> + ?Sized)) -> &'a mut Self {
40 self.exe = path.as_ref().as_os_str();
41 self
42 }
43
44 /// Sets the path to the data directory to use.
45 /// By default, a temporary directory will be created for the process
46 /// and deleted when the [`Server`] is `Drop`ped.
47 pub fn data_directory(&'a mut self, path: &'a (impl AsRef<Path> + ?Sized)) -> &'a mut Self {
48 self.data_dir = Some(path.as_ref());
49 self
50 }
51
52 /// Starts a new etcd process, waits for it to be ready,
53 /// then returns its [`Server`] handle.
54 pub fn start(&self) -> impl Future<Output = Result<Server, EtcdError>> + Send + 'static {
55 let mut command = Command::new(self.exe);
56 command.stdin(Stdio::null());
57 command.stdout(Stdio::null());
58 command.kill_on_drop(true);
59
60 let result = (|| -> Result<Server, EtcdError> {
61 let port = pick_unused_port()?;
62 let peer_port = pick_unused_port()?;
63 let endpoint_url = format!("http://localhost:{}", port);
64 command.arg(format!("--listen-client-urls={}", &endpoint_url));
65 command.arg(format!("--listen-peer-urls=http://localhost:{}", peer_port));
66 command.arg(format!(
67 "--advertise-client-urls=http://localhost:{}",
68 peer_port
69 ));
70 let data_dir = if let Some(data_dir) = self.data_dir {
71 command.arg(format!("--data-dir={}", data_dir.display()));
72 None
73 } else {
74 let dir = tempfile::tempdir()?;
75 command.arg(format!("--data-dir={}", dir.path().display()));
76 Some(dir)
77 };
78 Ok(Server {
79 endpoint_url,
80 child: command.spawn()?,
81 _data_dir: data_dir,
82 })
83 })();
84
85 async move {
86 let mut server = result?;
87
88 let client = InnerClient::new(&[server.endpoint_url()]);
89 let wait_future = server.child.wait();
90 let healthy_future = wait_for_etcd_healthy(&client);
91
92 tokio::select! {
93 wait_result = wait_future => {
94 match wait_result {
95 Ok(status) => Err(EtcdError::IoError { error: std::io::Error::new(std::io::ErrorKind::Other, format!("etcd exited with {}", status)) }),
96 Err(error) => Err(EtcdError::IoError { error })
97 }
98 },
99 _ = healthy_future => {
100 Ok(server)
101 },
102 }
103 }
104 }
105}
106
107/// A running etcd server.
108///
109/// Dropping the `Server` will send a kill signal to the subprocess
110/// but not wait on it to exit.
111/// You should try to call [`Server::stop`] whenever possible.
112#[derive(Debug)]
113pub struct Server {
114 endpoint_url: String,
115 child: Child,
116 _data_dir: Option<TempDir>, // deleted on Drop
117}
118
119impl Server {
120 /// Creates a new builder.
121 #[inline]
122 pub fn builder<'a>() -> ServerBuilder<'a> {
123 ServerBuilder::default()
124 }
125
126 /// Sends an interrupt signal to the etcd process,
127 /// then returns a [`Future`] that resolves when the subprocess has exited.
128 ///
129 /// # Errors
130 ///
131 /// If signalling the subprocess fails, then an error will be returned.
132 pub fn stop(mut self) -> impl Future<Output = Result<(), EtcdError>> + Send + 'static {
133 let pid = Pid::from_raw(
134 self.child
135 .id()
136 .expect("child already stopped")
137 .try_into()
138 .expect("invalid pid"),
139 );
140 let kill_result = signal::kill(pid, Signal::SIGTERM);
141 async move {
142 if let Err(errno) = kill_result {
143 Err(std::io::Error::from(errno).into())
144 } else {
145 self.child.wait().await.ok();
146 Ok(())
147 }
148 }
149 }
150
151 /// Returns the server's etcd endpoint URL.
152 #[inline]
153 pub fn endpoint_url(&self) -> &str {
154 &self.endpoint_url
155 }
156
157 /// Create a new client with the server as its sole endpoint.
158 #[inline]
159 pub fn new_client(&self) -> anyhow::Result<Client> {
160 Client::new(&[&self.endpoint_url])
161 }
162}
163
164async fn wait_for_etcd_healthy(client: &InnerClient) {
165 loop {
166 if client.health().await.iter().all(Result::is_ok) {
167 return;
168 }
169 tokio::time::sleep(Duration::from_millis(100)).await;
170 }
171}
172
173fn pick_unused_port() -> std::io::Result<u16> {
174 let socket = TcpSocket::new_v4()?;
175 socket.set_reuseaddr(true)?;
176 socket.bind(SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 0))?;
177 Ok(socket.local_addr()?.port())
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 #[tokio::test]
185 async fn startup() {
186 if let Err(err) = which::which("etcd") {
187 eprintln!("etcd not found (skipping): {}", err);
188 return;
189 }
190 let server = Server::builder().start().await.unwrap();
191 server.stop().await.unwrap();
192 }
193
194 #[tokio::test]
195 async fn basic_read() {
196 if let Err(err) = which::which("etcd") {
197 eprintln!("etcd not found (skipping): {}", err);
198 return;
199 }
200 let server = Server::builder().start().await.unwrap();
201 // Use raw client to avoid conflating with potential client bugs.
202 let client = InnerClient::new(&[server.endpoint_url()]);
203 etcd::kv::create(&client, "/foo", "bar", None)
204 .await
205 .unwrap();
206 let got = etcd::kv::get(&client, "/foo", Default::default())
207 .await
208 .unwrap();
209 assert_eq!(
210 got.data.node.value.as_ref().map(String::as_str),
211 Some("bar")
212 );
213 server.stop().await.unwrap();
214 }
215}