remove rocket dependencies for manager notifications

This commit is contained in:
rmanach 2023-04-22 15:23:33 +02:00
parent e1e8085955
commit 0f5f8a3ec0
5 changed files with 58 additions and 42 deletions

View File

@ -20,7 +20,8 @@ use notification::Notification;
use worker::{DeployHandler, Manager}; use worker::{DeployHandler, Manager};
struct GlobalState { struct GlobalState {
deployer: Manager<DeployHandler>, deployer: Manager<DeployHandler, Notification>,
notification: Arc<Notification>,
} }
#[get("/deploy/<id>")] #[get("/deploy/<id>")]
@ -33,7 +34,7 @@ async fn deploy(state: &State<GlobalState>, id: u32) -> Value {
#[get("/events")] #[get("/events")]
async fn events(state: &State<GlobalState>, end: Shutdown) -> EventStream![] { async fn events(state: &State<GlobalState>, end: Shutdown) -> EventStream![] {
state.deployer.get_events(end).await state.notification.events(end).await
} }
#[get("/health")] #[get("/health")]
@ -44,10 +45,15 @@ async fn healthcheck(state: &State<GlobalState>) -> Value {
#[rocket::main] #[rocket::main]
async fn main() -> Result<(), rocket::Error> { async fn main() -> Result<(), rocket::Error> {
let (sender, _receiver) = channel(); let (sender, _receiver) = channel();
let mut deployer: Manager<DeployHandler> = Manager::new("deploy", DeployHandler::new(sender));
let notification = Arc::new(Notification::init());
let mut deployer = Manager::new("deploy", DeployHandler::new(sender), notification.clone());
deployer.launch_workers::<Job>(5); deployer.launch_workers::<Job>(5);
let global_state = GlobalState { deployer: deployer }; let global_state = GlobalState {
deployer,
notification,
};
let _rocket = rocket::build() let _rocket = rocket::build()
.mount("/", routes![deploy, events, healthcheck]) .mount("/", routes![deploy, events, healthcheck])

View File

@ -2,4 +2,4 @@ mod job;
mod traits; mod traits;
pub use job::{Job, JobAction}; pub use job::{Job, JobAction};
pub use traits::{Runner, RunnerStatus, Storer}; pub use traits::{Notifier, Runner, RunnerStatus, Storer};

View File

@ -1,4 +1,4 @@
use std::marker::Send; use std::marker::{Send, Sync};
use crate::error::{MessageError, RunnerError, StorerError}; use crate::error::{MessageError, RunnerError, StorerError};
use crate::message::Message; use crate::message::Message;
@ -19,6 +19,10 @@ pub trait Runner: Send + 'static {
Self: Sized; Self: Sized;
} }
pub trait Notifier: Sync + Send + 'static {
fn send(&self, data: Message);
}
pub trait Storer { pub trait Storer {
fn insert(&self) -> Result<(), StorerError>; fn insert(&self) -> Result<(), StorerError>;
fn save(&self) -> Result<(), StorerError>; fn save(&self) -> Result<(), StorerError>;

View File

@ -2,7 +2,6 @@ use std::sync::{Arc, Mutex};
use rocket::{ use rocket::{
response::stream::{Event, EventStream}, response::stream::{Event, EventStream},
serde::Serialize,
tokio::{ tokio::{
select, select,
sync::{broadcast, broadcast::error::RecvError}, sync::{broadcast, broadcast::error::RecvError},
@ -10,11 +9,14 @@ use rocket::{
Shutdown, Shutdown,
}; };
pub struct Notification<T> { use crate::message::Message;
sender: Arc<Mutex<broadcast::Sender<T>>>, use crate::model::Notifier;
pub struct Notification {
sender: Arc<Mutex<broadcast::Sender<Message>>>,
} }
impl<T: Clone + Serialize> Notification<T> { impl Notification {
pub fn init() -> Self { pub fn init() -> Self {
let (sender, _) = broadcast::channel(10); let (sender, _) = broadcast::channel(10);
Self { Self {
@ -22,7 +24,7 @@ impl<T: Clone + Serialize> Notification<T> {
} }
} }
pub fn get_sender(&self) -> broadcast::Sender<T> { pub fn get_sender(&self) -> broadcast::Sender<Message> {
self.sender.lock().unwrap().clone() self.sender.lock().unwrap().clone()
} }
@ -44,3 +46,13 @@ impl<T: Clone + Serialize> Notification<T> {
} }
} }
} }
impl Notifier for Notification {
fn send(&self, data: Message) {
let guard = self.get_sender();
match guard.send(data) {
Ok(_) => (),
Err(e) => eprintln!("error while sending event: {}", e),
}
}
}

View File

@ -2,13 +2,10 @@ use std::collections::VecDeque;
use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex}; use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex};
use std::{thread, time}; use std::{thread, time};
use rocket::{response::stream::EventStream, Shutdown};
use super::handler::Handler; use super::handler::Handler;
use crate::error::HandlerError; use crate::error::HandlerError;
use crate::message::{Message, Subject}; use crate::message::{Message, Subject};
use crate::model::*; use crate::model::*;
use crate::notification::Notification;
// Queue is a simple queue data structure. // Queue is a simple queue data structure.
// //
@ -35,22 +32,22 @@ pub enum WorkerStatus {
Stopped, Stopped,
} }
struct Worker<T> { struct Worker<H, N> {
id: u32, id: u32,
manager: String, manager: String,
queue: Arc<Queue<Message>>, queue: Arc<Queue<Message>>,
status: Arc<Mutex<WorkerStatus>>, status: Arc<Mutex<WorkerStatus>>,
shared_handler: Arc<T>, shared_handler: Arc<H>,
notifier: Arc<Notification<Message>>, notifier: Arc<N>,
} }
impl<T: Handler> Worker<T> { impl<H: Handler, N: Notifier> Worker<H, N> {
fn new( fn new(
id: u32, id: u32,
manager: String, manager: String,
queue: Arc<Queue<Message>>, queue: Arc<Queue<Message>>,
shared_handler: Arc<T>, shared_handler: Arc<H>,
notifier: Arc<Notification<Message>>, notifier: Arc<N>,
) -> Self { ) -> Self {
Self { Self {
id: id, id: id,
@ -75,7 +72,7 @@ impl<T: Handler> Worker<T> {
// //
// It tries to parse the incoming `Message` into a `Runner` and runs it. // It tries to parse the incoming `Message` into a `Runner` and runs it.
// If an `Unknown` error is returned from the handler, the thread is stopped and its status is set to `Failed`. // If an `Unknown` error is returned from the handler, the thread is stopped and its status is set to `Failed`.
fn launch<U: Runner>(&self) { fn launch<R: Runner>(&self) {
let queue = self.queue.clone(); let queue = self.queue.clone();
let status = self.status.clone(); let status = self.status.clone();
let handler = self.shared_handler.clone(); let handler = self.shared_handler.clone();
@ -93,13 +90,13 @@ impl<T: Handler> Worker<T> {
}; };
if message.get_subject() == Subject::StopManager { if message.get_subject() == Subject::StopManager {
Worker::<T>::set_status(&status, WorkerStatus::Stopped); Worker::<H, N>::set_status(&status, WorkerStatus::Stopped);
break; break;
} }
drop(guard); drop(guard);
let mut runner = match U::try_from(message) { let mut runner = match R::try_from(message) {
Ok(r) => r, Ok(r) => r,
Err(_e) => { Err(_e) => {
eprintln!("unable to parse the incoming message into a runner"); eprintln!("unable to parse the incoming message into a runner");
@ -107,23 +104,21 @@ impl<T: Handler> Worker<T> {
} }
}; };
Worker::<T>::set_status(&status, WorkerStatus::Running); Worker::<H, N>::set_status(&status, WorkerStatus::Running);
// TODO: the message could be requeued if needed // TODO: the message could be requeued if needed
if let Err(e) = handler.handle(runner.run()) { if let Err(e) = handler.handle(runner.run()) {
match e { match e {
HandlerError::Unknown => { HandlerError::Unknown => {
Worker::<T>::set_status(&status, WorkerStatus::Failed); Worker::<H, N>::set_status(&status, WorkerStatus::Failed);
break; break;
} }
_ => (), _ => (),
} }
} }
// TODO: collect a message from the handler (modify the signature) // TODO: collect a message from the handler (modify the signature)
// TODO: unwrap() on send is useless since it can failed notifier.send(Message::empty());
let guard = notifier.get_sender(); Worker::<H, N>::set_status(&status, WorkerStatus::Pending);
guard.send(Message::empty()).unwrap();
Worker::<T>::set_status(&status, WorkerStatus::Pending);
}); });
} }
@ -141,18 +136,17 @@ pub enum ManagerStatus {
} }
// Manager is a pool of workers and holds a queue containing `Message`. // Manager is a pool of workers and holds a queue containing `Message`.
pub struct Manager<T> { pub struct Manager<H, N> {
name: String, name: String,
status: ManagerStatus, status: ManagerStatus,
workers: Vec<Worker<T>>, workers: Vec<Worker<H, N>>,
queue: Arc<Queue<Message>>, queue: Arc<Queue<Message>>,
shared_handler: Arc<T>, shared_handler: Arc<H>,
notifier: Arc<Notification<Message>>, notifier: Arc<N>,
} }
impl<T: Handler> Manager<T> { impl<H: Handler, N: Notifier> Manager<H, N> {
pub fn new(name: &str, shared_handler: T) -> Self { pub fn new(name: &str, shared_handler: H, notifier: Arc<N>) -> Self {
let notifier = Arc::new(Notification::<Message>::init());
Self { Self {
name: name.to_string(), name: name.to_string(),
workers: vec![], workers: vec![],
@ -171,25 +165,25 @@ impl<T: Handler> Manager<T> {
// //
// Example: // Example:
// deployer.launch_workers::<Job>(5) // deployer.launch_workers::<Job>(5)
pub fn launch_workers<U: Runner>(&mut self, nb_workers: u32) { pub fn launch_workers<R: Runner>(&mut self, nb_workers: u32) {
for i in 0..nb_workers { for i in 0..nb_workers {
let worker: Worker<T> = Worker::new( let worker: Worker<H, N> = Worker::new(
i, i,
self.name.clone(), self.name.clone(),
self.queue.clone(), self.queue.clone(),
self.shared_handler.clone(), self.shared_handler.clone(),
self.notifier.clone(), self.notifier.clone(),
); );
worker.launch::<U>(); worker.launch::<R>();
self.workers.push(worker); self.workers.push(worker);
} }
self.status = ManagerStatus::Up; self.status = ManagerStatus::Up;
} }
pub async fn get_events(&self, end: Shutdown) -> EventStream![] { // pub async fn get_events(&self, end: Shutdown) -> EventStream![] {
self.notifier.events(end).await // self.notifier.events(end).await
} // }
// subscribe subscribes to a `Receiver` channel and notify all the related workers. // subscribe subscribes to a `Receiver` channel and notify all the related workers.
pub fn subscribe(&self, receiver: Receiver<Message>) { pub fn subscribe(&self, receiver: Receiver<Message>) {