initial commit

This commit is contained in:
rmanach 2023-04-17 09:42:50 +02:00
commit c42ae39183
9 changed files with 269 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

16
Cargo.lock generated Normal file
View File

@ -0,0 +1,16 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "dispatcher"
version = "0.1.0"
dependencies = [
"lazy_static",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"

9
Cargo.toml Normal file
View File

@ -0,0 +1,9 @@
[package]
name = "dispatcher"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lazy_static = "1.4.0"

8
Makefile Normal file
View File

@ -0,0 +1,8 @@
format:
cargo-fmt
run: format
cargo run
build: format
cargo build --release

74
src/main.rs Normal file
View File

@ -0,0 +1,74 @@
mod message;
use std::sync::mpsc::{channel, Sender};
use std::thread;
use std::time;
use message::{Message, QueueMessage};
struct Dispatch<T> {
ch_sender: Sender<T>,
}
impl<T: std::fmt::Debug + std::marker::Send + 'static> Dispatch<T> {
fn new() -> Self {
let (sender, receiver) = channel::<T>();
thread::spawn(move || loop {
match receiver.recv() {
Ok(m) => {
println!("[dispatch] treating message: {:?}", m);
}
Err(e) => {
eprintln!("[dispatch] error occurred: {:?}", e);
break;
}
}
});
Self { ch_sender: sender }
}
fn send(&self, message: T) -> Result<(), String> {
match self.ch_sender.send(message) {
Ok(_) => Ok(()),
Err(e) => Err(format!("[dispatch] unable to send the message: {}", e)),
}
}
fn get_sender(&self) -> Sender<T> {
self.ch_sender.clone()
}
}
fn main() {
let dispatch: Dispatch<Message> = Dispatch::new();
let wait = time::Duration::from_secs(2);
dispatch.send(Message::command(1, "hello world!")).unwrap();
dispatch
.send(Message::command(1, "arf... wrong type!"))
.unwrap();
dispatch.send(Message::command(1, "...")).unwrap();
let qm = QueueMessage::new(dispatch.get_sender());
qm.add(Message::command(1, "q1"));
qm.add(Message::command(1, "q1"));
qm.add(Message::command(1, "q1"));
qm.add(Message::command(1, "q1"));
qm.add(Message::command(1, "q1"));
thread::sleep(wait);
qm.add(Message::command(1, "q2"));
qm.add(Message::command(1, "q3"));
qm.add(Message::command(1, "q4"));
thread::sleep(wait);
qm.add(Message::QueueStop);
thread::sleep(wait);
}

134
src/message/message.rs Normal file
View File

@ -0,0 +1,134 @@
use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::sync::{
mpsc::{channel, Receiver, Sender},
Condvar, Mutex,
};
use std::thread;
lazy_static! {
static ref QUEUE: Mutex<VecDeque<Message>> = Mutex::new(VecDeque::new());
static ref QUEUE_NOT_EMPTY: Condvar = Condvar::new();
}
#[derive(Copy, Clone, PartialEq)]
enum QueueStatus {
Pending,
Running,
Stopped,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum Message {
Command(MessageBody),
QueueStop,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct MessageBody {
id: u32,
content: String,
}
impl Message {
pub fn command(id: u32, content: &str) -> Message {
let body = MessageBody {
id: id,
content: content.to_string(),
};
Message::Command(body)
}
}
pub struct QueueMessage {
status: Mutex<QueueStatus>,
}
impl QueueMessage {
pub fn new(ws: Sender<Message>) -> Self {
let (sender, receiver) = channel::<Message>();
let mut queue_message = QueueMessage {
status: Mutex::new(QueueStatus::Pending),
};
queue_message.launch_handler(ws, receiver);
queue_message.launch_queue(sender);
queue_message
}
fn launch_handler(&self, ws: Sender<Message>, receiver: Receiver<Message>) {
thread::spawn(move || loop {
match receiver.recv() {
Ok(m) => {
match m {
Message::Command(_) => {
ws.send(m).unwrap()
}
Message::QueueStop => break,
}
}
Err(e) => {
eprintln!("[queue] error occurred: {:?}", e);
// self.set_status(QueueStatus::Stopped);
break;
}
}
});
}
fn launch_queue(&mut self, sender: Sender<Message>) {
thread::spawn(move || loop {
let mut q = QUEUE.lock().unwrap();
let message = loop {
if let Some(message) = q.pop_front() {
break message;
} else {
q = QUEUE_NOT_EMPTY.wait(q).unwrap();
}
};
drop(q);
let msg = message.clone();
if let Err(e) = sender.send(message) {
eprintln!("[queue] error while sending message: {:?}", e);
}
match msg {
Message::Command(_) => (),
Message::QueueStop => {
println!("[queue] stopped");
break;
}
}
});
self.set_status(QueueStatus::Running);
}
pub fn add(&self, message: Message) {
match *self.status.lock().unwrap() {
QueueStatus::Pending | QueueStatus::Running => {
println!("[queue] queueing message: {:?}", message);
QUEUE.lock().unwrap().push_back(message);
QUEUE_NOT_EMPTY.notify_one();
}
QueueStatus::Stopped => (),
}
}
pub fn set_status(&mut self, status: QueueStatus) {
let mut s = self.status.lock().unwrap();
*s = status;
}
pub fn get_status(&self) -> QueueStatus {
*self.status.lock().unwrap()
}
}

3
src/message/mod.rs Normal file
View File

@ -0,0 +1,3 @@
mod message;
pub use message::{Message, QueueMessage};

21
src/utils/iditer.rs Normal file
View File

@ -0,0 +1,21 @@
pub struct IDIter {
id: usize,
}
impl IDIter {
fn new() -> Self {
Self { id: 0 }
}
}
impl Iterator for IDIter {
type Item = usize;
fn next(&mut self) -> Option<Self::Item> {
if self.id == usize::MAX {
return None;
}
self.id += 1;
Some(self.id)
}
}

3
src/utils/mod.rs Normal file
View File

@ -0,0 +1,3 @@
mod idter;
pub use idter::{IDIter};