diff options
-rw-r--r-- | Cargo.lock | 32 | ||||
-rw-r--r-- | Cargo.toml | 7 | ||||
-rwxr-xr-x | setup-other-node.sh | 27 | ||||
-rw-r--r-- | src/lib.rs | 345 | ||||
-rw-r--r-- | src/main.rs | 148 |
5 files changed, 487 insertions, 72 deletions
@@ -303,8 +303,13 @@ dependencies = [ "log", "pretty_env_logger", "radicle", + "radicle-git-ext", "radicle-term", "reqwest", + "serde", + "serde_json", + "serde_yaml", + "thiserror", "tokio", ] @@ -1906,18 +1911,18 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.192" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" +checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.192" +version = "1.0.193" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" +checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", @@ -1959,6 +1964,19 @@ dependencies = [ ] [[package]] +name = "serde_yaml" +version = "0.9.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cc7a1570e38322cfe4154732e5110f887ea57e22b76f4bfd32b5bdd3368666c" +dependencies = [ + "indexmap 2.1.0", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + +[[package]] name = "sha2" version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2433,6 +2451,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" [[package]] +name = "unsafe-libyaml" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28467d3e1d3c6586d8f25fa243f544f5800fec42d97032474e17222c2b75cfa" + +[[package]] name = "url" version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -10,9 +10,14 @@ axum = "0.6.20" tokio = { version = "1.34.0", features = ["full"] } radicle = { path = "/home/liw/radicle/heartwood/radicle" } anyhow = "1.0.75" -reqwest = { version = "0.11.22", features = ["blocking"] } +reqwest = { version = "0.11.22", features = ["blocking", "json"] } log = "0.4.20" pretty_env_logger = "0.5.0" +radicle-git-ext = "0.6.0" +thiserror = "1.0.50" +serde = { version = "1.0.193", features = ["derive"] } +serde_yaml = "0.9.27" +serde_json = "1.0.108" [dependencies.radicle-term] version = "0" diff --git a/setup-other-node.sh b/setup-other-node.sh new file mode 100755 index 0000000..c4fd3a4 --- /dev/null +++ b/setup-other-node.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +set -euo pipefail + +if ! pgrep -f ssh-agent >/dev/null; then + echo "Start ssh-agent first:" + echo 'eval $(ssh-agent) && rad auth --alias liw-other' + exit 1 +fi + +rad node start -- --listen 0.0.0.0:8776 + +git init liw-test2 +cd liw-test2 +echo foo >foo +git add foo +git commit -m foo +rad init --name=liw-test2 \ + --no-confirm \ + --description=test \ + --public \ + --scope=all \ + --default-branch=main \ + --no-confirm \ + . + +rad . diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..efe294b --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,345 @@ +//! Help a Radicle node and a CI engine to communicate. An +//! implementation of the broker engine interface for native CI. + +use std::{ + fs::read, + path::{Path, PathBuf}, + time, +}; + +use log::{info, trace}; +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; + +use radicle_git_ext::{ref_format::RefString, Oid}; + +use radicle::{ + identity::Id, + node::{Event, Handle}, + storage::{git::Repository, RefUpdate}, + Profile, +}; + +/// Status of a CI run. +pub enum RunStatus { + /// A CI run has been triggered, but has not yet started running. + Triggered, + + /// CI is running. + Running, + + /// CI has finished. + Finished(FinishReason), +} + +/// Why did the CI run finish? +pub enum FinishReason { + Succeeded, + Failed, +} + +/// Interface for CI broker to access a CI engine. This needs to be +/// implemented for each type of CI engine that we want to support. +/// All engines will be accessed via the interface defined by this +/// trait. However, each CI engine value is created in a way specific +/// to that engine, and creation is not part of the trait interface. +pub trait CiEngine { + /// The type of errors returned for this CI engine. + type Error: std::fmt::Display; + + /// An identifier for a CI run on this engine. + type RunId: std::fmt::Display + std::fmt::Debug; + + /// Add a project to the engine. + fn add_repository(&mut self, repo: &Repository) -> Result<Id, Self::Error>; + + /// Trigger a run on the engine, on a given commit, on a + /// repository that has already been added to the CI engine. + fn trigger_run(&mut self, repo_id: &Id, commit: Oid) -> Result<Self::RunId, Self::Error>; + + /// Retrieve the result of a CI run. + fn run_result(&mut self, run_id: &Self::RunId) -> Result<RunStatus, Self::Error>; +} + +/// The kinds of CI engines supported by the broker. +pub enum EngineKind { + Native, +} + +/// The native CI engine of Radicle. It runs as a separate process, +/// and is controlled over HTTP using a base URL. +pub struct NativeCi { + url: String, +} + +impl NativeCi { + /// Create a new native CI engine value, for a given base URL. + pub fn new(url: &str) -> Self { + Self { url: url.into() } + } +} + +/// Possible errors for accessing the native CI engine. +#[derive(Debug, thiserror::Error)] +pub enum NativeError { + #[error("failed to parse JSON response from CI")] + Json(#[source] reqwest::Error), + #[error("failed to trigger CI run")] + Trigger, + #[error("failed to make HTTP request to CI")] + Http(#[source] reqwest::Error), + #[error("not implemented")] + Nope, +} + +impl CiEngine for NativeCi { + type Error = NativeError; + type RunId = String; + + fn add_repository(&mut self, repo: &Repository) -> Result<Id, Self::Error> { + println!("NativeCi::add_repository: repo={}", repo.id); + Err(NativeError::Nope) + } + + fn trigger_run(&mut self, repo_id: &Id, commit: Oid) -> Result<Self::RunId, Self::Error> { + println!("NativeCi::trigger_run: repo_id={repo_id:?} commit={commit:?}"); + let url = format!("{}/trigger/{}", self.url, commit); + let result = reqwest::blocking::get(&url); + match result { + Ok(response) => { + if response.status() == StatusCode::CREATED { + let run: Run = response.json().map_err(NativeError::Json)?; + info!("triggered CI run {}: {}", url, run.id); + Ok(run.id) + } else { + info!("failed to trigger CI run {}: {}", url, response.status()); + Err(NativeError::Trigger) + } + } + Err(e) => Err(NativeError::Http(e)), + } + } + + fn run_result(&mut self, run_id: &Self::RunId) -> Result<RunStatus, Self::Error> { + println!("NativeCi::run_result: run_id={run_id:?})"); + Ok(RunStatus::Finished(FinishReason::Succeeded)) + } +} + +/// FIXME. +#[derive(Deserialize)] +#[allow(dead_code)] +struct Run { + id: String, + commit: String, +} + +/// Possible errors from accessing the local Radicle node. +#[derive(Debug, thiserror::Error)] +pub enum NodeEventError { + #[error(transparent)] + Node(#[from] radicle::node::Error), + #[error(transparent)] + Id(#[from] radicle::identity::IdError), + #[error(transparent)] + Io(#[from] std::io::Error), + #[error(transparent)] + RefFormat(#[from] radicle_git_ext::ref_format::Error), + #[error("failed to read filter file: {0}")] + ReadFilterFile(PathBuf, #[source] std::io::Error), + #[error("failed to parser filters file: {0}")] + FiltersJson(PathBuf, #[source] serde_json::Error), +} + +/// Source of events from the local Radicle node. +pub struct NodeEventSource { + events: Box<dyn Iterator<Item = Result<Event, std::io::Error>>>, + allowed: Vec<EventFilter>, +} + +impl NodeEventSource { + /// Create a new source of node events, for a given Radicle + /// profile. + pub fn new(profile: Profile) -> Result<Self, NodeEventError> { + info!("subscribing to local node events"); + let node = radicle::Node::new(profile.socket()); + let events = node.subscribe(time::Duration::MAX)?; + trace!("subscribed OK"); + Ok(Self { + events, + allowed: vec![], + }) + } + + /// Add an event filter for allowed events for this event source. + pub fn allow(&mut self, filter: EventFilter) { + self.allowed.push(filter); + } + + fn allowed(&self, event: &BrokerEvent) -> bool { + for filter in self.allowed.iter() { + if !event.allowed(filter) { + return false; + } + } + true + } + + /// Get the allowed next event from an event source. This will + /// block until there is an allowed event, or until there will be + /// no more events from this source, or there's an error. + pub fn event(&mut self) -> Result<Vec<BrokerEvent>, NodeEventError> { + trace!("getting next event from local node"); + loop { + let next = self.events.next(); + if next.is_none() { + info!("no more events from node"); + return Ok(vec![]); + } + + let event = next.unwrap()?; + let mut result = vec![]; + if let Some(broker_events) = BrokerEvent::from_event(&event) { + for e in broker_events { + if self.allowed(&e) { + result.push(e); + } + } + return Ok(result); + } + + trace!("got event, but it was not allowed by filter, next event"); + } + } +} + +fn _event_summary(event: &Event) -> Option<String> { + match event { + Event::RefsFetched { .. } => Some("RefsFetched".into()), + _ => None, + } +} + +/// An event filter for allowing events. Or an "AND" combination of events. +/// +/// NOTE: Adding "OR" and "NOT" would be easy, too. +#[derive(Debug, Deserialize, Serialize)] +pub enum EventFilter { + /// Event concerns a specific repository. + Repository(Id), + + /// Event concerns a git ref that ends with a given string. FIXME: + /// this should really be a glob pattern. + Glob(String), + + /// Combine two filters that both must allow the events. FIXME: Is + /// two enough? Should this be a vector that all have to allow? + And(Box<Self>, Box<Self>), +} + +impl EventFilter { + /// Create a filter for a repository. + pub fn repository(rid: &str) -> Result<Self, NodeEventError> { + Ok(Self::Repository(Id::from_urn(rid)?)) + } + + /// Create a filter for a git ref that ends with a string. + pub fn glob(pattern: &str) -> Result<Self, NodeEventError> { + Ok(Self::Glob(pattern.into())) + } + + /// Create a filter combining two other filters. + pub fn and(a: Self, b: Self) -> Self { + Self::And(Box::new(a), Box::new(b)) + } + + /// Read filters from a JSON file. + /// + /// A file might look like this: + /// + /// ```json + /// { + /// "filters": [ + /// { + /// "And": [ + /// { + /// "Repository": "rad:z3bBRYgzcYYBNjipFdDTwPgHaihPX" + /// }, + /// { + /// "Glob": "refs/heads/main" + /// } + /// ] + /// } + /// ] + /// } + /// ``` + pub fn from_file(filename: &Path) -> Result<Vec<Self>, NodeEventError> { + let filters = + read(filename).map_err(|e| NodeEventError::ReadFilterFile(filename.into(), e))?; + let filters: Filters = serde_json::from_slice(&filters) + .map_err(|e| NodeEventError::FiltersJson(filename.into(), e))?; + eprintln!("FILTERS: {:#?}", filters); + Ok(filters.filters) + } +} + +// This is a helper type for reading filters from a JSON file. It +// represents the structure of the JSON, which is not just a list of +// filters. The top level "filters" field is added, to allow adding +// more fields later. +#[derive(Debug, Deserialize, Serialize)] +struct Filters { + filters: Vec<EventFilter>, +} + +/// A node event transmogrified into a form that's more easily +/// filtered in the broker. For now, we only care about events about +/// refs changing. +#[derive(Debug)] +pub enum BrokerEvent { + RefChanged { rid: Id, name: RefString, oid: Oid }, +} + +impl BrokerEvent { + fn new(rid: &Id, name: &RefString, oid: &Oid) -> Self { + Self::RefChanged { + rid: *rid, + name: name.clone(), + oid: *oid, + } + } + + fn from_event(e: &Event) -> Option<Vec<Self>> { + if let Event::RefsFetched { + remote: _, + rid, + updated, + } = e + { + let mut events = vec![]; + for up in updated { + match up { + RefUpdate::Created { name, oid } => { + events.push(Self::new(rid, name, oid)); + } + RefUpdate::Updated { name, old: _, new } => { + events.push(Self::new(rid, name, new)); + } + _ => (), + } + } + Some(events) + } else { + None + } + } + + fn allowed(&self, filter: &EventFilter) -> bool { + let Self::RefChanged { rid, name, oid: _ } = self; + match filter { + EventFilter::Repository(wanted) => rid == wanted, + EventFilter::Glob(wanted) => name.ends_with(wanted), + EventFilter::And(a, b) => self.allowed(a) && self.allowed(b), + } + } +} diff --git a/src/main.rs b/src/main.rs index 3b9eab2..c32d39c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,26 +1,53 @@ -// This is a simulation of the Radicle CI broker. It listens on node -// CI runs, and accepts requests when simulated runs have finished. -// events, and makes HTTP requests to the native CI simulator to start - -use std::{thread::spawn, time}; +// This is a prototype for a CI broker for Radicle. It listens on +// events on its local node, reacts to changes git refs that it's +// configured to accept. The reaction is to trigger a CI run on the +// native CI instance the broker is configured to use. It also awaits +// for the CI engine to tell, via an HTTP call, if the CI run +// succeeded or failed. This is meant to work with my native CI +// simulator, which pretends to run CI jobs, but only really randomly +// reports success or failure. +// +// To run: optionally give the `--url URL` option, to set the native +// CI engine base URL, and then zero or more JSON files that describe +// filters for events to allow. Note that if no filters are given, +// nothing is allowed. +// +// Run with `RUST_LOG=info` or `RUST_LOG=debug` to get useful output. +// +// ```sh +// RUST_LOG=debug cargo run -q -- filters.json +// ``` + +use std::{path::PathBuf, thread::spawn}; use axum::{extract::Path, http::StatusCode, response::IntoResponse, routing::get, Router}; use log::{debug, info}; -use radicle::node::{Event, Handle}; -use radicle::storage::RefUpdate; use radicle::Profile; -// const START: &str = "http://localhost:8000/runs"; -const RID: &str = "rad:z3cRFMy4yiiUFHJxco865TijwUaDk"; +use ci_broker_prototype::{BrokerEvent, CiEngine, EventFilter, NativeCi, NodeEventSource}; + +const DEFAULT_URL: &str = "http://localhost:8000"; #[tokio::main] async fn main() -> anyhow::Result<()> { pretty_env_logger::init(); info!("ci-broker-prototype starts"); + let args = Args::parse(); + debug!("command line args: {:#?}", args); + + let mut filters = vec![]; + for filename in args.filenames { + let mut more = EventFilter::from_file(std::path::Path::new(&filename))?; + filters.append(&mut more); + } + info!("node event filters: {:#?}", filters); + + let engine = NativeCi::new(&args.url); + let profile = Profile::load()?; - spawn(move || subscribe_to_node_events(profile).expect("node events")); + spawn(move || handle_node_events(profile, filters, engine).expect("node events")); debug!("started thread for receiving events from node"); let app = Router::new() @@ -36,6 +63,32 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +#[derive(Debug)] +struct Args { + url: String, + filenames: Vec<PathBuf>, +} + +impl Args { + fn parse() -> Args { + let mut args = Args { + url: DEFAULT_URL.into(), + filenames: vec![], + }; + + let mut argv: Vec<String> = std::env::args().skip(1).collect(); + if argv.len() >= 2 && argv[0] == "--url" { + args.url = argv[1].clone(); + argv.remove(0); // "--url" option + argv.remove(0); // option value + } + + args.filenames = argv.iter().map(PathBuf::from).collect(); + + args + } +} + async fn go_away() -> (StatusCode, impl IntoResponse) { info!("bad request"); (StatusCode::FORBIDDEN, ()) @@ -51,67 +104,28 @@ async fn failed(Path(id): Path<String>) -> (StatusCode, impl IntoResponse) { (StatusCode::OK, ()) } -fn subscribe_to_node_events(profile: Profile) -> anyhow::Result<()> { - let node = radicle::Node::new(profile.socket()); - let events = node.subscribe(time::Duration::MAX)?; - - for event in events { - let event = event?; - debug!("{:#?}", event); - - if let Event::RefsFetched { - remote: _, - rid, - updated, - } = event - { - debug!("repository {} updated refs", rid); - let rid = format!("{}", rid); - debug!("rid: {:?}", rid); - debug!("RID: {:?}", RID); - if rid == RID { - info!("repository is interesting; ref changes:"); - for u in updated { - match u { - RefUpdate::Created { name, oid } => { - info!("- new ref: {}", name); - info!(" {}", oid); - } - RefUpdate::Updated { name, old, new } => { - info!("- updated: {}", name); - info!(" {} -> {}", old, new); - } - RefUpdate::Deleted { name, oid: _ } => { - info!("- deleted: {}", name) - } - RefUpdate::Skipped { name, oid: _ } => { - info!("- up to date: {}", name) +fn handle_node_events( + profile: Profile, + filters: Vec<EventFilter>, + mut engine: impl CiEngine, +) -> anyhow::Result<()> { + let mut source = NodeEventSource::new(profile)?; + for filter in filters { + source.allow(filter); + } + + loop { + for e in source.event()? { + match e { + BrokerEvent::RefChanged { rid, name: _, oid } => { + match engine.trigger_run(&rid, oid) { + Err(e) => info!("failed to trigger CI run: {}", e), + Ok(run_id) => { + info!("triggered CI run: {}", run_id); } } } } } - - // #[allow(clippy::single_match)] - // match event { - // Event::RefsFetched { - // remote: _, - // rid: _, - // updated, - // } => { - // for refs in updated { - // match refs { - // RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => { - // println!("refs update: {}", name); - // let url = format!("{}/trigger/{}", START, "dummy-commit"); - // reqwest::blocking::get(&url).expect("trigger new run"); - // } - // _ => (), - // } - // } - // } - // _ => (), - // } } - Ok(()) } |