diff --git a/src/error/message.rs b/src/error/message.rs new file mode 100644 index 0000000..78f2dfb --- /dev/null +++ b/src/error/message.rs @@ -0,0 +1,11 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum MessageError { + #[error("error while parsing message")] + ParseError, + #[error("ID needed")] + IDNeededError, + #[error("ID parsing")] + IDParsingError, +} diff --git a/src/error/mod.rs b/src/error/mod.rs index 2a3dab4..7164ba3 100644 --- a/src/error/mod.rs +++ b/src/error/mod.rs @@ -1,7 +1,9 @@ mod handler; +mod message; mod runner; mod storer; pub use handler::HandlerError; +pub use message::MessageError; pub use runner::RunnerError; pub use storer::StorerError; diff --git a/src/main.rs b/src/main.rs index ad9050c..a1d651d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,6 +10,7 @@ use std::time; use chrono::prelude::*; use database::init_database_pool; +use message::{Message, Subject}; use model::{Job, JobAction}; use worker::{CheckHandler, DeployHandler, Manager, WorkerStatus}; @@ -17,21 +18,22 @@ fn main() { let now: DateTime = Utc::now(); init_database_pool(); - let (sender, receiver) = channel(); - let mut deployer: Manager = Manager::new("deploy"); + // launch deployment workers + let mut deployer: Manager = Manager::new("deploy"); let deploy_handler = DeployHandler::new(Arc::new(Mutex::new(sender))); - deployer.launch_workers(5, Arc::new(deploy_handler)); + deployer.launch_workers::(5, Arc::new(deploy_handler)); + // launch checker workers let check_handler = CheckHandler::new(); - let mut checker: Manager = Manager::new("checker"); - checker.launch_workers(5, Arc::new(check_handler)); + let mut checker: Manager = Manager::new("checker"); + checker.launch_workers::(5, Arc::new(check_handler)); checker.subscribe(receiver); for i in 0..5000 { - let job = Job::new(i, JobAction::MegaportDeploy); - deployer.add_runner(job); + let message = Message::new(i, Subject::Action(JobAction::Deploy)); + deployer.put_message(message); } let wait = time::Duration::from_millis(100); diff --git a/src/message/message.rs b/src/message/message.rs index 728317d..62d0195 100644 --- a/src/message/message.rs +++ b/src/message/message.rs @@ -1,29 +1,68 @@ -#[allow(dead_code)] -#[derive(Debug, Clone)] -pub enum Message { - CheckDeploy(MessageBody), +use crate::model::JobAction; + +#[derive(Debug, Clone, Copy)] +pub enum Subject { + Action(JobAction), StopManager, + Empty, } -#[allow(dead_code)] -#[derive(Debug, Clone)] -pub struct MessageBody { +impl Subject { + pub fn new_action(action: JobAction) -> Self { + Subject::Action(action) + } +} + +#[derive(Debug)] +pub struct Body { id: u32, - content: String, } -impl MessageBody { - pub fn get_id(&self) -> u32 { +impl Body { + fn new(id: u32) -> Self { + Self { id: id } + } + + fn get_id(&self) -> u32 { self.id } } +pub struct Message { + subject: Subject, + body: Option, +} + impl Message { - pub fn check_deploy(id: u32, content: &str) -> Self { - let body = MessageBody { - id: id, - content: content.to_string(), - }; - Message::CheckDeploy(body) + pub fn new(id: u32, subject: Subject) -> Self { + Self { + subject: subject, + body: Some(Body::new(id)), + } + } + + pub fn empty() -> Self { + Self { + subject: Subject::Empty, + body: None, + } + } + + pub fn stop() -> Self { + Self { + subject: Subject::StopManager, + body: None, + } + } + + pub fn get_subject(&self) -> Subject { + self.subject + } + + pub fn get_body_id(&self) -> Option { + match self.body { + Some(ref b) => Some(b.get_id()), + None => None, + } } } diff --git a/src/message/mod.rs b/src/message/mod.rs index 977f05e..48277e3 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -1,3 +1,3 @@ mod message; -pub use message::Message; +pub use message::{Message, Subject}; diff --git a/src/model/job.rs b/src/model/job.rs index 114d268..c045e3b 100644 --- a/src/model/job.rs +++ b/src/model/job.rs @@ -1,16 +1,13 @@ use std::{thread, time}; use super::{Runner, RunnerStatus, Storer}; -use crate::error::{RunnerError, StorerError}; -use crate::message::Message; +use crate::error::{MessageError, RunnerError, StorerError}; +use crate::message::{Message, Subject}; -#[derive(Debug)] -#[allow(dead_code)] +#[derive(Debug, Clone, Copy)] pub enum JobAction { - MegaportDeploy, - MegaportUndeploy, - MegaportCheckDeploy, - MegaportCheckUndeploy, + Deploy, + Check, } #[derive(Debug)] @@ -29,42 +26,37 @@ impl Job { } } - fn deploy_megaport(&mut self) -> Result { + fn format_job(&self) -> String { + format!("[job({} - {:?} - {:?})]", self.id, self.action, self.status) + } + + fn deploy(&mut self) -> Result { self.set_status(RunnerStatus::Running); - println!( - "[job({} - {:?} - {:?})] deploying megaport...", - self.id, self.action, self.status - ); + println!("{} {}", self.format_job(), "deploying..."); let wait = time::Duration::from_millis(5); thread::sleep(wait); self.set_status(RunnerStatus::Success); - println!( - "[job({} - {:?} - {:?})] megaport deployed", - self.id, self.action, self.status - ); + println!("{} {}", self.format_job(), "deploy done"); - Ok(Message::check_deploy(self.id.try_into().unwrap(), "")) + Ok(Message::new( + self.id.try_into().unwrap(), + Subject::new_action(JobAction::Check), + )) } - fn check_megaport(&mut self) -> Result { + fn check(&mut self) -> Result { self.set_status(RunnerStatus::Running); - println!( - "[job({} - {:?} - {:?})] checking megaport...", - self.id, self.action, self.status - ); + println!("{} {}", self.format_job(), "checking..."); let wait = time::Duration::from_millis(50); thread::sleep(wait); self.set_status(RunnerStatus::Success); - println!( - "[job({} - {:?} - {:?})] checking megaport done", - self.id, self.action, self.status - ); + println!("{} {}", self.format_job(), "checking done"); - Ok(Message::check_deploy(self.id.try_into().unwrap(), "")) + Ok(Message::empty()) } fn set_status(&mut self, status: RunnerStatus) { @@ -75,10 +67,26 @@ impl Job { impl Runner for Job { fn run(&mut self) -> Result { match self.action { - JobAction::MegaportDeploy => self.deploy_megaport(), - JobAction::MegaportUndeploy => todo!(), - JobAction::MegaportCheckDeploy => self.check_megaport(), - JobAction::MegaportCheckUndeploy => todo!(), + JobAction::Deploy => self.deploy(), + JobAction::Check => self.check(), + } + } + + fn try_from(message: Message) -> Result { + let id: i32 = match message.get_body_id() { + Some(id) => match id.try_into() { + Ok(id) => id, + Err(_) => return Err(MessageError::IDParsingError), + }, + None => return Err(MessageError::IDNeededError), + }; + + match message.get_subject() { + Subject::Action(action) => match action { + JobAction::Deploy => Ok(Job::new(id, JobAction::Deploy)), + JobAction::Check => Ok(Job::new(id, JobAction::Check)), + }, + _ => Err(MessageError::ParseError), } } } diff --git a/src/model/traits.rs b/src/model/traits.rs index 9538486..03c2d08 100644 --- a/src/model/traits.rs +++ b/src/model/traits.rs @@ -1,4 +1,6 @@ -use crate::error::{RunnerError, StorerError}; +use std::marker::Send; + +use crate::error::{MessageError, RunnerError, StorerError}; use crate::message::Message; #[derive(Debug)] @@ -10,8 +12,11 @@ pub enum RunnerStatus { Success, } -pub trait Runner { +pub trait Runner: Send + 'static { fn run(&mut self) -> Result; + fn try_from(message: Message) -> Result + where + Self: Sized; } pub trait Storer { diff --git a/src/worker/handler.rs b/src/worker/handler.rs index c20bc3d..5273a84 100644 --- a/src/worker/handler.rs +++ b/src/worker/handler.rs @@ -1,11 +1,11 @@ -use std::sync::mpsc::Sender; -use std::sync::{Arc, Mutex}; +use std::marker::{Send, Sync}; +use std::sync::{mpsc::Sender, Arc, Mutex}; use crate::error::{HandlerError, RunnerError}; -use crate::message::Message; -use crate::model::{Job, JobAction}; +use crate::message::{Message, Subject}; +use crate::model::JobAction; -pub trait Handler { +pub trait Handler: Sync + Send + 'static { fn handle(&self, res: Result) -> Result<(), HandlerError>; } @@ -17,9 +17,9 @@ impl CheckHandler { } fn manage_message(&self, message: Message) -> Result<(), HandlerError> { - match message { - Message::CheckDeploy(_body) => Ok(()), - Message::StopManager => Err(HandlerError::Unknown), + match message.get_subject() { + Subject::Action(_) | Subject::Empty => Ok(()), + Subject::StopManager => Err(HandlerError::Unknown), } } } @@ -34,27 +34,28 @@ impl Handler for CheckHandler { } pub struct DeployHandler { - sender: Arc>>, + sender: Arc>>, } impl DeployHandler { - pub fn new(sender: Arc>>) -> Self { + pub fn new(sender: Arc>>) -> Self { Self { sender } } fn manage_message(&self, message: Message) -> Result<(), HandlerError> { - match message { - Message::CheckDeploy(body) => { - let guard = self.sender.lock().unwrap(); - let id: i32 = body.get_id().try_into().unwrap(); - - if let Err(e) = guard.send(Job::new(id, JobAction::MegaportCheckDeploy)) { - println!("[handler(deploy)] error: {}", e); + match message.get_subject() { + Subject::Action(action) => match action { + JobAction::Check => { + let guard = self.sender.lock().unwrap(); + if let Err(e) = guard.send(message) { + println!("[handler(deploy)] error: {}", e); + } + Ok(()) } - - Ok(()) - } - Message::StopManager => Err(HandlerError::Unknown), + JobAction::Deploy => Ok(()), + }, + Subject::StopManager => Err(HandlerError::Unknown), + Subject::Empty => Ok(()), } } } diff --git a/src/worker/worker.rs b/src/worker/worker.rs index 795aefe..ddf28a4 100644 --- a/src/worker/worker.rs +++ b/src/worker/worker.rs @@ -1,10 +1,10 @@ use std::collections::VecDeque; -use std::marker::{Send, Sync}; use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex}; use std::thread; use super::handler::Handler; use crate::error::HandlerError; +use crate::message::Message; use crate::model::*; // Queue is a simple queue data structure. @@ -60,24 +60,22 @@ impl Worker { } // Manager is a pool of workers and holds a queue containing `Job` to run. -// -// TODO: to handle more message, replace the trait with a `Message` trait. -pub struct Manager { +pub struct Manager { name: String, workers: Vec>>, - queue: Arc>, + queue: Arc>, } -impl Manager { +impl Manager { pub fn new(name: &str) -> Self { Self { name: name.to_string(), workers: vec![], - queue: Arc::new(Queue::::new()), + queue: Arc::new(Queue::new()), } } - pub fn launch_workers( + pub fn launch_workers( &mut self, nb_workers: u32, shared_handler: Arc, @@ -93,9 +91,9 @@ impl Manager { thread::spawn(move || loop { let mut guard = queue.content.lock().unwrap(); - let mut runner = loop { - if let Some(r) = guard.pop_front() { - break r; + let message = loop { + if let Some(m) = guard.pop_front() { + break m; } else { guard = queue.not_empty.wait(guard).unwrap(); } @@ -103,23 +101,31 @@ impl Manager { drop(guard); - Manager::::set_worker_status(&worker, WorkerStatus::Running); + let mut runner = match T::try_from(message) { + Ok(r) => r, + Err(_e) => { + eprintln!("unable to parse the incoming message into a runner"); + continue; + } + }; + + Manager::set_worker_status(&worker, WorkerStatus::Running); if let Err(e) = handler.handle(runner.run()) { match e { HandlerError::Unknown => { - Manager::::set_worker_status(&worker, WorkerStatus::Failed); + Manager::set_worker_status(&worker, WorkerStatus::Failed); break; } _ => (), } } - Manager::::set_worker_status(&worker, WorkerStatus::Pending); + Manager::set_worker_status(&worker, WorkerStatus::Pending); }); } } // subscribe subscribes to a `Receiver` channel. - pub fn subscribe(&self, receiver: Receiver) { + pub fn subscribe(&self, receiver: Receiver) { let queue = self.queue.clone(); thread::spawn(move || loop { match receiver.recv() { @@ -138,9 +144,9 @@ impl Manager { }); } - pub fn add_runner(&self, runner: T) { + pub fn put_message(&self, message: Message) { let mut q = self.queue.content.lock().unwrap(); - q.push_back(runner); + q.push_back(message); self.queue.not_empty.notify_one(); }