From 9ed43e5e0157bb386fe0d19c1c7c24b0ba328c58 Mon Sep 17 00:00:00 2001 From: rmanach Date: Wed, 19 Apr 2023 12:57:47 +0200 Subject: [PATCH] move thread in worker + impl manager stop --- src/main.rs | 15 ++-- src/worker/worker.rs | 165 ++++++++++++++++++++++++++----------------- 2 files changed, 109 insertions(+), 71 deletions(-) diff --git a/src/main.rs b/src/main.rs index 098c56d..89a6693 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,14 +5,12 @@ mod model; mod worker; use std::sync::{mpsc::channel, Arc, Mutex}; -use std::thread; -use std::time; use chrono::prelude::*; use database::init_database_pool; use message::{Message, Subject}; use model::{Job, JobAction}; -use worker::{CheckHandler, DeployHandler, Manager, WorkerStatus}; +use worker::{CheckHandler, DeployHandler, Manager}; fn main() { let now: DateTime = Utc::now(); @@ -31,15 +29,18 @@ fn main() { checker.launch_workers::(5, Arc::new(check_handler)); checker.subscribe(receiver); + // test message handling and subscription for i in 0..500 { let message = Message::new(i, Subject::Action(JobAction::Deploy)); deployer.put_message(message); } - let wait = time::Duration::from_millis(100); - while !deployer.healthcheck(WorkerStatus::Stopped) { - thread::sleep(wait); - } + // deployer and checker (order matters) + deployer.stop(); + assert_eq!(0, deployer.len()); + + checker.stop(); + assert_eq!(0, checker.len()); let elapsed = Utc::now() - now; println!( diff --git a/src/worker/worker.rs b/src/worker/worker.rs index ddf28a4..a98d941 100644 --- a/src/worker/worker.rs +++ b/src/worker/worker.rs @@ -1,10 +1,10 @@ use std::collections::VecDeque; use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex}; -use std::thread; +use std::{thread, time}; use super::handler::Handler; use crate::error::HandlerError; -use crate::message::Message; +use crate::message::{Message, Subject}; use crate::model::*; // Queue is a simple queue data structure. @@ -25,7 +25,6 @@ impl Queue { } #[derive(Debug, Copy, Clone, PartialEq)] -#[allow(dead_code)] pub enum WorkerStatus { Pending, Running, @@ -33,36 +32,95 @@ pub enum WorkerStatus { Stopped, } -// Worker holds the computation in thread with a status. struct Worker { id: u32, manager: String, - status: WorkerStatus, + queue: Arc>, + status: Arc>, } impl Worker { - fn new(id: u32, manager: String) -> Self { + fn new(id: u32, manager: String, queue: Arc>) -> Self { Self { id: id, manager: manager, - status: WorkerStatus::Pending, + queue: queue, + status: Arc::new(Mutex::new(WorkerStatus::Pending)), } } - fn set_status(&mut self, status: WorkerStatus) { - self.status = status; + fn get_status(&self) -> WorkerStatus { + let guard = self.status.lock().unwrap(); + println!( + "[worker({} - {})] status: {:?}", + self.manager, self.id, *guard + ); + *guard } - #[allow(dead_code)] - fn get_status(&self) -> WorkerStatus { - self.status + fn launch(&self, shared_handler: Arc) { + let queue = self.queue.clone(); + let status = self.status.clone(); + let handler = shared_handler.clone(); + + thread::spawn(move || loop { + let mut guard = queue.content.lock().unwrap(); + + let message = loop { + if let Some(m) = guard.pop_front() { + break m; + } else { + guard = queue.not_empty.wait(guard).unwrap(); + } + }; + + if message.get_subject() == Subject::StopManager { + Worker::set_status(&status, WorkerStatus::Stopped); + break; + } + + drop(guard); + + let mut runner = match T::try_from(message) { + Ok(r) => r, + Err(_e) => { + eprintln!("unable to parse the incoming message into a runner"); + continue; + } + }; + + Worker::set_status(&status, WorkerStatus::Running); + if let Err(e) = handler.handle(runner.run()) { + match e { + HandlerError::Unknown => { + Worker::set_status(&status, WorkerStatus::Failed); + break; + } + _ => (), + } + } + Worker::set_status(&status, WorkerStatus::Pending); + }); } + + fn set_status(old: &Arc>, new: WorkerStatus) { + let mut guard = old.lock().unwrap(); + *guard = new; + } +} + +#[derive(PartialEq)] +pub enum ManagerStatus { + Up, + Down, + Stopping, } // Manager is a pool of workers and holds a queue containing `Job` to run. pub struct Manager { name: String, - workers: Vec>>, + status: ManagerStatus, + workers: Vec, queue: Arc>, } @@ -71,6 +129,7 @@ impl Manager { Self { name: name.to_string(), workers: vec![], + status: ManagerStatus::Down, queue: Arc::new(Queue::new()), } } @@ -81,47 +140,12 @@ impl Manager { shared_handler: Arc, ) { for i in 0..nb_workers { - let shared_worker = Arc::new(Mutex::new(Worker::new(i, self.name.clone()))); - self.workers.push(shared_worker.clone()); - - let worker = shared_worker.clone(); - let queue = self.queue.clone(); - let handler = shared_handler.clone(); - - thread::spawn(move || loop { - let mut guard = queue.content.lock().unwrap(); - - let message = loop { - if let Some(m) = guard.pop_front() { - break m; - } else { - guard = queue.not_empty.wait(guard).unwrap(); - } - }; - - drop(guard); - - let mut runner = match T::try_from(message) { - Ok(r) => r, - Err(_e) => { - eprintln!("unable to parse the incoming message into a runner"); - continue; - } - }; - - Manager::set_worker_status(&worker, WorkerStatus::Running); - if let Err(e) = handler.handle(runner.run()) { - match e { - HandlerError::Unknown => { - Manager::set_worker_status(&worker, WorkerStatus::Failed); - break; - } - _ => (), - } - } - Manager::set_worker_status(&worker, WorkerStatus::Pending); - }); + let worker = Worker::new(i, self.name.clone(), self.queue.clone()); + worker.launch::(shared_handler.clone()); + self.workers.push(worker); } + + self.status = ManagerStatus::Up; } // subscribe subscribes to a `Receiver` channel. @@ -144,29 +168,42 @@ impl Manager { }); } + pub fn len(&self) -> usize { + self.queue.content.lock().unwrap().len() + } + pub fn put_message(&self, message: Message) { + if self.status != ManagerStatus::Up { + return; + } + let mut q = self.queue.content.lock().unwrap(); q.push_back(message); self.queue.not_empty.notify_one(); } + pub fn stop(&mut self) { + for _ in &self.workers { + self.put_message(Message::stop()); + } + + self.status = ManagerStatus::Stopping; + + let wait = time::Duration::from_millis(100); + while !self.healthcheck(WorkerStatus::Stopped) { + thread::sleep(wait); + } + + self.status = ManagerStatus::Down; + } + pub fn healthcheck(&self, target: WorkerStatus) -> bool { for w in &self.workers { - if w.lock().unwrap().get_status() != target { + if w.get_status() != target { return false; } } true } - - fn set_worker_status(worker: &Arc>, status: WorkerStatus) { - let mut guard = worker.lock().unwrap(); - guard.set_status(status); - - println!( - "[worker({} - {})] status: {:?}", - guard.manager, guard.id, guard.status - ); - } }