impl subscriber

This commit is contained in:
rmanach 2023-04-18 20:55:38 +02:00
parent abfae5babb
commit 0c3a8acae0
5 changed files with 86 additions and 15 deletions

View File

@ -4,23 +4,30 @@ mod message;
mod model;
mod worker;
use std::sync::{mpsc::channel, Arc, Mutex};
use std::thread;
use std::time;
use chrono::prelude::*;
use database::init_database_pool;
use model::{Job, JobAction};
use worker::{DeployHandler, Manager, WorkerStatus};
use worker::{CheckHandler, DeployHandler, Manager, WorkerStatus};
fn main() {
let now: DateTime<Utc> = Utc::now();
init_database_pool();
let mut deployer: Manager<Job> = Manager::new("deploy");
let (sender, receiver) = channel();
let handler = DeployHandler::new();
deployer.launch_workers(4, handler);
let mut deployer: Manager<Job> = Manager::new("deploy");
let deploy_handler = DeployHandler::new(Arc::new(Mutex::new(sender)));
deployer.launch_workers(5, Arc::new(deploy_handler));
let check_handler = CheckHandler::new();
let mut checker: Manager<Job> = Manager::new("checker");
checker.launch_workers(5, Arc::new(check_handler));
checker.subscribe(receiver);
for i in 0..5000 {
let job = Job::new(i, JobAction::MegaportDeploy);

View File

@ -48,6 +48,25 @@ impl Job {
Ok(Message::check_deploy(self.id.try_into().unwrap(), ""))
}
fn check_megaport(&mut self) -> Result<Message, String> {
self.set_status(RunnerStatus::Running);
println!(
"[job({} - {:?} - {:?})] checking megaport...",
self.id, self.action, self.status
);
let wait = time::Duration::from_millis(50);
thread::sleep(wait);
self.set_status(RunnerStatus::Success);
println!(
"[job({} - {:?} - {:?})] checking megaport done",
self.id, self.action, self.status
);
Ok(Message::check_deploy(self.id.try_into().unwrap(), ""))
}
fn set_status(&mut self, status: RunnerStatus) {
self.status = status;
}
@ -58,7 +77,7 @@ impl Runner for Job {
match self.action {
JobAction::MegaportDeploy => self.deploy_megaport(),
JobAction::MegaportUndeploy => todo!(),
JobAction::MegaportCheckDeploy => todo!(),
JobAction::MegaportCheckDeploy => self.check_megaport(),
JobAction::MegaportCheckUndeploy => todo!(),
}
}

View File

@ -1,21 +1,46 @@
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use crate::message::Message;
use crate::model::{Job, JobAction};
pub trait Handler {
fn handle(&self, res: Result<Message, String>);
}
#[derive(Clone, Copy)]
pub struct DeployHandler {}
pub struct CheckHandler {}
impl DeployHandler {
impl CheckHandler {
pub fn new() -> Self {
Self {}
}
}
impl Handler for CheckHandler {
fn handle(&self, res: Result<Message, String>) {
match res {
Ok(_m) => todo!(),
Err(_e) => todo!(),
}
}
}
pub struct DeployHandler {
sender: Arc<Mutex<Sender<Job>>>,
}
impl DeployHandler {
pub fn new(sender: Arc<Mutex<Sender<Job>>>) -> Self {
Self { sender }
}
fn manage_message(&self, message: Message) {
match message {
Message::CheckDeploy(body) => {
println!("{:?}", body)
Message::CheckDeploy(_body) => {
let guard = self.sender.lock().unwrap();
if let Err(e) = guard.send(Job::new(1, JobAction::MegaportCheckDeploy)) {
println!("[handler(deploy)] error: {}", e);
}
}
}
}

View File

@ -1,5 +1,5 @@
mod handler;
mod worker;
pub use handler::DeployHandler;
pub use handler::{CheckHandler, DeployHandler};
pub use worker::{Manager, Queue, WorkerStatus};

View File

@ -1,6 +1,6 @@
use std::collections::VecDeque;
use std::marker::Send;
use std::sync::{Arc, Condvar, Mutex};
use std::marker::{Send, Sync};
use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex};
use std::thread;
use super::handler::Handler;
@ -68,10 +68,10 @@ impl<T: Runner + Send + 'static> Manager<T> {
}
}
pub fn launch_workers<U: Handler + Send + 'static + Copy>(
pub fn launch_workers<U: Handler + Sync + Send + 'static>(
&mut self,
nb_workers: u32,
handler: U,
shared_handler: Arc<U>,
) {
for i in 0..nb_workers {
let shared_worker = Arc::new(Mutex::new(Worker::new(i, self.name.clone())));
@ -79,6 +79,7 @@ impl<T: Runner + Send + 'static> Manager<T> {
let worker = shared_worker.clone();
let queue = self.queue.clone();
let handler = shared_handler.clone();
thread::spawn(move || loop {
let mut guard = queue.content.lock().unwrap();
@ -100,6 +101,25 @@ impl<T: Runner + Send + 'static> Manager<T> {
}
}
pub fn subscribe(&self, receiver: Receiver<T>) {
let queue = self.queue.clone();
thread::spawn(move || loop {
match receiver.recv() {
Ok(j) => {
let mut guard = queue.content.lock().unwrap();
guard.push_back(j);
drop(guard);
queue.not_empty.notify_all();
}
Err(e) => {
eprintln!("[subscribe] error occurred: {:?}", e);
break;
}
}
});
}
pub fn add_runner(&self, runner: T) {
let mut q = self.queue.content.lock().unwrap();
q.push_back(runner);