diff options
author | Lars Wirzenius <liw@liw.fi> | 2024-03-06 17:49:24 +0200 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2024-03-06 19:36:26 +0200 |
commit | 23be5b2133a904b4cde3fe8c1f3a147cbbce6b6c (patch) | |
tree | 2dcb5d040800b7696d5fc1a93519e381d2ffd9b2 | |
parent | 173231d46c14625013660db8420d7cdf224bb521 (diff) | |
download | radicle-ci-broker-23be5b2133a904b4cde3fe8c1f3a147cbbce6b6c.tar.gz |
feat! make list of CI runs in broker be persistent
This means they won't vanish when the broker is restarted.
BREAKING CHANGE: the configuration files MUST now have a field "db"
that is a path to the database file to use. The file will be created
if needed.
Signed-off-by: Lars Wirzenius <liw@liw.fi>
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/bin/ci-broker.rs | 8 | ||||
-rw-r--r-- | src/bin/list_runs.rs | 30 | ||||
-rw-r--r-- | src/broker.rs | 63 | ||||
-rw-r--r-- | src/config.rs | 5 | ||||
-rw-r--r-- | src/db.rs | 148 | ||||
-rw-r--r-- | src/error.rs | 5 | ||||
-rw-r--r-- | src/lib.rs | 1 | ||||
-rw-r--r-- | src/pages.rs | 15 |
10 files changed, 264 insertions, 13 deletions
@@ -1169,6 +1169,7 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "sqlite", "tempfile", "thiserror", "time", @@ -16,6 +16,7 @@ radicle-surf = { version = "0.18.0", default-features = false, features = ["serd uuid = { version = "1.7.0", features = ["v4"] } time = { version = "0.3.34", features = ["formatting", "macros"] } html-page = "0.1.0" +sqlite = "0.32.0" [dependencies.radicle] git = "https://seed.radicle.xyz/z3gqcJUoA1n9HaHKufZs5FCSGazv5.git" diff --git a/src/bin/ci-broker.rs b/src/bin/ci-broker.rs index d8cf766..f8f4da0 100644 --- a/src/bin/ci-broker.rs +++ b/src/bin/ci-broker.rs @@ -43,8 +43,11 @@ fn fallible_main() -> Result<(), BrokerError> { let config = Config::load(&filename)?; debug!("loaded configuration: {:#?}", config); - let mut broker = Broker::default(); - debug!("created broker"); + let mut broker = Broker::new(config.db())?; + debug!( + "created broker, db has {} CI runs", + broker.all_runs()?.len() + ); // FIXME: this is broken. the config file only lists how to invoke // each adapter, not what adapter to use for each repo. @@ -80,6 +83,7 @@ fn fallible_main() -> Result<(), BrokerError> { // Spawn a thread that updates the status pages. let mut page = PageBuilder::default() .node_alias(&profile.config.node.alias) + .runs(broker.all_runs()?) .build()?; let page2 = page.clone(); let report_dir = if let Some(dir) = &config.report_dir { diff --git a/src/bin/list_runs.rs b/src/bin/list_runs.rs new file mode 100644 index 0000000..cd39ff2 --- /dev/null +++ b/src/bin/list_runs.rs @@ -0,0 +1,30 @@ +use std::{error::Error, path::PathBuf}; + +use radicle_ci_broker::{db::Db, error::BrokerError}; + +fn main() { + if let Err(e) = fallible_main() { + eprintln!("ERROR: {}", e); + let mut e = e.source(); + while let Some(source) = e { + eprintln!("caused by: {}", source); + e = source.source(); + } + } +} + +fn fallible_main() -> Result<(), BrokerError> { + let mut args = std::env::args().skip(1); + let filename: PathBuf = if let Some(filename) = args.next() { + PathBuf::from(filename) + } else { + return Err(BrokerError::Usage); + }; + + let mut db = Db::new(&filename)?; + for run in db.all_runs()? { + println!("{run:#?}"); + } + + Ok(()) +} diff --git a/src/broker.rs b/src/broker.rs index 05cbd1c..4b4381d 100644 --- a/src/broker.rs +++ b/src/broker.rs @@ -3,14 +3,16 @@ //! This is type and module of its own to facilitate automated //! testing. -use std::collections::HashMap; +use std::{collections::HashMap, path::Path}; +use log::debug; use time::{macros::format_description, OffsetDateTime}; use radicle::prelude::RepoId; use crate::{ adapter::Adapter, + db::Db, error::BrokerError, msg::{PatchEvent, PushEvent, Request}, pages::StatusPage, @@ -22,13 +24,29 @@ use crate::{ /// The broker gets repository change events from the local Radicle /// node, and executes the appropriate adapter to run CI on the /// repository. -#[derive(Debug, Default)] +#[derive(Debug)] pub struct Broker { default_adapter: Option<Adapter>, adapters: HashMap<RepoId, Adapter>, + db: Db, } impl Broker { + #[allow(clippy::result_large_err)] + pub fn new(db_filename: &Path) -> Result<Self, BrokerError> { + debug!("broker database in {}", db_filename.display()); + Ok(Self { + default_adapter: None, + adapters: HashMap::new(), + db: Db::new(db_filename)?, + }) + } + + #[allow(clippy::result_large_err)] + pub fn all_runs(&mut self) -> Result<Vec<Run>, BrokerError> { + self.db.all_runs().map_err(BrokerError::Db) + } + pub fn set_default_adapter(&mut self, adapter: &Adapter) { self.default_adapter = Some(adapter.clone()); } @@ -47,7 +65,7 @@ impl Broker { #[allow(clippy::result_large_err)] pub fn execute_ci( - &self, + &mut self, trigger: &Request, status: &mut StatusPage, ) -> Result<Run, BrokerError> { @@ -82,6 +100,7 @@ impl Broker { } } }; + self.db.push_run(&run)?; Ok(run) } @@ -94,6 +113,7 @@ fn now() -> String { #[cfg(test)] mod test { + use std::path::Path; use tempfile::tempdir; use super::{Adapter, Broker, RepoId}; @@ -104,6 +124,10 @@ mod test { test::{mock_adapter, trigger_request, TestResult}, }; + fn broker(filename: &Path) -> Broker { + Broker::new(filename).unwrap() + } + fn rid() -> RepoId { const RID: &str = "rad:zwTxygwuz5LDGBq255RA2CbNGrz8"; RepoId::from_urn(RID).unwrap() @@ -123,7 +147,9 @@ mod test { #[test] fn has_no_adapters_initially() -> TestResult<()> { - let broker = Broker::default(); + let tmp = tempdir().unwrap(); + let db = tmp.path().join("db.db"); + let broker = broker(&db); let rid = rid(); assert_eq!(broker.adapter(&rid), None); Ok(()) @@ -131,7 +157,10 @@ mod test { #[test] fn adds_adapter() -> TestResult<()> { - let mut broker = Broker::default(); + let tmp = tempdir().unwrap(); + let db = tmp.path().join("db.db"); + let mut broker = broker(&db); + let adapter = Adapter::default(); let rid = rid(); broker.set_repository_adapter(&rid, &adapter); @@ -141,7 +170,10 @@ mod test { #[test] fn does_not_find_unknown_repo() -> TestResult<()> { - let mut broker = Broker::default(); + let tmp = tempdir().unwrap(); + let db = tmp.path().join("db.db"); + let mut broker = broker(&db); + let adapter = Adapter::default(); let rid = rid(); let rid2 = rid2(); @@ -152,14 +184,20 @@ mod test { #[test] fn does_not_have_a_default_adapter_initially() -> TestResult<()> { - let broker = Broker::default(); + let tmp = tempdir().unwrap(); + let db = tmp.path().join("db.db"); + let broker = broker(&db); + assert_eq!(broker.default_adapter(), None); Ok(()) } #[test] fn sets_a_default_adapter_initially() -> TestResult<()> { - let mut broker = Broker::default(); + let tmp = tempdir().unwrap(); + let db = tmp.path().join("db.db"); + let mut broker = broker(&db); + let adapter = Adapter::default(); broker.set_default_adapter(&adapter); assert_eq!(broker.default_adapter(), Some(&adapter)); @@ -168,7 +206,10 @@ mod test { #[test] fn finds_default_adapter_for_unknown_repo() -> TestResult<()> { - let mut broker = Broker::default(); + let tmp = tempdir().unwrap(); + let db = tmp.path().join("db.db"); + let mut broker = broker(&db); + let adapter = Adapter::default(); broker.set_default_adapter(&adapter); @@ -188,7 +229,9 @@ echo '{"response":"finished","result":"success"}' let bin = tmp.path().join("adapter.sh"); let adapter = mock_adapter(&bin, ADAPTER)?; - let mut broker = Broker::default(); + let tmp = tempdir().unwrap(); + let db = tmp.path().join("db.db"); + let mut broker = broker(&db); broker.set_default_adapter(&adapter); let trigger = trigger_request()?; diff --git a/src/config.rs b/src/config.rs index 3823a9b..25d0bd7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -18,6 +18,7 @@ pub struct Config { pub filters: Vec<EventFilter>, pub report_dir: Option<PathBuf>, pub status_update_interval_seconds: Option<u64>, + pub db: PathBuf, } impl Config { @@ -35,6 +36,10 @@ impl Config { self.status_update_interval_seconds .unwrap_or(DEFAULT_STATUS_PAGE_UPDATE_INTERVAL) } + + pub fn db(&self) -> &Path { + &self.db + } } #[derive(Debug, Serialize, Deserialize)] diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..b190828 --- /dev/null +++ b/src/db.rs @@ -0,0 +1,148 @@ +//! Persistent database for CI run information. + +use std::{ + fmt, + path::{Path, PathBuf}, +}; + +use sqlite::{Connection, State}; + +use crate::run::Run; + +const CREATE_TABLES: &str = + "CREATE TABLE IF NOT EXISTS ci_runs (run_id TEXT PRIMARY KEY, json TEXT)"; + +const INSERT_ROW: &str = "INSERT OR REPLACE INTO ci_runs (run_id, json) VALUES (:id, :json)"; + +const ALL_RUNS: &str = "SELECT json FROM ci_runs"; + +pub struct Db { + filename: PathBuf, + conn: Connection, +} + +impl fmt::Debug for Db { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + write!(f, "<Db:{}>", self.filename.display()) + } +} + +impl Db { + pub fn new(filename: &Path) -> Result<Self, DbError> { + eprintln!("open {}", filename.display()); + let conn = sqlite::open(filename).map_err(|e| DbError::open(filename, e))?; + + eprintln!("create tables"); + conn.execute(CREATE_TABLES) + .map_err(|e| DbError::create_tables(CREATE_TABLES, e))?; + + Ok(Db { + conn, + filename: filename.into(), + }) + } + + pub fn push_run(&mut self, run: &Run) -> Result<(), DbError> { + let json = serde_json::to_string(&run).map_err(DbError::to_json)?; + + let mut stmt = self + .conn + .prepare(INSERT_ROW) + .map_err(|e| DbError::prepare(INSERT_ROW, e))?; + + let run_id = format!("{}", run.adapter_run_id().unwrap()); + stmt.bind((":id", run_id.as_str())) + .map_err(|e| DbError::bind(":id", e))?; + stmt.bind((":json", json.as_str())) + .map_err(|e| DbError::bind(":json", e))?; + stmt.next().map_err(DbError::insert_run)?; + + Ok(()) + } + + pub fn all_runs(&mut self) -> Result<Vec<Run>, DbError> { + let mut stmt = self + .conn + .prepare(ALL_RUNS) + .map_err(|e| DbError::prepare(ALL_RUNS, e))?; + + let mut runs = vec![]; + while let Ok(State::Row) = stmt.next() { + let json: String = stmt.read("json").map_err(DbError::get_run)?; + let run: Run = serde_json::from_str(&json).map_err(DbError::from_json)?; + runs.push(run); + } + + Ok(runs) + } +} + +/// All errors from this module. +#[derive(Debug, thiserror::Error)] +pub enum DbError { + /// Error opening a database file. + #[error("failed to open SQLite database {0}")] + Open(PathBuf, #[source] sqlite::Error), + + /// Error creating tables. + #[error("failed to create tables: {0}")] + CreateTables(&'static str, #[source] sqlite::Error), + + /// Error preparing an SQL statement. + #[error("failed to prepare SQL statement {0}")] + Prepare(&'static str, #[source] sqlite::Error), + + /// Error binding a value to an SQL statement placeholder. + #[error("failed to bind a value to SQL statement placeholder {0}")] + Bind(&'static str, #[source] sqlite::Error), + + /// Error inserting or updating a run in SQL database. + #[error("failed to insert or update a run in SQL database")] + InsertRun(#[source] sqlite::Error), + + /// Error getting a run from SQL query. + #[error("failed to get CI run from SQL query result")] + GetRun(#[source] sqlite::Error), + + /// Error serializing a [`Run`]` into a string. + #[error("failed to serialize a CI run into JSON")] + ToJson(#[source] serde_json::Error), + + /// Error deserializing a [`Run`]` from a string. + #[error("failed to parse JSON as a CI run")] + FromJson(#[source] serde_json::Error), +} + +impl DbError { + fn open(filename: &Path, e: sqlite::Error) -> Self { + Self::Open(filename.into(), e) + } + + fn create_tables(query: &'static str, e: sqlite::Error) -> Self { + Self::CreateTables(query, e) + } + + fn prepare(stmt: &'static str, e: sqlite::Error) -> Self { + Self::Prepare(stmt, e) + } + + fn bind(placeholder: &'static str, e: sqlite::Error) -> Self { + Self::Bind(placeholder, e) + } + + fn insert_run(e: sqlite::Error) -> Self { + Self::InsertRun(e) + } + + fn to_json(e: serde_json::Error) -> Self { + Self::ToJson(e) + } + + fn from_json(e: serde_json::Error) -> Self { + Self::FromJson(e) + } + + fn get_run(e: sqlite::Error) -> Self { + Self::GetRun(e) + } +} diff --git a/src/error.rs b/src/error.rs index 2c4c1a1..fee19e0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -11,6 +11,7 @@ use radicle::prelude::RepoId; use crate::{ adapter::AdapterError, config::ConfigError, + db::DbError, msg::{MessageError, Request}, pages::PageError, }; @@ -66,4 +67,8 @@ pub enum BrokerError { /// Status page error. #[error(transparent)] StatusPage(#[from] PageError), + + /// Database error. + #[error(transparent)] + Db(#[from] DbError), } @@ -8,6 +8,7 @@ pub mod adapter; pub mod broker; pub mod config; +pub mod db; pub mod error; pub mod event; pub mod msg; diff --git a/src/pages.rs b/src/pages.rs index ed998d2..cd3906e 100644 --- a/src/pages.rs +++ b/src/pages.rs @@ -15,6 +15,7 @@ use std::{ }; use html_page::{Document, Element, Tag}; +use log::info; use serde::Serialize; use time::{macros::format_description, OffsetDateTime}; @@ -46,6 +47,7 @@ pub enum PageError { #[derive(Default)] pub struct PageBuilder { node_alias: Option<String>, + runs: Vec<Run>, } impl PageBuilder { @@ -54,13 +56,24 @@ impl PageBuilder { self } + pub fn runs(mut self, runs: Vec<Run>) -> Self { + self.runs = runs; + self + } + pub fn build(self) -> Result<StatusPage, PageError> { + let mut runs = HashMap::new(); + for run in self.runs.iter() { + runs.insert(run.adapter_run_id().unwrap().clone(), run.clone()); + } + info!("broker database had {} CI runs", runs.len()); + Ok(StatusPage::new(PageData { timestamp: now(), ci_broker_version: env!("CARGO_PKG_VERSION"), ci_broker_git_commit: env!("GIT_HEAD"), node_alias: self.node_alias.ok_or(PageError::NoAlias)?, - runs: HashMap::new(), + runs, broker_event_counter: 0, latest_broker_event: None, latest_ci_run: None, |