replace job->message in queue

This commit is contained in:
rmanach 2023-04-19 11:30:12 +02:00
parent 452cfc81b9
commit d5b3affe5d
9 changed files with 170 additions and 96 deletions

11
src/error/message.rs Normal file
View File

@ -0,0 +1,11 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum MessageError {
#[error("error while parsing message")]
ParseError,
#[error("ID needed")]
IDNeededError,
#[error("ID parsing")]
IDParsingError,
}

View File

@ -1,7 +1,9 @@
mod handler;
mod message;
mod runner;
mod storer;
pub use handler::HandlerError;
pub use message::MessageError;
pub use runner::RunnerError;
pub use storer::StorerError;

View File

@ -10,6 +10,7 @@ use std::time;
use chrono::prelude::*;
use database::init_database_pool;
use message::{Message, Subject};
use model::{Job, JobAction};
use worker::{CheckHandler, DeployHandler, Manager, WorkerStatus};
@ -17,21 +18,22 @@ fn main() {
let now: DateTime<Utc> = Utc::now();
init_database_pool();
let (sender, receiver) = channel();
let mut deployer: Manager<Job> = Manager::new("deploy");
// launch deployment workers
let mut deployer: Manager = Manager::new("deploy");
let deploy_handler = DeployHandler::new(Arc::new(Mutex::new(sender)));
deployer.launch_workers(5, Arc::new(deploy_handler));
deployer.launch_workers::<Job, DeployHandler>(5, Arc::new(deploy_handler));
// launch checker workers
let check_handler = CheckHandler::new();
let mut checker: Manager<Job> = Manager::new("checker");
checker.launch_workers(5, Arc::new(check_handler));
let mut checker: Manager = Manager::new("checker");
checker.launch_workers::<Job, CheckHandler>(5, Arc::new(check_handler));
checker.subscribe(receiver);
for i in 0..5000 {
let job = Job::new(i, JobAction::MegaportDeploy);
deployer.add_runner(job);
let message = Message::new(i, Subject::Action(JobAction::Deploy));
deployer.put_message(message);
}
let wait = time::Duration::from_millis(100);

View File

@ -1,29 +1,68 @@
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum Message {
CheckDeploy(MessageBody),
use crate::model::JobAction;
#[derive(Debug, Clone, Copy)]
pub enum Subject {
Action(JobAction),
StopManager,
Empty,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct MessageBody {
impl Subject {
pub fn new_action(action: JobAction) -> Self {
Subject::Action(action)
}
}
#[derive(Debug)]
pub struct Body {
id: u32,
content: String,
}
impl MessageBody {
pub fn get_id(&self) -> u32 {
impl Body {
fn new(id: u32) -> Self {
Self { id: id }
}
fn get_id(&self) -> u32 {
self.id
}
}
pub struct Message {
subject: Subject,
body: Option<Body>,
}
impl Message {
pub fn check_deploy(id: u32, content: &str) -> Self {
let body = MessageBody {
id: id,
content: content.to_string(),
};
Message::CheckDeploy(body)
pub fn new(id: u32, subject: Subject) -> Self {
Self {
subject: subject,
body: Some(Body::new(id)),
}
}
pub fn empty() -> Self {
Self {
subject: Subject::Empty,
body: None,
}
}
pub fn stop() -> Self {
Self {
subject: Subject::StopManager,
body: None,
}
}
pub fn get_subject(&self) -> Subject {
self.subject
}
pub fn get_body_id(&self) -> Option<u32> {
match self.body {
Some(ref b) => Some(b.get_id()),
None => None,
}
}
}

View File

@ -1,3 +1,3 @@
mod message;
pub use message::Message;
pub use message::{Message, Subject};

View File

@ -1,16 +1,13 @@
use std::{thread, time};
use super::{Runner, RunnerStatus, Storer};
use crate::error::{RunnerError, StorerError};
use crate::message::Message;
use crate::error::{MessageError, RunnerError, StorerError};
use crate::message::{Message, Subject};
#[derive(Debug)]
#[allow(dead_code)]
#[derive(Debug, Clone, Copy)]
pub enum JobAction {
MegaportDeploy,
MegaportUndeploy,
MegaportCheckDeploy,
MegaportCheckUndeploy,
Deploy,
Check,
}
#[derive(Debug)]
@ -29,42 +26,37 @@ impl Job {
}
}
fn deploy_megaport(&mut self) -> Result<Message, RunnerError> {
fn format_job(&self) -> String {
format!("[job({} - {:?} - {:?})]", self.id, self.action, self.status)
}
fn deploy(&mut self) -> Result<Message, RunnerError> {
self.set_status(RunnerStatus::Running);
println!(
"[job({} - {:?} - {:?})] deploying megaport...",
self.id, self.action, self.status
);
println!("{} {}", self.format_job(), "deploying...");
let wait = time::Duration::from_millis(5);
thread::sleep(wait);
self.set_status(RunnerStatus::Success);
println!(
"[job({} - {:?} - {:?})] megaport deployed",
self.id, self.action, self.status
);
println!("{} {}", self.format_job(), "deploy done");
Ok(Message::check_deploy(self.id.try_into().unwrap(), ""))
Ok(Message::new(
self.id.try_into().unwrap(),
Subject::new_action(JobAction::Check),
))
}
fn check_megaport(&mut self) -> Result<Message, RunnerError> {
fn check(&mut self) -> Result<Message, RunnerError> {
self.set_status(RunnerStatus::Running);
println!(
"[job({} - {:?} - {:?})] checking megaport...",
self.id, self.action, self.status
);
println!("{} {}", self.format_job(), "checking...");
let wait = time::Duration::from_millis(50);
thread::sleep(wait);
self.set_status(RunnerStatus::Success);
println!(
"[job({} - {:?} - {:?})] checking megaport done",
self.id, self.action, self.status
);
println!("{} {}", self.format_job(), "checking done");
Ok(Message::check_deploy(self.id.try_into().unwrap(), ""))
Ok(Message::empty())
}
fn set_status(&mut self, status: RunnerStatus) {
@ -75,10 +67,26 @@ impl Job {
impl Runner for Job {
fn run(&mut self) -> Result<Message, RunnerError> {
match self.action {
JobAction::MegaportDeploy => self.deploy_megaport(),
JobAction::MegaportUndeploy => todo!(),
JobAction::MegaportCheckDeploy => self.check_megaport(),
JobAction::MegaportCheckUndeploy => todo!(),
JobAction::Deploy => self.deploy(),
JobAction::Check => self.check(),
}
}
fn try_from(message: Message) -> Result<Self, MessageError> {
let id: i32 = match message.get_body_id() {
Some(id) => match id.try_into() {
Ok(id) => id,
Err(_) => return Err(MessageError::IDParsingError),
},
None => return Err(MessageError::IDNeededError),
};
match message.get_subject() {
Subject::Action(action) => match action {
JobAction::Deploy => Ok(Job::new(id, JobAction::Deploy)),
JobAction::Check => Ok(Job::new(id, JobAction::Check)),
},
_ => Err(MessageError::ParseError),
}
}
}

View File

@ -1,4 +1,6 @@
use crate::error::{RunnerError, StorerError};
use std::marker::Send;
use crate::error::{MessageError, RunnerError, StorerError};
use crate::message::Message;
#[derive(Debug)]
@ -10,8 +12,11 @@ pub enum RunnerStatus {
Success,
}
pub trait Runner {
pub trait Runner: Send + 'static {
fn run(&mut self) -> Result<Message, RunnerError>;
fn try_from(message: Message) -> Result<Self, MessageError>
where
Self: Sized;
}
pub trait Storer {

View File

@ -1,11 +1,11 @@
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::marker::{Send, Sync};
use std::sync::{mpsc::Sender, Arc, Mutex};
use crate::error::{HandlerError, RunnerError};
use crate::message::Message;
use crate::model::{Job, JobAction};
use crate::message::{Message, Subject};
use crate::model::JobAction;
pub trait Handler {
pub trait Handler: Sync + Send + 'static {
fn handle(&self, res: Result<Message, RunnerError>) -> Result<(), HandlerError>;
}
@ -17,9 +17,9 @@ impl CheckHandler {
}
fn manage_message(&self, message: Message) -> Result<(), HandlerError> {
match message {
Message::CheckDeploy(_body) => Ok(()),
Message::StopManager => Err(HandlerError::Unknown),
match message.get_subject() {
Subject::Action(_) | Subject::Empty => Ok(()),
Subject::StopManager => Err(HandlerError::Unknown),
}
}
}
@ -34,27 +34,28 @@ impl Handler for CheckHandler {
}
pub struct DeployHandler {
sender: Arc<Mutex<Sender<Job>>>,
sender: Arc<Mutex<Sender<Message>>>,
}
impl DeployHandler {
pub fn new(sender: Arc<Mutex<Sender<Job>>>) -> Self {
pub fn new(sender: Arc<Mutex<Sender<Message>>>) -> Self {
Self { sender }
}
fn manage_message(&self, message: Message) -> Result<(), HandlerError> {
match message {
Message::CheckDeploy(body) => {
let guard = self.sender.lock().unwrap();
let id: i32 = body.get_id().try_into().unwrap();
if let Err(e) = guard.send(Job::new(id, JobAction::MegaportCheckDeploy)) {
println!("[handler(deploy)] error: {}", e);
match message.get_subject() {
Subject::Action(action) => match action {
JobAction::Check => {
let guard = self.sender.lock().unwrap();
if let Err(e) = guard.send(message) {
println!("[handler(deploy)] error: {}", e);
}
Ok(())
}
Ok(())
}
Message::StopManager => Err(HandlerError::Unknown),
JobAction::Deploy => Ok(()),
},
Subject::StopManager => Err(HandlerError::Unknown),
Subject::Empty => Ok(()),
}
}
}

View File

@ -1,10 +1,10 @@
use std::collections::VecDeque;
use std::marker::{Send, Sync};
use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex};
use std::thread;
use super::handler::Handler;
use crate::error::HandlerError;
use crate::message::Message;
use crate::model::*;
// Queue is a simple queue data structure.
@ -60,24 +60,22 @@ impl Worker {
}
// Manager is a pool of workers and holds a queue containing `Job` to run.
//
// TODO: to handle more message, replace the trait with a `Message` trait.
pub struct Manager<T: Runner> {
pub struct Manager {
name: String,
workers: Vec<Arc<Mutex<Worker>>>,
queue: Arc<Queue<T>>,
queue: Arc<Queue<Message>>,
}
impl<T: Runner + Send + 'static> Manager<T> {
impl Manager {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
workers: vec![],
queue: Arc::new(Queue::<T>::new()),
queue: Arc::new(Queue::new()),
}
}
pub fn launch_workers<U: Handler + Sync + Send + 'static>(
pub fn launch_workers<T: Runner, U: Handler>(
&mut self,
nb_workers: u32,
shared_handler: Arc<U>,
@ -93,9 +91,9 @@ impl<T: Runner + Send + 'static> Manager<T> {
thread::spawn(move || loop {
let mut guard = queue.content.lock().unwrap();
let mut runner = loop {
if let Some(r) = guard.pop_front() {
break r;
let message = loop {
if let Some(m) = guard.pop_front() {
break m;
} else {
guard = queue.not_empty.wait(guard).unwrap();
}
@ -103,23 +101,31 @@ impl<T: Runner + Send + 'static> Manager<T> {
drop(guard);
Manager::<T>::set_worker_status(&worker, WorkerStatus::Running);
let mut runner = match T::try_from(message) {
Ok(r) => r,
Err(_e) => {
eprintln!("unable to parse the incoming message into a runner");
continue;
}
};
Manager::set_worker_status(&worker, WorkerStatus::Running);
if let Err(e) = handler.handle(runner.run()) {
match e {
HandlerError::Unknown => {
Manager::<T>::set_worker_status(&worker, WorkerStatus::Failed);
Manager::set_worker_status(&worker, WorkerStatus::Failed);
break;
}
_ => (),
}
}
Manager::<T>::set_worker_status(&worker, WorkerStatus::Pending);
Manager::set_worker_status(&worker, WorkerStatus::Pending);
});
}
}
// subscribe subscribes to a `Receiver` channel.
pub fn subscribe(&self, receiver: Receiver<T>) {
pub fn subscribe(&self, receiver: Receiver<Message>) {
let queue = self.queue.clone();
thread::spawn(move || loop {
match receiver.recv() {
@ -138,9 +144,9 @@ impl<T: Runner + Send + 'static> Manager<T> {
});
}
pub fn add_runner(&self, runner: T) {
pub fn put_message(&self, message: Message) {
let mut q = self.queue.content.lock().unwrap();
q.push_back(runner);
q.push_back(message);
self.queue.not_empty.notify_one();
}