commit 5762654b4ba35781ba7bb787ac9e30e47b622f68 Author: rmanach Date: Mon Jul 3 09:09:06 2023 +0200 init repository diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4f13f5d --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +redis-queueing-sample/target +redis-queueing-sample/Cargo.lock diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6378b9c --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +run-redis: + docker compose up redis -d + +run-server: + $(MAKE) -C redis-queueing-sample run-server + +build: + $(MAKE) -C redis-queueing-sample build + +run: build run-redis run-server + +run-client: + $(MAKE) -C redis-queueing-sample run-client \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..6f4cc6e --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# redis-queueing-sample + +This repo aims to show how to communicate between two `services` using **Redis** as a message broker with pub/sub channels. \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..94594c6 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,12 @@ +services: + redis: + image: redis/redis-stack-server:latest + restart: always + ports: + - '6379:6379' + command: redis-stack-server --save 20 1 --loglevel warning --requirepass 11677f0c-ead4-4434-ad14-3d54ce2521ce + volumes: + - cache:/data +volumes: + cache: + driver: local \ No newline at end of file diff --git a/redis-queueing-sample/Cargo.toml b/redis-queueing-sample/Cargo.toml new file mode 100644 index 0000000..a8c736b --- /dev/null +++ b/redis-queueing-sample/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "redis-queueing-sample" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tokio = { version = "1", features = ["full"] } +redis = { version = "0.23.0", features = ["tokio-comp", "json"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" diff --git a/redis-queueing-sample/Makefile b/redis-queueing-sample/Makefile new file mode 100644 index 0000000..0f63ce7 --- /dev/null +++ b/redis-queueing-sample/Makefile @@ -0,0 +1,9 @@ +build: + cargo build --release --bin client + cargo build --release --bin server + +run-server: build + @./target/release/server + +run-client: + @while true; do ./target/release/client; sleep 1; done \ No newline at end of file diff --git a/redis-queueing-sample/src/bin/client.rs b/redis-queueing-sample/src/bin/client.rs new file mode 100644 index 0000000..80ab32b --- /dev/null +++ b/redis-queueing-sample/src/bin/client.rs @@ -0,0 +1,16 @@ +use redis_queueing_sample::model::{Action, Message}; +use redis_queueing_sample::{GenericError, REDIS_QUEUE_NAME, REDIS_URL}; +use redis::{AsyncCommands, Client}; + +#[tokio::main] +async fn main() -> Result<(), GenericError> { + let client = Client::open(REDIS_URL)?; + let mut conn = client.get_tokio_connection().await?; + + let message = Message::new(1, Action::Download); + let json_str = serde_json::to_string(&message)?; + + let _: () = conn.rpush(REDIS_QUEUE_NAME, json_str).await?; + + Ok(()) +} diff --git a/redis-queueing-sample/src/bin/server.rs b/redis-queueing-sample/src/bin/server.rs new file mode 100644 index 0000000..e36816e --- /dev/null +++ b/redis-queueing-sample/src/bin/server.rs @@ -0,0 +1,25 @@ +use redis::AsyncCommands; + +use redis_queueing_sample::model::Message; +use redis_queueing_sample::{GenericError, REDIS_QUEUE_NAME, REDIS_URL}; + +async fn read_queue(mut conn: redis::aio::Connection) -> Result<(), GenericError> { + loop { + let (_, json_data): (String, String) = conn.blpop(REDIS_QUEUE_NAME, 0).await?; + + let data: Message = serde_json::from_str(&json_data)?; + println!("Received data: {:?}", data); + } +} + +#[tokio::main] +async fn main() -> Result<(), GenericError> { + println!("listening message..."); + + let client = redis::Client::open(REDIS_URL)?; + let conn = client.get_tokio_connection().await?; + + read_queue(conn).await?; + + Ok(()) +} diff --git a/redis-queueing-sample/src/lib.rs b/redis-queueing-sample/src/lib.rs new file mode 100644 index 0000000..4bee9aa --- /dev/null +++ b/redis-queueing-sample/src/lib.rs @@ -0,0 +1,31 @@ +pub static REDIS_URL: &str = "redis://:11677f0c-ead4-4434-ad14-3d54ce2521ce@127.0.0.1:6379/"; + +pub type GenericError = Box; + +pub const REDIS_PATH: &'static str = "queueing-sample"; +pub const REDIS_QUEUE_NAME: &'static str = "messages"; + +pub mod model { + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Serialize, Deserialize)] + pub enum Action { + Download, + Format, + } + + #[derive(Debug, Serialize, Deserialize)] + pub struct Message { + id: u32, + action: Action, + } + + impl Message { + pub fn new(id: u32, action: Action) -> Self { + Self { + id, + action, + } + } + } +}