move main engine in worker

This commit is contained in:
rmanach 2023-04-18 12:57:43 +02:00
parent 64d91f4cbf
commit 4906b397ba
13 changed files with 103 additions and 139 deletions

26
Cargo.lock generated
View File

@ -77,6 +77,19 @@ dependencies = [
"typenum",
]
[[package]]
name = "deployer"
version = "0.1.0"
dependencies = [
"diesel",
"lazy_static",
"postgres",
"r2d2",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "diesel"
version = "2.0.3"
@ -114,19 +127,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "dispatcher"
version = "0.1.0"
dependencies = [
"diesel",
"lazy_static",
"postgres",
"r2d2",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"

View File

@ -1,5 +1,5 @@
[package]
name = "dispatcher"
name = "deployer"
version = "0.1.0"
edition = "2021"

View File

@ -1,7 +1,7 @@
format:
cargo-fmt
run: format
run: format db-start
cargo run
build: format

View File

@ -5,6 +5,8 @@ use diesel::r2d2::ConnectionManager;
use lazy_static::lazy_static;
type Pool = r2d2::Pool<ConnectionManager<PgConnection>>;
#[allow(dead_code)]
pub type DbConnection = r2d2::PooledConnection<ConnectionManager<PgConnection>>;
lazy_static! {
@ -19,6 +21,7 @@ pub fn init_database_pool() {
lazy_static::initialize(&POOL);
}
#[allow(dead_code)]
pub fn get_connection() -> Result<DbConnection, String> {
match POOL.get() {
Ok(conn) => Ok(conn),

View File

@ -1,3 +1,3 @@
mod client;
pub use client::{get_connection, init_database_pool};
pub use client::init_database_pool;

View File

@ -1,6 +1,7 @@
use thiserror::Error;
#[derive(Error, Debug)]
#[allow(dead_code)]
pub enum StorerError {
#[error("unable to insert `{0}`")]
InsertError(String),

View File

@ -1,80 +1,26 @@
mod database;
mod error;
mod message;
mod model;
mod queue;
mod worker;
use std::sync::mpsc::{channel, Sender};
use std::thread;
use std::time;
use database::{get_connection, init_database_pool};
use message::Message;
use queue::QueueController;
struct Dispatch<T> {
ch_sender: Sender<T>,
}
impl<T: std::fmt::Debug + std::marker::Send + 'static> Dispatch<T> {
fn new() -> Self {
let (sender, receiver) = channel::<T>();
thread::spawn(move || loop {
match receiver.recv() {
Ok(m) => {
println!("[dispatch] treating message: {:?}", m);
}
Err(e) => {
eprintln!("[dispatch] error occurred: {:?}", e);
break;
}
}
});
Self { ch_sender: sender }
}
fn send(&self, message: T) -> Result<(), String> {
match self.ch_sender.send(message) {
Ok(_) => Ok(()),
Err(e) => Err(format!("[dispatch] unable to send the message: {}", e)),
}
}
}
use database::init_database_pool;
use model::{Job, JobAction};
use worker::Manager;
fn main() {
init_database_pool();
let dispatch: Dispatch<Message> = Dispatch::new();
let mut m: Manager<Job> = Manager::new("deploy");
m.launch_workers(5);
for i in 0..50 {
let job = Job::new(i, JobAction::MegaportDeploy);
m.add_runner(job);
}
let wait = time::Duration::from_secs(2);
dispatch.send(Message::command(1, "hello world!")).unwrap();
dispatch
.send(Message::command(1, "arf... wrong type!"))
.unwrap();
dispatch.send(Message::command(1, "...")).unwrap();
let controller = QueueController::new();
controller.launch();
controller.put_message(Message::command(1, "q1"));
controller.put_message(Message::command(1, "q1"));
controller.put_message(Message::command(1, "q1"));
controller.put_message(Message::command(1, "q1"));
controller.put_message(Message::command(1, "q1"));
thread::sleep(wait);
controller.put_message(Message::command(1, "q2"));
controller.put_message(Message::command(1, "q3"));
controller.put_message(Message::command(1, "q4"));
thread::sleep(wait);
controller.put_message(Message::QueueStop);
thread::sleep(wait);
}

View File

@ -1,11 +1,9 @@
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum Message {
Command(MessageBody),
QueueStop,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct MessageBody {
id: u32,

View File

@ -1,11 +1,10 @@
use crate::database::get_connection;
use std::time::Duration;
use std::{thread, time};
use super::{Runner, RunnerStatus, Storer};
use crate::error::StorerError;
#[derive(Debug)]
#[allow(dead_code)]
pub enum JobAction {
MegaportDeploy,
MegaportUndeploy,
@ -40,16 +39,28 @@ impl Runner for Job {
match self.action {
JobAction::MegaportDeploy => {
println!("[job({:?})] deploying megaport...", self);
println!(
"[job({} - {:?} - {:?})] deploying megaport...",
self.id, self.action, self.status
);
}
JobAction::MegaportUndeploy => {
println!("[job({:?})] undeploying megaport...", self);
println!(
"[job({} - {:?} - {:?})] undeploying megaport...",
self.id, self.action, self.status
);
}
JobAction::MegaportCheckDeploy => {
println!("[job({:?})] ckecking megaport deployment...", self);
println!(
"[job({} - {:?} - {:?})] checking megaport deployement...",
self.id, self.action, self.status
);
}
JobAction::MegaportCheckUndeploy => {
println!("[job({:?})] ckecking megaport undeployment...", self);
println!(
"[job({} - {:?} - {:?})] checking megaport undeployement...",
self.id, self.action, self.status
);
}
}

View File

@ -1,6 +1,7 @@
use crate::error::StorerError;
#[derive(Debug)]
#[allow(dead_code)]
pub enum RunnerStatus {
Pending,
Running,

View File

@ -12,6 +12,37 @@ lazy_static! {
static ref QUEUE_NOT_EMPTY: Condvar = Condvar::new();
}
struct Dispatch<T> {
ch_sender: Sender<T>,
}
impl<T: std::fmt::Debug + std::marker::Send + 'static> Dispatch<T> {
fn new() -> Self {
let (sender, receiver) = channel::<T>();
thread::spawn(move || loop {
match receiver.recv() {
Ok(m) => {
println!("[dispatch] treating message: {:?}", m);
}
Err(e) => {
eprintln!("[dispatch] error occurred: {:?}", e);
break;
}
}
});
Self { ch_sender: sender }
}
fn send(&self, message: T) -> Result<(), String> {
match self.ch_sender.send(message) {
Ok(_) => Ok(()),
Err(e) => Err(format!("[dispatch] unable to send the message: {}", e)),
}
}
}
#[derive(Copy, Clone, PartialEq)]
pub enum QueueStatus {
Pending,

View File

@ -1 +1,3 @@
mod worker;
pub use worker::{Manager, Queue};

View File

@ -1,35 +1,25 @@
use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time;
use std::time::Duration;
use crate::message::Message;
use crate::model::*;
use crate::queue::QueueStatus;
struct Queue<T: Runner> {
pub struct Queue<T> {
content: Mutex<VecDeque<T>>,
not_empty: Condvar,
}
impl<T: Runner> Queue<T> {
fn new() -> Self {
impl<T> Queue<T> {
pub fn new() -> Self {
Self {
content: Mutex::new(VecDeque::new()),
content: Mutex::new(VecDeque::<T>::new()),
not_empty: Condvar::new(),
}
}
pub fn add_runner(&self, runner: T) {
let mut c = self.content.lock().unwrap();
c.push_back(runner);
self.not_empty.notify_one();
}
}
#[derive(Copy, Clone, PartialEq)]
#[allow(dead_code)]
pub enum WorkerStatus {
Pending,
Running,
@ -51,36 +41,34 @@ impl Worker {
self.status = status;
}
#[allow(dead_code)]
fn get_status(&self) -> WorkerStatus {
self.status
}
}
pub struct Manager {
pub struct Manager<T: Runner> {
name: String,
workers: Vec<Arc<Mutex<Worker>>>,
queue: Arc<Queue<T>>,
}
impl Manager {
fn new(name: &str) -> Self {
impl<T: Runner + std::marker::Send + 'static> Manager<T> {
pub fn new(name: &str) -> Self {
Self {
name: name.to_string(),
workers: vec![],
queue: Arc::new(Queue::<T>::new()),
}
}
fn launch_workers<T: Runner + std::marker::Send + 'static>(
&mut self,
nb_workers: u32,
shared_queue: Arc<Queue<T>>,
) {
pub fn launch_workers(&mut self, nb_workers: u32) {
for i in 0..nb_workers {
let shared_worker = Arc::new(Mutex::new(Worker::new()));
self.workers.push(shared_worker.clone());
let worker = shared_worker.clone();
let queue = shared_queue.clone();
let queue = self.queue.clone();
let name = self.name.clone();
thread::spawn(move || loop {
@ -102,6 +90,7 @@ impl Manager {
println!("[worker({} - {})] launching job...", name, i);
runner.run();
println!("[worker({} - {})] job done", name, i);
let mut guard = worker.lock().unwrap();
guard.set_status(WorkerStatus::Pending);
@ -110,6 +99,14 @@ impl Manager {
}
}
pub fn add_runner(&self, runner: T) {
let mut q = self.queue.content.lock().unwrap();
q.push_back(runner);
self.queue.not_empty.notify_one();
}
#[allow(dead_code)]
fn healthcheck(&self, target: WorkerStatus) -> bool {
for w in &self.workers {
if w.lock().unwrap().get_status() != target {
@ -119,29 +116,3 @@ impl Manager {
true
}
}
#[test]
fn test_manager() {
use std::time;
use std::time::Duration;
let mut m = Manager::new("deploy");
let queue_deploy = Arc::new(Queue::<Job>::new());
let queue_check = Arc::new(Queue::<Job>::new());
m.launch_workers(5, queue_deploy.clone());
assert_eq!(5, m.workers.len());
assert!(!m.healthcheck(WorkerStatus::Failed));
for i in 0..500 {
let j = Job::new(i, JobAction::MegaportDeploy);
queue_deploy.add_runner(j);
}
let wait = time::Duration::from_millis(200);
thread::sleep(wait);
assert!(m.healthcheck(WorkerStatus::Pending));
}