diff options
author | Lars Wirzenius <liw@liw.fi> | 2024-02-02 17:32:57 +0200 |
---|---|---|
committer | Lars Wirzenius <liw@liw.fi> | 2024-02-02 17:44:15 +0200 |
commit | 9886cfd48b7e53b95120dc00dce4225b05df99e4 (patch) | |
tree | b4e128c7f5589d2b1d0435495ec50c7939457927 | |
parent | f298cc3ebd8bcb72ce530558b0d112c6da856e93 (diff) | |
download | radicle-ci-broker-9886cfd48b7e53b95120dc00dce4225b05df99e4.tar.gz |
refactor(src/bin/ci-broker.rs): use new modules in main program
Signed-off-by: Lars Wirzenius <liw@liw.fi>
-rw-r--r-- | src/bin/ci-broker.rs | 114 |
1 files changed, 50 insertions, 64 deletions
diff --git a/src/bin/ci-broker.rs b/src/bin/ci-broker.rs index b52ec69..d3300fc 100644 --- a/src/bin/ci-broker.rs +++ b/src/bin/ci-broker.rs @@ -1,18 +1,16 @@ -use std::{ - error::Error, - io::BufReader, - path::PathBuf, - process::{Command, Stdio}, -}; +use std::{error::Error, path::PathBuf}; use log::{debug, info}; use radicle::prelude::Profile; use radicle_ci_broker::{ - config::{Adapter, Config}, + adapter::Adapter, + broker::Broker, + config::Config, error::BrokerError, event::{BrokerEvent, NodeEventSource}, - msg::{Request, Response, RunResult}, + msg::Request, + run::Run, }; fn main() { @@ -40,82 +38,70 @@ fn fallible_main() -> Result<(), BrokerError> { let config = Config::load(&filename)?; debug!("loaded configuration: {:#?}", config); + let mut broker = Broker::default(); + debug!("created broker"); + + // FIXME: this is broken. the config file only lists how to invoke + // each adapter, not what adapter to use for each repo. + // for (rid, spec) in config.adapters.iter() { + // debug!("setting adapter for {rid:?} to {spec:#?}"); + // let rid = RepoId::from_urn(rid).map_err(|e| BrokerError::BadRepoId(rid.into(), e))?; + // let adapter = Adapter::new(&spec.command).with_environment(spec.envs()); + // broker.set_repository_adapter(&rid, &adapter); + // } + // debug!("set per-repository adapters"); + + let spec = + config + .adapter(&config.default_adapter) + .ok_or(BrokerError::UnknownDefaultAdapter( + config.default_adapter.clone(), + ))?; + let adapter = Adapter::new(&spec.command).with_environment(spec.envs()); + broker.set_default_adapter(&adapter); + debug!("set default adapter"); + let profile = Profile::load()?; + debug!("loaded profile"); + let mut source = NodeEventSource::new(&profile)?; + debug!("created node event source"); + for filter in config.filters.iter() { source.allow(filter.clone()); } - - let default = &config.default_adapter; - let adapter = config - .adapter(default) - .ok_or(BrokerError::UnknownDefaultAdapter(default.clone()))?; + debug!("added filters to node event source"); // This loop ends when there's an error, e.g., failure to read an // event from the node. let mut counter = 0; loop { + debug!("waiting for event from node"); for e in source.event()? { counter += 1; - let req = Request::trigger(&profile, &e)?; - let ret = match run(adapter, req)? { - None => "unknown".into(), - Some(RunResult::Error(e)) => format!("ERROR: {}", e), - Some(RunResult::Failure) => "FAILED".into(), - Some(RunResult::Success) => "OK".into(), - Some(x) => format!("UNKNOWN: {:#?}", x), - }; + debug!("broker event {e:#?}"); let BrokerEvent::RefChanged { rid, oid, name: _, old: _, } = e; - println!("Run CI run #{}: {}, {} -> {}", counter, rid, oid, ret,); - } - } -} -fn run(adapter: &Adapter, req: Request) -> Result<Option<RunResult>, BrokerError> { - info!("Trigger on {}", req); - - debug!("Spawning child process"); - let mut child = Command::new(&adapter.command) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .envs(adapter.envs()) - .spawn() - .map_err(|e| BrokerError::SpawnAdapter(adapter.command.clone(), e))?; - - { - let stdin = child.stdin.take().expect("get stdin"); - req.to_writer(stdin)?; - } - - let mut id = None; - let stdout = child.stdout.take().expect("get stdout"); - let mut stdout = BufReader::new(stdout); - let mut ret = None; - while let Some(resp) = Response::from_reader(&mut stdout)? { - debug!("received from adapter: {:#?}", resp); - match resp { - Response::Triggered { run_id } => { - info!("CI run triggered, id is {}", run_id); - id = Some(run_id); - } - Response::Finished { result } => { - if let Some(id) = &id { - info!("CI run {} finished: {}", id, result); - ret = Some(result); - } else { - info!("Unknown CI run finished: {}", result); - ret = Some(result); - } + let req = Request::trigger(&profile, &e)?; + let mut run = Run::default(); + if let Some(adapter) = broker.adapter(&rid) { + adapter.run(&req, &mut run)?; + } else { + return Err(BrokerError::NoAdapter(rid)); } + + println!( + "Run CI run #{}: {}, {} -> {}", + counter, + rid, + oid, + run.result().unwrap() + ); } } - - child.wait().expect("wait for child"); - - Ok(ret) } |