From 0c3a8acae0c6a449cbf8a7d8a279d7bc4a35a247 Mon Sep 17 00:00:00 2001 From: rmanach Date: Tue, 18 Apr 2023 20:55:38 +0200 Subject: [PATCH] impl subscriber --- src/main.rs | 15 +++++++++++---- src/model/job.rs | 21 ++++++++++++++++++++- src/worker/handler.rs | 35 ++++++++++++++++++++++++++++++----- src/worker/mod.rs | 2 +- src/worker/worker.rs | 28 ++++++++++++++++++++++++---- 5 files changed, 86 insertions(+), 15 deletions(-) diff --git a/src/main.rs b/src/main.rs index 712fc40..62bbf6e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,23 +4,30 @@ mod message; mod model; mod worker; +use std::sync::{mpsc::channel, Arc, Mutex}; use std::thread; use std::time; use chrono::prelude::*; use database::init_database_pool; use model::{Job, JobAction}; -use worker::{DeployHandler, Manager, WorkerStatus}; +use worker::{CheckHandler, DeployHandler, Manager, WorkerStatus}; fn main() { let now: DateTime = Utc::now(); init_database_pool(); - let mut deployer: Manager = Manager::new("deploy"); + let (sender, receiver) = channel(); - let handler = DeployHandler::new(); - deployer.launch_workers(4, handler); + let mut deployer: Manager = Manager::new("deploy"); + let deploy_handler = DeployHandler::new(Arc::new(Mutex::new(sender))); + deployer.launch_workers(5, Arc::new(deploy_handler)); + + let check_handler = CheckHandler::new(); + let mut checker: Manager = Manager::new("checker"); + checker.launch_workers(5, Arc::new(check_handler)); + checker.subscribe(receiver); for i in 0..5000 { let job = Job::new(i, JobAction::MegaportDeploy); diff --git a/src/model/job.rs b/src/model/job.rs index 6419112..ee56147 100644 --- a/src/model/job.rs +++ b/src/model/job.rs @@ -48,6 +48,25 @@ impl Job { Ok(Message::check_deploy(self.id.try_into().unwrap(), "")) } + fn check_megaport(&mut self) -> Result { + self.set_status(RunnerStatus::Running); + println!( + "[job({} - {:?} - {:?})] checking megaport...", + self.id, self.action, self.status + ); + + let wait = time::Duration::from_millis(50); + thread::sleep(wait); + + self.set_status(RunnerStatus::Success); + println!( + "[job({} - {:?} - {:?})] checking megaport done", + self.id, self.action, self.status + ); + + Ok(Message::check_deploy(self.id.try_into().unwrap(), "")) + } + fn set_status(&mut self, status: RunnerStatus) { self.status = status; } @@ -58,7 +77,7 @@ impl Runner for Job { match self.action { JobAction::MegaportDeploy => self.deploy_megaport(), JobAction::MegaportUndeploy => todo!(), - JobAction::MegaportCheckDeploy => todo!(), + JobAction::MegaportCheckDeploy => self.check_megaport(), JobAction::MegaportCheckUndeploy => todo!(), } } diff --git a/src/worker/handler.rs b/src/worker/handler.rs index 92d6d02..9d17ba8 100644 --- a/src/worker/handler.rs +++ b/src/worker/handler.rs @@ -1,21 +1,46 @@ +use std::sync::mpsc::Sender; +use std::sync::{Arc, Mutex}; + use crate::message::Message; +use crate::model::{Job, JobAction}; pub trait Handler { fn handle(&self, res: Result); } -#[derive(Clone, Copy)] -pub struct DeployHandler {} +pub struct CheckHandler {} -impl DeployHandler { +impl CheckHandler { pub fn new() -> Self { Self {} } +} + +impl Handler for CheckHandler { + fn handle(&self, res: Result) { + match res { + Ok(_m) => todo!(), + Err(_e) => todo!(), + } + } +} + +pub struct DeployHandler { + sender: Arc>>, +} + +impl DeployHandler { + pub fn new(sender: Arc>>) -> Self { + Self { sender } + } fn manage_message(&self, message: Message) { match message { - Message::CheckDeploy(body) => { - println!("{:?}", body) + Message::CheckDeploy(_body) => { + let guard = self.sender.lock().unwrap(); + if let Err(e) = guard.send(Job::new(1, JobAction::MegaportCheckDeploy)) { + println!("[handler(deploy)] error: {}", e); + } } } } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 0235f12..7d66ddf 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,5 +1,5 @@ mod handler; mod worker; -pub use handler::DeployHandler; +pub use handler::{CheckHandler, DeployHandler}; pub use worker::{Manager, Queue, WorkerStatus}; diff --git a/src/worker/worker.rs b/src/worker/worker.rs index 04ffdd8..8850d05 100644 --- a/src/worker/worker.rs +++ b/src/worker/worker.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use std::marker::Send; -use std::sync::{Arc, Condvar, Mutex}; +use std::marker::{Send, Sync}; +use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex}; use std::thread; use super::handler::Handler; @@ -68,10 +68,10 @@ impl Manager { } } - pub fn launch_workers( + pub fn launch_workers( &mut self, nb_workers: u32, - handler: U, + shared_handler: Arc, ) { for i in 0..nb_workers { let shared_worker = Arc::new(Mutex::new(Worker::new(i, self.name.clone()))); @@ -79,6 +79,7 @@ impl Manager { let worker = shared_worker.clone(); let queue = self.queue.clone(); + let handler = shared_handler.clone(); thread::spawn(move || loop { let mut guard = queue.content.lock().unwrap(); @@ -100,6 +101,25 @@ impl Manager { } } + pub fn subscribe(&self, receiver: Receiver) { + let queue = self.queue.clone(); + thread::spawn(move || loop { + match receiver.recv() { + Ok(j) => { + let mut guard = queue.content.lock().unwrap(); + guard.push_back(j); + + drop(guard); + queue.not_empty.notify_all(); + } + Err(e) => { + eprintln!("[subscribe] error occurred: {:?}", e); + break; + } + } + }); + } + pub fn add_runner(&self, runner: T) { let mut q = self.queue.content.lock().unwrap(); q.push_back(runner);