From 45a29cf3959844a45678dadcf0309b223bbebf34 Mon Sep 17 00:00:00 2001 From: rmanach Date: Wed, 19 Apr 2023 16:24:32 +0200 Subject: [PATCH] add documentation + lighten manager and handler constructor --- src/main.rs | 14 ++++----- src/worker/handler.rs | 6 ++-- src/worker/worker.rs | 69 ++++++++++++++++++++++++++++--------------- 3 files changed, 56 insertions(+), 33 deletions(-) diff --git a/src/main.rs b/src/main.rs index 89a6693..64a4ac2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ mod message; mod model; mod worker; -use std::sync::{mpsc::channel, Arc, Mutex}; +use std::sync::mpsc::channel; use chrono::prelude::*; use database::init_database_pool; @@ -19,14 +19,12 @@ fn main() { let (sender, receiver) = channel(); // launch deployment workers - let mut deployer: Manager = Manager::new("deploy"); - let deploy_handler = DeployHandler::new(Arc::new(Mutex::new(sender))); - deployer.launch_workers::(5, Arc::new(deploy_handler)); + let mut deployer: Manager = Manager::new("deploy", DeployHandler::new(sender)); + deployer.launch_workers::(5); // launch checker workers - let check_handler = CheckHandler::new(); - let mut checker: Manager = Manager::new("checker"); - checker.launch_workers::(5, Arc::new(check_handler)); + let mut checker: Manager = Manager::new("checker", CheckHandler::new()); + checker.launch_workers::(5); checker.subscribe(receiver); // test message handling and subscription @@ -35,7 +33,7 @@ fn main() { deployer.put_message(message); } - // deployer and checker (order matters) + // deployer and checker stop (order matters) deployer.stop(); assert_eq!(0, deployer.len()); diff --git a/src/worker/handler.rs b/src/worker/handler.rs index 5273a84..3d161e2 100644 --- a/src/worker/handler.rs +++ b/src/worker/handler.rs @@ -38,8 +38,10 @@ pub struct DeployHandler { } impl DeployHandler { - pub fn new(sender: Arc>>) -> Self { - Self { sender } + pub fn new(sender: Sender) -> Self { + Self { + sender: Arc::new(Mutex::new(sender)), + } } fn manage_message(&self, message: Message) -> Result<(), HandlerError> { diff --git a/src/worker/worker.rs b/src/worker/worker.rs index a98d941..df441ba 100644 --- a/src/worker/worker.rs +++ b/src/worker/worker.rs @@ -32,20 +32,22 @@ pub enum WorkerStatus { Stopped, } -struct Worker { +struct Worker { id: u32, manager: String, queue: Arc>, status: Arc>, + shared_handler: Arc, } -impl Worker { - fn new(id: u32, manager: String, queue: Arc>) -> Self { +impl Worker { + fn new(id: u32, manager: String, queue: Arc>, shared_handler: Arc) -> Self { Self { id: id, manager: manager, queue: queue, status: Arc::new(Mutex::new(WorkerStatus::Pending)), + shared_handler: shared_handler, } } @@ -58,10 +60,14 @@ impl Worker { *guard } - fn launch(&self, shared_handler: Arc) { + // launch launches a thread to handle all the `Message` form the `queue`. + // + // It tries to parse the incoming `Message` into a `Runner` and runs it. + // If an `Unknown` error is returned from the handler, the thread is stopped and its status is set to `Failed`. + fn launch(&self) { let queue = self.queue.clone(); let status = self.status.clone(); - let handler = shared_handler.clone(); + let handler = self.shared_handler.clone(); thread::spawn(move || loop { let mut guard = queue.content.lock().unwrap(); @@ -75,13 +81,13 @@ impl Worker { }; if message.get_subject() == Subject::StopManager { - Worker::set_status(&status, WorkerStatus::Stopped); + Worker::::set_status(&status, WorkerStatus::Stopped); break; } drop(guard); - let mut runner = match T::try_from(message) { + let mut runner = match U::try_from(message) { Ok(r) => r, Err(_e) => { eprintln!("unable to parse the incoming message into a runner"); @@ -89,17 +95,19 @@ impl Worker { } }; - Worker::set_status(&status, WorkerStatus::Running); + Worker::::set_status(&status, WorkerStatus::Running); + + // TODO: the message could be requeued if needed if let Err(e) = handler.handle(runner.run()) { match e { HandlerError::Unknown => { - Worker::set_status(&status, WorkerStatus::Failed); + Worker::::set_status(&status, WorkerStatus::Failed); break; } _ => (), } } - Worker::set_status(&status, WorkerStatus::Pending); + Worker::::set_status(&status, WorkerStatus::Pending); }); } @@ -116,39 +124,50 @@ pub enum ManagerStatus { Stopping, } -// Manager is a pool of workers and holds a queue containing `Job` to run. -pub struct Manager { +// Manager is a pool of workers and holds a queue containing `Message`. +pub struct Manager { name: String, status: ManagerStatus, - workers: Vec, + workers: Vec>, queue: Arc>, + shared_handler: Arc, } -impl Manager { - pub fn new(name: &str) -> Self { +impl Manager { + pub fn new(name: &str, shared_handler: T) -> Self { Self { name: name.to_string(), workers: vec![], status: ManagerStatus::Down, queue: Arc::new(Queue::new()), + shared_handler: Arc::new(shared_handler), } } - pub fn launch_workers( - &mut self, - nb_workers: u32, - shared_handler: Arc, - ) { + // launch_workers launches a pool of workers. + // + // parameters: + // * `nb_workers`: number of workers to launch + // * `shared_handlers`: a thread-sharable `Handler` + // + // Example: + // deployer.launch_workers::(5) + pub fn launch_workers(&mut self, nb_workers: u32) { for i in 0..nb_workers { - let worker = Worker::new(i, self.name.clone(), self.queue.clone()); - worker.launch::(shared_handler.clone()); + let worker: Worker = Worker::new( + i, + self.name.clone(), + self.queue.clone(), + self.shared_handler.clone(), + ); + worker.launch::(); self.workers.push(worker); } self.status = ManagerStatus::Up; } - // subscribe subscribes to a `Receiver` channel. + // subscribe subscribes to a `Receiver` channel and notify all the related workers. pub fn subscribe(&self, receiver: Receiver) { let queue = self.queue.clone(); thread::spawn(move || loop { @@ -183,6 +202,9 @@ impl Manager { self.queue.not_empty.notify_one(); } + // stop loops over the pool's workers and send a `Message` signaling the worker to stop. + // + // WARN: this is blocking. pub fn stop(&mut self) { for _ in &self.workers { self.put_message(Message::stop()); @@ -198,6 +220,7 @@ impl Manager { self.status = ManagerStatus::Down; } + // healthcheck checks the status of all workers. pub fn healthcheck(&self, target: WorkerStatus) -> bool { for w in &self.workers { if w.get_status() != target {