From c9c620bd44a7fd84e90e16ca632335fea20e94f7 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sat, 18 Dec 2021 10:53:46 +0200 Subject: feat: add abstraction for daemons and managing them Sponsored-by: author --- src/daemon.rs | 156 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + 2 files changed, 157 insertions(+) create mode 100644 src/daemon.rs diff --git a/src/daemon.rs b/src/daemon.rs new file mode 100644 index 0000000..c6da5f5 --- /dev/null +++ b/src/daemon.rs @@ -0,0 +1,156 @@ +use nix::sys::signal::kill; +use nix::sys::signal::Signal; +use nix::unistd::Pid; +use std::fs::read; +use std::process::Command; +use std::thread::sleep; +use std::time::{Duration, Instant}; +use tempfile::NamedTempFile; + +/// Possible errors from starting and stopping daemons. +#[derive(Debug, thiserror::Error)] +pub enum DaemonError { + /// The daemon took too long to start. The timeout can be + /// configured with [DaemonManager::timeout]. + #[error("daemon took longer than {0} ms to start: {1}\n{2}")] + Timeout(u128, String, String), + + /// Something went wrong, when handling temporary files. + #[error(transparent)] + TempFile(#[from] std::io::Error), + + /// Something went wrong read error output of daemon. + #[error("failed to read daemon stderr: {0}")] + Stderr(std::io::Error), + + /// Failed to kill a daemon. + #[error("failed to kill process {0}: {1}")] + Kill(i32, nix::Error), +} + +/// Manage daemons. +/// +/// A daemon is a process running in the background, doing useful +/// things. For Obnam benchmarks, it's the Obnam server, but this is a +/// generic manager. This version requires the `daemonize` helper +/// program to be available on $PATH. +pub struct DaemonManager { + timeout: Duration, +} + +impl Default for DaemonManager { + fn default() -> Self { + Self { + timeout: Duration::from_millis(1000), + } + } +} + +impl DaemonManager { + /// Create a new manager instance, with default settings. + pub fn new() -> Self { + Self::default() + } + + /// Set the timeout for waiting on a daemon to start, in + /// milliseconds. + pub fn timeout(mut self, millis: u64) -> Self { + self.timeout = Duration::from_millis(millis); + self + } + + /// Start a daemon. + /// + /// The daemon is considered started if its process id (PID) is + /// known. The daemon may take longer to actually be in a useful + /// state, and it may fail after the PID is known, for example if + /// it reads a configuration file and that has errors. This + /// function won't wait for that to happen: it only cares about + /// the PID. + pub fn start(&self, argv: &[&str]) -> Result { + let stdout = NamedTempFile::new()?; + let stderr = NamedTempFile::new()?; + let pid = NamedTempFile::new()?; + let output = Command::new("daemonize") + .args(&[ + "-c", + "/", + "-e", + &stderr.path().display().to_string(), + "-o", + &stdout.path().display().to_string(), + "-p", + &pid.path().display().to_string(), + ]) + .args(argv) + .output() + .unwrap(); + if output.status.code() != Some(0) { + eprintln!("{}", String::from_utf8_lossy(&output.stdout)); + eprintln!("{}", String::from_utf8_lossy(&output.stderr)); + std::process::exit(1); + } + + let time = Instant::now(); + while time.elapsed() < self.timeout { + // Do we have the pid file? + if let Ok(pid) = std::fs::read(pid.path()) { + // Parse it as a string. We don't mind if it's not purely UTF8. + let pid = String::from_utf8_lossy(&pid).into_owned(); + // Strip newline, if any. + if let Some(pid) = pid.strip_suffix('\n') { + // Parse as an integer, if possible. + if let Ok(pid) = pid.parse() { + // We have a pid, stop waiting. + return Ok(Daemon::new(pid)); + } + } + sleep_briefly(); + } else { + sleep_briefly(); + } + } + + let cmd = argv.join(" "); + let err = read(stderr.path()).map_err(DaemonError::Stderr)?; + let err = String::from_utf8_lossy(&err).into_owned(); + Err(DaemonError::Timeout(self.timeout.as_millis(), cmd, err)) + } +} + +/// A running daemon. +/// +/// The daemon process is killed, when the `Daemon` struct is dropped. +pub struct Daemon { + pid: Option, +} + +impl Daemon { + fn new(pid: i32) -> Self { + Self { pid: Some(pid) } + } + + /// Explicitly stop a daemon. + /// + /// Calling this function is only useful if you want to handle + /// errors. It can only be called once. + pub fn stop(&mut self) -> Result<(), DaemonError> { + if let Some(raw_pid) = self.pid.take() { + let pid = Pid::from_raw(raw_pid); + kill(pid, Some(Signal::SIGKILL)).map_err(|e| DaemonError::Kill(raw_pid, e))?; + } + Ok(()) + } +} + +impl Drop for Daemon { + fn drop(&mut self) { + if self.stop().is_err() { + // Do nothing. + } + } +} + +fn sleep_briefly() { + sleep(Duration::from_millis(100)); +} diff --git a/src/lib.rs b/src/lib.rs index db53aac..87c57d1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ //! This crate only collects data from a set of benchmarks. It does //! not analyze the data. The data can be stored for later analysis. +pub mod daemon; pub mod obnam; pub mod result; pub mod specification; -- cgit v1.2.1 From bdf6e5306aab0a7183d36d6a45d91e307fb9a4b7 Mon Sep 17 00:00:00 2001 From: Lars Wirzenius Date: Sat, 18 Dec 2021 10:54:15 +0200 Subject: feat: add an abstraction for managing an Obnam server Sponsored-by: author --- Cargo.lock | 15 +++++++ Cargo.toml | 2 + src/daemon.rs | 1 + src/lib.rs | 2 + src/server.rs | 122 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/suite.rs | 19 ++++++-- src/tlsgen.rs | 138 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 296 insertions(+), 3 deletions(-) create mode 100644 src/server.rs create mode 100644 src/tlsgen.rs diff --git a/Cargo.lock b/Cargo.lock index ec7a287..e98169b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -561,6 +561,19 @@ dependencies = [ "autocfg", ] +[[package]] +name = "nix" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188" +dependencies = [ + "bitflags", + "cc", + "cfg-if", + "libc", + "memoffset", +] + [[package]] name = "no-std-compat" version = "0.4.1" @@ -605,7 +618,9 @@ dependencies = [ "glob", "lazy_static", "log", + "nix", "pretty_env_logger", + "rand", "serde", "serde_json", "serde_yaml", diff --git a/Cargo.toml b/Cargo.toml index 07fe5e1..c5f7c9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,9 @@ edition = "2018" anyhow = "1.0.51" lazy_static = "1.4.0" log = "0.4.14" +nix = "0.23.0" pretty_env_logger = "0.4.0" +rand = "0.8.4" serde = { version = "1.0.101", features = ["derive"] } serde_json = "1.0.72" serde_yaml = "0.8.21" diff --git a/src/daemon.rs b/src/daemon.rs index c6da5f5..fa8f287 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -121,6 +121,7 @@ impl DaemonManager { /// A running daemon. /// /// The daemon process is killed, when the `Daemon` struct is dropped. +#[derive(Debug)] pub struct Daemon { pid: Option, } diff --git a/src/lib.rs b/src/lib.rs index 87c57d1..82e735b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,6 +31,8 @@ pub mod daemon; pub mod obnam; pub mod result; +pub mod server; pub mod specification; pub mod step; pub mod suite; +pub mod tlsgen; diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..9a809a5 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,122 @@ +use crate::daemon::{Daemon, DaemonError, DaemonManager}; +use crate::tlsgen::{Tls, TlsError}; +use log::debug; +use rand::random; +use serde::Serialize; +use std::ops::Range; +use std::path::{Path, PathBuf}; +use std::time::Instant; +use tempfile::{tempdir, TempDir}; + +const PORT_RANGE: Range = 2048..32000; +const TIMEOUT_MS: u128 = 10_000; + +type PortNumber = u16; + +#[derive(Debug, thiserror::Error)] +pub enum ObnamServerError { + #[error("took too long to pick a random port for server")] + Port, + + #[error("failed to create temporary directory for server: {0}")] + TempDir(std::io::Error), + + #[error("failed to write server configuration to {0}: {1}")] + WriteConfig(PathBuf, std::io::Error), + + #[error("failed to create TLS certificate: {0}")] + Tls(TlsError), + + #[error("failed to write TLS certificate: {0}")] + WriteTls(PathBuf, std::io::Error), + + #[error("failed to start Obnam server: {0}")] + Daemon(DaemonError), +} + +#[derive(Debug)] +pub struct ObnamServer { + #[allow(dead_code)] + tempdir: TempDir, + chunks: PathBuf, + daemon: Option, +} + +impl ObnamServer { + pub fn new(manager: &DaemonManager) -> Result { + debug!("creating ObamServer"); + let tempdir = tempdir().map_err(ObnamServerError::TempDir)?; + let config_filename = tempdir.path().join("server.yaml"); + let chunks = tempdir.path().join("chunks"); + let tls_key = tempdir.path().join("tls_key"); + let tls_cert = tempdir.path().join("tls_cert"); + + let tls = Tls::new().map_err(ObnamServerError::Tls)?; + write(&tls_key, tls.key())?; + write(&tls_cert, tls.cert())?; + + let port = pick_port()?; + + let config = ServerConfig::new(port, chunks.clone(), tls_key, tls_cert); + config.write(&config_filename)?; + + let daemon = manager + .start(&["/bin/sleep", "1000"]) + .map_err(ObnamServerError::Daemon)?; + + Ok(Self { + tempdir, + chunks, + daemon: Some(daemon), + }) + } + + pub fn stop(&mut self) { + self.daemon.take(); + } + + pub fn chunks(&self) -> &Path { + &self.chunks + } +} + +fn write(filename: &Path, data: &[u8]) -> Result<(), ObnamServerError> { + std::fs::write(filename, data) + .map_err(|err| ObnamServerError::WriteTls(filename.to_path_buf(), err)) +} + +fn pick_port() -> Result { + let started = Instant::now(); + while started.elapsed().as_millis() < TIMEOUT_MS { + let port: PortNumber = random(); + if PORT_RANGE.contains(&port) { + return Ok(port); + } + } + Err(ObnamServerError::Port) +} + +#[derive(Debug, Serialize)] +pub struct ServerConfig { + address: String, + chunks: PathBuf, + tls_key: PathBuf, + tls_cert: PathBuf, +} + +impl ServerConfig { + fn new(port: u16, chunks: PathBuf, tls_key: PathBuf, tls_cert: PathBuf) -> Self { + Self { + address: format!("localhost:{}", port), + chunks, + tls_key, + tls_cert, + } + } + + fn write(&self, filename: &Path) -> Result<(), ObnamServerError> { + std::fs::write(filename, serde_yaml::to_string(self).unwrap()) + .map_err(|err| ObnamServerError::WriteConfig(filename.to_path_buf(), err))?; + Ok(()) + } +} diff --git a/src/suite.rs b/src/suite.rs index 435c476..0f49994 100644 --- a/src/suite.rs +++ b/src/suite.rs @@ -1,5 +1,7 @@ +use crate::daemon::DaemonManager; use crate::obnam::{Obnam, ObnamError}; use crate::result::{Measurement, OpMeasurements, Operation}; +use crate::server::{ObnamServer, ObnamServerError}; use crate::specification::{Create, FileCount}; use crate::step::Step; use log::{debug, info}; @@ -13,6 +15,7 @@ use walkdir::WalkDir; /// This manages temporary data created for the benchmarks, and /// executes individual steps in the suite. pub struct Suite { + manager: DaemonManager, benchmark: Option, } @@ -38,11 +41,18 @@ pub enum SuiteError { /// Error managing an Obnam system. #[error(transparent)] Obnam(#[from] ObnamError), + + /// Error managing an Obnam server. + #[error(transparent)] + Server(#[from] ObnamServerError), } impl Suite { pub fn new() -> Result { - Ok(Self { benchmark: None }) + Ok(Self { + manager: DaemonManager::new(), + benchmark: None, + }) } /// Execute one step in the benchmark suite. @@ -54,7 +64,7 @@ impl Suite { let mut om = match step { Step::Start(name) => { assert!(self.benchmark.is_none()); - let mut benchmark = Benchmark::new(name)?; + let mut benchmark = Benchmark::new(name, &self.manager)?; let om = benchmark.start()?; self.benchmark = Some(benchmark); om @@ -104,13 +114,15 @@ struct Benchmark { // Obnam, and thereby delete any temporary files. We want to do // that intentionally, so that it can be measured. obnam: Option, + server: Option, } impl Benchmark { - fn new(name: &str) -> Result { + fn new(name: &str, manager: &DaemonManager) -> Result { Ok(Self { name: name.to_string(), obnam: Some(Obnam::new()?), + server: Some(ObnamServer::new(manager)?), }) } @@ -132,6 +144,7 @@ impl Benchmark { info!("ending benchmark {}", self.name); self.obnam().stop_server()?; self.obnam.take().unwrap(); // This destroys the Obnam + self.server.as_mut().unwrap().stop(); Ok(OpMeasurements::new(self.name(), Operation::Stop)) } diff --git a/src/tlsgen.rs b/src/tlsgen.rs new file mode 100644 index 0000000..03b58ad --- /dev/null +++ b/src/tlsgen.rs @@ -0,0 +1,138 @@ +use std::fs::read; +use std::path::Path; +use std::process::{Command, Stdio}; +use tempfile::NamedTempFile; + +#[derive(Debug, thiserror::Error)] +pub enum TlsError { + #[error("failed to create temporary file: {0}")] + TempFile(std::io::Error), + + #[error("failed to read temporary file: {0}")] + ReadTemp(std::io::Error), + + #[error("failed to run openssl {0}: {1}")] + RunOpenSsl(String, std::io::Error), + + #[error("openssl {0} failed: {1}")] + OpenSsl(String, String), +} + +#[derive(Debug)] +pub struct Tls { + key: Vec, + cert: Vec, +} + +impl Tls { + pub fn new() -> Result { + let (key, cert) = generate()?; + Ok(Self { key, cert }) + } + + pub fn key(&self) -> &[u8] { + &self.key + } + + pub fn cert(&self) -> &[u8] { + &self.cert + } +} + +fn generate() -> Result<(Vec, Vec), TlsError> { + let key = NamedTempFile::new().map_err(TlsError::TempFile)?; + let csr = NamedTempFile::new().map_err(TlsError::TempFile)?; + genrsa(key.path())?; + let key_data = rsa(key.path())?; + req(key.path(), csr.path())?; + let cert_data = x509(key.path(), csr.path())?; + Ok((key_data, cert_data)) +} + +fn openssl() -> Command { + let mut command = Command::new("openssl"); + command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + command +} + +fn genrsa(filename: &Path) -> Result<(), TlsError> { + let output = openssl() + .arg("genrsa") + .arg("-out") + .arg(filename) + .arg("2048") + .output() + .map_err(|err| TlsError::RunOpenSsl("genrsa".to_string(), err))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).into_owned(); + return Err(TlsError::OpenSsl("genrsa".to_string(), stderr)); + } + + Ok(()) +} + +fn rsa(filename: &Path) -> Result, TlsError> { + let output = openssl() + .arg("rsa") + .arg("-in") + .arg(filename) + .arg("-out") + .arg(filename) + .output() + .map_err(|err| TlsError::RunOpenSsl("rsa".to_string(), err))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).into_owned(); + return Err(TlsError::OpenSsl("rsa".to_string(), stderr)); + } + + read(filename).map_err(TlsError::ReadTemp) +} + +fn req(key: &Path, csr: &Path) -> Result<(), TlsError> { + let output = openssl() + .arg("req") + .arg("-sha256") + .arg("-new") + .arg("-key") + .arg(key) + .arg("-out") + .arg(csr) + .arg("-subj") + .arg("/CN=localhost") + .output() + .map_err(|err| TlsError::RunOpenSsl("req".to_string(), err))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).into_owned(); + return Err(TlsError::OpenSsl("req".to_string(), stderr)); + } + + Ok(()) +} + +fn x509(key: &Path, csr: &Path) -> Result, TlsError> { + let output = openssl() + .arg("x509") + .arg("-req") + .arg("-sha256") + .arg("-days") + .arg("1") + .arg("-in") + .arg(csr) + .arg("-signkey") + .arg(key) + .output() + .map_err(|err| TlsError::RunOpenSsl("req".to_string(), err))?; + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr).into_owned(); + return Err(TlsError::OpenSsl("req".to_string(), stderr)); + } + + Ok(output.stdout) +} -- cgit v1.2.1