diff --git a/Makefile b/Makefile index 001c1e5..5739092 100644 --- a/Makefile +++ b/Makefile @@ -5,4 +5,7 @@ run: format cargo run build: format - cargo build --release \ No newline at end of file + cargo build --release + +test: + cargo test -- --nocapture \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 1cb02ff..5f1f5ea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,12 @@ mod message; +mod queue; use std::sync::mpsc::{channel, Sender}; use std::thread; use std::time; -use message::{Message, QueueMessage}; +use message::Message; +use queue::Controller; struct Dispatch { ch_sender: Sender, @@ -35,10 +37,6 @@ impl Dispatch { Err(e) => Err(format!("[dispatch] unable to send the message: {}", e)), } } - - fn get_sender(&self) -> Sender { - self.ch_sender.clone() - } } fn main() { @@ -52,23 +50,24 @@ fn main() { .unwrap(); dispatch.send(Message::command(1, "...")).unwrap(); - let qm = QueueMessage::new(dispatch.get_sender()); + let controller = Controller::new(); + controller.launch(); - qm.add(Message::command(1, "q1")); - qm.add(Message::command(1, "q1")); - qm.add(Message::command(1, "q1")); - qm.add(Message::command(1, "q1")); - qm.add(Message::command(1, "q1")); + controller.put_message(Message::command(1, "q1")); + controller.put_message(Message::command(1, "q1")); + controller.put_message(Message::command(1, "q1")); + controller.put_message(Message::command(1, "q1")); + controller.put_message(Message::command(1, "q1")); thread::sleep(wait); - qm.add(Message::command(1, "q2")); - qm.add(Message::command(1, "q3")); - qm.add(Message::command(1, "q4")); + controller.put_message(Message::command(1, "q2")); + controller.put_message(Message::command(1, "q3")); + controller.put_message(Message::command(1, "q4")); thread::sleep(wait); - qm.add(Message::QueueStop); + controller.put_message(Message::QueueStop); thread::sleep(wait); } diff --git a/src/message/message.rs b/src/message/message.rs index 30edfea..06bd88c 100644 --- a/src/message/message.rs +++ b/src/message/message.rs @@ -1,23 +1,3 @@ -use lazy_static::lazy_static; -use std::collections::VecDeque; -use std::sync::{ - mpsc::{channel, Receiver, Sender}, - Condvar, Mutex, -}; -use std::thread; - -lazy_static! { - static ref QUEUE: Mutex> = Mutex::new(VecDeque::new()); - static ref QUEUE_NOT_EMPTY: Condvar = Condvar::new(); -} - -#[derive(Copy, Clone, PartialEq)] -enum QueueStatus { - Pending, - Running, - Stopped, -} - #[allow(dead_code)] #[derive(Debug, Clone)] pub enum Message { @@ -41,94 +21,3 @@ impl Message { Message::Command(body) } } - -pub struct QueueMessage { - status: Mutex, -} - -impl QueueMessage { - pub fn new(ws: Sender) -> Self { - let (sender, receiver) = channel::(); - - let mut queue_message = QueueMessage { - status: Mutex::new(QueueStatus::Pending), - }; - - queue_message.launch_handler(ws, receiver); - queue_message.launch_queue(sender); - - queue_message - } - - fn launch_handler(&self, ws: Sender, receiver: Receiver) { - thread::spawn(move || loop { - match receiver.recv() { - Ok(m) => { - match m { - Message::Command(_) => { - ws.send(m).unwrap() - } - Message::QueueStop => break, - } - } - Err(e) => { - eprintln!("[queue] error occurred: {:?}", e); - // self.set_status(QueueStatus::Stopped); - break; - } - } - }); - } - - fn launch_queue(&mut self, sender: Sender) { - thread::spawn(move || loop { - let mut q = QUEUE.lock().unwrap(); - - let message = loop { - if let Some(message) = q.pop_front() { - break message; - } else { - q = QUEUE_NOT_EMPTY.wait(q).unwrap(); - } - }; - - drop(q); - - let msg = message.clone(); - - if let Err(e) = sender.send(message) { - eprintln!("[queue] error while sending message: {:?}", e); - } - - match msg { - Message::Command(_) => (), - Message::QueueStop => { - println!("[queue] stopped"); - break; - } - } - }); - - self.set_status(QueueStatus::Running); - } - - pub fn add(&self, message: Message) { - match *self.status.lock().unwrap() { - QueueStatus::Pending | QueueStatus::Running => { - println!("[queue] queueing message: {:?}", message); - QUEUE.lock().unwrap().push_back(message); - QUEUE_NOT_EMPTY.notify_one(); - } - QueueStatus::Stopped => (), - } - } - - pub fn set_status(&mut self, status: QueueStatus) { - let mut s = self.status.lock().unwrap(); - *s = status; - } - - pub fn get_status(&self) -> QueueStatus { - *self.status.lock().unwrap() - } -} diff --git a/src/message/mod.rs b/src/message/mod.rs index b718ee0..977f05e 100644 --- a/src/message/mod.rs +++ b/src/message/mod.rs @@ -1,3 +1,3 @@ mod message; -pub use message::{Message, QueueMessage}; +pub use message::Message; diff --git a/src/queue/mod.rs b/src/queue/mod.rs new file mode 100644 index 0000000..a0be5d6 --- /dev/null +++ b/src/queue/mod.rs @@ -0,0 +1,3 @@ +mod queue; + +pub use queue::Controller; diff --git a/src/queue/queue.rs b/src/queue/queue.rs new file mode 100644 index 0000000..dfaaa9f --- /dev/null +++ b/src/queue/queue.rs @@ -0,0 +1,174 @@ +use crate::message::Message; +use lazy_static::lazy_static; +use std::collections::VecDeque; +use std::sync::{ + mpsc::{channel, Receiver, Sender}, + Condvar, Mutex, +}; +use std::thread; + +lazy_static! { + static ref QUEUE: Mutex = Mutex::new(Queue::new()); + static ref QUEUE_NOT_EMPTY: Condvar = Condvar::new(); +} + +#[derive(Copy, Clone, PartialEq)] +enum QueueStatus { + Pending, + Running, + Stopped, +} + +struct Queue { + status: QueueStatus, + content: VecDeque, +} + +impl Queue { + fn new() -> Self { + Self { + status: QueueStatus::Pending, + content: VecDeque::new(), + } + } + + pub fn set_status(&mut self, status: QueueStatus) { + self.status = status; + } +} + +pub struct Controller {} + +impl Controller { + pub fn new() -> Self { + Self {} + } + + pub fn launch(&self) { + let q = QUEUE.lock().unwrap(); + let status = q.status; + drop(q); + + match status { + QueueStatus::Pending | QueueStatus::Stopped => { + let (sender, receiver) = channel::(); + + self.launch_queue(sender); + self.launch_handler(receiver); + + QUEUE.lock().unwrap().set_status(QueueStatus::Running); + } + QueueStatus::Running => eprintln!("[queue] error queue is already running"), + } + } + + fn launch_handler(&self, receiver: Receiver) { + thread::spawn(move || loop { + match receiver.recv() { + Ok(m) => match m { + Message::Command(_) => println!("[handler] received message: {:?}", m), + Message::QueueStop => { + println!("[handler] handler stopped"); + break; + } + }, + Err(e) => { + eprintln!("[queue] error occurred: {:?}", e); + QUEUE.lock().unwrap().set_status(QueueStatus::Stopped); + break; + } + } + }); + } + + fn launch_queue(&self, sender: Sender) { + thread::spawn(move || loop { + let mut q = QUEUE.lock().unwrap(); + + let message = loop { + if let Some(message) = q.content.pop_front() { + break message; + } else { + q = QUEUE_NOT_EMPTY.wait(q).unwrap(); + } + }; + + drop(q); + + let msg = message.clone(); + + if let Err(e) = sender.send(message) { + eprintln!("[queue] error while sending message: {:?}", e); + QUEUE.lock().unwrap().set_status(QueueStatus::Stopped); + break; + } + + match msg { + Message::Command(_) => (), + Message::QueueStop => { + println!("[queue] stopped"); + break; + } + } + }); + + QUEUE.lock().unwrap().set_status(QueueStatus::Running); + } + + pub fn put_message(&self, message: Message) -> bool { + let mut q = QUEUE.lock().unwrap(); + + match q.status { + QueueStatus::Pending | QueueStatus::Running => { + println!("[queue] queueing message: {:?}", message); + + q.content.push_back(message); + QUEUE_NOT_EMPTY.notify_one(); + + true + } + QueueStatus::Stopped => false, + } + } + + #[allow(dead_code)] + pub fn is_empty(&self) -> bool { + let q = QUEUE.lock().unwrap(); + return q.content.len() == 0; + } + + #[allow(dead_code)] + pub fn stop(&self) { + let mut q = QUEUE.lock().unwrap(); + q.set_status(QueueStatus::Stopped); + + q.content.push_back(Message::QueueStop); + QUEUE_NOT_EMPTY.notify_one(); + } +} + +#[test] +fn test_queue() { + let controller = Controller::new(); + + controller.launch(); + controller.put_message(Message::command(1, "hello world!")); + controller.launch(); + + controller.stop(); + assert!(!controller.put_message(Message::command(1, "hello again!"))); + + assert!(!controller.put_message(Message::QueueStop)); + controller.launch(); + + for i in (0..5000) { + assert!(controller.put_message(Message::command(i, "hello again!"))); + } + + controller.stop(); + + let wait = time::Duration::from_micros(1000); + thread::sleep(wait); + + assert!(controller.is_empty()); +}