add handler

This commit is contained in:
rmanach 2023-04-18 17:34:57 +02:00
parent 35161d1c61
commit abfae5babb
4 changed files with 46 additions and 17 deletions

View File

@ -10,7 +10,7 @@ use std::time;
use chrono::prelude::*; use chrono::prelude::*;
use database::init_database_pool; use database::init_database_pool;
use model::{Job, JobAction}; use model::{Job, JobAction};
use worker::{Manager, WorkerStatus}; use worker::{DeployHandler, Manager, WorkerStatus};
fn main() { fn main() {
let now: DateTime<Utc> = Utc::now(); let now: DateTime<Utc> = Utc::now();
@ -18,7 +18,9 @@ fn main() {
init_database_pool(); init_database_pool();
let mut deployer: Manager<Job> = Manager::new("deploy"); let mut deployer: Manager<Job> = Manager::new("deploy");
deployer.launch_workers(10);
let handler = DeployHandler::new();
deployer.launch_workers(4, handler);
for i in 0..5000 { for i in 0..5000 {
let job = Job::new(i, JobAction::MegaportDeploy); let job = Job::new(i, JobAction::MegaportDeploy);

31
src/worker/handler.rs Normal file
View File

@ -0,0 +1,31 @@
use crate::message::Message;
pub trait Handler {
fn handle(&self, res: Result<Message, String>);
}
#[derive(Clone, Copy)]
pub struct DeployHandler {}
impl DeployHandler {
pub fn new() -> Self {
Self {}
}
fn manage_message(&self, message: Message) {
match message {
Message::CheckDeploy(body) => {
println!("{:?}", body)
}
}
}
}
impl Handler for DeployHandler {
fn handle(&self, res: Result<Message, String>) {
match res {
Ok(m) => self.manage_message(m),
Err(_e) => todo!(),
}
}
}

View File

@ -1,3 +1,5 @@
mod handler;
mod worker; mod worker;
pub use handler::DeployHandler;
pub use worker::{Manager, Queue, WorkerStatus}; pub use worker::{Manager, Queue, WorkerStatus};

View File

@ -1,8 +1,9 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use std::marker::Send;
use std::sync::{Arc, Condvar, Mutex}; use std::sync::{Arc, Condvar, Mutex};
use std::thread; use std::thread;
use crate::message::Message; use super::handler::Handler;
use crate::model::*; use crate::model::*;
pub struct Queue<T> { pub struct Queue<T> {
@ -58,7 +59,7 @@ pub struct Manager<T: Runner> {
queue: Arc<Queue<T>>, queue: Arc<Queue<T>>,
} }
impl<T: Runner + std::marker::Send + 'static> Manager<T> { impl<T: Runner + Send + 'static> Manager<T> {
pub fn new(name: &str) -> Self { pub fn new(name: &str) -> Self {
Self { Self {
name: name.to_string(), name: name.to_string(),
@ -67,7 +68,11 @@ impl<T: Runner + std::marker::Send + 'static> Manager<T> {
} }
} }
pub fn launch_workers(&mut self, nb_workers: u32) { pub fn launch_workers<U: Handler + Send + 'static + Copy>(
&mut self,
nb_workers: u32,
handler: U,
) {
for i in 0..nb_workers { for i in 0..nb_workers {
let shared_worker = Arc::new(Mutex::new(Worker::new(i, self.name.clone()))); let shared_worker = Arc::new(Mutex::new(Worker::new(i, self.name.clone())));
self.workers.push(shared_worker.clone()); self.workers.push(shared_worker.clone());
@ -89,10 +94,7 @@ impl<T: Runner + std::marker::Send + 'static> Manager<T> {
drop(guard); drop(guard);
Manager::<T>::set_worker_status(&worker, WorkerStatus::Running); Manager::<T>::set_worker_status(&worker, WorkerStatus::Running);
match runner.run() { handler.handle(runner.run());
Ok(m) => Manager::<T>::manage_message(m),
Err(_e) => todo!(),
}
Manager::<T>::set_worker_status(&worker, WorkerStatus::Pending); Manager::<T>::set_worker_status(&worker, WorkerStatus::Pending);
}); });
} }
@ -114,14 +116,6 @@ impl<T: Runner + std::marker::Send + 'static> Manager<T> {
true true
} }
fn manage_message(message: Message) {
match message {
Message::CheckDeploy(body) => {
println!("{:?}", body)
}
}
}
fn set_worker_status(worker: &Arc<Mutex<Worker>>, status: WorkerStatus) { fn set_worker_status(worker: &Arc<Mutex<Worker>>, status: WorkerStatus) {
let mut guard = worker.lock().unwrap(); let mut guard = worker.lock().unwrap();
guard.set_status(status); guard.set_status(status);