summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2024-02-19 17:52:34 +0200
committerLars Wirzenius <liw@liw.fi>2024-02-19 20:54:47 +0200
commit5d3aa44a92be306481612d8c8f10c85e9c9feefa (patch)
tree8494fed67bb8b93b33184562c047fa933ded4770
parent2dafe88a84dc031485a71b405f3cda43fa8e00fa (diff)
downloadradicle-ci-broker-5d3aa44a92be306481612d8c8f10c85e9c9feefa.tar.gz
feat: update status page in a background thread
Previously, we updated the status page only when we got a new broker event, or after a CI run finished. This made it unsuitable as a "is the broker alive?" service. Signed-off-by: Lars Wirzenius <liw@liw.fi>
-rw-r--r--src/adapter.rs63
-rw-r--r--src/bin/ci-broker.rs56
-rw-r--r--src/bin/status.rs2
-rw-r--r--src/broker.rs12
-rw-r--r--src/run.rs19
-rw-r--r--src/status.rs85
6 files changed, 145 insertions, 92 deletions
diff --git a/src/adapter.rs b/src/adapter.rs
index d9a37d0..9d479fb 100644
--- a/src/adapter.rs
+++ b/src/adapter.rs
@@ -19,6 +19,7 @@ use std::{
use crate::{
msg::{MessageError, Request, Response},
run::{Run, RunState},
+ status::Status,
};
/// An external executable that runs CI on request.
@@ -47,14 +48,26 @@ impl Adapter {
self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref()))
}
- pub fn run(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
- run.set_state(RunState::Running);
- let x = self.run_helper(trigger, run);
+ pub fn run(
+ &self,
+ trigger: &Request,
+ run: &mut Run,
+ status: &mut Status,
+ ) -> Result<(), AdapterError> {
+ run.set_state(RunState::Triggered);
+ status.ci_run(run);
+ let x = self.run_helper(trigger, run, status);
run.set_state(RunState::Finished);
+ status.ci_run(run);
x
}
- fn run_helper(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> {
+ fn run_helper(
+ &self,
+ trigger: &Request,
+ run: &mut Run,
+ status: &mut Status,
+ ) -> Result<(), AdapterError> {
assert!(matches!(trigger, Request::Trigger { .. }));
// Spawn the adapter sub-process.
@@ -84,7 +97,9 @@ impl Adapter {
let resp = Response::from_str(&line)?;
match resp {
Response::Triggered { run_id } => {
+ run.set_state(RunState::Running);
run.set_adapter_run_id(run_id);
+ status.ci_run(run);
}
_ => return Err(AdapterError::NotTriggered(resp)),
}
@@ -96,6 +111,7 @@ impl Adapter {
match resp {
Response::Finished { result } => {
run.set_result(result);
+ status.ci_run(run);
}
_ => return Err(AdapterError::NotTriggered(resp)),
}
@@ -157,7 +173,7 @@ pub enum AdapterError {
#[cfg(test)]
mod test {
- use std::{fs::write, io::ErrorKind};
+ use std::{fs::write, io::ErrorKind, path::Path};
use tempfile::tempdir;
@@ -165,6 +181,7 @@ mod test {
use crate::{
adapter::AdapterError,
msg::{MessageError, Response, RunResult},
+ status::Status,
test::{mock_adapter, trigger_request, TestResult},
};
@@ -180,7 +197,8 @@ echo '{"response":"finished","result":"success"}'
mock_adapter(&bin, ADAPTER)?;
let mut run = Run::default();
- Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+ let mut status = Status::new(Path::new("/dev/null"));
+ Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status)?;
assert_eq!(run.result(), Some(&RunResult::Success));
Ok(())
@@ -198,7 +216,8 @@ echo '{"response":"finished","result":"failure"}'
mock_adapter(&bin, ADAPTER)?;
let mut run = Run::default();
- Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+ let mut status = Status::new(Path::new("/dev/null"));
+ Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status)?;
assert_eq!(run.result(), Some(&RunResult::Failure));
Ok(())
@@ -216,7 +235,8 @@ echo '{"response":"finished","result":{"error":"error message\nsecond line"}}'
mock_adapter(&bin, ADAPTER)?;
let mut run = Run::default();
- Adapter::new(&bin).run(&trigger_request()?, &mut run)?;
+ let mut status = Status::new(Path::new("/dev/null"));
+ Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status)?;
assert_eq!(
run.result(),
Some(&RunResult::Error("error message\nsecond line".into()))
@@ -236,7 +256,8 @@ kill -9 $BASHPID
mock_adapter(&bin, ADAPTER)?;
let mut run = Run::default();
- let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+ let mut status = Status::new(Path::new("/dev/null"));
+ let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
assert!(matches!(x, Err(AdapterError::Failed(_))));
Ok(())
@@ -254,7 +275,8 @@ kill -9 $BASHPID
mock_adapter(&bin, ADAPTER)?;
let mut run = Run::default();
- let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+ let mut status = Status::new(Path::new("/dev/null"));
+ let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
assert!(matches!(x, Err(AdapterError::Failed(_))));
Ok(())
@@ -273,7 +295,8 @@ kill -9 $BASHPID
mock_adapter(&bin, ADAPTER)?;
let mut run = Run::default();
- let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+ let mut status = Status::new(Path::new("/dev/null"));
+ let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
assert!(matches!(x, Err(AdapterError::Failed(_))));
Ok(())
@@ -291,7 +314,8 @@ echo '{"response":"finished","result":"success","bad":"field"}'
mock_adapter(&bin, ADAPTER)?;
let mut run = Run::default();
- let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+ let mut status = Status::new(Path::new("/dev/null"));
+ let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
assert!(matches!(
x,
Err(AdapterError::Message(MessageError::DeserializeResponse(_)))
@@ -311,7 +335,8 @@ echo '{"response":"finished","result":"success"}'
mock_adapter(&bin, ADAPTER)?;
let mut run = Run::default();
- let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+ let mut status = Status::new(Path::new("/dev/null"));
+ let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
assert!(matches!(
x,
Err(AdapterError::NotTriggered(Response::Finished {
@@ -335,7 +360,8 @@ echo '{"response":"finished","result":"success"}'
mock_adapter(&bin, ADAPTER)?;
let mut run = Run::default();
- let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+ let mut status = Status::new(Path::new("/dev/null"));
+ let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
assert!(matches!(
x,
Err(AdapterError::TooMany(Response::Finished {
@@ -352,7 +378,8 @@ echo '{"response":"finished","result":"success"}'
let bin = tmp.path().join("adapter.sh");
let mut run = Run::default();
- let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+ let mut status = Status::new(Path::new("/dev/null"));
+ let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
match x {
Err(AdapterError::SpawnAdapter(filename, e)) => {
assert_eq!(bin, filename);
@@ -376,7 +403,8 @@ echo '{"response":"finished","result":"success"}'
write(&bin, ADAPTER)?;
let mut run = Run::default();
- let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+ let mut status = Status::new(Path::new("/dev/null"));
+ let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
match x {
Err(AdapterError::SpawnAdapter(filename, e)) => {
assert_eq!(bin, filename);
@@ -404,7 +432,8 @@ echo '{"response":"finished","result":"success"}'
mock_adapter(&bin, ADAPTER)?;
let mut run = Run::default();
- let x = Adapter::new(&bin).run(&trigger_request()?, &mut run);
+ let mut status = Status::new(Path::new("/dev/null"));
+ let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status);
match x {
Err(AdapterError::SpawnAdapter(filename, e)) => {
assert_eq!(bin, filename);
diff --git a/src/bin/ci-broker.rs b/src/bin/ci-broker.rs
index efa6ce1..d25cde6 100644
--- a/src/bin/ci-broker.rs
+++ b/src/bin/ci-broker.rs
@@ -1,22 +1,20 @@
use std::{
error::Error,
path::{Path, PathBuf},
+ thread::{sleep, spawn},
+ time::Duration,
};
use log::{debug, info};
use radicle::prelude::Profile;
use radicle_ci_broker::{
- adapter::Adapter,
- broker::Broker,
- config::Config,
- error::BrokerError,
- event::{BrokerEvent, NodeEventSource},
- msg::Request,
- run::Run,
- status::StatusBuilder,
+ adapter::Adapter, broker::Broker, config::Config, error::BrokerError, event::NodeEventSource,
+ msg::Request, status::Status,
};
+const STATUS_UPDATE_DELAY: Duration = Duration::from_millis(1000);
+
fn main() {
if let Err(e) = fallible_main() {
eprintln!("ERROR: {}", e);
@@ -76,41 +74,29 @@ fn fallible_main() -> Result<(), BrokerError> {
}
debug!("added filters to node event source");
+ // Spawn a thread that updates the status page.
+ let mut status = Status::new(config.status_page().unwrap_or(Path::new("/dev/null")));
+ let s2 = status.clone();
+ let _status_thread = spawn(move || status_updater(s2));
+
// This loop ends when there's an error, e.g., failure to read an
// event from the node.
- let mut counter = 0;
- let mut status = StatusBuilder::new(config.status_page().unwrap_or(Path::new("/dev/null")));
loop {
- status.write()?;
debug!("waiting for event from node");
for e in source.event()? {
status.broker_event(&e);
- status.write()?;
-
- counter += 1;
debug!("broker event {e:#?}");
- let BrokerEvent::RefChanged {
- rid,
- oid,
- name: _,
- old: _,
- } = e;
-
let req = Request::trigger(&profile, &e)?;
- let mut run = Run::default();
- if let Some(adapter) = broker.adapter(&rid) {
- adapter.run(&req, &mut run)?;
- status.ci_run(run.adapter_run_id().unwrap(), run.result().unwrap());
- println!(
- "Run CI run #{}: {}, {} -> {}",
- counter,
- rid,
- oid,
- run.result().unwrap()
- );
- } else {
- eprintln!("ERROR: no adapter available for repository {rid}: not running CI");
- }
+ broker.execute_ci(&req, &mut status)?;
+ }
+ }
+}
+
+fn status_updater(mut status: Status) {
+ loop {
+ if let Err(e) = status.write() {
+ eprintln!("ERROR: failed to update status page: {e}");
}
+ sleep(STATUS_UPDATE_DELAY);
}
}
diff --git a/src/bin/status.rs b/src/bin/status.rs
index 646291b..bcf592c 100644
--- a/src/bin/status.rs
+++ b/src/bin/status.rs
@@ -14,6 +14,6 @@ fn main() {
}
fn fallible_main() -> Result<(), StatusError> {
- StatusBuilder::new(Path::new("status.json")).write()?;
+ Status::new(Path::new("status.json")).write()?;
Ok(())
}
diff --git a/src/broker.rs b/src/broker.rs
index 3676f2e..b7669c4 100644
--- a/src/broker.rs
+++ b/src/broker.rs
@@ -7,7 +7,7 @@ use std::collections::HashMap;
use radicle::prelude::RepoId;
-use crate::{adapter::Adapter, error::BrokerError, msg::Request, run::Run};
+use crate::{adapter::Adapter, error::BrokerError, msg::Request, run::Run, status::Status};
/// A CI broker.
///
@@ -38,7 +38,7 @@ impl Broker {
}
#[allow(clippy::result_large_err)]
- pub fn execute_ci(&self, trigger: &Request) -> Result<Run, BrokerError> {
+ pub fn execute_ci(&self, trigger: &Request, status: &mut Status) -> Result<Run, BrokerError> {
let adapter = match trigger {
Request::Trigger {
common,
@@ -55,7 +55,7 @@ impl Broker {
};
let mut run = Run::default();
- adapter.run(trigger, &mut run)?;
+ adapter.run(trigger, &mut run, status)?;
Ok(run)
}
@@ -63,12 +63,15 @@ impl Broker {
#[cfg(test)]
mod test {
+ use std::path::Path;
+
use tempfile::tempdir;
use super::{Adapter, Broker, RepoId};
use crate::{
msg::{RunId, RunResult},
run::RunState,
+ status::Status,
test::{mock_adapter, trigger_request, TestResult},
};
@@ -154,7 +157,8 @@ echo '{"response":"finished","result":"success"}'
let trigger = trigger_request()?;
- let x = broker.execute_ci(&trigger);
+ let mut status = Status::new(Path::new("/dev/null"));
+ let x = broker.execute_ci(&trigger, &mut status);
assert!(x.is_ok());
let run = x.unwrap();
assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy")));
diff --git a/src/run.rs b/src/run.rs
index 8029c95..0b0ee58 100644
--- a/src/run.rs
+++ b/src/run.rs
@@ -63,7 +63,6 @@ impl Run {
/// State of CI run.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)]
-#[serde(untagged)]
#[serde(rename_all = "snake_case")]
pub enum RunState {
/// Run has been triggered, but has not yet been started.
@@ -78,6 +77,22 @@ pub enum RunState {
impl fmt::Display for RunState {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
- write!(f, "{}", serde_json::to_string(self).unwrap())
+ let s = match self {
+ Self::Finished => "finished",
+ Self::Running => "running",
+ Self::Triggered => "triggered",
+ };
+ write!(f, "{}", s)
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+
+ #[test]
+ fn serialize_run_state() {
+ let s = serde_json::to_string(&RunState::Finished).unwrap();
+ assert_eq!(s, r#""finished""#);
}
}
diff --git a/src/status.rs b/src/status.rs
index 00bf0c4..17445c8 100644
--- a/src/status.rs
+++ b/src/status.rs
@@ -1,25 +1,38 @@
-use std::path::{Path, PathBuf};
+use std::{
+ path::{Path, PathBuf},
+ sync::{Arc, Mutex, MutexGuard},
+};
use serde::Serialize;
use time::{macros::format_description, OffsetDateTime};
-use crate::{
- event::BrokerEvent,
- msg::{RunId, RunResult},
-};
+use crate::{event::BrokerEvent, run::Run};
-#[derive(Serialize)]
-pub struct Status {
+#[derive(Debug, Serialize)]
+struct StatusData {
timestamp: String,
+ broker_event_counter: usize,
ci_broker_version: &'static str,
ci_broker_git_commit: &'static str,
latest_broker_event: Option<BrokerEvent>,
- latest_ci_run: Option<RunId>,
- latest_ci_run_result: Option<RunResult>,
+ latest_ci_run: Option<Run>,
}
-impl Status {
- pub fn write(&self, filename: &Path) -> Result<(), StatusError> {
+impl Default for StatusData {
+ fn default() -> Self {
+ Self {
+ timestamp: "".into(),
+ broker_event_counter: 0,
+ ci_broker_version: env!("CARGO_PKG_VERSION"),
+ ci_broker_git_commit: env!("GIT_HEAD"),
+ latest_broker_event: None,
+ latest_ci_run: None,
+ }
+ }
+}
+
+impl StatusData {
+ fn write(&self, filename: &Path) -> Result<(), StatusError> {
let s = serde_json::to_string_pretty(&self).map_err(StatusError::serialize)?;
std::fs::write(filename, s.as_bytes())
.map_err(|e| StatusError::status_write(filename, e))?;
@@ -27,42 +40,39 @@ impl Status {
}
}
-pub struct StatusBuilder {
+pub struct Status {
filename: PathBuf,
- latest_broker_event: Option<BrokerEvent>,
- latest_ci_run: Option<RunId>,
- latest_ci_run_result: Option<RunResult>,
+ status: Arc<Mutex<StatusData>>,
}
-impl StatusBuilder {
+impl Status {
pub fn new(filename: &Path) -> Self {
Self {
filename: filename.into(),
- latest_ci_run: None,
- latest_ci_run_result: None,
- latest_broker_event: None,
+ status: Arc::new(Mutex::new(StatusData::default())),
}
}
+ fn lock(&mut self) -> MutexGuard<StatusData> {
+ self.status.lock().expect("lock StatusGuard::status")
+ }
+
pub fn broker_event(&mut self, event: &BrokerEvent) {
- self.latest_broker_event = Some(event.clone());
+ let mut status = self.lock();
+ status.latest_broker_event = Some(event.clone());
+ status.broker_event_counter += 1;
}
- pub fn ci_run(&mut self, run_id: &RunId, result: &RunResult) {
- self.latest_ci_run = Some(run_id.clone());
- self.latest_ci_run_result = Some(result.clone());
+ pub fn ci_run(&mut self, run: &Run) {
+ let mut status = self.lock();
+ status.latest_ci_run = Some(run.clone());
}
- pub fn write(&self) -> Result<(), StatusError> {
- let status = Status {
- timestamp: Self::now()?,
- ci_broker_version: env!("CARGO_PKG_VERSION"),
- ci_broker_git_commit: env!("GIT_HEAD"),
- latest_broker_event: self.latest_broker_event.clone(),
- latest_ci_run: self.latest_ci_run.clone(),
- latest_ci_run_result: self.latest_ci_run_result.clone(),
- };
- status.write(&self.filename)?;
+ pub fn write(&mut self) -> Result<(), StatusError> {
+ let filename = self.filename.clone();
+ let mut status = self.lock();
+ status.timestamp = Self::now()?;
+ status.write(&filename)?;
Ok(())
}
@@ -74,6 +84,15 @@ impl StatusBuilder {
}
}
+impl Clone for Status {
+ fn clone(&self) -> Self {
+ Self {
+ filename: self.filename.clone(),
+ status: Arc::clone(&self.status),
+ }
+ }
+}
+
#[derive(Debug, thiserror::Error)]
pub enum StatusError {
#[error("failed to format current time stamp")]