add documentation + lighten manager and handler constructor

This commit is contained in:
rmanach 2023-04-19 16:24:32 +02:00
parent 9ed43e5e01
commit 45a29cf395
3 changed files with 56 additions and 33 deletions

View File

@ -4,7 +4,7 @@ mod message;
mod model; mod model;
mod worker; mod worker;
use std::sync::{mpsc::channel, Arc, Mutex}; use std::sync::mpsc::channel;
use chrono::prelude::*; use chrono::prelude::*;
use database::init_database_pool; use database::init_database_pool;
@ -19,14 +19,12 @@ fn main() {
let (sender, receiver) = channel(); let (sender, receiver) = channel();
// launch deployment workers // launch deployment workers
let mut deployer: Manager = Manager::new("deploy"); let mut deployer: Manager<DeployHandler> = Manager::new("deploy", DeployHandler::new(sender));
let deploy_handler = DeployHandler::new(Arc::new(Mutex::new(sender))); deployer.launch_workers::<Job>(5);
deployer.launch_workers::<Job, DeployHandler>(5, Arc::new(deploy_handler));
// launch checker workers // launch checker workers
let check_handler = CheckHandler::new(); let mut checker: Manager<CheckHandler> = Manager::new("checker", CheckHandler::new());
let mut checker: Manager = Manager::new("checker"); checker.launch_workers::<Job>(5);
checker.launch_workers::<Job, CheckHandler>(5, Arc::new(check_handler));
checker.subscribe(receiver); checker.subscribe(receiver);
// test message handling and subscription // test message handling and subscription
@ -35,7 +33,7 @@ fn main() {
deployer.put_message(message); deployer.put_message(message);
} }
// deployer and checker (order matters) // deployer and checker stop (order matters)
deployer.stop(); deployer.stop();
assert_eq!(0, deployer.len()); assert_eq!(0, deployer.len());

View File

@ -38,8 +38,10 @@ pub struct DeployHandler {
} }
impl DeployHandler { impl DeployHandler {
pub fn new(sender: Arc<Mutex<Sender<Message>>>) -> Self { pub fn new(sender: Sender<Message>) -> Self {
Self { sender } Self {
sender: Arc::new(Mutex::new(sender)),
}
} }
fn manage_message(&self, message: Message) -> Result<(), HandlerError> { fn manage_message(&self, message: Message) -> Result<(), HandlerError> {

View File

@ -32,20 +32,22 @@ pub enum WorkerStatus {
Stopped, Stopped,
} }
struct Worker { struct Worker<T> {
id: u32, id: u32,
manager: String, manager: String,
queue: Arc<Queue<Message>>, queue: Arc<Queue<Message>>,
status: Arc<Mutex<WorkerStatus>>, status: Arc<Mutex<WorkerStatus>>,
shared_handler: Arc<T>,
} }
impl Worker { impl<T: Handler> Worker<T> {
fn new(id: u32, manager: String, queue: Arc<Queue<Message>>) -> Self { fn new(id: u32, manager: String, queue: Arc<Queue<Message>>, shared_handler: Arc<T>) -> Self {
Self { Self {
id: id, id: id,
manager: manager, manager: manager,
queue: queue, queue: queue,
status: Arc::new(Mutex::new(WorkerStatus::Pending)), status: Arc::new(Mutex::new(WorkerStatus::Pending)),
shared_handler: shared_handler,
} }
} }
@ -58,10 +60,14 @@ impl Worker {
*guard *guard
} }
fn launch<T: Runner, U: Handler>(&self, shared_handler: Arc<U>) { // launch launches a thread to handle all the `Message` form the `queue`.
//
// It tries to parse the incoming `Message` into a `Runner` and runs it.
// If an `Unknown` error is returned from the handler, the thread is stopped and its status is set to `Failed`.
fn launch<U: Runner>(&self) {
let queue = self.queue.clone(); let queue = self.queue.clone();
let status = self.status.clone(); let status = self.status.clone();
let handler = shared_handler.clone(); let handler = self.shared_handler.clone();
thread::spawn(move || loop { thread::spawn(move || loop {
let mut guard = queue.content.lock().unwrap(); let mut guard = queue.content.lock().unwrap();
@ -75,13 +81,13 @@ impl Worker {
}; };
if message.get_subject() == Subject::StopManager { if message.get_subject() == Subject::StopManager {
Worker::set_status(&status, WorkerStatus::Stopped); Worker::<T>::set_status(&status, WorkerStatus::Stopped);
break; break;
} }
drop(guard); drop(guard);
let mut runner = match T::try_from(message) { let mut runner = match U::try_from(message) {
Ok(r) => r, Ok(r) => r,
Err(_e) => { Err(_e) => {
eprintln!("unable to parse the incoming message into a runner"); eprintln!("unable to parse the incoming message into a runner");
@ -89,17 +95,19 @@ impl Worker {
} }
}; };
Worker::set_status(&status, WorkerStatus::Running); Worker::<T>::set_status(&status, WorkerStatus::Running);
// TODO: the message could be requeued if needed
if let Err(e) = handler.handle(runner.run()) { if let Err(e) = handler.handle(runner.run()) {
match e { match e {
HandlerError::Unknown => { HandlerError::Unknown => {
Worker::set_status(&status, WorkerStatus::Failed); Worker::<T>::set_status(&status, WorkerStatus::Failed);
break; break;
} }
_ => (), _ => (),
} }
} }
Worker::set_status(&status, WorkerStatus::Pending); Worker::<T>::set_status(&status, WorkerStatus::Pending);
}); });
} }
@ -116,39 +124,50 @@ pub enum ManagerStatus {
Stopping, Stopping,
} }
// Manager is a pool of workers and holds a queue containing `Job` to run. // Manager is a pool of workers and holds a queue containing `Message`.
pub struct Manager { pub struct Manager<T> {
name: String, name: String,
status: ManagerStatus, status: ManagerStatus,
workers: Vec<Worker>, workers: Vec<Worker<T>>,
queue: Arc<Queue<Message>>, queue: Arc<Queue<Message>>,
shared_handler: Arc<T>,
} }
impl Manager { impl<T: Handler> Manager<T> {
pub fn new(name: &str) -> Self { pub fn new(name: &str, shared_handler: T) -> Self {
Self { Self {
name: name.to_string(), name: name.to_string(),
workers: vec![], workers: vec![],
status: ManagerStatus::Down, status: ManagerStatus::Down,
queue: Arc::new(Queue::new()), queue: Arc::new(Queue::new()),
shared_handler: Arc::new(shared_handler),
} }
} }
pub fn launch_workers<T: Runner, U: Handler>( // launch_workers launches a pool of workers.
&mut self, //
nb_workers: u32, // parameters:
shared_handler: Arc<U>, // * `nb_workers`: number of workers to launch
) { // * `shared_handlers`: a thread-sharable `Handler`
//
// Example:
// deployer.launch_workers::<Job>(5)
pub fn launch_workers<U: Runner>(&mut self, nb_workers: u32) {
for i in 0..nb_workers { for i in 0..nb_workers {
let worker = Worker::new(i, self.name.clone(), self.queue.clone()); let worker: Worker<T> = Worker::new(
worker.launch::<T, U>(shared_handler.clone()); i,
self.name.clone(),
self.queue.clone(),
self.shared_handler.clone(),
);
worker.launch::<U>();
self.workers.push(worker); self.workers.push(worker);
} }
self.status = ManagerStatus::Up; self.status = ManagerStatus::Up;
} }
// subscribe subscribes to a `Receiver` channel. // subscribe subscribes to a `Receiver` channel and notify all the related workers.
pub fn subscribe(&self, receiver: Receiver<Message>) { pub fn subscribe(&self, receiver: Receiver<Message>) {
let queue = self.queue.clone(); let queue = self.queue.clone();
thread::spawn(move || loop { thread::spawn(move || loop {
@ -183,6 +202,9 @@ impl Manager {
self.queue.not_empty.notify_one(); self.queue.not_empty.notify_one();
} }
// stop loops over the pool's workers and send a `Message` signaling the worker to stop.
//
// WARN: this is blocking.
pub fn stop(&mut self) { pub fn stop(&mut self) {
for _ in &self.workers { for _ in &self.workers {
self.put_message(Message::stop()); self.put_message(Message::stop());
@ -198,6 +220,7 @@ impl Manager {
self.status = ManagerStatus::Down; self.status = ManagerStatus::Down;
} }
// healthcheck checks the status of all workers.
pub fn healthcheck(&self, target: WorkerStatus) -> bool { pub fn healthcheck(&self, target: WorkerStatus) -> bool {
for w in &self.workers { for w in &self.workers {
if w.get_status() != target { if w.get_status() != target {