diff --git a/src/error/handler.rs b/src/error/handler.rs new file mode 100644 index 0000000..dc1dbe3 --- /dev/null +++ b/src/error/handler.rs @@ -0,0 +1,10 @@ +use thiserror::Error; + +#[allow(dead_code)] +#[derive(Error, Debug)] +pub enum HandlerError { + #[error("error while handling message")] + MessageError, + #[error("unknown handler error")] + Unknown, +} diff --git a/src/error/mod.rs b/src/error/mod.rs index 77ad282..2a3dab4 100644 --- a/src/error/mod.rs +++ b/src/error/mod.rs @@ -1,3 +1,7 @@ +mod handler; +mod runner; mod storer; +pub use handler::HandlerError; +pub use runner::RunnerError; pub use storer::StorerError; diff --git a/src/error/runner.rs b/src/error/runner.rs new file mode 100644 index 0000000..23571cb --- /dev/null +++ b/src/error/runner.rs @@ -0,0 +1,12 @@ +use thiserror::Error; + +#[allow(dead_code)] +#[derive(Error, Debug)] +pub enum RunnerError { + #[error("error while deploying")] + DeployError, + #[error("error while checking")] + CheckError, + #[error("unknown handler error")] + Unknown, +} diff --git a/src/main.rs b/src/main.rs index 62bbf6e..ad9050c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -35,8 +35,7 @@ fn main() { } let wait = time::Duration::from_millis(100); - - while !deployer.healthcheck(WorkerStatus::Pending) { + while !deployer.healthcheck(WorkerStatus::Stopped) { thread::sleep(wait); } diff --git a/src/model/job.rs b/src/model/job.rs index ee56147..114d268 100644 --- a/src/model/job.rs +++ b/src/model/job.rs @@ -1,7 +1,7 @@ use std::{thread, time}; use super::{Runner, RunnerStatus, Storer}; -use crate::error::StorerError; +use crate::error::{RunnerError, StorerError}; use crate::message::Message; #[derive(Debug)] @@ -29,7 +29,7 @@ impl Job { } } - fn deploy_megaport(&mut self) -> Result { + fn deploy_megaport(&mut self) -> Result { self.set_status(RunnerStatus::Running); println!( "[job({} - {:?} - {:?})] deploying megaport...", @@ -48,7 +48,7 @@ impl Job { Ok(Message::check_deploy(self.id.try_into().unwrap(), "")) } - fn check_megaport(&mut self) -> Result { + fn check_megaport(&mut self) -> Result { self.set_status(RunnerStatus::Running); println!( "[job({} - {:?} - {:?})] checking megaport...", @@ -73,7 +73,7 @@ impl Job { } impl Runner for Job { - fn run(&mut self) -> Result { + fn run(&mut self) -> Result { match self.action { JobAction::MegaportDeploy => self.deploy_megaport(), JobAction::MegaportUndeploy => todo!(), diff --git a/src/model/traits.rs b/src/model/traits.rs index 2bc112f..9538486 100644 --- a/src/model/traits.rs +++ b/src/model/traits.rs @@ -1,4 +1,4 @@ -use crate::error::StorerError; +use crate::error::{RunnerError, StorerError}; use crate::message::Message; #[derive(Debug)] @@ -11,7 +11,7 @@ pub enum RunnerStatus { } pub trait Runner { - fn run(&mut self) -> Result; + fn run(&mut self) -> Result; } pub trait Storer { diff --git a/src/worker/handler.rs b/src/worker/handler.rs index 993597e..c20bc3d 100644 --- a/src/worker/handler.rs +++ b/src/worker/handler.rs @@ -1,11 +1,12 @@ use std::sync::mpsc::Sender; use std::sync::{Arc, Mutex}; +use crate::error::{HandlerError, RunnerError}; use crate::message::Message; use crate::model::{Job, JobAction}; pub trait Handler { - fn handle(&self, res: Result); + fn handle(&self, res: Result) -> Result<(), HandlerError>; } pub struct CheckHandler {} @@ -14,12 +15,19 @@ impl CheckHandler { pub fn new() -> Self { Self {} } + + fn manage_message(&self, message: Message) -> Result<(), HandlerError> { + match message { + Message::CheckDeploy(_body) => Ok(()), + Message::StopManager => Err(HandlerError::Unknown), + } + } } impl Handler for CheckHandler { - fn handle(&self, res: Result) { + fn handle(&self, res: Result) -> Result<(), HandlerError> { match res { - Ok(_m) => (), + Ok(m) => self.manage_message(m), Err(_e) => todo!(), } } @@ -34,7 +42,7 @@ impl DeployHandler { Self { sender } } - fn manage_message(&self, message: Message) { + fn manage_message(&self, message: Message) -> Result<(), HandlerError> { match message { Message::CheckDeploy(body) => { let guard = self.sender.lock().unwrap(); @@ -43,14 +51,16 @@ impl DeployHandler { if let Err(e) = guard.send(Job::new(id, JobAction::MegaportCheckDeploy)) { println!("[handler(deploy)] error: {}", e); } + + Ok(()) } - Message::StopManager => todo!(), + Message::StopManager => Err(HandlerError::Unknown), } } } impl Handler for DeployHandler { - fn handle(&self, res: Result) { + fn handle(&self, res: Result) -> Result<(), HandlerError> { match res { Ok(m) => self.manage_message(m), Err(_e) => todo!(), diff --git a/src/worker/worker.rs b/src/worker/worker.rs index 8850d05..3a4f516 100644 --- a/src/worker/worker.rs +++ b/src/worker/worker.rs @@ -4,6 +4,7 @@ use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex}; use std::thread; use super::handler::Handler; +use crate::error::HandlerError; use crate::model::*; pub struct Queue { @@ -26,6 +27,7 @@ pub enum WorkerStatus { Pending, Running, Failed, + Stopped, } struct Worker { @@ -95,7 +97,15 @@ impl Manager { drop(guard); Manager::::set_worker_status(&worker, WorkerStatus::Running); - handler.handle(runner.run()); + if let Err(e) = handler.handle(runner.run()) { + match e { + HandlerError::Unknown => { + Manager::::set_worker_status(&worker, WorkerStatus::Failed); + break; + } + _ => (), + } + } Manager::::set_worker_status(&worker, WorkerStatus::Pending); }); }