impl notification + add Rocket endpoints

This commit is contained in:
rmanach 2023-04-20 16:03:48 +02:00
parent 45a29cf395
commit e1e8085955
8 changed files with 1006 additions and 50 deletions

877
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -14,3 +14,7 @@ r2d2 = "0.8.10"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
[dependencies.rocket]
version = "=0.5.0-rc.3"
features = ["json"]

View File

@ -1,49 +1,58 @@
mod database;
mod error;
mod message;
mod model;
mod notification;
mod worker;
use std::sync::mpsc::channel;
use std::sync::{mpsc::channel, Arc};
use rocket::{
get,
response::stream::EventStream,
routes,
serde::json::{json, Value},
Shutdown, State,
};
use chrono::prelude::*;
use database::init_database_pool;
use message::{Message, Subject};
use model::{Job, JobAction};
use worker::{CheckHandler, DeployHandler, Manager};
use notification::Notification;
use worker::{DeployHandler, Manager};
fn main() {
let now: DateTime<Utc> = Utc::now();
struct GlobalState {
deployer: Manager<DeployHandler>,
}
init_database_pool();
let (sender, receiver) = channel();
#[get("/deploy/<id>")]
async fn deploy(state: &State<GlobalState>, id: u32) -> Value {
state
.deployer
.put_message(Message::new(id, Subject::Action(JobAction::Deploy)));
json!({"id": id, "status": "deploying"})
}
// launch deployment workers
#[get("/events")]
async fn events(state: &State<GlobalState>, end: Shutdown) -> EventStream![] {
state.deployer.get_events(end).await
}
#[get("/health")]
async fn healthcheck(state: &State<GlobalState>) -> Value {
json!({"status": state.deployer.healthcheck()})
}
#[rocket::main]
async fn main() -> Result<(), rocket::Error> {
let (sender, _receiver) = channel();
let mut deployer: Manager<DeployHandler> = Manager::new("deploy", DeployHandler::new(sender));
deployer.launch_workers::<Job>(5);
// launch checker workers
let mut checker: Manager<CheckHandler> = Manager::new("checker", CheckHandler::new());
checker.launch_workers::<Job>(5);
checker.subscribe(receiver);
let global_state = GlobalState { deployer: deployer };
// test message handling and subscription
for i in 0..500 {
let message = Message::new(i, Subject::Action(JobAction::Deploy));
deployer.put_message(message);
}
// deployer and checker stop (order matters)
deployer.stop();
assert_eq!(0, deployer.len());
checker.stop();
assert_eq!(0, checker.len());
let elapsed = Utc::now() - now;
println!(
"deployment done in : {}.{}s",
elapsed.num_seconds(),
elapsed.num_milliseconds(),
);
let _rocket = rocket::build()
.mount("/", routes![deploy, events, healthcheck])
.manage(global_state)
.launch()
.await?;
Ok(())
}

View File

@ -1,6 +1,7 @@
use crate::model::JobAction;
use rocket::serde::Serialize;
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize)]
pub enum Subject {
Action(JobAction),
StopManager,
@ -13,7 +14,7 @@ impl Subject {
}
}
#[derive(Debug)]
#[derive(Debug, Clone, Serialize)]
pub struct Body {
id: u32,
}
@ -28,6 +29,7 @@ impl Body {
}
}
#[derive(Clone, Debug, Serialize)]
pub struct Message {
subject: Subject,
body: Option<Body>,

View File

@ -1,10 +1,12 @@
use std::{thread, time};
use rocket::serde::Serialize;
use super::{Runner, RunnerStatus, Storer};
use crate::error::{MessageError, RunnerError, StorerError};
use crate::message::{Message, Subject};
#[derive(Debug, Clone, Copy, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize)]
pub enum JobAction {
Deploy,
Check,
@ -34,7 +36,7 @@ impl Job {
self.set_status(RunnerStatus::Running);
println!("{} {}", self.format_job(), "deploying...");
let wait = time::Duration::from_millis(5);
let wait = time::Duration::from_secs(5);
thread::sleep(wait);
self.set_status(RunnerStatus::Success);

3
src/notification/mod.rs Normal file
View File

@ -0,0 +1,3 @@
mod notification;
pub use notification::Notification;

View File

@ -0,0 +1,46 @@
use std::sync::{Arc, Mutex};
use rocket::{
response::stream::{Event, EventStream},
serde::Serialize,
tokio::{
select,
sync::{broadcast, broadcast::error::RecvError},
},
Shutdown,
};
pub struct Notification<T> {
sender: Arc<Mutex<broadcast::Sender<T>>>,
}
impl<T: Clone + Serialize> Notification<T> {
pub fn init() -> Self {
let (sender, _) = broadcast::channel(10);
Self {
sender: Arc::new(Mutex::new(sender)),
}
}
pub fn get_sender(&self) -> broadcast::Sender<T> {
self.sender.lock().unwrap().clone()
}
pub async fn events(&self, mut end: Shutdown) -> EventStream![] {
let mut receiver = self.get_sender().subscribe();
EventStream! {
loop {
let msg = select! {
msg = receiver.recv() => match msg {
Ok(msg) => msg,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(_)) => continue,
},
_ = &mut end => break,
};
yield Event::json(&msg);
}
}
}
}

View File

@ -2,10 +2,13 @@ use std::collections::VecDeque;
use std::sync::{mpsc::Receiver, Arc, Condvar, Mutex};
use std::{thread, time};
use rocket::{response::stream::EventStream, Shutdown};
use super::handler::Handler;
use crate::error::HandlerError;
use crate::message::{Message, Subject};
use crate::model::*;
use crate::notification::Notification;
// Queue is a simple queue data structure.
//
@ -38,16 +41,24 @@ struct Worker<T> {
queue: Arc<Queue<Message>>,
status: Arc<Mutex<WorkerStatus>>,
shared_handler: Arc<T>,
notifier: Arc<Notification<Message>>,
}
impl<T: Handler> Worker<T> {
fn new(id: u32, manager: String, queue: Arc<Queue<Message>>, shared_handler: Arc<T>) -> Self {
fn new(
id: u32,
manager: String,
queue: Arc<Queue<Message>>,
shared_handler: Arc<T>,
notifier: Arc<Notification<Message>>,
) -> Self {
Self {
id: id,
manager: manager,
queue: queue,
status: Arc::new(Mutex::new(WorkerStatus::Pending)),
shared_handler: shared_handler,
notifier: notifier,
}
}
@ -68,6 +79,7 @@ impl<T: Handler> Worker<T> {
let queue = self.queue.clone();
let status = self.status.clone();
let handler = self.shared_handler.clone();
let notifier = self.notifier.clone();
thread::spawn(move || loop {
let mut guard = queue.content.lock().unwrap();
@ -107,6 +119,10 @@ impl<T: Handler> Worker<T> {
_ => (),
}
}
// TODO: collect a message from the handler (modify the signature)
// TODO: unwrap() on send is useless since it can failed
let guard = notifier.get_sender();
guard.send(Message::empty()).unwrap();
Worker::<T>::set_status(&status, WorkerStatus::Pending);
});
}
@ -131,16 +147,19 @@ pub struct Manager<T> {
workers: Vec<Worker<T>>,
queue: Arc<Queue<Message>>,
shared_handler: Arc<T>,
notifier: Arc<Notification<Message>>,
}
impl<T: Handler> Manager<T> {
pub fn new(name: &str, shared_handler: T) -> Self {
let notifier = Arc::new(Notification::<Message>::init());
Self {
name: name.to_string(),
workers: vec![],
status: ManagerStatus::Down,
queue: Arc::new(Queue::new()),
shared_handler: Arc::new(shared_handler),
notifier: notifier,
}
}
@ -159,6 +178,7 @@ impl<T: Handler> Manager<T> {
self.name.clone(),
self.queue.clone(),
self.shared_handler.clone(),
self.notifier.clone(),
);
worker.launch::<U>();
self.workers.push(worker);
@ -167,6 +187,10 @@ impl<T: Handler> Manager<T> {
self.status = ManagerStatus::Up;
}
pub async fn get_events(&self, end: Shutdown) -> EventStream![] {
self.notifier.events(end).await
}
// subscribe subscribes to a `Receiver` channel and notify all the related workers.
pub fn subscribe(&self, receiver: Receiver<Message>) {
let queue = self.queue.clone();
@ -213,7 +237,7 @@ impl<T: Handler> Manager<T> {
self.status = ManagerStatus::Stopping;
let wait = time::Duration::from_millis(100);
while !self.healthcheck(WorkerStatus::Stopped) {
while !self.check_status(WorkerStatus::Stopped) {
thread::sleep(wait);
}
@ -221,7 +245,16 @@ impl<T: Handler> Manager<T> {
}
// healthcheck checks the status of all workers.
pub fn healthcheck(&self, target: WorkerStatus) -> bool {
pub fn healthcheck(&self) -> bool {
for w in &self.workers {
if w.get_status() == WorkerStatus::Stopped || w.get_status() == WorkerStatus::Failed {
return false;
}
}
true
}
fn check_status(&self, target: WorkerStatus) -> bool {
for w in &self.workers {
if w.get_status() != target {
return false;