summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLars Wirzenius <liw@liw.fi>2024-02-02 17:32:57 +0200
committerLars Wirzenius <liw@liw.fi>2024-02-02 17:44:15 +0200
commit9886cfd48b7e53b95120dc00dce4225b05df99e4 (patch)
treeb4e128c7f5589d2b1d0435495ec50c7939457927
parentf298cc3ebd8bcb72ce530558b0d112c6da856e93 (diff)
downloadradicle-ci-broker-9886cfd48b7e53b95120dc00dce4225b05df99e4.tar.gz
refactor(src/bin/ci-broker.rs): use new modules in main program
Signed-off-by: Lars Wirzenius <liw@liw.fi>
-rw-r--r--src/bin/ci-broker.rs114
1 files changed, 50 insertions, 64 deletions
diff --git a/src/bin/ci-broker.rs b/src/bin/ci-broker.rs
index b52ec69..d3300fc 100644
--- a/src/bin/ci-broker.rs
+++ b/src/bin/ci-broker.rs
@@ -1,18 +1,16 @@
-use std::{
- error::Error,
- io::BufReader,
- path::PathBuf,
- process::{Command, Stdio},
-};
+use std::{error::Error, path::PathBuf};
use log::{debug, info};
use radicle::prelude::Profile;
use radicle_ci_broker::{
- config::{Adapter, Config},
+ adapter::Adapter,
+ broker::Broker,
+ config::Config,
error::BrokerError,
event::{BrokerEvent, NodeEventSource},
- msg::{Request, Response, RunResult},
+ msg::Request,
+ run::Run,
};
fn main() {
@@ -40,82 +38,70 @@ fn fallible_main() -> Result<(), BrokerError> {
let config = Config::load(&filename)?;
debug!("loaded configuration: {:#?}", config);
+ let mut broker = Broker::default();
+ debug!("created broker");
+
+ // FIXME: this is broken. the config file only lists how to invoke
+ // each adapter, not what adapter to use for each repo.
+ // for (rid, spec) in config.adapters.iter() {
+ // debug!("setting adapter for {rid:?} to {spec:#?}");
+ // let rid = RepoId::from_urn(rid).map_err(|e| BrokerError::BadRepoId(rid.into(), e))?;
+ // let adapter = Adapter::new(&spec.command).with_environment(spec.envs());
+ // broker.set_repository_adapter(&rid, &adapter);
+ // }
+ // debug!("set per-repository adapters");
+
+ let spec =
+ config
+ .adapter(&config.default_adapter)
+ .ok_or(BrokerError::UnknownDefaultAdapter(
+ config.default_adapter.clone(),
+ ))?;
+ let adapter = Adapter::new(&spec.command).with_environment(spec.envs());
+ broker.set_default_adapter(&adapter);
+ debug!("set default adapter");
+
let profile = Profile::load()?;
+ debug!("loaded profile");
+
let mut source = NodeEventSource::new(&profile)?;
+ debug!("created node event source");
+
for filter in config.filters.iter() {
source.allow(filter.clone());
}
-
- let default = &config.default_adapter;
- let adapter = config
- .adapter(default)
- .ok_or(BrokerError::UnknownDefaultAdapter(default.clone()))?;
+ debug!("added filters to node event source");
// This loop ends when there's an error, e.g., failure to read an
// event from the node.
let mut counter = 0;
loop {
+ debug!("waiting for event from node");
for e in source.event()? {
counter += 1;
- let req = Request::trigger(&profile, &e)?;
- let ret = match run(adapter, req)? {
- None => "unknown".into(),
- Some(RunResult::Error(e)) => format!("ERROR: {}", e),
- Some(RunResult::Failure) => "FAILED".into(),
- Some(RunResult::Success) => "OK".into(),
- Some(x) => format!("UNKNOWN: {:#?}", x),
- };
+ debug!("broker event {e:#?}");
let BrokerEvent::RefChanged {
rid,
oid,
name: _,
old: _,
} = e;
- println!("Run CI run #{}: {}, {} -> {}", counter, rid, oid, ret,);
- }
- }
-}
-fn run(adapter: &Adapter, req: Request) -> Result<Option<RunResult>, BrokerError> {
- info!("Trigger on {}", req);
-
- debug!("Spawning child process");
- let mut child = Command::new(&adapter.command)
- .stdin(Stdio::piped())
- .stdout(Stdio::piped())
- .envs(adapter.envs())
- .spawn()
- .map_err(|e| BrokerError::SpawnAdapter(adapter.command.clone(), e))?;
-
- {
- let stdin = child.stdin.take().expect("get stdin");
- req.to_writer(stdin)?;
- }
-
- let mut id = None;
- let stdout = child.stdout.take().expect("get stdout");
- let mut stdout = BufReader::new(stdout);
- let mut ret = None;
- while let Some(resp) = Response::from_reader(&mut stdout)? {
- debug!("received from adapter: {:#?}", resp);
- match resp {
- Response::Triggered { run_id } => {
- info!("CI run triggered, id is {}", run_id);
- id = Some(run_id);
- }
- Response::Finished { result } => {
- if let Some(id) = &id {
- info!("CI run {} finished: {}", id, result);
- ret = Some(result);
- } else {
- info!("Unknown CI run finished: {}", result);
- ret = Some(result);
- }
+ let req = Request::trigger(&profile, &e)?;
+ let mut run = Run::default();
+ if let Some(adapter) = broker.adapter(&rid) {
+ adapter.run(&req, &mut run)?;
+ } else {
+ return Err(BrokerError::NoAdapter(rid));
}
+
+ println!(
+ "Run CI run #{}: {}, {} -> {}",
+ counter,
+ rid,
+ oid,
+ run.result().unwrap()
+ );
}
}
-
- child.wait().expect("wait for child");
-
- Ok(ret)
}