diff options
author | Lars Wirzenius <liw@liw.fi> | 2024-02-19 17:52:34 +0200 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2024-02-19 20:54:47 +0200 |
commit | 5d3aa44a92be306481612d8c8f10c85e9c9feefa (patch) | |
tree | 8494fed67bb8b93b33184562c047fa933ded4770 | |
parent | 2dafe88a84dc031485a71b405f3cda43fa8e00fa (diff) | |
download | radicle-ci-broker-5d3aa44a92be306481612d8c8f10c85e9c9feefa.tar.gz |
feat: update status page in a background thread
Previously, we updated the status page only when we got a new broker
event, or after a CI run finished. This made it unsuitable as a "is
the broker alive?" service.
Signed-off-by: Lars Wirzenius <liw@liw.fi>
-rw-r--r-- | src/adapter.rs | 63 | ||||
-rw-r--r-- | src/bin/ci-broker.rs | 56 | ||||
-rw-r--r-- | src/bin/status.rs | 2 | ||||
-rw-r--r-- | src/broker.rs | 12 | ||||
-rw-r--r-- | src/run.rs | 19 | ||||
-rw-r--r-- | src/status.rs | 85 |
6 files changed, 145 insertions, 92 deletions
diff --git a/src/adapter.rs b/src/adapter.rs index d9a37d0..9d479fb 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -19,6 +19,7 @@ use std::{ use crate::{ msg::{MessageError, Request, Response}, run::{Run, RunState}, + status::Status, }; /// An external executable that runs CI on request. @@ -47,14 +48,26 @@ impl Adapter { self.env.iter().map(|(k, v)| (k.as_ref(), v.as_ref())) } - pub fn run(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> { - run.set_state(RunState::Running); - let x = self.run_helper(trigger, run); + pub fn run( + &self, + trigger: &Request, + run: &mut Run, + status: &mut Status, + ) -> Result<(), AdapterError> { + run.set_state(RunState::Triggered); + status.ci_run(run); + let x = self.run_helper(trigger, run, status); run.set_state(RunState::Finished); + status.ci_run(run); x } - fn run_helper(&self, trigger: &Request, run: &mut Run) -> Result<(), AdapterError> { + fn run_helper( + &self, + trigger: &Request, + run: &mut Run, + status: &mut Status, + ) -> Result<(), AdapterError> { assert!(matches!(trigger, Request::Trigger { .. })); // Spawn the adapter sub-process. @@ -84,7 +97,9 @@ impl Adapter { let resp = Response::from_str(&line)?; match resp { Response::Triggered { run_id } => { + run.set_state(RunState::Running); run.set_adapter_run_id(run_id); + status.ci_run(run); } _ => return Err(AdapterError::NotTriggered(resp)), } @@ -96,6 +111,7 @@ impl Adapter { match resp { Response::Finished { result } => { run.set_result(result); + status.ci_run(run); } _ => return Err(AdapterError::NotTriggered(resp)), } @@ -157,7 +173,7 @@ pub enum AdapterError { #[cfg(test)] mod test { - use std::{fs::write, io::ErrorKind}; + use std::{fs::write, io::ErrorKind, path::Path}; use tempfile::tempdir; @@ -165,6 +181,7 @@ mod test { use crate::{ adapter::AdapterError, msg::{MessageError, Response, RunResult}, + status::Status, test::{mock_adapter, trigger_request, TestResult}, }; @@ -180,7 +197,8 @@ echo '{"response":"finished","result":"success"}' mock_adapter(&bin, ADAPTER)?; let mut run = Run::default(); - Adapter::new(&bin).run(&trigger_request()?, &mut run)?; + let mut status = Status::new(Path::new("/dev/null")); + Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status)?; assert_eq!(run.result(), Some(&RunResult::Success)); Ok(()) @@ -198,7 +216,8 @@ echo '{"response":"finished","result":"failure"}' mock_adapter(&bin, ADAPTER)?; let mut run = Run::default(); - Adapter::new(&bin).run(&trigger_request()?, &mut run)?; + let mut status = Status::new(Path::new("/dev/null")); + Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status)?; assert_eq!(run.result(), Some(&RunResult::Failure)); Ok(()) @@ -216,7 +235,8 @@ echo '{"response":"finished","result":{"error":"error message\nsecond line"}}' mock_adapter(&bin, ADAPTER)?; let mut run = Run::default(); - Adapter::new(&bin).run(&trigger_request()?, &mut run)?; + let mut status = Status::new(Path::new("/dev/null")); + Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status)?; assert_eq!( run.result(), Some(&RunResult::Error("error message\nsecond line".into())) @@ -236,7 +256,8 @@ kill -9 $BASHPID mock_adapter(&bin, ADAPTER)?; let mut run = Run::default(); - let x = Adapter::new(&bin).run(&trigger_request()?, &mut run); + let mut status = Status::new(Path::new("/dev/null")); + let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status); assert!(matches!(x, Err(AdapterError::Failed(_)))); Ok(()) @@ -254,7 +275,8 @@ kill -9 $BASHPID mock_adapter(&bin, ADAPTER)?; let mut run = Run::default(); - let x = Adapter::new(&bin).run(&trigger_request()?, &mut run); + let mut status = Status::new(Path::new("/dev/null")); + let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status); assert!(matches!(x, Err(AdapterError::Failed(_)))); Ok(()) @@ -273,7 +295,8 @@ kill -9 $BASHPID mock_adapter(&bin, ADAPTER)?; let mut run = Run::default(); - let x = Adapter::new(&bin).run(&trigger_request()?, &mut run); + let mut status = Status::new(Path::new("/dev/null")); + let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status); assert!(matches!(x, Err(AdapterError::Failed(_)))); Ok(()) @@ -291,7 +314,8 @@ echo '{"response":"finished","result":"success","bad":"field"}' mock_adapter(&bin, ADAPTER)?; let mut run = Run::default(); - let x = Adapter::new(&bin).run(&trigger_request()?, &mut run); + let mut status = Status::new(Path::new("/dev/null")); + let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status); assert!(matches!( x, Err(AdapterError::Message(MessageError::DeserializeResponse(_))) @@ -311,7 +335,8 @@ echo '{"response":"finished","result":"success"}' mock_adapter(&bin, ADAPTER)?; let mut run = Run::default(); - let x = Adapter::new(&bin).run(&trigger_request()?, &mut run); + let mut status = Status::new(Path::new("/dev/null")); + let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status); assert!(matches!( x, Err(AdapterError::NotTriggered(Response::Finished { @@ -335,7 +360,8 @@ echo '{"response":"finished","result":"success"}' mock_adapter(&bin, ADAPTER)?; let mut run = Run::default(); - let x = Adapter::new(&bin).run(&trigger_request()?, &mut run); + let mut status = Status::new(Path::new("/dev/null")); + let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status); assert!(matches!( x, Err(AdapterError::TooMany(Response::Finished { @@ -352,7 +378,8 @@ echo '{"response":"finished","result":"success"}' let bin = tmp.path().join("adapter.sh"); let mut run = Run::default(); - let x = Adapter::new(&bin).run(&trigger_request()?, &mut run); + let mut status = Status::new(Path::new("/dev/null")); + let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status); match x { Err(AdapterError::SpawnAdapter(filename, e)) => { assert_eq!(bin, filename); @@ -376,7 +403,8 @@ echo '{"response":"finished","result":"success"}' write(&bin, ADAPTER)?; let mut run = Run::default(); - let x = Adapter::new(&bin).run(&trigger_request()?, &mut run); + let mut status = Status::new(Path::new("/dev/null")); + let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status); match x { Err(AdapterError::SpawnAdapter(filename, e)) => { assert_eq!(bin, filename); @@ -404,7 +432,8 @@ echo '{"response":"finished","result":"success"}' mock_adapter(&bin, ADAPTER)?; let mut run = Run::default(); - let x = Adapter::new(&bin).run(&trigger_request()?, &mut run); + let mut status = Status::new(Path::new("/dev/null")); + let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &mut status); match x { Err(AdapterError::SpawnAdapter(filename, e)) => { assert_eq!(bin, filename); diff --git a/src/bin/ci-broker.rs b/src/bin/ci-broker.rs index efa6ce1..d25cde6 100644 --- a/src/bin/ci-broker.rs +++ b/src/bin/ci-broker.rs @@ -1,22 +1,20 @@ use std::{ error::Error, path::{Path, PathBuf}, + thread::{sleep, spawn}, + time::Duration, }; use log::{debug, info}; use radicle::prelude::Profile; use radicle_ci_broker::{ - adapter::Adapter, - broker::Broker, - config::Config, - error::BrokerError, - event::{BrokerEvent, NodeEventSource}, - msg::Request, - run::Run, - status::StatusBuilder, + adapter::Adapter, broker::Broker, config::Config, error::BrokerError, event::NodeEventSource, + msg::Request, status::Status, }; +const STATUS_UPDATE_DELAY: Duration = Duration::from_millis(1000); + fn main() { if let Err(e) = fallible_main() { eprintln!("ERROR: {}", e); @@ -76,41 +74,29 @@ fn fallible_main() -> Result<(), BrokerError> { } debug!("added filters to node event source"); + // Spawn a thread that updates the status page. + let mut status = Status::new(config.status_page().unwrap_or(Path::new("/dev/null"))); + let s2 = status.clone(); + let _status_thread = spawn(move || status_updater(s2)); + // This loop ends when there's an error, e.g., failure to read an // event from the node. - let mut counter = 0; - let mut status = StatusBuilder::new(config.status_page().unwrap_or(Path::new("/dev/null"))); loop { - status.write()?; debug!("waiting for event from node"); for e in source.event()? { status.broker_event(&e); - status.write()?; - - counter += 1; debug!("broker event {e:#?}"); - let BrokerEvent::RefChanged { - rid, - oid, - name: _, - old: _, - } = e; - let req = Request::trigger(&profile, &e)?; - let mut run = Run::default(); - if let Some(adapter) = broker.adapter(&rid) { - adapter.run(&req, &mut run)?; - status.ci_run(run.adapter_run_id().unwrap(), run.result().unwrap()); - println!( - "Run CI run #{}: {}, {} -> {}", - counter, - rid, - oid, - run.result().unwrap() - ); - } else { - eprintln!("ERROR: no adapter available for repository {rid}: not running CI"); - } + broker.execute_ci(&req, &mut status)?; + } + } +} + +fn status_updater(mut status: Status) { + loop { + if let Err(e) = status.write() { + eprintln!("ERROR: failed to update status page: {e}"); } + sleep(STATUS_UPDATE_DELAY); } } diff --git a/src/bin/status.rs b/src/bin/status.rs index 646291b..bcf592c 100644 --- a/src/bin/status.rs +++ b/src/bin/status.rs @@ -14,6 +14,6 @@ fn main() { } fn fallible_main() -> Result<(), StatusError> { - StatusBuilder::new(Path::new("status.json")).write()?; + Status::new(Path::new("status.json")).write()?; Ok(()) } diff --git a/src/broker.rs b/src/broker.rs index 3676f2e..b7669c4 100644 --- a/src/broker.rs +++ b/src/broker.rs @@ -7,7 +7,7 @@ use std::collections::HashMap; use radicle::prelude::RepoId; -use crate::{adapter::Adapter, error::BrokerError, msg::Request, run::Run}; +use crate::{adapter::Adapter, error::BrokerError, msg::Request, run::Run, status::Status}; /// A CI broker. /// @@ -38,7 +38,7 @@ impl Broker { } #[allow(clippy::result_large_err)] - pub fn execute_ci(&self, trigger: &Request) -> Result<Run, BrokerError> { + pub fn execute_ci(&self, trigger: &Request, status: &mut Status) -> Result<Run, BrokerError> { let adapter = match trigger { Request::Trigger { common, @@ -55,7 +55,7 @@ impl Broker { }; let mut run = Run::default(); - adapter.run(trigger, &mut run)?; + adapter.run(trigger, &mut run, status)?; Ok(run) } @@ -63,12 +63,15 @@ impl Broker { #[cfg(test)] mod test { + use std::path::Path; + use tempfile::tempdir; use super::{Adapter, Broker, RepoId}; use crate::{ msg::{RunId, RunResult}, run::RunState, + status::Status, test::{mock_adapter, trigger_request, TestResult}, }; @@ -154,7 +157,8 @@ echo '{"response":"finished","result":"success"}' let trigger = trigger_request()?; - let x = broker.execute_ci(&trigger); + let mut status = Status::new(Path::new("/dev/null")); + let x = broker.execute_ci(&trigger, &mut status); assert!(x.is_ok()); let run = x.unwrap(); assert_eq!(run.adapter_run_id(), Some(&RunId::from("xyzzy"))); @@ -63,7 +63,6 @@ impl Run { /// State of CI run. #[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize)] -#[serde(untagged)] #[serde(rename_all = "snake_case")] pub enum RunState { /// Run has been triggered, but has not yet been started. @@ -78,6 +77,22 @@ pub enum RunState { impl fmt::Display for RunState { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "{}", serde_json::to_string(self).unwrap()) + let s = match self { + Self::Finished => "finished", + Self::Running => "running", + Self::Triggered => "triggered", + }; + write!(f, "{}", s) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn serialize_run_state() { + let s = serde_json::to_string(&RunState::Finished).unwrap(); + assert_eq!(s, r#""finished""#); } } diff --git a/src/status.rs b/src/status.rs index 00bf0c4..17445c8 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,25 +1,38 @@ -use std::path::{Path, PathBuf}; +use std::{ + path::{Path, PathBuf}, + sync::{Arc, Mutex, MutexGuard}, +}; use serde::Serialize; use time::{macros::format_description, OffsetDateTime}; -use crate::{ - event::BrokerEvent, - msg::{RunId, RunResult}, -}; +use crate::{event::BrokerEvent, run::Run}; -#[derive(Serialize)] -pub struct Status { +#[derive(Debug, Serialize)] +struct StatusData { timestamp: String, + broker_event_counter: usize, ci_broker_version: &'static str, ci_broker_git_commit: &'static str, latest_broker_event: Option<BrokerEvent>, - latest_ci_run: Option<RunId>, - latest_ci_run_result: Option<RunResult>, + latest_ci_run: Option<Run>, } -impl Status { - pub fn write(&self, filename: &Path) -> Result<(), StatusError> { +impl Default for StatusData { + fn default() -> Self { + Self { + timestamp: "".into(), + broker_event_counter: 0, + ci_broker_version: env!("CARGO_PKG_VERSION"), + ci_broker_git_commit: env!("GIT_HEAD"), + latest_broker_event: None, + latest_ci_run: None, + } + } +} + +impl StatusData { + fn write(&self, filename: &Path) -> Result<(), StatusError> { let s = serde_json::to_string_pretty(&self).map_err(StatusError::serialize)?; std::fs::write(filename, s.as_bytes()) .map_err(|e| StatusError::status_write(filename, e))?; @@ -27,42 +40,39 @@ impl Status { } } -pub struct StatusBuilder { +pub struct Status { filename: PathBuf, - latest_broker_event: Option<BrokerEvent>, - latest_ci_run: Option<RunId>, - latest_ci_run_result: Option<RunResult>, + status: Arc<Mutex<StatusData>>, } -impl StatusBuilder { +impl Status { pub fn new(filename: &Path) -> Self { Self { filename: filename.into(), - latest_ci_run: None, - latest_ci_run_result: None, - latest_broker_event: None, + status: Arc::new(Mutex::new(StatusData::default())), } } + fn lock(&mut self) -> MutexGuard<StatusData> { + self.status.lock().expect("lock StatusGuard::status") + } + pub fn broker_event(&mut self, event: &BrokerEvent) { - self.latest_broker_event = Some(event.clone()); + let mut status = self.lock(); + status.latest_broker_event = Some(event.clone()); + status.broker_event_counter += 1; } - pub fn ci_run(&mut self, run_id: &RunId, result: &RunResult) { - self.latest_ci_run = Some(run_id.clone()); - self.latest_ci_run_result = Some(result.clone()); + pub fn ci_run(&mut self, run: &Run) { + let mut status = self.lock(); + status.latest_ci_run = Some(run.clone()); } - pub fn write(&self) -> Result<(), StatusError> { - let status = Status { - timestamp: Self::now()?, - ci_broker_version: env!("CARGO_PKG_VERSION"), - ci_broker_git_commit: env!("GIT_HEAD"), - latest_broker_event: self.latest_broker_event.clone(), - latest_ci_run: self.latest_ci_run.clone(), - latest_ci_run_result: self.latest_ci_run_result.clone(), - }; - status.write(&self.filename)?; + pub fn write(&mut self) -> Result<(), StatusError> { + let filename = self.filename.clone(); + let mut status = self.lock(); + status.timestamp = Self::now()?; + status.write(&filename)?; Ok(()) } @@ -74,6 +84,15 @@ impl StatusBuilder { } } +impl Clone for Status { + fn clone(&self) -> Self { + Self { + filename: self.filename.clone(), + status: Arc::clone(&self.status), + } + } +} + #[derive(Debug, thiserror::Error)] pub enum StatusError { #[error("failed to format current time stamp")] |