add a queue wrapper for management

This commit is contained in:
rmanach 2023-04-17 14:40:04 +02:00
parent c42ae39183
commit bf3bd31eed
6 changed files with 196 additions and 128 deletions

View File

@ -5,4 +5,7 @@ run: format
cargo run cargo run
build: format build: format
cargo build --release cargo build --release
test:
cargo test -- --nocapture

View File

@ -1,10 +1,12 @@
mod message; mod message;
mod queue;
use std::sync::mpsc::{channel, Sender}; use std::sync::mpsc::{channel, Sender};
use std::thread; use std::thread;
use std::time; use std::time;
use message::{Message, QueueMessage}; use message::Message;
use queue::Controller;
struct Dispatch<T> { struct Dispatch<T> {
ch_sender: Sender<T>, ch_sender: Sender<T>,
@ -35,10 +37,6 @@ impl<T: std::fmt::Debug + std::marker::Send + 'static> Dispatch<T> {
Err(e) => Err(format!("[dispatch] unable to send the message: {}", e)), Err(e) => Err(format!("[dispatch] unable to send the message: {}", e)),
} }
} }
fn get_sender(&self) -> Sender<T> {
self.ch_sender.clone()
}
} }
fn main() { fn main() {
@ -52,23 +50,24 @@ fn main() {
.unwrap(); .unwrap();
dispatch.send(Message::command(1, "...")).unwrap(); dispatch.send(Message::command(1, "...")).unwrap();
let qm = QueueMessage::new(dispatch.get_sender()); let controller = Controller::new();
controller.launch();
qm.add(Message::command(1, "q1")); controller.put_message(Message::command(1, "q1"));
qm.add(Message::command(1, "q1")); controller.put_message(Message::command(1, "q1"));
qm.add(Message::command(1, "q1")); controller.put_message(Message::command(1, "q1"));
qm.add(Message::command(1, "q1")); controller.put_message(Message::command(1, "q1"));
qm.add(Message::command(1, "q1")); controller.put_message(Message::command(1, "q1"));
thread::sleep(wait); thread::sleep(wait);
qm.add(Message::command(1, "q2")); controller.put_message(Message::command(1, "q2"));
qm.add(Message::command(1, "q3")); controller.put_message(Message::command(1, "q3"));
qm.add(Message::command(1, "q4")); controller.put_message(Message::command(1, "q4"));
thread::sleep(wait); thread::sleep(wait);
qm.add(Message::QueueStop); controller.put_message(Message::QueueStop);
thread::sleep(wait); thread::sleep(wait);
} }

View File

@ -1,23 +1,3 @@
use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::sync::{
mpsc::{channel, Receiver, Sender},
Condvar, Mutex,
};
use std::thread;
lazy_static! {
static ref QUEUE: Mutex<VecDeque<Message>> = Mutex::new(VecDeque::new());
static ref QUEUE_NOT_EMPTY: Condvar = Condvar::new();
}
#[derive(Copy, Clone, PartialEq)]
enum QueueStatus {
Pending,
Running,
Stopped,
}
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Message { pub enum Message {
@ -41,94 +21,3 @@ impl Message {
Message::Command(body) Message::Command(body)
} }
} }
pub struct QueueMessage {
status: Mutex<QueueStatus>,
}
impl QueueMessage {
pub fn new(ws: Sender<Message>) -> Self {
let (sender, receiver) = channel::<Message>();
let mut queue_message = QueueMessage {
status: Mutex::new(QueueStatus::Pending),
};
queue_message.launch_handler(ws, receiver);
queue_message.launch_queue(sender);
queue_message
}
fn launch_handler(&self, ws: Sender<Message>, receiver: Receiver<Message>) {
thread::spawn(move || loop {
match receiver.recv() {
Ok(m) => {
match m {
Message::Command(_) => {
ws.send(m).unwrap()
}
Message::QueueStop => break,
}
}
Err(e) => {
eprintln!("[queue] error occurred: {:?}", e);
// self.set_status(QueueStatus::Stopped);
break;
}
}
});
}
fn launch_queue(&mut self, sender: Sender<Message>) {
thread::spawn(move || loop {
let mut q = QUEUE.lock().unwrap();
let message = loop {
if let Some(message) = q.pop_front() {
break message;
} else {
q = QUEUE_NOT_EMPTY.wait(q).unwrap();
}
};
drop(q);
let msg = message.clone();
if let Err(e) = sender.send(message) {
eprintln!("[queue] error while sending message: {:?}", e);
}
match msg {
Message::Command(_) => (),
Message::QueueStop => {
println!("[queue] stopped");
break;
}
}
});
self.set_status(QueueStatus::Running);
}
pub fn add(&self, message: Message) {
match *self.status.lock().unwrap() {
QueueStatus::Pending | QueueStatus::Running => {
println!("[queue] queueing message: {:?}", message);
QUEUE.lock().unwrap().push_back(message);
QUEUE_NOT_EMPTY.notify_one();
}
QueueStatus::Stopped => (),
}
}
pub fn set_status(&mut self, status: QueueStatus) {
let mut s = self.status.lock().unwrap();
*s = status;
}
pub fn get_status(&self) -> QueueStatus {
*self.status.lock().unwrap()
}
}

View File

@ -1,3 +1,3 @@
mod message; mod message;
pub use message::{Message, QueueMessage}; pub use message::Message;

3
src/queue/mod.rs Normal file
View File

@ -0,0 +1,3 @@
mod queue;
pub use queue::Controller;

174
src/queue/queue.rs Normal file
View File

@ -0,0 +1,174 @@
use crate::message::Message;
use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::sync::{
mpsc::{channel, Receiver, Sender},
Condvar, Mutex,
};
use std::thread;
lazy_static! {
static ref QUEUE: Mutex<Queue> = Mutex::new(Queue::new());
static ref QUEUE_NOT_EMPTY: Condvar = Condvar::new();
}
#[derive(Copy, Clone, PartialEq)]
enum QueueStatus {
Pending,
Running,
Stopped,
}
struct Queue {
status: QueueStatus,
content: VecDeque<Message>,
}
impl Queue {
fn new() -> Self {
Self {
status: QueueStatus::Pending,
content: VecDeque::new(),
}
}
pub fn set_status(&mut self, status: QueueStatus) {
self.status = status;
}
}
pub struct Controller {}
impl Controller {
pub fn new() -> Self {
Self {}
}
pub fn launch(&self) {
let q = QUEUE.lock().unwrap();
let status = q.status;
drop(q);
match status {
QueueStatus::Pending | QueueStatus::Stopped => {
let (sender, receiver) = channel::<Message>();
self.launch_queue(sender);
self.launch_handler(receiver);
QUEUE.lock().unwrap().set_status(QueueStatus::Running);
}
QueueStatus::Running => eprintln!("[queue] error queue is already running"),
}
}
fn launch_handler(&self, receiver: Receiver<Message>) {
thread::spawn(move || loop {
match receiver.recv() {
Ok(m) => match m {
Message::Command(_) => println!("[handler] received message: {:?}", m),
Message::QueueStop => {
println!("[handler] handler stopped");
break;
}
},
Err(e) => {
eprintln!("[queue] error occurred: {:?}", e);
QUEUE.lock().unwrap().set_status(QueueStatus::Stopped);
break;
}
}
});
}
fn launch_queue(&self, sender: Sender<Message>) {
thread::spawn(move || loop {
let mut q = QUEUE.lock().unwrap();
let message = loop {
if let Some(message) = q.content.pop_front() {
break message;
} else {
q = QUEUE_NOT_EMPTY.wait(q).unwrap();
}
};
drop(q);
let msg = message.clone();
if let Err(e) = sender.send(message) {
eprintln!("[queue] error while sending message: {:?}", e);
QUEUE.lock().unwrap().set_status(QueueStatus::Stopped);
break;
}
match msg {
Message::Command(_) => (),
Message::QueueStop => {
println!("[queue] stopped");
break;
}
}
});
QUEUE.lock().unwrap().set_status(QueueStatus::Running);
}
pub fn put_message(&self, message: Message) -> bool {
let mut q = QUEUE.lock().unwrap();
match q.status {
QueueStatus::Pending | QueueStatus::Running => {
println!("[queue] queueing message: {:?}", message);
q.content.push_back(message);
QUEUE_NOT_EMPTY.notify_one();
true
}
QueueStatus::Stopped => false,
}
}
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
let q = QUEUE.lock().unwrap();
return q.content.len() == 0;
}
#[allow(dead_code)]
pub fn stop(&self) {
let mut q = QUEUE.lock().unwrap();
q.set_status(QueueStatus::Stopped);
q.content.push_back(Message::QueueStop);
QUEUE_NOT_EMPTY.notify_one();
}
}
#[test]
fn test_queue() {
let controller = Controller::new();
controller.launch();
controller.put_message(Message::command(1, "hello world!"));
controller.launch();
controller.stop();
assert!(!controller.put_message(Message::command(1, "hello again!")));
assert!(!controller.put_message(Message::QueueStop));
controller.launch();
for i in (0..5000) {
assert!(controller.put_message(Message::command(i, "hello again!")));
}
controller.stop();
let wait = time::Duration::from_micros(1000);
thread::sleep(wait);
assert!(controller.is_empty());
}