move thread in worker + impl manager stop

This commit is contained in:
rmanach 2023-04-19 12:57:47 +02:00
parent e38a8611f1
commit 9ed43e5e01
2 changed files with 109 additions and 71 deletions

View File

@ -5,14 +5,12 @@ mod model;
mod worker; mod worker;
use std::sync::{mpsc::channel, Arc, Mutex}; use std::sync::{mpsc::channel, Arc, Mutex};
use std::thread;
use std::time;
use chrono::prelude::*; use chrono::prelude::*;
use database::init_database_pool; use database::init_database_pool;
use message::{Message, Subject}; use message::{Message, Subject};
use model::{Job, JobAction}; use model::{Job, JobAction};
use worker::{CheckHandler, DeployHandler, Manager, WorkerStatus}; use worker::{CheckHandler, DeployHandler, Manager};
fn main() { fn main() {
let now: DateTime<Utc> = Utc::now(); let now: DateTime<Utc> = Utc::now();
@ -31,15 +29,18 @@ fn main() {
checker.launch_workers::<Job, CheckHandler>(5, Arc::new(check_handler)); checker.launch_workers::<Job, CheckHandler>(5, Arc::new(check_handler));
checker.subscribe(receiver); checker.subscribe(receiver);
// test message handling and subscription
for i in 0..500 { for i in 0..500 {
let message = Message::new(i, Subject::Action(JobAction::Deploy)); let message = Message::new(i, Subject::Action(JobAction::Deploy));
deployer.put_message(message); deployer.put_message(message);
} }
let wait = time::Duration::from_millis(100); // deployer and checker (order matters)
while !deployer.healthcheck(WorkerStatus::Stopped) { deployer.stop();
thread::sleep(wait); assert_eq!(0, deployer.len());
}
checker.stop();
assert_eq!(0, checker.len());
let elapsed = Utc::now() - now; let elapsed = Utc::now() - now;
println!( println!(

View File

@ -1,10 +1,10 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex}; use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex};
use std::thread; use std::{thread, time};
use super::handler::Handler; use super::handler::Handler;
use crate::error::HandlerError; use crate::error::HandlerError;
use crate::message::Message; use crate::message::{Message, Subject};
use crate::model::*; use crate::model::*;
// Queue is a simple queue data structure. // Queue is a simple queue data structure.
@ -25,7 +25,6 @@ impl<T> Queue<T> {
} }
#[derive(Debug, Copy, Clone, PartialEq)] #[derive(Debug, Copy, Clone, PartialEq)]
#[allow(dead_code)]
pub enum WorkerStatus { pub enum WorkerStatus {
Pending, Pending,
Running, Running,
@ -33,36 +32,95 @@ pub enum WorkerStatus {
Stopped, Stopped,
} }
// Worker holds the computation in thread with a status.
struct Worker { struct Worker {
id: u32, id: u32,
manager: String, manager: String,
status: WorkerStatus, queue: Arc<Queue<Message>>,
status: Arc<Mutex<WorkerStatus>>,
} }
impl Worker { impl Worker {
fn new(id: u32, manager: String) -> Self { fn new(id: u32, manager: String, queue: Arc<Queue<Message>>) -> Self {
Self { Self {
id: id, id: id,
manager: manager, manager: manager,
status: WorkerStatus::Pending, queue: queue,
status: Arc::new(Mutex::new(WorkerStatus::Pending)),
} }
} }
fn set_status(&mut self, status: WorkerStatus) { fn get_status(&self) -> WorkerStatus {
self.status = status; let guard = self.status.lock().unwrap();
println!(
"[worker({} - {})] status: {:?}",
self.manager, self.id, *guard
);
*guard
} }
#[allow(dead_code)] fn launch<T: Runner, U: Handler>(&self, shared_handler: Arc<U>) {
fn get_status(&self) -> WorkerStatus { let queue = self.queue.clone();
self.status let status = self.status.clone();
let handler = shared_handler.clone();
thread::spawn(move || loop {
let mut guard = queue.content.lock().unwrap();
let message = loop {
if let Some(m) = guard.pop_front() {
break m;
} else {
guard = queue.not_empty.wait(guard).unwrap();
}
};
if message.get_subject() == Subject::StopManager {
Worker::set_status(&status, WorkerStatus::Stopped);
break;
}
drop(guard);
let mut runner = match T::try_from(message) {
Ok(r) => r,
Err(_e) => {
eprintln!("unable to parse the incoming message into a runner");
continue;
}
};
Worker::set_status(&status, WorkerStatus::Running);
if let Err(e) = handler.handle(runner.run()) {
match e {
HandlerError::Unknown => {
Worker::set_status(&status, WorkerStatus::Failed);
break;
}
_ => (),
}
}
Worker::set_status(&status, WorkerStatus::Pending);
});
} }
fn set_status(old: &Arc<Mutex<WorkerStatus>>, new: WorkerStatus) {
let mut guard = old.lock().unwrap();
*guard = new;
}
}
#[derive(PartialEq)]
pub enum ManagerStatus {
Up,
Down,
Stopping,
} }
// Manager is a pool of workers and holds a queue containing `Job` to run. // Manager is a pool of workers and holds a queue containing `Job` to run.
pub struct Manager { pub struct Manager {
name: String, name: String,
workers: Vec<Arc<Mutex<Worker>>>, status: ManagerStatus,
workers: Vec<Worker>,
queue: Arc<Queue<Message>>, queue: Arc<Queue<Message>>,
} }
@ -71,6 +129,7 @@ impl Manager {
Self { Self {
name: name.to_string(), name: name.to_string(),
workers: vec![], workers: vec![],
status: ManagerStatus::Down,
queue: Arc::new(Queue::new()), queue: Arc::new(Queue::new()),
} }
} }
@ -81,47 +140,12 @@ impl Manager {
shared_handler: Arc<U>, shared_handler: Arc<U>,
) { ) {
for i in 0..nb_workers { for i in 0..nb_workers {
let shared_worker = Arc::new(Mutex::new(Worker::new(i, self.name.clone()))); let worker = Worker::new(i, self.name.clone(), self.queue.clone());
self.workers.push(shared_worker.clone()); worker.launch::<T, U>(shared_handler.clone());
self.workers.push(worker);
let worker = shared_worker.clone();
let queue = self.queue.clone();
let handler = shared_handler.clone();
thread::spawn(move || loop {
let mut guard = queue.content.lock().unwrap();
let message = loop {
if let Some(m) = guard.pop_front() {
break m;
} else {
guard = queue.not_empty.wait(guard).unwrap();
}
};
drop(guard);
let mut runner = match T::try_from(message) {
Ok(r) => r,
Err(_e) => {
eprintln!("unable to parse the incoming message into a runner");
continue;
}
};
Manager::set_worker_status(&worker, WorkerStatus::Running);
if let Err(e) = handler.handle(runner.run()) {
match e {
HandlerError::Unknown => {
Manager::set_worker_status(&worker, WorkerStatus::Failed);
break;
}
_ => (),
}
}
Manager::set_worker_status(&worker, WorkerStatus::Pending);
});
} }
self.status = ManagerStatus::Up;
} }
// subscribe subscribes to a `Receiver` channel. // subscribe subscribes to a `Receiver` channel.
@ -144,29 +168,42 @@ impl Manager {
}); });
} }
pub fn len(&self) -> usize {
self.queue.content.lock().unwrap().len()
}
pub fn put_message(&self, message: Message) { pub fn put_message(&self, message: Message) {
if self.status != ManagerStatus::Up {
return;
}
let mut q = self.queue.content.lock().unwrap(); let mut q = self.queue.content.lock().unwrap();
q.push_back(message); q.push_back(message);
self.queue.not_empty.notify_one(); self.queue.not_empty.notify_one();
} }
pub fn stop(&mut self) {
for _ in &self.workers {
self.put_message(Message::stop());
}
self.status = ManagerStatus::Stopping;
let wait = time::Duration::from_millis(100);
while !self.healthcheck(WorkerStatus::Stopped) {
thread::sleep(wait);
}
self.status = ManagerStatus::Down;
}
pub fn healthcheck(&self, target: WorkerStatus) -> bool { pub fn healthcheck(&self, target: WorkerStatus) -> bool {
for w in &self.workers { for w in &self.workers {
if w.lock().unwrap().get_status() != target { if w.get_status() != target {
return false; return false;
} }
} }
true true
} }
fn set_worker_status(worker: &Arc<Mutex<Worker>>, status: WorkerStatus) {
let mut guard = worker.lock().unwrap();
guard.set_status(status);
println!(
"[worker({} - {})] status: {:?}",
guard.manager, guard.id, guard.status
);
}
} }