impl job worker engine

This commit is contained in:
rmanach 2023-04-17 16:53:20 +02:00
parent bf3bd31eed
commit c0c3804e5d
5 changed files with 203 additions and 7 deletions

View File

@ -1,12 +1,13 @@
mod message;
mod queue;
mod worker;
use std::sync::mpsc::{channel, Sender};
use std::thread;
use std::time;
use message::Message;
use queue::Controller;
use queue::QueueController;
struct Dispatch<T> {
ch_sender: Sender<T>,
@ -50,7 +51,7 @@ fn main() {
.unwrap();
dispatch.send(Message::command(1, "...")).unwrap();
let controller = Controller::new();
let controller = QueueController::new();
controller.launch();
controller.put_message(Message::command(1, "q1"));

View File

@ -1,3 +1,3 @@
mod queue;
pub use queue::Controller;
pub use queue::{QueueController, QueueStatus};

View File

@ -13,7 +13,7 @@ lazy_static! {
}
#[derive(Copy, Clone, PartialEq)]
enum QueueStatus {
pub enum QueueStatus {
Pending,
Running,
Stopped,
@ -37,9 +37,9 @@ impl Queue {
}
}
pub struct Controller {}
pub struct QueueController {}
impl Controller {
impl QueueController {
pub fn new() -> Self {
Self {}
}
@ -149,7 +149,9 @@ impl Controller {
#[test]
fn test_queue() {
let controller = Controller::new();
use std::time;
use std::time::Duration;
let controller = QueueController::new();
controller.launch();
controller.put_message(Message::command(1, "hello world!"));

1
src/worker/mod.rs Normal file
View File

@ -0,0 +1 @@
mod worker;

192
src/worker/worker.rs Normal file
View File

@ -0,0 +1,192 @@
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::time;
use std::time::Duration;
use crate::message::Message;
use crate::queue::QueueStatus;
struct Queue {
status: Mutex<QueueStatus>,
content: Mutex<VecDeque<Job>>,
not_empty: Condvar,
}
impl Queue {
fn new() -> Self {
Self {
status: Mutex::new(QueueStatus::Pending),
content: Mutex::new(VecDeque::new()),
not_empty: Condvar::new(),
}
}
pub fn set_status(&mut self, status: QueueStatus) {
let mut s = self.status.lock().unwrap();
*s = status;
}
pub fn add_job(&self, job: Job) {
let mut c = self.content.lock().unwrap();
c.push_back(job);
self.not_empty.notify_one();
}
}
#[derive(Copy, Clone)]
pub enum JobAction {
MegaportDeploy,
MegaportUndeploy,
MegaportCheckDeploy,
MegaportCheckUndeploy
}
pub struct Job {
id: u32,
action: JobAction,
data: String,
}
impl Job {
fn new(id: u32, action: JobAction, data: &str) -> Self {
Self {
id: id,
action: action,
data: data.to_string()
}
}
fn run(&self) {
let id = self.id.clone();
match self.action {
JobAction::MegaportDeploy => {
println!("[job({})] deploying megaport...", id);
},
JobAction::MegaportUndeploy => {
println!("[job({})] undeploying megaport...", id);
},
JobAction::MegaportCheckDeploy => {
println!("[job({})] ckecking megaport deployment...", id);
}
JobAction::MegaportCheckUndeploy => {
println!("[job({})] ckecking megaport undeployment...", id);
}
}
let wait = time::Duration::from_millis(1);
thread::sleep(wait);
}
}
#[derive(Copy, Clone, PartialEq)]
pub enum WorkerStatus {
Pending,
Running,
Failed,
}
struct Worker {
status: WorkerStatus,
}
impl Worker {
fn new() -> Self {
Self {
status: WorkerStatus::Pending,
}
}
fn set_status(&mut self, status: WorkerStatus) {
self.status = status;
}
fn get_status(&self) -> WorkerStatus {
self.status
}
}
pub struct Manager {
name: String,
workers: Vec<Arc<Mutex<Worker>>>,
}
impl Manager {
fn new(name :&str) -> Self {
Self { name: name.to_string(), workers: vec![] }
}
fn launch_workers(&mut self, nb_workers: u32, shared_queue: Arc<Queue>) {
for i in 0..nb_workers {
let shared_worker = Arc::new(Mutex::new(Worker::new()));
self.workers.push(shared_worker.clone());
let worker = shared_worker.clone();
let queue = shared_queue.clone();
let name = self.name.clone();
thread::spawn(move || {
loop {
let mut c = queue.content.lock().unwrap();
let job = loop {
if let Some(job) = c.pop_front() {
break job;
} else {
c = queue.not_empty.wait(c).unwrap();
}
};
drop(c);
let mut guard = worker.lock().unwrap();
guard.set_status(WorkerStatus::Running);
drop(guard);
println!("[worker({} - {})] launching job...", name, i);
job.run();
let mut guard = worker.lock().unwrap();
guard.set_status(WorkerStatus::Pending);
drop(guard);
}
});
}
}
fn healthcheck(&self, target: WorkerStatus) -> bool {
for w in &self.workers {
if w.lock().unwrap().get_status() != target {
return false
}
}
true
}
}
#[test]
fn test_manager() {
use std::time;
use std::time::Duration;
let mut m = Manager::new("deploy");
let queue_deploy = Arc::new(Queue::new());
let queue_check = Arc::new(Queue::new());
m.launch_workers(5, queue_deploy.clone());
assert_eq!(5, m.workers.len());
assert!(!m.healthcheck(WorkerStatus::Failed));
for i in 0..500 {
let j = Job::new(i, JobAction::MegaportDeploy, "");
queue_deploy.add_job(j);
}
let wait = time::Duration::from_millis(200);
thread::sleep(wait);
assert!(m.healthcheck(WorkerStatus::Pending));
}