summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2021-12-19 09:09:25 +0000
committerLars Wirzenius <liw@liw.fi>2021-12-19 09:09:25 +0000
commit9f170606eb4fc24086274b446bb77cc1195280f0 (patch)
tree241a868b1018433f6bf17f8f85fead56e96b1b54
parente5a830248d8a5d68a701377e0935fc9a8e3785a1 (diff)
parentbdf6e5306aab0a7183d36d6a45d91e307fb9a4b7 (diff)
downloadobnam-benchmark-9f170606eb4fc24086274b446bb77cc1195280f0.tar.gz
Merge branch 'obnam-server' into 'main'
feat: add abstraction for daemons and managing them See merge request obnam/obnam-benchmark!2
-rw-r--r--Cargo.lock15
-rw-r--r--Cargo.toml2
-rw-r--r--src/daemon.rs157
-rw-r--r--src/lib.rs3
-rw-r--r--src/server.rs122
-rw-r--r--src/suite.rs19
-rw-r--r--src/tlsgen.rs138
7 files changed, 453 insertions, 3 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ec7a287..e98169b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -562,6 +562,19 @@ dependencies = [
]
[[package]]
+name = "nix"
+version = "0.23.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f305c2c2e4c39a82f7bf0bf65fb557f9070ce06781d4f2454295cc34b1c43188"
+dependencies = [
+ "bitflags",
+ "cc",
+ "cfg-if",
+ "libc",
+ "memoffset",
+]
+
+[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -605,7 +618,9 @@ dependencies = [
"glob",
"lazy_static",
"log",
+ "nix",
"pretty_env_logger",
+ "rand",
"serde",
"serde_json",
"serde_yaml",
diff --git a/Cargo.toml b/Cargo.toml
index 07fe5e1..c5f7c9d 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,7 +10,9 @@ edition = "2018"
anyhow = "1.0.51"
lazy_static = "1.4.0"
log = "0.4.14"
+nix = "0.23.0"
pretty_env_logger = "0.4.0"
+rand = "0.8.4"
serde = { version = "1.0.101", features = ["derive"] }
serde_json = "1.0.72"
serde_yaml = "0.8.21"
diff --git a/src/daemon.rs b/src/daemon.rs
new file mode 100644
index 0000000..fa8f287
--- /dev/null
+++ b/src/daemon.rs
@@ -0,0 +1,157 @@
+use nix::sys::signal::kill;
+use nix::sys::signal::Signal;
+use nix::unistd::Pid;
+use std::fs::read;
+use std::process::Command;
+use std::thread::sleep;
+use std::time::{Duration, Instant};
+use tempfile::NamedTempFile;
+
+/// Possible errors from starting and stopping daemons.
+#[derive(Debug, thiserror::Error)]
+pub enum DaemonError {
+ /// The daemon took too long to start. The timeout can be
+ /// configured with [DaemonManager::timeout].
+ #[error("daemon took longer than {0} ms to start: {1}\n{2}")]
+ Timeout(u128, String, String),
+
+ /// Something went wrong, when handling temporary files.
+ #[error(transparent)]
+ TempFile(#[from] std::io::Error),
+
+ /// Something went wrong read error output of daemon.
+ #[error("failed to read daemon stderr: {0}")]
+ Stderr(std::io::Error),
+
+ /// Failed to kill a daemon.
+ #[error("failed to kill process {0}: {1}")]
+ Kill(i32, nix::Error),
+}
+
+/// Manage daemons.
+///
+/// A daemon is a process running in the background, doing useful
+/// things. For Obnam benchmarks, it's the Obnam server, but this is a
+/// generic manager. This version requires the `daemonize` helper
+/// program to be available on $PATH.
+pub struct DaemonManager {
+ timeout: Duration,
+}
+
+impl Default for DaemonManager {
+ fn default() -> Self {
+ Self {
+ timeout: Duration::from_millis(1000),
+ }
+ }
+}
+
+impl DaemonManager {
+ /// Create a new manager instance, with default settings.
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Set the timeout for waiting on a daemon to start, in
+ /// milliseconds.
+ pub fn timeout(mut self, millis: u64) -> Self {
+ self.timeout = Duration::from_millis(millis);
+ self
+ }
+
+ /// Start a daemon.
+ ///
+ /// The daemon is considered started if its process id (PID) is
+ /// known. The daemon may take longer to actually be in a useful
+ /// state, and it may fail after the PID is known, for example if
+ /// it reads a configuration file and that has errors. This
+ /// function won't wait for that to happen: it only cares about
+ /// the PID.
+ pub fn start(&self, argv: &[&str]) -> Result<Daemon, DaemonError> {
+ let stdout = NamedTempFile::new()?;
+ let stderr = NamedTempFile::new()?;
+ let pid = NamedTempFile::new()?;
+ let output = Command::new("daemonize")
+ .args(&[
+ "-c",
+ "/",
+ "-e",
+ &stderr.path().display().to_string(),
+ "-o",
+ &stdout.path().display().to_string(),
+ "-p",
+ &pid.path().display().to_string(),
+ ])
+ .args(argv)
+ .output()
+ .unwrap();
+ if output.status.code() != Some(0) {
+ eprintln!("{}", String::from_utf8_lossy(&output.stdout));
+ eprintln!("{}", String::from_utf8_lossy(&output.stderr));
+ std::process::exit(1);
+ }
+
+ let time = Instant::now();
+ while time.elapsed() < self.timeout {
+ // Do we have the pid file?
+ if let Ok(pid) = std::fs::read(pid.path()) {
+ // Parse it as a string. We don't mind if it's not purely UTF8.
+ let pid = String::from_utf8_lossy(&pid).into_owned();
+ // Strip newline, if any.
+ if let Some(pid) = pid.strip_suffix('\n') {
+ // Parse as an integer, if possible.
+ if let Ok(pid) = pid.parse() {
+ // We have a pid, stop waiting.
+ return Ok(Daemon::new(pid));
+ }
+ }
+ sleep_briefly();
+ } else {
+ sleep_briefly();
+ }
+ }
+
+ let cmd = argv.join(" ");
+ let err = read(stderr.path()).map_err(DaemonError::Stderr)?;
+ let err = String::from_utf8_lossy(&err).into_owned();
+ Err(DaemonError::Timeout(self.timeout.as_millis(), cmd, err))
+ }
+}
+
+/// A running daemon.
+///
+/// The daemon process is killed, when the `Daemon` struct is dropped.
+#[derive(Debug)]
+pub struct Daemon {
+ pid: Option<i32>,
+}
+
+impl Daemon {
+ fn new(pid: i32) -> Self {
+ Self { pid: Some(pid) }
+ }
+
+ /// Explicitly stop a daemon.
+ ///
+ /// Calling this function is only useful if you want to handle
+ /// errors. It can only be called once.
+ pub fn stop(&mut self) -> Result<(), DaemonError> {
+ if let Some(raw_pid) = self.pid.take() {
+ let pid = Pid::from_raw(raw_pid);
+ kill(pid, Some(Signal::SIGKILL)).map_err(|e| DaemonError::Kill(raw_pid, e))?;
+ }
+ Ok(())
+ }
+}
+
+impl Drop for Daemon {
+ fn drop(&mut self) {
+ if self.stop().is_err() {
+ // Do nothing.
+ }
+ }
+}
+
+fn sleep_briefly() {
+ sleep(Duration::from_millis(100));
+}
diff --git a/src/lib.rs b/src/lib.rs
index db53aac..82e735b 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -28,8 +28,11 @@
//! This crate only collects data from a set of benchmarks. It does
//! not analyze the data. The data can be stored for later analysis.
+pub mod daemon;
pub mod obnam;
pub mod result;
+pub mod server;
pub mod specification;
pub mod step;
pub mod suite;
+pub mod tlsgen;
diff --git a/src/server.rs b/src/server.rs
new file mode 100644
index 0000000..9a809a5
--- /dev/null
+++ b/src/server.rs
@@ -0,0 +1,122 @@
+use crate::daemon::{Daemon, DaemonError, DaemonManager};
+use crate::tlsgen::{Tls, TlsError};
+use log::debug;
+use rand::random;
+use serde::Serialize;
+use std::ops::Range;
+use std::path::{Path, PathBuf};
+use std::time::Instant;
+use tempfile::{tempdir, TempDir};
+
+const PORT_RANGE: Range<PortNumber> = 2048..32000;
+const TIMEOUT_MS: u128 = 10_000;
+
+type PortNumber = u16;
+
+#[derive(Debug, thiserror::Error)]
+pub enum ObnamServerError {
+ #[error("took too long to pick a random port for server")]
+ Port,
+
+ #[error("failed to create temporary directory for server: {0}")]
+ TempDir(std::io::Error),
+
+ #[error("failed to write server configuration to {0}: {1}")]
+ WriteConfig(PathBuf, std::io::Error),
+
+ #[error("failed to create TLS certificate: {0}")]
+ Tls(TlsError),
+
+ #[error("failed to write TLS certificate: {0}")]
+ WriteTls(PathBuf, std::io::Error),
+
+ #[error("failed to start Obnam server: {0}")]
+ Daemon(DaemonError),
+}
+
+#[derive(Debug)]
+pub struct ObnamServer {
+ #[allow(dead_code)]
+ tempdir: TempDir,
+ chunks: PathBuf,
+ daemon: Option<Daemon>,
+}
+
+impl ObnamServer {
+ pub fn new(manager: &DaemonManager) -> Result<Self, ObnamServerError> {
+ debug!("creating ObamServer");
+ let tempdir = tempdir().map_err(ObnamServerError::TempDir)?;
+ let config_filename = tempdir.path().join("server.yaml");
+ let chunks = tempdir.path().join("chunks");
+ let tls_key = tempdir.path().join("tls_key");
+ let tls_cert = tempdir.path().join("tls_cert");
+
+ let tls = Tls::new().map_err(ObnamServerError::Tls)?;
+ write(&tls_key, tls.key())?;
+ write(&tls_cert, tls.cert())?;
+
+ let port = pick_port()?;
+
+ let config = ServerConfig::new(port, chunks.clone(), tls_key, tls_cert);
+ config.write(&config_filename)?;
+
+ let daemon = manager
+ .start(&["/bin/sleep", "1000"])
+ .map_err(ObnamServerError::Daemon)?;
+
+ Ok(Self {
+ tempdir,
+ chunks,
+ daemon: Some(daemon),
+ })
+ }
+
+ pub fn stop(&mut self) {
+ self.daemon.take();
+ }
+
+ pub fn chunks(&self) -> &Path {
+ &self.chunks
+ }
+}
+
+fn write(filename: &Path, data: &[u8]) -> Result<(), ObnamServerError> {
+ std::fs::write(filename, data)
+ .map_err(|err| ObnamServerError::WriteTls(filename.to_path_buf(), err))
+}
+
+fn pick_port() -> Result<PortNumber, ObnamServerError> {
+ let started = Instant::now();
+ while started.elapsed().as_millis() < TIMEOUT_MS {
+ let port: PortNumber = random();
+ if PORT_RANGE.contains(&port) {
+ return Ok(port);
+ }
+ }
+ Err(ObnamServerError::Port)
+}
+
+#[derive(Debug, Serialize)]
+pub struct ServerConfig {
+ address: String,
+ chunks: PathBuf,
+ tls_key: PathBuf,
+ tls_cert: PathBuf,
+}
+
+impl ServerConfig {
+ fn new(port: u16, chunks: PathBuf, tls_key: PathBuf, tls_cert: PathBuf) -> Self {
+ Self {
+ address: format!("localhost:{}", port),
+ chunks,
+ tls_key,
+ tls_cert,
+ }
+ }
+
+ fn write(&self, filename: &Path) -> Result<(), ObnamServerError> {
+ std::fs::write(filename, serde_yaml::to_string(self).unwrap())
+ .map_err(|err| ObnamServerError::WriteConfig(filename.to_path_buf(), err))?;
+ Ok(())
+ }
+}
diff --git a/src/suite.rs b/src/suite.rs
index 435c476..0f49994 100644
--- a/src/suite.rs
+++ b/src/suite.rs
@@ -1,5 +1,7 @@
+use crate::daemon::DaemonManager;
use crate::obnam::{Obnam, ObnamError};
use crate::result::{Measurement, OpMeasurements, Operation};
+use crate::server::{ObnamServer, ObnamServerError};
use crate::specification::{Create, FileCount};
use crate::step::Step;
use log::{debug, info};
@@ -13,6 +15,7 @@ use walkdir::WalkDir;
/// This manages temporary data created for the benchmarks, and
/// executes individual steps in the suite.
pub struct Suite {
+ manager: DaemonManager,
benchmark: Option<Benchmark>,
}
@@ -38,11 +41,18 @@ pub enum SuiteError {
/// Error managing an Obnam system.
#[error(transparent)]
Obnam(#[from] ObnamError),
+
+ /// Error managing an Obnam server.
+ #[error(transparent)]
+ Server(#[from] ObnamServerError),
}
impl Suite {
pub fn new() -> Result<Self, SuiteError> {
- Ok(Self { benchmark: None })
+ Ok(Self {
+ manager: DaemonManager::new(),
+ benchmark: None,
+ })
}
/// Execute one step in the benchmark suite.
@@ -54,7 +64,7 @@ impl Suite {
let mut om = match step {
Step::Start(name) => {
assert!(self.benchmark.is_none());
- let mut benchmark = Benchmark::new(name)?;
+ let mut benchmark = Benchmark::new(name, &self.manager)?;
let om = benchmark.start()?;
self.benchmark = Some(benchmark);
om
@@ -104,13 +114,15 @@ struct Benchmark {
// Obnam, and thereby delete any temporary files. We want to do
// that intentionally, so that it can be measured.
obnam: Option<Obnam>,
+ server: Option<ObnamServer>,
}
impl Benchmark {
- fn new(name: &str) -> Result<Self, SuiteError> {
+ fn new(name: &str, manager: &DaemonManager) -> Result<Self, SuiteError> {
Ok(Self {
name: name.to_string(),
obnam: Some(Obnam::new()?),
+ server: Some(ObnamServer::new(manager)?),
})
}
@@ -132,6 +144,7 @@ impl Benchmark {
info!("ending benchmark {}", self.name);
self.obnam().stop_server()?;
self.obnam.take().unwrap(); // This destroys the Obnam
+ self.server.as_mut().unwrap().stop();
Ok(OpMeasurements::new(self.name(), Operation::Stop))
}
diff --git a/src/tlsgen.rs b/src/tlsgen.rs
new file mode 100644
index 0000000..03b58ad
--- /dev/null
+++ b/src/tlsgen.rs
@@ -0,0 +1,138 @@
+use std::fs::read;
+use std::path::Path;
+use std::process::{Command, Stdio};
+use tempfile::NamedTempFile;
+
+#[derive(Debug, thiserror::Error)]
+pub enum TlsError {
+ #[error("failed to create temporary file: {0}")]
+ TempFile(std::io::Error),
+
+ #[error("failed to read temporary file: {0}")]
+ ReadTemp(std::io::Error),
+
+ #[error("failed to run openssl {0}: {1}")]
+ RunOpenSsl(String, std::io::Error),
+
+ #[error("openssl {0} failed: {1}")]
+ OpenSsl(String, String),
+}
+
+#[derive(Debug)]
+pub struct Tls {
+ key: Vec<u8>,
+ cert: Vec<u8>,
+}
+
+impl Tls {
+ pub fn new() -> Result<Self, TlsError> {
+ let (key, cert) = generate()?;
+ Ok(Self { key, cert })
+ }
+
+ pub fn key(&self) -> &[u8] {
+ &self.key
+ }
+
+ pub fn cert(&self) -> &[u8] {
+ &self.cert
+ }
+}
+
+fn generate() -> Result<(Vec<u8>, Vec<u8>), TlsError> {
+ let key = NamedTempFile::new().map_err(TlsError::TempFile)?;
+ let csr = NamedTempFile::new().map_err(TlsError::TempFile)?;
+ genrsa(key.path())?;
+ let key_data = rsa(key.path())?;
+ req(key.path(), csr.path())?;
+ let cert_data = x509(key.path(), csr.path())?;
+ Ok((key_data, cert_data))
+}
+
+fn openssl() -> Command {
+ let mut command = Command::new("openssl");
+ command
+ .stdin(Stdio::null())
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped());
+ command
+}
+
+fn genrsa(filename: &Path) -> Result<(), TlsError> {
+ let output = openssl()
+ .arg("genrsa")
+ .arg("-out")
+ .arg(filename)
+ .arg("2048")
+ .output()
+ .map_err(|err| TlsError::RunOpenSsl("genrsa".to_string(), err))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
+ return Err(TlsError::OpenSsl("genrsa".to_string(), stderr));
+ }
+
+ Ok(())
+}
+
+fn rsa(filename: &Path) -> Result<Vec<u8>, TlsError> {
+ let output = openssl()
+ .arg("rsa")
+ .arg("-in")
+ .arg(filename)
+ .arg("-out")
+ .arg(filename)
+ .output()
+ .map_err(|err| TlsError::RunOpenSsl("rsa".to_string(), err))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
+ return Err(TlsError::OpenSsl("rsa".to_string(), stderr));
+ }
+
+ read(filename).map_err(TlsError::ReadTemp)
+}
+
+fn req(key: &Path, csr: &Path) -> Result<(), TlsError> {
+ let output = openssl()
+ .arg("req")
+ .arg("-sha256")
+ .arg("-new")
+ .arg("-key")
+ .arg(key)
+ .arg("-out")
+ .arg(csr)
+ .arg("-subj")
+ .arg("/CN=localhost")
+ .output()
+ .map_err(|err| TlsError::RunOpenSsl("req".to_string(), err))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
+ return Err(TlsError::OpenSsl("req".to_string(), stderr));
+ }
+
+ Ok(())
+}
+
+fn x509(key: &Path, csr: &Path) -> Result<Vec<u8>, TlsError> {
+ let output = openssl()
+ .arg("x509")
+ .arg("-req")
+ .arg("-sha256")
+ .arg("-days")
+ .arg("1")
+ .arg("-in")
+ .arg(csr)
+ .arg("-signkey")
+ .arg(key)
+ .output()
+ .map_err(|err| TlsError::RunOpenSsl("req".to_string(), err))?;
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr).into_owned();
+ return Err(TlsError::OpenSsl("req".to_string(), stderr));
+ }
+
+ Ok(output.stdout)
+}