From c42ae3918356c2b86231c0305727c8bb5383d0e7 Mon Sep 17 00:00:00 2001 From: rmanach Date: Mon, 17 Apr 2023 09:42:50 +0200 Subject: [PATCH] initial commit --- .gitignore | 1 + Cargo.lock | 16 +++++ Cargo.toml | 9 +++ Makefile | 8 +++ src/main.rs | 74 +++++++++++++++++++++++ src/message/message.rs | 134 +++++++++++++++++++++++++++++++++++++++++ src/message/mod.rs | 3 + src/utils/iditer.rs | 21 +++++++ src/utils/mod.rs | 3 + 9 files changed, 269 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 Makefile create mode 100644 src/main.rs create mode 100644 src/message/message.rs create mode 100644 src/message/mod.rs create mode 100644 src/utils/iditer.rs create mode 100644 src/utils/mod.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..ce4737e --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,16 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "dispatcher" +version = "0.1.0" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..513f279 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "dispatcher" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +lazy_static = "1.4.0" diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..001c1e5 --- /dev/null +++ b/Makefile @@ -0,0 +1,8 @@ +format: + cargo-fmt + +run: format + cargo run + +build: format + cargo build --release \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..1cb02ff --- /dev/null +++ b/src/main.rs @@ -0,0 +1,74 @@ +mod message; + +use std::sync::mpsc::{channel, Sender}; +use std::thread; +use std::time; + +use message::{Message, QueueMessage}; + +struct Dispatch { + ch_sender: Sender, +} + +impl Dispatch { + fn new() -> Self { + let (sender, receiver) = channel::(); + + thread::spawn(move || loop { + match receiver.recv() { + Ok(m) => { + println!("[dispatch] treating message: {:?}", m); + } + Err(e) => { + eprintln!("[dispatch] error occurred: {:?}", e); + break; + } + } + }); + + Self { ch_sender: sender } + } + + fn send(&self, message: T) -> Result<(), String> { + match self.ch_sender.send(message) { + Ok(_) => Ok(()), + Err(e) => Err(format!("[dispatch] unable to send the message: {}", e)), + } + } + + fn get_sender(&self) -> Sender { + self.ch_sender.clone() + } +} + +fn main() { + let dispatch: Dispatch = Dispatch::new(); + + let wait = time::Duration::from_secs(2); + + dispatch.send(Message::command(1, "hello world!")).unwrap(); + dispatch + .send(Message::command(1, "arf... wrong type!")) + .unwrap(); + dispatch.send(Message::command(1, "...")).unwrap(); + + let qm = QueueMessage::new(dispatch.get_sender()); + + qm.add(Message::command(1, "q1")); + qm.add(Message::command(1, "q1")); + qm.add(Message::command(1, "q1")); + qm.add(Message::command(1, "q1")); + qm.add(Message::command(1, "q1")); + + thread::sleep(wait); + + qm.add(Message::command(1, "q2")); + qm.add(Message::command(1, "q3")); + qm.add(Message::command(1, "q4")); + + thread::sleep(wait); + + qm.add(Message::QueueStop); + + thread::sleep(wait); +} diff --git a/src/message/message.rs b/src/message/message.rs new file mode 100644 index 0000000..30edfea --- /dev/null +++ b/src/message/message.rs @@ -0,0 +1,134 @@ +use lazy_static::lazy_static; +use std::collections::VecDeque; +use std::sync::{ + mpsc::{channel, Receiver, Sender}, + Condvar, Mutex, +}; +use std::thread; + +lazy_static! { + static ref QUEUE: Mutex> = Mutex::new(VecDeque::new()); + static ref QUEUE_NOT_EMPTY: Condvar = Condvar::new(); +} + +#[derive(Copy, Clone, PartialEq)] +enum QueueStatus { + Pending, + Running, + Stopped, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub enum Message { + Command(MessageBody), + QueueStop, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct MessageBody { + id: u32, + content: String, +} + +impl Message { + pub fn command(id: u32, content: &str) -> Message { + let body = MessageBody { + id: id, + content: content.to_string(), + }; + Message::Command(body) + } +} + +pub struct QueueMessage { + status: Mutex, +} + +impl QueueMessage { + pub fn new(ws: Sender) -> Self { + let (sender, receiver) = channel::(); + + let mut queue_message = QueueMessage { + status: Mutex::new(QueueStatus::Pending), + }; + + queue_message.launch_handler(ws, receiver); + queue_message.launch_queue(sender); + + queue_message + } + + fn launch_handler(&self, ws: Sender, receiver: Receiver) { + thread::spawn(move || loop { + match receiver.recv() { + Ok(m) => { + match m { + Message::Command(_) => { + ws.send(m).unwrap() + } + Message::QueueStop => break, + } + } + Err(e) => { + eprintln!("[queue] error occurred: {:?}", e); + // self.set_status(QueueStatus::Stopped); + break; + } + } + }); + } + + fn launch_queue(&mut self, sender: Sender) { + thread::spawn(move || loop { + let mut q = QUEUE.lock().unwrap(); + + let message = loop { + if let Some(message) = q.pop_front() { + break message; + } else { + q = QUEUE_NOT_EMPTY.wait(q).unwrap(); + } + }; + + drop(q); + + let msg = message.clone(); + + if let Err(e) = sender.send(message) { + eprintln!("[queue] error while sending message: {:?}", e); + } + + match msg { + Message::Command(_) => (), + Message::QueueStop => { + println!("[queue] stopped"); + break; + } + } + }); + + self.set_status(QueueStatus::Running); + } + + pub fn add(&self, message: Message) { + match *self.status.lock().unwrap() { + QueueStatus::Pending | QueueStatus::Running => { + println!("[queue] queueing message: {:?}", message); + QUEUE.lock().unwrap().push_back(message); + QUEUE_NOT_EMPTY.notify_one(); + } + QueueStatus::Stopped => (), + } + } + + pub fn set_status(&mut self, status: QueueStatus) { + let mut s = self.status.lock().unwrap(); + *s = status; + } + + pub fn get_status(&self) -> QueueStatus { + *self.status.lock().unwrap() + } +} diff --git a/src/message/mod.rs b/src/message/mod.rs new file mode 100644 index 0000000..b718ee0 --- /dev/null +++ b/src/message/mod.rs @@ -0,0 +1,3 @@ +mod message; + +pub use message::{Message, QueueMessage}; diff --git a/src/utils/iditer.rs b/src/utils/iditer.rs new file mode 100644 index 0000000..555df81 --- /dev/null +++ b/src/utils/iditer.rs @@ -0,0 +1,21 @@ +pub struct IDIter { + id: usize, +} + +impl IDIter { + fn new() -> Self { + Self { id: 0 } + } +} + +impl Iterator for IDIter { + type Item = usize; + + fn next(&mut self) -> Option { + if self.id == usize::MAX { + return None; + } + self.id += 1; + Some(self.id) + } +} \ No newline at end of file diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..989bd9b --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1,3 @@ +mod idter; + +pub use idter::{IDIter}; \ No newline at end of file