diff --git a/src/main.rs b/src/main.rs index 5f1f5ea..1cf4603 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,13 @@ mod message; mod queue; +mod worker; use std::sync::mpsc::{channel, Sender}; use std::thread; use std::time; use message::Message; -use queue::Controller; +use queue::QueueController; struct Dispatch { ch_sender: Sender, @@ -50,7 +51,7 @@ fn main() { .unwrap(); dispatch.send(Message::command(1, "...")).unwrap(); - let controller = Controller::new(); + let controller = QueueController::new(); controller.launch(); controller.put_message(Message::command(1, "q1")); diff --git a/src/queue/mod.rs b/src/queue/mod.rs index a0be5d6..6126895 100644 --- a/src/queue/mod.rs +++ b/src/queue/mod.rs @@ -1,3 +1,3 @@ mod queue; -pub use queue::Controller; +pub use queue::{QueueController, QueueStatus}; diff --git a/src/queue/queue.rs b/src/queue/queue.rs index dfaaa9f..15a4c64 100644 --- a/src/queue/queue.rs +++ b/src/queue/queue.rs @@ -13,7 +13,7 @@ lazy_static! { } #[derive(Copy, Clone, PartialEq)] -enum QueueStatus { +pub enum QueueStatus { Pending, Running, Stopped, @@ -37,9 +37,9 @@ impl Queue { } } -pub struct Controller {} +pub struct QueueController {} -impl Controller { +impl QueueController { pub fn new() -> Self { Self {} } @@ -149,7 +149,9 @@ impl Controller { #[test] fn test_queue() { - let controller = Controller::new(); + use std::time; + use std::time::Duration; + let controller = QueueController::new(); controller.launch(); controller.put_message(Message::command(1, "hello world!")); diff --git a/src/worker/mod.rs b/src/worker/mod.rs new file mode 100644 index 0000000..afda139 --- /dev/null +++ b/src/worker/mod.rs @@ -0,0 +1 @@ +mod worker; diff --git a/src/worker/worker.rs b/src/worker/worker.rs new file mode 100644 index 0000000..5826666 --- /dev/null +++ b/src/worker/worker.rs @@ -0,0 +1,192 @@ +use std::collections::VecDeque; +use std::sync::{Arc, Mutex, Condvar}; +use std::thread; +use std::time; +use std::time::Duration; + +use crate::message::Message; +use crate::queue::QueueStatus; + +struct Queue { + status: Mutex, + content: Mutex>, + not_empty: Condvar, +} + +impl Queue { + fn new() -> Self { + Self { + status: Mutex::new(QueueStatus::Pending), + content: Mutex::new(VecDeque::new()), + not_empty: Condvar::new(), + } + } + + pub fn set_status(&mut self, status: QueueStatus) { + let mut s = self.status.lock().unwrap(); + *s = status; + } + + pub fn add_job(&self, job: Job) { + let mut c = self.content.lock().unwrap(); + c.push_back(job); + + self.not_empty.notify_one(); + } +} + +#[derive(Copy, Clone)] +pub enum JobAction { + MegaportDeploy, + MegaportUndeploy, + MegaportCheckDeploy, + MegaportCheckUndeploy +} + +pub struct Job { + id: u32, + action: JobAction, + data: String, +} + +impl Job { + fn new(id: u32, action: JobAction, data: &str) -> Self { + Self { + id: id, + action: action, + data: data.to_string() + } + } + + fn run(&self) { + let id = self.id.clone(); + match self.action { + JobAction::MegaportDeploy => { + println!("[job({})] deploying megaport...", id); + }, + JobAction::MegaportUndeploy => { + println!("[job({})] undeploying megaport...", id); + }, + JobAction::MegaportCheckDeploy => { + println!("[job({})] ckecking megaport deployment...", id); + } + JobAction::MegaportCheckUndeploy => { + println!("[job({})] ckecking megaport undeployment...", id); + } + } + let wait = time::Duration::from_millis(1); + thread::sleep(wait); + } +} + +#[derive(Copy, Clone, PartialEq)] +pub enum WorkerStatus { + Pending, + Running, + Failed, +} + +struct Worker { + status: WorkerStatus, +} + +impl Worker { + fn new() -> Self { + Self { + status: WorkerStatus::Pending, + } + } + + fn set_status(&mut self, status: WorkerStatus) { + self.status = status; + } + + fn get_status(&self) -> WorkerStatus { + self.status + } +} + +pub struct Manager { + name: String, + workers: Vec>>, +} + +impl Manager { + fn new(name :&str) -> Self { + Self { name: name.to_string(), workers: vec![] } + } + + fn launch_workers(&mut self, nb_workers: u32, shared_queue: Arc) { + for i in 0..nb_workers { + let shared_worker = Arc::new(Mutex::new(Worker::new())); + + self.workers.push(shared_worker.clone()); + + let worker = shared_worker.clone(); + let queue = shared_queue.clone(); + let name = self.name.clone(); + + thread::spawn(move || { + loop { + let mut c = queue.content.lock().unwrap(); + + let job = loop { + if let Some(job) = c.pop_front() { + break job; + } else { + c = queue.not_empty.wait(c).unwrap(); + } + }; + + drop(c); + + let mut guard = worker.lock().unwrap(); + guard.set_status(WorkerStatus::Running); + drop(guard); + + println!("[worker({} - {})] launching job...", name, i); + job.run(); + + let mut guard = worker.lock().unwrap(); + guard.set_status(WorkerStatus::Pending); + drop(guard); + } + }); + } + } + + fn healthcheck(&self, target: WorkerStatus) -> bool { + for w in &self.workers { + if w.lock().unwrap().get_status() != target { + return false + } + } + true + } +} + +#[test] +fn test_manager() { + use std::time; + use std::time::Duration; + + let mut m = Manager::new("deploy"); + let queue_deploy = Arc::new(Queue::new()); + let queue_check = Arc::new(Queue::new()); + + m.launch_workers(5, queue_deploy.clone()); + + assert_eq!(5, m.workers.len()); + assert!(!m.healthcheck(WorkerStatus::Failed)); + + for i in 0..500 { + let j = Job::new(i, JobAction::MegaportDeploy, ""); + + queue_deploy.add_job(j); + } + + let wait = time::Duration::from_millis(200); + thread::sleep(wait); + + assert!(m.healthcheck(WorkerStatus::Pending)); +}