summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock32
-rw-r--r--Cargo.toml7
-rwxr-xr-xsetup-other-node.sh27
-rw-r--r--src/lib.rs345
-rw-r--r--src/main.rs148
5 files changed, 487 insertions, 72 deletions
diff --git a/Cargo.lock b/Cargo.lock
index dcbdc9a..76caa7c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -303,8 +303,13 @@ dependencies = [
"log",
"pretty_env_logger",
"radicle",
+ "radicle-git-ext",
"radicle-term",
"reqwest",
+ "serde",
+ "serde_json",
+ "serde_yaml",
+ "thiserror",
"tokio",
]
@@ -1906,18 +1911,18 @@ dependencies = [
[[package]]
name = "serde"
-version = "1.0.192"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001"
+checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
-version = "1.0.192"
+version = "1.0.193"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1"
+checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3"
dependencies = [
"proc-macro2",
"quote",
@@ -1959,6 +1964,19 @@ dependencies = [
]
[[package]]
+name = "serde_yaml"
+version = "0.9.27"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3cc7a1570e38322cfe4154732e5110f887ea57e22b76f4bfd32b5bdd3368666c"
+dependencies = [
+ "indexmap 2.1.0",
+ "itoa",
+ "ryu",
+ "serde",
+ "unsafe-libyaml",
+]
+
+[[package]]
name = "sha2"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2433,6 +2451,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85"
[[package]]
+name = "unsafe-libyaml"
+version = "0.2.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f28467d3e1d3c6586d8f25fa243f544f5800fec42d97032474e17222c2b75cfa"
+
+[[package]]
name = "url"
version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index dbbd440..670d119 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,9 +10,14 @@ axum = "0.6.20"
tokio = { version = "1.34.0", features = ["full"] }
radicle = { path = "/home/liw/radicle/heartwood/radicle" }
anyhow = "1.0.75"
-reqwest = { version = "0.11.22", features = ["blocking"] }
+reqwest = { version = "0.11.22", features = ["blocking", "json"] }
log = "0.4.20"
pretty_env_logger = "0.5.0"
+radicle-git-ext = "0.6.0"
+thiserror = "1.0.50"
+serde = { version = "1.0.193", features = ["derive"] }
+serde_yaml = "0.9.27"
+serde_json = "1.0.108"
[dependencies.radicle-term]
version = "0"
diff --git a/setup-other-node.sh b/setup-other-node.sh
new file mode 100755
index 0000000..c4fd3a4
--- /dev/null
+++ b/setup-other-node.sh
@@ -0,0 +1,27 @@
+#!/bin/bash
+
+set -euo pipefail
+
+if ! pgrep -f ssh-agent >/dev/null; then
+ echo "Start ssh-agent first:"
+ echo 'eval $(ssh-agent) && rad auth --alias liw-other'
+ exit 1
+fi
+
+rad node start -- --listen 0.0.0.0:8776
+
+git init liw-test2
+cd liw-test2
+echo foo >foo
+git add foo
+git commit -m foo
+rad init --name=liw-test2 \
+ --no-confirm \
+ --description=test \
+ --public \
+ --scope=all \
+ --default-branch=main \
+ --no-confirm \
+ .
+
+rad .
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..efe294b
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,345 @@
+//! Help a Radicle node and a CI engine to communicate. An
+//! implementation of the broker engine interface for native CI.
+
+use std::{
+ fs::read,
+ path::{Path, PathBuf},
+ time,
+};
+
+use log::{info, trace};
+use reqwest::StatusCode;
+use serde::{Deserialize, Serialize};
+
+use radicle_git_ext::{ref_format::RefString, Oid};
+
+use radicle::{
+ identity::Id,
+ node::{Event, Handle},
+ storage::{git::Repository, RefUpdate},
+ Profile,
+};
+
+/// Status of a CI run.
+pub enum RunStatus {
+ /// A CI run has been triggered, but has not yet started running.
+ Triggered,
+
+ /// CI is running.
+ Running,
+
+ /// CI has finished.
+ Finished(FinishReason),
+}
+
+/// Why did the CI run finish?
+pub enum FinishReason {
+ Succeeded,
+ Failed,
+}
+
+/// Interface for CI broker to access a CI engine. This needs to be
+/// implemented for each type of CI engine that we want to support.
+/// All engines will be accessed via the interface defined by this
+/// trait. However, each CI engine value is created in a way specific
+/// to that engine, and creation is not part of the trait interface.
+pub trait CiEngine {
+ /// The type of errors returned for this CI engine.
+ type Error: std::fmt::Display;
+
+ /// An identifier for a CI run on this engine.
+ type RunId: std::fmt::Display + std::fmt::Debug;
+
+ /// Add a project to the engine.
+ fn add_repository(&mut self, repo: &Repository) -> Result<Id, Self::Error>;
+
+ /// Trigger a run on the engine, on a given commit, on a
+ /// repository that has already been added to the CI engine.
+ fn trigger_run(&mut self, repo_id: &Id, commit: Oid) -> Result<Self::RunId, Self::Error>;
+
+ /// Retrieve the result of a CI run.
+ fn run_result(&mut self, run_id: &Self::RunId) -> Result<RunStatus, Self::Error>;
+}
+
+/// The kinds of CI engines supported by the broker.
+pub enum EngineKind {
+ Native,
+}
+
+/// The native CI engine of Radicle. It runs as a separate process,
+/// and is controlled over HTTP using a base URL.
+pub struct NativeCi {
+ url: String,
+}
+
+impl NativeCi {
+ /// Create a new native CI engine value, for a given base URL.
+ pub fn new(url: &str) -> Self {
+ Self { url: url.into() }
+ }
+}
+
+/// Possible errors for accessing the native CI engine.
+#[derive(Debug, thiserror::Error)]
+pub enum NativeError {
+ #[error("failed to parse JSON response from CI")]
+ Json(#[source] reqwest::Error),
+ #[error("failed to trigger CI run")]
+ Trigger,
+ #[error("failed to make HTTP request to CI")]
+ Http(#[source] reqwest::Error),
+ #[error("not implemented")]
+ Nope,
+}
+
+impl CiEngine for NativeCi {
+ type Error = NativeError;
+ type RunId = String;
+
+ fn add_repository(&mut self, repo: &Repository) -> Result<Id, Self::Error> {
+ println!("NativeCi::add_repository: repo={}", repo.id);
+ Err(NativeError::Nope)
+ }
+
+ fn trigger_run(&mut self, repo_id: &Id, commit: Oid) -> Result<Self::RunId, Self::Error> {
+ println!("NativeCi::trigger_run: repo_id={repo_id:?} commit={commit:?}");
+ let url = format!("{}/trigger/{}", self.url, commit);
+ let result = reqwest::blocking::get(&url);
+ match result {
+ Ok(response) => {
+ if response.status() == StatusCode::CREATED {
+ let run: Run = response.json().map_err(NativeError::Json)?;
+ info!("triggered CI run {}: {}", url, run.id);
+ Ok(run.id)
+ } else {
+ info!("failed to trigger CI run {}: {}", url, response.status());
+ Err(NativeError::Trigger)
+ }
+ }
+ Err(e) => Err(NativeError::Http(e)),
+ }
+ }
+
+ fn run_result(&mut self, run_id: &Self::RunId) -> Result<RunStatus, Self::Error> {
+ println!("NativeCi::run_result: run_id={run_id:?})");
+ Ok(RunStatus::Finished(FinishReason::Succeeded))
+ }
+}
+
+/// FIXME.
+#[derive(Deserialize)]
+#[allow(dead_code)]
+struct Run {
+ id: String,
+ commit: String,
+}
+
+/// Possible errors from accessing the local Radicle node.
+#[derive(Debug, thiserror::Error)]
+pub enum NodeEventError {
+ #[error(transparent)]
+ Node(#[from] radicle::node::Error),
+ #[error(transparent)]
+ Id(#[from] radicle::identity::IdError),
+ #[error(transparent)]
+ Io(#[from] std::io::Error),
+ #[error(transparent)]
+ RefFormat(#[from] radicle_git_ext::ref_format::Error),
+ #[error("failed to read filter file: {0}")]
+ ReadFilterFile(PathBuf, #[source] std::io::Error),
+ #[error("failed to parser filters file: {0}")]
+ FiltersJson(PathBuf, #[source] serde_json::Error),
+}
+
+/// Source of events from the local Radicle node.
+pub struct NodeEventSource {
+ events: Box<dyn Iterator<Item = Result<Event, std::io::Error>>>,
+ allowed: Vec<EventFilter>,
+}
+
+impl NodeEventSource {
+ /// Create a new source of node events, for a given Radicle
+ /// profile.
+ pub fn new(profile: Profile) -> Result<Self, NodeEventError> {
+ info!("subscribing to local node events");
+ let node = radicle::Node::new(profile.socket());
+ let events = node.subscribe(time::Duration::MAX)?;
+ trace!("subscribed OK");
+ Ok(Self {
+ events,
+ allowed: vec![],
+ })
+ }
+
+ /// Add an event filter for allowed events for this event source.
+ pub fn allow(&mut self, filter: EventFilter) {
+ self.allowed.push(filter);
+ }
+
+ fn allowed(&self, event: &BrokerEvent) -> bool {
+ for filter in self.allowed.iter() {
+ if !event.allowed(filter) {
+ return false;
+ }
+ }
+ true
+ }
+
+ /// Get the allowed next event from an event source. This will
+ /// block until there is an allowed event, or until there will be
+ /// no more events from this source, or there's an error.
+ pub fn event(&mut self) -> Result<Vec<BrokerEvent>, NodeEventError> {
+ trace!("getting next event from local node");
+ loop {
+ let next = self.events.next();
+ if next.is_none() {
+ info!("no more events from node");
+ return Ok(vec![]);
+ }
+
+ let event = next.unwrap()?;
+ let mut result = vec![];
+ if let Some(broker_events) = BrokerEvent::from_event(&event) {
+ for e in broker_events {
+ if self.allowed(&e) {
+ result.push(e);
+ }
+ }
+ return Ok(result);
+ }
+
+ trace!("got event, but it was not allowed by filter, next event");
+ }
+ }
+}
+
+fn _event_summary(event: &Event) -> Option<String> {
+ match event {
+ Event::RefsFetched { .. } => Some("RefsFetched".into()),
+ _ => None,
+ }
+}
+
+/// An event filter for allowing events. Or an "AND" combination of events.
+///
+/// NOTE: Adding "OR" and "NOT" would be easy, too.
+#[derive(Debug, Deserialize, Serialize)]
+pub enum EventFilter {
+ /// Event concerns a specific repository.
+ Repository(Id),
+
+ /// Event concerns a git ref that ends with a given string. FIXME:
+ /// this should really be a glob pattern.
+ Glob(String),
+
+ /// Combine two filters that both must allow the events. FIXME: Is
+ /// two enough? Should this be a vector that all have to allow?
+ And(Box<Self>, Box<Self>),
+}
+
+impl EventFilter {
+ /// Create a filter for a repository.
+ pub fn repository(rid: &str) -> Result<Self, NodeEventError> {
+ Ok(Self::Repository(Id::from_urn(rid)?))
+ }
+
+ /// Create a filter for a git ref that ends with a string.
+ pub fn glob(pattern: &str) -> Result<Self, NodeEventError> {
+ Ok(Self::Glob(pattern.into()))
+ }
+
+ /// Create a filter combining two other filters.
+ pub fn and(a: Self, b: Self) -> Self {
+ Self::And(Box::new(a), Box::new(b))
+ }
+
+ /// Read filters from a JSON file.
+ ///
+ /// A file might look like this:
+ ///
+ /// ```json
+ /// {
+ /// "filters": [
+ /// {
+ /// "And": [
+ /// {
+ /// "Repository": "rad:z3bBRYgzcYYBNjipFdDTwPgHaihPX"
+ /// },
+ /// {
+ /// "Glob": "refs/heads/main"
+ /// }
+ /// ]
+ /// }
+ /// ]
+ /// }
+ /// ```
+ pub fn from_file(filename: &Path) -> Result<Vec<Self>, NodeEventError> {
+ let filters =
+ read(filename).map_err(|e| NodeEventError::ReadFilterFile(filename.into(), e))?;
+ let filters: Filters = serde_json::from_slice(&filters)
+ .map_err(|e| NodeEventError::FiltersJson(filename.into(), e))?;
+ eprintln!("FILTERS: {:#?}", filters);
+ Ok(filters.filters)
+ }
+}
+
+// This is a helper type for reading filters from a JSON file. It
+// represents the structure of the JSON, which is not just a list of
+// filters. The top level "filters" field is added, to allow adding
+// more fields later.
+#[derive(Debug, Deserialize, Serialize)]
+struct Filters {
+ filters: Vec<EventFilter>,
+}
+
+/// A node event transmogrified into a form that's more easily
+/// filtered in the broker. For now, we only care about events about
+/// refs changing.
+#[derive(Debug)]
+pub enum BrokerEvent {
+ RefChanged { rid: Id, name: RefString, oid: Oid },
+}
+
+impl BrokerEvent {
+ fn new(rid: &Id, name: &RefString, oid: &Oid) -> Self {
+ Self::RefChanged {
+ rid: *rid,
+ name: name.clone(),
+ oid: *oid,
+ }
+ }
+
+ fn from_event(e: &Event) -> Option<Vec<Self>> {
+ if let Event::RefsFetched {
+ remote: _,
+ rid,
+ updated,
+ } = e
+ {
+ let mut events = vec![];
+ for up in updated {
+ match up {
+ RefUpdate::Created { name, oid } => {
+ events.push(Self::new(rid, name, oid));
+ }
+ RefUpdate::Updated { name, old: _, new } => {
+ events.push(Self::new(rid, name, new));
+ }
+ _ => (),
+ }
+ }
+ Some(events)
+ } else {
+ None
+ }
+ }
+
+ fn allowed(&self, filter: &EventFilter) -> bool {
+ let Self::RefChanged { rid, name, oid: _ } = self;
+ match filter {
+ EventFilter::Repository(wanted) => rid == wanted,
+ EventFilter::Glob(wanted) => name.ends_with(wanted),
+ EventFilter::And(a, b) => self.allowed(a) && self.allowed(b),
+ }
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index 3b9eab2..c32d39c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,26 +1,53 @@
-// This is a simulation of the Radicle CI broker. It listens on node
-// CI runs, and accepts requests when simulated runs have finished.
-// events, and makes HTTP requests to the native CI simulator to start
-
-use std::{thread::spawn, time};
+// This is a prototype for a CI broker for Radicle. It listens on
+// events on its local node, reacts to changes git refs that it's
+// configured to accept. The reaction is to trigger a CI run on the
+// native CI instance the broker is configured to use. It also awaits
+// for the CI engine to tell, via an HTTP call, if the CI run
+// succeeded or failed. This is meant to work with my native CI
+// simulator, which pretends to run CI jobs, but only really randomly
+// reports success or failure.
+//
+// To run: optionally give the `--url URL` option, to set the native
+// CI engine base URL, and then zero or more JSON files that describe
+// filters for events to allow. Note that if no filters are given,
+// nothing is allowed.
+//
+// Run with `RUST_LOG=info` or `RUST_LOG=debug` to get useful output.
+//
+// ```sh
+// RUST_LOG=debug cargo run -q -- filters.json
+// ```
+
+use std::{path::PathBuf, thread::spawn};
use axum::{extract::Path, http::StatusCode, response::IntoResponse, routing::get, Router};
use log::{debug, info};
-use radicle::node::{Event, Handle};
-use radicle::storage::RefUpdate;
use radicle::Profile;
-// const START: &str = "http://localhost:8000/runs";
-const RID: &str = "rad:z3cRFMy4yiiUFHJxco865TijwUaDk";
+use ci_broker_prototype::{BrokerEvent, CiEngine, EventFilter, NativeCi, NodeEventSource};
+
+const DEFAULT_URL: &str = "http://localhost:8000";
#[tokio::main]
async fn main() -> anyhow::Result<()> {
pretty_env_logger::init();
info!("ci-broker-prototype starts");
+ let args = Args::parse();
+ debug!("command line args: {:#?}", args);
+
+ let mut filters = vec![];
+ for filename in args.filenames {
+ let mut more = EventFilter::from_file(std::path::Path::new(&filename))?;
+ filters.append(&mut more);
+ }
+ info!("node event filters: {:#?}", filters);
+
+ let engine = NativeCi::new(&args.url);
+
let profile = Profile::load()?;
- spawn(move || subscribe_to_node_events(profile).expect("node events"));
+ spawn(move || handle_node_events(profile, filters, engine).expect("node events"));
debug!("started thread for receiving events from node");
let app = Router::new()
@@ -36,6 +63,32 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
+#[derive(Debug)]
+struct Args {
+ url: String,
+ filenames: Vec<PathBuf>,
+}
+
+impl Args {
+ fn parse() -> Args {
+ let mut args = Args {
+ url: DEFAULT_URL.into(),
+ filenames: vec![],
+ };
+
+ let mut argv: Vec<String> = std::env::args().skip(1).collect();
+ if argv.len() >= 2 && argv[0] == "--url" {
+ args.url = argv[1].clone();
+ argv.remove(0); // "--url" option
+ argv.remove(0); // option value
+ }
+
+ args.filenames = argv.iter().map(PathBuf::from).collect();
+
+ args
+ }
+}
+
async fn go_away() -> (StatusCode, impl IntoResponse) {
info!("bad request");
(StatusCode::FORBIDDEN, ())
@@ -51,67 +104,28 @@ async fn failed(Path(id): Path<String>) -> (StatusCode, impl IntoResponse) {
(StatusCode::OK, ())
}
-fn subscribe_to_node_events(profile: Profile) -> anyhow::Result<()> {
- let node = radicle::Node::new(profile.socket());
- let events = node.subscribe(time::Duration::MAX)?;
-
- for event in events {
- let event = event?;
- debug!("{:#?}", event);
-
- if let Event::RefsFetched {
- remote: _,
- rid,
- updated,
- } = event
- {
- debug!("repository {} updated refs", rid);
- let rid = format!("{}", rid);
- debug!("rid: {:?}", rid);
- debug!("RID: {:?}", RID);
- if rid == RID {
- info!("repository is interesting; ref changes:");
- for u in updated {
- match u {
- RefUpdate::Created { name, oid } => {
- info!("- new ref: {}", name);
- info!(" {}", oid);
- }
- RefUpdate::Updated { name, old, new } => {
- info!("- updated: {}", name);
- info!(" {} -> {}", old, new);
- }
- RefUpdate::Deleted { name, oid: _ } => {
- info!("- deleted: {}", name)
- }
- RefUpdate::Skipped { name, oid: _ } => {
- info!("- up to date: {}", name)
+fn handle_node_events(
+ profile: Profile,
+ filters: Vec<EventFilter>,
+ mut engine: impl CiEngine,
+) -> anyhow::Result<()> {
+ let mut source = NodeEventSource::new(profile)?;
+ for filter in filters {
+ source.allow(filter);
+ }
+
+ loop {
+ for e in source.event()? {
+ match e {
+ BrokerEvent::RefChanged { rid, name: _, oid } => {
+ match engine.trigger_run(&rid, oid) {
+ Err(e) => info!("failed to trigger CI run: {}", e),
+ Ok(run_id) => {
+ info!("triggered CI run: {}", run_id);
}
}
}
}
}
-
- // #[allow(clippy::single_match)]
- // match event {
- // Event::RefsFetched {
- // remote: _,
- // rid: _,
- // updated,
- // } => {
- // for refs in updated {
- // match refs {
- // RefUpdate::Updated { name, .. } | RefUpdate::Created { name, .. } => {
- // println!("refs update: {}", name);
- // let url = format!("{}/trigger/{}", START, "dummy-commit");
- // reqwest::blocking::get(&url).expect("trigger new run");
- // }
- // _ => (),
- // }
- // }
- // }
- // _ => (),
- // }
}
- Ok(())
}