summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2024-03-06 17:49:24 +0200
committerLars Wirzenius <liw@liw.fi>2024-03-06 19:36:26 +0200
commit23be5b2133a904b4cde3fe8c1f3a147cbbce6b6c (patch)
tree2dcb5d040800b7696d5fc1a93519e381d2ffd9b2
parent173231d46c14625013660db8420d7cdf224bb521 (diff)
downloadradicle-ci-broker-23be5b2133a904b4cde3fe8c1f3a147cbbce6b6c.tar.gz
feat! make list of CI runs in broker be persistent
This means they won't vanish when the broker is restarted. BREAKING CHANGE: the configuration files MUST now have a field "db" that is a path to the database file to use. The file will be created if needed. Signed-off-by: Lars Wirzenius <liw@liw.fi>
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--src/bin/ci-broker.rs8
-rw-r--r--src/bin/list_runs.rs30
-rw-r--r--src/broker.rs63
-rw-r--r--src/config.rs5
-rw-r--r--src/db.rs148
-rw-r--r--src/error.rs5
-rw-r--r--src/lib.rs1
-rw-r--r--src/pages.rs15
10 files changed, 264 insertions, 13 deletions
diff --git a/Cargo.lock b/Cargo.lock
index a8f6e2d..10f1636 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1169,6 +1169,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
+ "sqlite",
"tempfile",
"thiserror",
"time",
diff --git a/Cargo.toml b/Cargo.toml
index c67944b..8100c78 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,6 +16,7 @@ radicle-surf = { version = "0.18.0", default-features = false, features = ["serd
uuid = { version = "1.7.0", features = ["v4"] }
time = { version = "0.3.34", features = ["formatting", "macros"] }
html-page = "0.1.0"
+sqlite = "0.32.0"
[dependencies.radicle]
git = "https://seed.radicle.xyz/z3gqcJUoA1n9HaHKufZs5FCSGazv5.git"
diff --git a/src/bin/ci-broker.rs b/src/bin/ci-broker.rs
index d8cf766..f8f4da0 100644
--- a/src/bin/ci-broker.rs
+++ b/src/bin/ci-broker.rs
@@ -43,8 +43,11 @@ fn fallible_main() -> Result<(), BrokerError> {
let config = Config::load(&filename)?;
debug!("loaded configuration: {:#?}", config);
- let mut broker = Broker::default();
- debug!("created broker");
+ let mut broker = Broker::new(config.db())?;
+ debug!(
+ "created broker, db has {} CI runs",
+ broker.all_runs()?.len()
+ );
// FIXME: this is broken. the config file only lists how to invoke
// each adapter, not what adapter to use for each repo.
@@ -80,6 +83,7 @@ fn fallible_main() -> Result<(), BrokerError> {
// Spawn a thread that updates the status pages.
let mut page = PageBuilder::default()
.node_alias(&profile.config.node.alias)
+ .runs(broker.all_runs()?)
.build()?;
let page2 = page.clone();
let report_dir = if let Some(dir) = &config.report_dir {
diff --git a/src/bin/list_runs.rs b/src/bin/list_runs.rs
new file mode 100644
index 0000000..cd39ff2
--- /dev/null
+++ b/src/bin/list_runs.rs
@@ -0,0 +1,30 @@
+use std::{error::Error, path::PathBuf};
+
+use radicle_ci_broker::{db::Db, error::BrokerError};
+
+fn main() {
+ if let Err(e) = fallible_main() {
+ eprintln!("ERROR: {}", e);
+ let mut e = e.source();
+ while let Some(source) = e {
+ eprintln!("caused by: {}", source);
+ e = source.source();
+ }
+ }
+}
+
+fn fallible_main() -> Result<(), BrokerError> {
+ let mut args = std::env::args().skip(1);
+ let filename: PathBuf = if let Some(filename) = args.next() {
+ PathBuf::from(filename)
+ } else {
+ return Err(BrokerError::Usage);
+ };
+
+ let mut db = Db::new(&filename)?;
+ for run in db.all_runs()? {
+ println!("{run:#?}");
+ }
+
+ Ok(())
+}
diff --git a/src/broker.rs b/src/broker.rs
index 05cbd1c..4b4381d 100644
--- a/src/broker.rs
+++ b/src/broker.rs
@@ -3,14 +3,16 @@
//! This is type and module of its own to facilitate automated
//! testing.
-use std::collections::HashMap;
+use std::{collections::HashMap, path::Path};
+use log::debug;
use time::{macros::format_description, OffsetDateTime};
use radicle::prelude::RepoId;
use crate::{
adapter::Adapter,
+ db::Db,
error::BrokerError,
msg::{PatchEvent, PushEvent, Request},
pages::StatusPage,
@@ -22,13 +24,29 @@ use crate::{
/// The broker gets repository change events from the local Radicle
/// node, and executes the appropriate adapter to run CI on the
/// repository.
-#[derive(Debug, Default)]
+#[derive(Debug)]
pub struct Broker {
default_adapter: Option<Adapter>,
adapters: HashMap<RepoId, Adapter>,
+ db: Db,
}
impl Broker {
+ #[allow(clippy::result_large_err)]
+ pub fn new(db_filename: &Path) -> Result<Self, BrokerError> {
+ debug!("broker database in {}", db_filename.display());
+ Ok(Self {
+ default_adapter: None,
+ adapters: HashMap::new(),
+ db: Db::new(db_filename)?,
+ })
+ }
+
+ #[allow(clippy::result_large_err)]
+ pub fn all_runs(&mut self) -> Result<Vec<Run>, BrokerError> {
+ self.db.all_runs().map_err(BrokerError::Db)
+ }
+
pub fn set_default_adapter(&mut self, adapter: &Adapter) {
self.default_adapter = Some(adapter.clone());
}
@@ -47,7 +65,7 @@ impl Broker {
#[allow(clippy::result_large_err)]
pub fn execute_ci(
- &self,
+ &mut self,
trigger: &Request,
status: &mut StatusPage,
) -> Result<Run, BrokerError> {
@@ -82,6 +100,7 @@ impl Broker {
}
}
};
+ self.db.push_run(&run)?;
Ok(run)
}
@@ -94,6 +113,7 @@ fn now() -> String {
#[cfg(test)]
mod test {
+ use std::path::Path;
use tempfile::tempdir;
use super::{Adapter, Broker, RepoId};
@@ -104,6 +124,10 @@ mod test {
test::{mock_adapter, trigger_request, TestResult},
};
+ fn broker(filename: &Path) -> Broker {
+ Broker::new(filename).unwrap()
+ }
+
fn rid() -> RepoId {
const RID: &str = "rad:zwTxygwuz5LDGBq255RA2CbNGrz8";
RepoId::from_urn(RID).unwrap()
@@ -123,7 +147,9 @@ mod test {
#[test]
fn has_no_adapters_initially() -> TestResult<()> {
- let broker = Broker::default();
+ let tmp = tempdir().unwrap();
+ let db = tmp.path().join("db.db");
+ let broker = broker(&db);
let rid = rid();
assert_eq!(broker.adapter(&rid), None);
Ok(())
@@ -131,7 +157,10 @@ mod test {
#[test]
fn adds_adapter() -> TestResult<()> {
- let mut broker = Broker::default();
+ let tmp = tempdir().unwrap();
+ let db = tmp.path().join("db.db");
+ let mut broker = broker(&db);
+
let adapter = Adapter::default();
let rid = rid();
broker.set_repository_adapter(&rid, &adapter);
@@ -141,7 +170,10 @@ mod test {
#[test]
fn does_not_find_unknown_repo() -> TestResult<()> {
- let mut broker = Broker::default();
+ let tmp = tempdir().unwrap();
+ let db = tmp.path().join("db.db");
+ let mut broker = broker(&db);
+
let adapter = Adapter::default();
let rid = rid();
let rid2 = rid2();
@@ -152,14 +184,20 @@ mod test {
#[test]
fn does_not_have_a_default_adapter_initially() -> TestResult<()> {
- let broker = Broker::default();
+ let tmp = tempdir().unwrap();
+ let db = tmp.path().join("db.db");
+ let broker = broker(&db);
+
assert_eq!(broker.default_adapter(), None);
Ok(())
}
#[test]
fn sets_a_default_adapter_initially() -> TestResult<()> {
- let mut broker = Broker::default();
+ let tmp = tempdir().unwrap();
+ let db = tmp.path().join("db.db");
+ let mut broker = broker(&db);
+
let adapter = Adapter::default();
broker.set_default_adapter(&adapter);
assert_eq!(broker.default_adapter(), Some(&adapter));
@@ -168,7 +206,10 @@ mod test {
#[test]
fn finds_default_adapter_for_unknown_repo() -> TestResult<()> {
- let mut broker = Broker::default();
+ let tmp = tempdir().unwrap();
+ let db = tmp.path().join("db.db");
+ let mut broker = broker(&db);
+
let adapter = Adapter::default();
broker.set_default_adapter(&adapter);
@@ -188,7 +229,9 @@ echo '{"response":"finished","result":"success"}'
let bin = tmp.path().join("adapter.sh");
let adapter = mock_adapter(&bin, ADAPTER)?;
- let mut broker = Broker::default();
+ let tmp = tempdir().unwrap();
+ let db = tmp.path().join("db.db");
+ let mut broker = broker(&db);
broker.set_default_adapter(&adapter);
let trigger = trigger_request()?;
diff --git a/src/config.rs b/src/config.rs
index 3823a9b..25d0bd7 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -18,6 +18,7 @@ pub struct Config {
pub filters: Vec<EventFilter>,
pub report_dir: Option<PathBuf>,
pub status_update_interval_seconds: Option<u64>,
+ pub db: PathBuf,
}
impl Config {
@@ -35,6 +36,10 @@ impl Config {
self.status_update_interval_seconds
.unwrap_or(DEFAULT_STATUS_PAGE_UPDATE_INTERVAL)
}
+
+ pub fn db(&self) -> &Path {
+ &self.db
+ }
}
#[derive(Debug, Serialize, Deserialize)]
diff --git a/src/db.rs b/src/db.rs
new file mode 100644
index 0000000..b190828
--- /dev/null
+++ b/src/db.rs
@@ -0,0 +1,148 @@
+//! Persistent database for CI run information.
+
+use std::{
+ fmt,
+ path::{Path, PathBuf},
+};
+
+use sqlite::{Connection, State};
+
+use crate::run::Run;
+
+const CREATE_TABLES: &str =
+ "CREATE TABLE IF NOT EXISTS ci_runs (run_id TEXT PRIMARY KEY, json TEXT)";
+
+const INSERT_ROW: &str = "INSERT OR REPLACE INTO ci_runs (run_id, json) VALUES (:id, :json)";
+
+const ALL_RUNS: &str = "SELECT json FROM ci_runs";
+
+pub struct Db {
+ filename: PathBuf,
+ conn: Connection,
+}
+
+impl fmt::Debug for Db {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
+ write!(f, "<Db:{}>", self.filename.display())
+ }
+}
+
+impl Db {
+ pub fn new(filename: &Path) -> Result<Self, DbError> {
+ eprintln!("open {}", filename.display());
+ let conn = sqlite::open(filename).map_err(|e| DbError::open(filename, e))?;
+
+ eprintln!("create tables");
+ conn.execute(CREATE_TABLES)
+ .map_err(|e| DbError::create_tables(CREATE_TABLES, e))?;
+
+ Ok(Db {
+ conn,
+ filename: filename.into(),
+ })
+ }
+
+ pub fn push_run(&mut self, run: &Run) -> Result<(), DbError> {
+ let json = serde_json::to_string(&run).map_err(DbError::to_json)?;
+
+ let mut stmt = self
+ .conn
+ .prepare(INSERT_ROW)
+ .map_err(|e| DbError::prepare(INSERT_ROW, e))?;
+
+ let run_id = format!("{}", run.adapter_run_id().unwrap());
+ stmt.bind((":id", run_id.as_str()))
+ .map_err(|e| DbError::bind(":id", e))?;
+ stmt.bind((":json", json.as_str()))
+ .map_err(|e| DbError::bind(":json", e))?;
+ stmt.next().map_err(DbError::insert_run)?;
+
+ Ok(())
+ }
+
+ pub fn all_runs(&mut self) -> Result<Vec<Run>, DbError> {
+ let mut stmt = self
+ .conn
+ .prepare(ALL_RUNS)
+ .map_err(|e| DbError::prepare(ALL_RUNS, e))?;
+
+ let mut runs = vec![];
+ while let Ok(State::Row) = stmt.next() {
+ let json: String = stmt.read("json").map_err(DbError::get_run)?;
+ let run: Run = serde_json::from_str(&json).map_err(DbError::from_json)?;
+ runs.push(run);
+ }
+
+ Ok(runs)
+ }
+}
+
+/// All errors from this module.
+#[derive(Debug, thiserror::Error)]
+pub enum DbError {
+ /// Error opening a database file.
+ #[error("failed to open SQLite database {0}")]
+ Open(PathBuf, #[source] sqlite::Error),
+
+ /// Error creating tables.
+ #[error("failed to create tables: {0}")]
+ CreateTables(&'static str, #[source] sqlite::Error),
+
+ /// Error preparing an SQL statement.
+ #[error("failed to prepare SQL statement {0}")]
+ Prepare(&'static str, #[source] sqlite::Error),
+
+ /// Error binding a value to an SQL statement placeholder.
+ #[error("failed to bind a value to SQL statement placeholder {0}")]
+ Bind(&'static str, #[source] sqlite::Error),
+
+ /// Error inserting or updating a run in SQL database.
+ #[error("failed to insert or update a run in SQL database")]
+ InsertRun(#[source] sqlite::Error),
+
+ /// Error getting a run from SQL query.
+ #[error("failed to get CI run from SQL query result")]
+ GetRun(#[source] sqlite::Error),
+
+ /// Error serializing a [`Run`]` into a string.
+ #[error("failed to serialize a CI run into JSON")]
+ ToJson(#[source] serde_json::Error),
+
+ /// Error deserializing a [`Run`]` from a string.
+ #[error("failed to parse JSON as a CI run")]
+ FromJson(#[source] serde_json::Error),
+}
+
+impl DbError {
+ fn open(filename: &Path, e: sqlite::Error) -> Self {
+ Self::Open(filename.into(), e)
+ }
+
+ fn create_tables(query: &'static str, e: sqlite::Error) -> Self {
+ Self::CreateTables(query, e)
+ }
+
+ fn prepare(stmt: &'static str, e: sqlite::Error) -> Self {
+ Self::Prepare(stmt, e)
+ }
+
+ fn bind(placeholder: &'static str, e: sqlite::Error) -> Self {
+ Self::Bind(placeholder, e)
+ }
+
+ fn insert_run(e: sqlite::Error) -> Self {
+ Self::InsertRun(e)
+ }
+
+ fn to_json(e: serde_json::Error) -> Self {
+ Self::ToJson(e)
+ }
+
+ fn from_json(e: serde_json::Error) -> Self {
+ Self::FromJson(e)
+ }
+
+ fn get_run(e: sqlite::Error) -> Self {
+ Self::GetRun(e)
+ }
+}
diff --git a/src/error.rs b/src/error.rs
index 2c4c1a1..fee19e0 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -11,6 +11,7 @@ use radicle::prelude::RepoId;
use crate::{
adapter::AdapterError,
config::ConfigError,
+ db::DbError,
msg::{MessageError, Request},
pages::PageError,
};
@@ -66,4 +67,8 @@ pub enum BrokerError {
/// Status page error.
#[error(transparent)]
StatusPage(#[from] PageError),
+
+ /// Database error.
+ #[error(transparent)]
+ Db(#[from] DbError),
}
diff --git a/src/lib.rs b/src/lib.rs
index 2855d5a..2a27927 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -8,6 +8,7 @@
pub mod adapter;
pub mod broker;
pub mod config;
+pub mod db;
pub mod error;
pub mod event;
pub mod msg;
diff --git a/src/pages.rs b/src/pages.rs
index ed998d2..cd3906e 100644
--- a/src/pages.rs
+++ b/src/pages.rs
@@ -15,6 +15,7 @@ use std::{
};
use html_page::{Document, Element, Tag};
+use log::info;
use serde::Serialize;
use time::{macros::format_description, OffsetDateTime};
@@ -46,6 +47,7 @@ pub enum PageError {
#[derive(Default)]
pub struct PageBuilder {
node_alias: Option<String>,
+ runs: Vec<Run>,
}
impl PageBuilder {
@@ -54,13 +56,24 @@ impl PageBuilder {
self
}
+ pub fn runs(mut self, runs: Vec<Run>) -> Self {
+ self.runs = runs;
+ self
+ }
+
pub fn build(self) -> Result<StatusPage, PageError> {
+ let mut runs = HashMap::new();
+ for run in self.runs.iter() {
+ runs.insert(run.adapter_run_id().unwrap().clone(), run.clone());
+ }
+ info!("broker database had {} CI runs", runs.len());
+
Ok(StatusPage::new(PageData {
timestamp: now(),
ci_broker_version: env!("CARGO_PKG_VERSION"),
ci_broker_git_commit: env!("GIT_HEAD"),
node_alias: self.node_alias.ok_or(PageError::NoAlias)?,
- runs: HashMap::new(),
+ runs,
broker_event_counter: 0,
latest_broker_event: None,
latest_ci_run: None,