diff --git a/nats-client/src/chat_message.rs b/nats-client/src/chat_message.rs index b2ac02c..e08a902 100644 --- a/nats-client/src/chat_message.rs +++ b/nats-client/src/chat_message.rs @@ -26,6 +26,13 @@ impl TryFrom> for ChatMessage { } } +impl From for Vec { + fn from(value: ChatMessage) -> Self { + let json_string_utf8 = serde_json::to_string(&value).expect("Cannot serialize message"); + json_string_utf8.into_bytes() + } +} + #[derive(Debug)] pub enum NatsParsingError { // Message was no string at all @@ -41,7 +48,7 @@ impl Display for NatsParsingError { let message = match self { NatsParsingError::NoValidUtf8 => "NATS message was no valid UTF-8 string.", NatsParsingError::CannotDeserializeJson => { - "NATS message is not a valid / expected json." + "NATS message is not a json of expected structure." } }; writeln!(f, "{}", message) diff --git a/nats-client/src/main.rs b/nats-client/src/main.rs index 90823ee..1fe6904 100644 --- a/nats-client/src/main.rs +++ b/nats-client/src/main.rs @@ -41,11 +41,18 @@ fn run_chat_program() -> io::Result<()> { username ); - let subscription = nats_cli::subscribe_to_chat_messages(username.clone(), &nc)?; + let chat_subscription = nats_cli::subscribe_to_chat_messages(username.clone(), &nc)?; + + // Handles requests with direct reply + let request_subscription = nats_cli::subscribe_to_requests(&nc, |message| ChatMessage { + author: "Automatic Response Bot".to_string(), + message: format!("Your request mirrored: {}", message.message), + })?; user_message_writing_loop(&username, &nc)?; - subscription.unsubscribe()?; + chat_subscription.unsubscribe()?; + request_subscription.unsubscribe()?; Ok(()) } @@ -74,15 +81,25 @@ fn user_message_writing_loop(username: &str, nc: &Connection) -> io::Result<()> if line == "q" || line == "Q" { break; + } else if line == "invalid" { + nats_cli::publish_invalid_message(&nc)?; + } else if line.starts_with("request") { + nats_cli::post_request( + &ChatMessage { + author: username.to_string(), + message: line.to_string(), + }, + &nc, + )?; + } else { + nats_cli::publish_message( + &ChatMessage { + author: username.to_string(), + message: line.to_string(), + }, + &nc, + )?; } - - nats_cli::publish_message( - &ChatMessage { - author: username.to_string(), - message: line.to_string(), - }, - &nc, - )?; } Ok(()) } diff --git a/nats-client/src/nats_cli.rs b/nats-client/src/nats_cli.rs index aa3c48e..fe579b1 100644 --- a/nats-client/src/nats_cli.rs +++ b/nats-client/src/nats_cli.rs @@ -1,4 +1,7 @@ -use std::io; +use std::{ + io::{self, ErrorKind}, + time::Duration, +}; use nats::{Connection, Handler}; @@ -6,6 +9,7 @@ use crate::chat_message::ChatMessage; const SERVER: &str = "127.0.0.1"; const SUBJECT_MESSAGES: &str = "telestion.chat"; +const SUBJECT_REQEUST_REPLY_MESSAGES: &str = "telestion.request-reply"; pub fn connect_to_nats(username: &str, password: &str) -> io::Result { nats::Options::with_user_pass(username, password) @@ -25,16 +29,87 @@ pub fn subscribe_to_chat_messages(username: String, nc: &Connection) -> io::Resu println!("Received {}", message); } } - Err(e) => eprintln!("Error from NATS: {}", e), + Err(e) => eprintln!("Error processing message: {}", e), }; Ok(()) }); Ok(subscription) } +pub fn subscribe_to_requests(nc: &Connection, answer_request: F) -> io::Result +where + F: Fn(ChatMessage) -> ChatMessage + Send + 'static, +{ + // TODO: This is problematic due to the multithreading. Use multithreading safe things here!!! + let nc = nc.clone(); + // This runs the closure in a separate Thread + let subscription = nc.subscribe(SUBJECT_REQEUST_REPLY_MESSAGES)?.with_handler( + move |msg| -> Result<(), io::Error> { + let reply_address = msg + .reply + .expect("All messages on this channel have to contain a reply address!"); + match ChatMessage::try_from(msg.data) { + Ok(message) => { + let answer = answer_request(message); + publish_message_to_channel(&reply_address, &answer, &nc)?; + } + Err(e) => eprintln!("Error processing message: {}", e), + }; + Ok(()) + }, + ); + Ok(subscription) +} + pub fn publish_message(message: &ChatMessage, nc: &Connection) -> io::Result<()> { - let json_string_utf8 = serde_json::to_string(message).expect("Cannot serialize message"); - // TODO: there is no error if server is offline - nc.publish(SUBJECT_MESSAGES, json_string_utf8)?; + publish_message_to_channel(SUBJECT_MESSAGES, message, nc) +} + +pub fn publish_invalid_message(nc: &Connection) -> io::Result<()> { + nc.publish(SUBJECT_MESSAGES, "This is not a valid json")?; Ok(()) } + +fn publish_message_to_channel( + channel: &str, + message: &ChatMessage, + nc: &Connection, +) -> io::Result<()> { + let json_string_utf8 = serde_json::to_string(message).expect("Cannot serialize message"); + // TODO: there is no error if server is offline + nc.publish(channel, json_string_utf8)?; + Ok(()) +} + +pub fn post_request(message: &ChatMessage, nc: &Connection) -> io::Result<()> { + let json_string_utf8 = serde_json::to_string(message).expect("Cannot serialize message"); + let response = nc.request_timeout( + SUBJECT_REQEUST_REPLY_MESSAGES, + json_string_utf8, + Duration::from_secs(5), + ); + + match response { + Ok(response) => { + match ChatMessage::try_from(response.data) { + Ok(message) => { + println!("Received quickchat message: {}", message); + } + Err(e) => eprintln!("Error processing message: {}", e), + }; + Ok(()) + } + + Err(e) => { + if e.kind() == ErrorKind::NotFound { + eprintln!("There is no one else online!"); + return Ok(()); + } else if e.kind() == ErrorKind::TimedOut { + eprintln!("There was no fast enough response!"); + return Ok(()); + } else { + return Err(e); + } + } + } +} diff --git a/nats-server/config/server.conf b/nats-server/config/server.conf index ec79ff3..7fc43d3 100644 --- a/nats-server/config/server.conf +++ b/nats-server/config/server.conf @@ -8,8 +8,8 @@ authorization { subscribe = ">" } CHAT_CLIENT = { - publish = "telestion.chat" - subscribe = "telestion.chat" + publish = ["telestion.>", "_INBOX.>"] + subscribe = ["telestion.>", "_INBOX.>"] } users = [ {user: admin, password: admin, permissions: $ADMIN}