From 4906b397bae6e8f070d54511814e94d21d9920c4 Mon Sep 17 00:00:00 2001 From: rmanach Date: Tue, 18 Apr 2023 12:57:43 +0200 Subject: [PATCH] move main engine in worker --- Cargo.lock | 26 +++++++-------- Cargo.toml | 2 +- Makefile | 2 +- src/database/client.rs | 3 ++ src/database/mod.rs | 2 +- src/error/storer.rs | 1 + src/main.rs | 74 ++++++------------------------------------ src/message/message.rs | 2 -- src/model/job.rs | 23 +++++++++---- src/model/traits.rs | 1 + src/queue/queue.rs | 31 ++++++++++++++++++ src/worker/mod.rs | 2 ++ src/worker/worker.rs | 73 +++++++++++++---------------------------- 13 files changed, 103 insertions(+), 139 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1648f73..e0be7dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -77,6 +77,19 @@ dependencies = [ "typenum", ] +[[package]] +name = "deployer" +version = "0.1.0" +dependencies = [ + "diesel", + "lazy_static", + "postgres", + "r2d2", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "diesel" version = "2.0.3" @@ -114,19 +127,6 @@ dependencies = [ "subtle", ] -[[package]] -name = "dispatcher" -version = "0.1.0" -dependencies = [ - "diesel", - "lazy_static", - "postgres", - "r2d2", - "serde", - "serde_json", - "thiserror", -] - [[package]] name = "fallible-iterator" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index fc7bf95..d844656 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "dispatcher" +name = "deployer" version = "0.1.0" edition = "2021" diff --git a/Makefile b/Makefile index 3fb4c02..cb031c7 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ format: cargo-fmt -run: format +run: format db-start cargo run build: format diff --git a/src/database/client.rs b/src/database/client.rs index a9b598f..7f9adca 100644 --- a/src/database/client.rs +++ b/src/database/client.rs @@ -5,6 +5,8 @@ use diesel::r2d2::ConnectionManager; use lazy_static::lazy_static; type Pool = r2d2::Pool>; + +#[allow(dead_code)] pub type DbConnection = r2d2::PooledConnection>; lazy_static! { @@ -19,6 +21,7 @@ pub fn init_database_pool() { lazy_static::initialize(&POOL); } +#[allow(dead_code)] pub fn get_connection() -> Result { match POOL.get() { Ok(conn) => Ok(conn), diff --git a/src/database/mod.rs b/src/database/mod.rs index a0762b8..c476fb8 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,3 +1,3 @@ mod client; -pub use client::{get_connection, init_database_pool}; +pub use client::init_database_pool; diff --git a/src/error/storer.rs b/src/error/storer.rs index 6bda9ad..45e4970 100644 --- a/src/error/storer.rs +++ b/src/error/storer.rs @@ -1,6 +1,7 @@ use thiserror::Error; #[derive(Error, Debug)] +#[allow(dead_code)] pub enum StorerError { #[error("unable to insert `{0}`")] InsertError(String), diff --git a/src/main.rs b/src/main.rs index eca4bee..6f4b115 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,80 +1,26 @@ mod database; mod error; -mod message; mod model; -mod queue; mod worker; -use std::sync::mpsc::{channel, Sender}; use std::thread; use std::time; -use database::{get_connection, init_database_pool}; -use message::Message; -use queue::QueueController; - -struct Dispatch { - ch_sender: Sender, -} - -impl Dispatch { - fn new() -> Self { - let (sender, receiver) = channel::(); - - thread::spawn(move || loop { - match receiver.recv() { - Ok(m) => { - println!("[dispatch] treating message: {:?}", m); - } - Err(e) => { - eprintln!("[dispatch] error occurred: {:?}", e); - break; - } - } - }); - - Self { ch_sender: sender } - } - - fn send(&self, message: T) -> Result<(), String> { - match self.ch_sender.send(message) { - Ok(_) => Ok(()), - Err(e) => Err(format!("[dispatch] unable to send the message: {}", e)), - } - } -} +use database::init_database_pool; +use model::{Job, JobAction}; +use worker::Manager; fn main() { init_database_pool(); - let dispatch: Dispatch = Dispatch::new(); + let mut m: Manager = Manager::new("deploy"); + m.launch_workers(5); + + for i in 0..50 { + let job = Job::new(i, JobAction::MegaportDeploy); + m.add_runner(job); + } let wait = time::Duration::from_secs(2); - - dispatch.send(Message::command(1, "hello world!")).unwrap(); - dispatch - .send(Message::command(1, "arf... wrong type!")) - .unwrap(); - dispatch.send(Message::command(1, "...")).unwrap(); - - let controller = QueueController::new(); - controller.launch(); - - controller.put_message(Message::command(1, "q1")); - controller.put_message(Message::command(1, "q1")); - controller.put_message(Message::command(1, "q1")); - controller.put_message(Message::command(1, "q1")); - controller.put_message(Message::command(1, "q1")); - - thread::sleep(wait); - - controller.put_message(Message::command(1, "q2")); - controller.put_message(Message::command(1, "q3")); - controller.put_message(Message::command(1, "q4")); - - thread::sleep(wait); - - controller.put_message(Message::QueueStop); - thread::sleep(wait); } diff --git a/src/message/message.rs b/src/message/message.rs index 06bd88c..29af5f8 100644 --- a/src/message/message.rs +++ b/src/message/message.rs @@ -1,11 +1,9 @@ -#[allow(dead_code)] #[derive(Debug, Clone)] pub enum Message { Command(MessageBody), QueueStop, } -#[allow(dead_code)] #[derive(Debug, Clone)] pub struct MessageBody { id: u32, diff --git a/src/model/job.rs b/src/model/job.rs index 03b811b..f68a4ec 100644 --- a/src/model/job.rs +++ b/src/model/job.rs @@ -1,11 +1,10 @@ -use crate::database::get_connection; -use std::time::Duration; use std::{thread, time}; use super::{Runner, RunnerStatus, Storer}; use crate::error::StorerError; #[derive(Debug)] +#[allow(dead_code)] pub enum JobAction { MegaportDeploy, MegaportUndeploy, @@ -40,16 +39,28 @@ impl Runner for Job { match self.action { JobAction::MegaportDeploy => { - println!("[job({:?})] deploying megaport...", self); + println!( + "[job({} - {:?} - {:?})] deploying megaport...", + self.id, self.action, self.status + ); } JobAction::MegaportUndeploy => { - println!("[job({:?})] undeploying megaport...", self); + println!( + "[job({} - {:?} - {:?})] undeploying megaport...", + self.id, self.action, self.status + ); } JobAction::MegaportCheckDeploy => { - println!("[job({:?})] ckecking megaport deployment...", self); + println!( + "[job({} - {:?} - {:?})] checking megaport deployement...", + self.id, self.action, self.status + ); } JobAction::MegaportCheckUndeploy => { - println!("[job({:?})] ckecking megaport undeployment...", self); + println!( + "[job({} - {:?} - {:?})] checking megaport undeployement...", + self.id, self.action, self.status + ); } } diff --git a/src/model/traits.rs b/src/model/traits.rs index 5b73864..e71fd2a 100644 --- a/src/model/traits.rs +++ b/src/model/traits.rs @@ -1,6 +1,7 @@ use crate::error::StorerError; #[derive(Debug)] +#[allow(dead_code)] pub enum RunnerStatus { Pending, Running, diff --git a/src/queue/queue.rs b/src/queue/queue.rs index 15a4c64..77ef764 100644 --- a/src/queue/queue.rs +++ b/src/queue/queue.rs @@ -12,6 +12,37 @@ lazy_static! { static ref QUEUE_NOT_EMPTY: Condvar = Condvar::new(); } +struct Dispatch { + ch_sender: Sender, +} + +impl Dispatch { + fn new() -> Self { + let (sender, receiver) = channel::(); + + thread::spawn(move || loop { + match receiver.recv() { + Ok(m) => { + println!("[dispatch] treating message: {:?}", m); + } + Err(e) => { + eprintln!("[dispatch] error occurred: {:?}", e); + break; + } + } + }); + + Self { ch_sender: sender } + } + + fn send(&self, message: T) -> Result<(), String> { + match self.ch_sender.send(message) { + Ok(_) => Ok(()), + Err(e) => Err(format!("[dispatch] unable to send the message: {}", e)), + } + } +} + #[derive(Copy, Clone, PartialEq)] pub enum QueueStatus { Pending, diff --git a/src/worker/mod.rs b/src/worker/mod.rs index afda139..560e02d 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1 +1,3 @@ mod worker; + +pub use worker::{Manager, Queue}; diff --git a/src/worker/worker.rs b/src/worker/worker.rs index e1115f0..acf04ab 100644 --- a/src/worker/worker.rs +++ b/src/worker/worker.rs @@ -1,35 +1,25 @@ use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex}; use std::thread; -use std::time; -use std::time::Duration; -use crate::message::Message; use crate::model::*; -use crate::queue::QueueStatus; -struct Queue { +pub struct Queue { content: Mutex>, not_empty: Condvar, } -impl Queue { - fn new() -> Self { +impl Queue { + pub fn new() -> Self { Self { - content: Mutex::new(VecDeque::new()), + content: Mutex::new(VecDeque::::new()), not_empty: Condvar::new(), } } - - pub fn add_runner(&self, runner: T) { - let mut c = self.content.lock().unwrap(); - c.push_back(runner); - - self.not_empty.notify_one(); - } } #[derive(Copy, Clone, PartialEq)] +#[allow(dead_code)] pub enum WorkerStatus { Pending, Running, @@ -51,36 +41,34 @@ impl Worker { self.status = status; } + #[allow(dead_code)] fn get_status(&self) -> WorkerStatus { self.status } } -pub struct Manager { +pub struct Manager { name: String, workers: Vec>>, + queue: Arc>, } -impl Manager { - fn new(name: &str) -> Self { +impl Manager { + pub fn new(name: &str) -> Self { Self { name: name.to_string(), workers: vec![], + queue: Arc::new(Queue::::new()), } } - fn launch_workers( - &mut self, - nb_workers: u32, - shared_queue: Arc>, - ) { + pub fn launch_workers(&mut self, nb_workers: u32) { for i in 0..nb_workers { let shared_worker = Arc::new(Mutex::new(Worker::new())); - self.workers.push(shared_worker.clone()); let worker = shared_worker.clone(); - let queue = shared_queue.clone(); + let queue = self.queue.clone(); let name = self.name.clone(); thread::spawn(move || loop { @@ -102,6 +90,7 @@ impl Manager { println!("[worker({} - {})] launching job...", name, i); runner.run(); + println!("[worker({} - {})] job done", name, i); let mut guard = worker.lock().unwrap(); guard.set_status(WorkerStatus::Pending); @@ -110,6 +99,14 @@ impl Manager { } } + pub fn add_runner(&self, runner: T) { + let mut q = self.queue.content.lock().unwrap(); + q.push_back(runner); + + self.queue.not_empty.notify_one(); + } + + #[allow(dead_code)] fn healthcheck(&self, target: WorkerStatus) -> bool { for w in &self.workers { if w.lock().unwrap().get_status() != target { @@ -119,29 +116,3 @@ impl Manager { true } } - -#[test] -fn test_manager() { - use std::time; - use std::time::Duration; - - let mut m = Manager::new("deploy"); - let queue_deploy = Arc::new(Queue::::new()); - let queue_check = Arc::new(Queue::::new()); - - m.launch_workers(5, queue_deploy.clone()); - - assert_eq!(5, m.workers.len()); - assert!(!m.healthcheck(WorkerStatus::Failed)); - - for i in 0..500 { - let j = Job::new(i, JobAction::MegaportDeploy); - - queue_deploy.add_runner(j); - } - - let wait = time::Duration::from_millis(200); - thread::sleep(wait); - - assert!(m.healthcheck(WorkerStatus::Pending)); -}