From 0f5f8a3ec04e88c249aa9631629dd194b06382c2 Mon Sep 17 00:00:00 2001 From: rmanach Date: Sat, 22 Apr 2023 15:23:33 +0200 Subject: [PATCH] remove rocket dependencies for manager notifications --- src/main.rs | 14 +++++--- src/model/mod.rs | 2 +- src/model/traits.rs | 6 +++- src/notification/notification.rs | 22 ++++++++++--- src/worker/worker.rs | 56 ++++++++++++++------------------ 5 files changed, 58 insertions(+), 42 deletions(-) diff --git a/src/main.rs b/src/main.rs index ebcbb80..9051a56 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,8 @@ use notification::Notification; use worker::{DeployHandler, Manager}; struct GlobalState { - deployer: Manager, + deployer: Manager, + notification: Arc, } #[get("/deploy/")] @@ -33,7 +34,7 @@ async fn deploy(state: &State, id: u32) -> Value { #[get("/events")] async fn events(state: &State, end: Shutdown) -> EventStream![] { - state.deployer.get_events(end).await + state.notification.events(end).await } #[get("/health")] @@ -44,10 +45,15 @@ async fn healthcheck(state: &State) -> Value { #[rocket::main] async fn main() -> Result<(), rocket::Error> { let (sender, _receiver) = channel(); - let mut deployer: Manager = Manager::new("deploy", DeployHandler::new(sender)); + + let notification = Arc::new(Notification::init()); + let mut deployer = Manager::new("deploy", DeployHandler::new(sender), notification.clone()); deployer.launch_workers::(5); - let global_state = GlobalState { deployer: deployer }; + let global_state = GlobalState { + deployer, + notification, + }; let _rocket = rocket::build() .mount("/", routes![deploy, events, healthcheck]) diff --git a/src/model/mod.rs b/src/model/mod.rs index 5551272..accd04d 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -2,4 +2,4 @@ mod job; mod traits; pub use job::{Job, JobAction}; -pub use traits::{Runner, RunnerStatus, Storer}; +pub use traits::{Notifier, Runner, RunnerStatus, Storer}; diff --git a/src/model/traits.rs b/src/model/traits.rs index 03c2d08..8f3e6bf 100644 --- a/src/model/traits.rs +++ b/src/model/traits.rs @@ -1,4 +1,4 @@ -use std::marker::Send; +use std::marker::{Send, Sync}; use crate::error::{MessageError, RunnerError, StorerError}; use crate::message::Message; @@ -19,6 +19,10 @@ pub trait Runner: Send + 'static { Self: Sized; } +pub trait Notifier: Sync + Send + 'static { + fn send(&self, data: Message); +} + pub trait Storer { fn insert(&self) -> Result<(), StorerError>; fn save(&self) -> Result<(), StorerError>; diff --git a/src/notification/notification.rs b/src/notification/notification.rs index ef6787e..095d180 100644 --- a/src/notification/notification.rs +++ b/src/notification/notification.rs @@ -2,7 +2,6 @@ use std::sync::{Arc, Mutex}; use rocket::{ response::stream::{Event, EventStream}, - serde::Serialize, tokio::{ select, sync::{broadcast, broadcast::error::RecvError}, @@ -10,11 +9,14 @@ use rocket::{ Shutdown, }; -pub struct Notification { - sender: Arc>>, +use crate::message::Message; +use crate::model::Notifier; + +pub struct Notification { + sender: Arc>>, } -impl Notification { +impl Notification { pub fn init() -> Self { let (sender, _) = broadcast::channel(10); Self { @@ -22,7 +24,7 @@ impl Notification { } } - pub fn get_sender(&self) -> broadcast::Sender { + pub fn get_sender(&self) -> broadcast::Sender { self.sender.lock().unwrap().clone() } @@ -44,3 +46,13 @@ impl Notification { } } } + +impl Notifier for Notification { + fn send(&self, data: Message) { + let guard = self.get_sender(); + match guard.send(data) { + Ok(_) => (), + Err(e) => eprintln!("error while sending event: {}", e), + } + } +} diff --git a/src/worker/worker.rs b/src/worker/worker.rs index 83255e9..29d3056 100644 --- a/src/worker/worker.rs +++ b/src/worker/worker.rs @@ -2,13 +2,10 @@ use std::collections::VecDeque; use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex}; use std::{thread, time}; -use rocket::{response::stream::EventStream, Shutdown}; - use super::handler::Handler; use crate::error::HandlerError; use crate::message::{Message, Subject}; use crate::model::*; -use crate::notification::Notification; // Queue is a simple queue data structure. // @@ -35,22 +32,22 @@ pub enum WorkerStatus { Stopped, } -struct Worker { +struct Worker { id: u32, manager: String, queue: Arc>, status: Arc>, - shared_handler: Arc, - notifier: Arc>, + shared_handler: Arc, + notifier: Arc, } -impl Worker { +impl Worker { fn new( id: u32, manager: String, queue: Arc>, - shared_handler: Arc, - notifier: Arc>, + shared_handler: Arc, + notifier: Arc, ) -> Self { Self { id: id, @@ -75,7 +72,7 @@ impl Worker { // // It tries to parse the incoming `Message` into a `Runner` and runs it. // If an `Unknown` error is returned from the handler, the thread is stopped and its status is set to `Failed`. - fn launch(&self) { + fn launch(&self) { let queue = self.queue.clone(); let status = self.status.clone(); let handler = self.shared_handler.clone(); @@ -93,13 +90,13 @@ impl Worker { }; if message.get_subject() == Subject::StopManager { - Worker::::set_status(&status, WorkerStatus::Stopped); + Worker::::set_status(&status, WorkerStatus::Stopped); break; } drop(guard); - let mut runner = match U::try_from(message) { + let mut runner = match R::try_from(message) { Ok(r) => r, Err(_e) => { eprintln!("unable to parse the incoming message into a runner"); @@ -107,23 +104,21 @@ impl Worker { } }; - Worker::::set_status(&status, WorkerStatus::Running); + Worker::::set_status(&status, WorkerStatus::Running); // TODO: the message could be requeued if needed if let Err(e) = handler.handle(runner.run()) { match e { HandlerError::Unknown => { - Worker::::set_status(&status, WorkerStatus::Failed); + Worker::::set_status(&status, WorkerStatus::Failed); break; } _ => (), } } // TODO: collect a message from the handler (modify the signature) - // TODO: unwrap() on send is useless since it can failed - let guard = notifier.get_sender(); - guard.send(Message::empty()).unwrap(); - Worker::::set_status(&status, WorkerStatus::Pending); + notifier.send(Message::empty()); + Worker::::set_status(&status, WorkerStatus::Pending); }); } @@ -141,18 +136,17 @@ pub enum ManagerStatus { } // Manager is a pool of workers and holds a queue containing `Message`. -pub struct Manager { +pub struct Manager { name: String, status: ManagerStatus, - workers: Vec>, + workers: Vec>, queue: Arc>, - shared_handler: Arc, - notifier: Arc>, + shared_handler: Arc, + notifier: Arc, } -impl Manager { - pub fn new(name: &str, shared_handler: T) -> Self { - let notifier = Arc::new(Notification::::init()); +impl Manager { + pub fn new(name: &str, shared_handler: H, notifier: Arc) -> Self { Self { name: name.to_string(), workers: vec![], @@ -171,25 +165,25 @@ impl Manager { // // Example: // deployer.launch_workers::(5) - pub fn launch_workers(&mut self, nb_workers: u32) { + pub fn launch_workers(&mut self, nb_workers: u32) { for i in 0..nb_workers { - let worker: Worker = Worker::new( + let worker: Worker = Worker::new( i, self.name.clone(), self.queue.clone(), self.shared_handler.clone(), self.notifier.clone(), ); - worker.launch::(); + worker.launch::(); self.workers.push(worker); } self.status = ManagerStatus::Up; } - pub async fn get_events(&self, end: Shutdown) -> EventStream![] { - self.notifier.events(end).await - } + // pub async fn get_events(&self, end: Shutdown) -> EventStream![] { + // self.notifier.events(end).await + // } // subscribe subscribes to a `Receiver` channel and notify all the related workers. pub fn subscribe(&self, receiver: Receiver) {