diff --git a/nats-client/src/chat_message.rs b/nats-client/src/chat_message.rs new file mode 100644 index 0000000..b2ac02c --- /dev/null +++ b/nats-client/src/chat_message.rs @@ -0,0 +1,49 @@ +use core::fmt; +use std::{error::Error, fmt::Display}; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct ChatMessage { + pub author: String, + pub message: String, +} + +impl Display for ChatMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Message from {}: {}", self.author, self.message) + } +} + +impl TryFrom> for ChatMessage { + type Error = NatsParsingError; + + fn try_from(value: Vec) -> Result { + let as_string = &String::from_utf8(value).map_err(|_| NatsParsingError::NoValidUtf8)?; + let to_return = + serde_json::from_str(as_string).map_err(|_| NatsParsingError::CannotDeserializeJson)?; + Ok(to_return) + } +} + +#[derive(Debug)] +pub enum NatsParsingError { + // Message was no string at all + NoValidUtf8, + // Message could be invalid json or json not matching the struct you want to deserialize + CannotDeserializeJson, +} + +impl Error for NatsParsingError {} + +impl Display for NatsParsingError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let message = match self { + NatsParsingError::NoValidUtf8 => "NATS message was no valid UTF-8 string.", + NatsParsingError::CannotDeserializeJson => { + "NATS message is not a valid / expected json." + } + }; + writeln!(f, "{}", message) + } +} diff --git a/nats-client/src/main.rs b/nats-client/src/main.rs index 4c4812b..7411699 100644 --- a/nats-client/src/main.rs +++ b/nats-client/src/main.rs @@ -1,73 +1,40 @@ +use nats::Connection; use std::{ - fmt::{self, Display}, io::{self, BufRead}, + process::exit, }; -use nats::Connection; -use serde::{Deserialize, Serialize}; +use crate::chat_message::ChatMessage; -#[derive(Serialize, Deserialize)] -struct Message { - author: String, - message: String, -} +mod chat_message; +mod nats_cli; -impl Display for Message { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Message from {}: {}", self.author, self.message) +fn main() { + let result = run_chat_program(); + if let Err(e) = result { + match e.kind() { + io::ErrorKind::ConnectionRefused => { + eprintln!("Connection to server is refused. Please check if it is really running."); + exit(1); + } + _ => panic!("{}", e), + } } } -const SUBJECT_MESSAGES: &str = "here.happens.messaging"; - -fn main() -> io::Result<()> { - let nc = nats::connect("127.0.0.1")?; - +fn run_chat_program() -> io::Result<()> { + let nc = nats_cli::connect_to_nats()?; let username = ask_user_name(); println!( "Hello {}, please write your message. Use q to quit:", username ); - let my_username = username.clone(); - let sub = nc.subscribe(SUBJECT_MESSAGES)?.with_handler(move |msg| { - // This runs in a separate Thread - let message: Message = serde_json::from_str(&String::from_utf8(msg.data).unwrap()).unwrap(); + let subscription = nats_cli::subscribe_to_chat_messages(username.clone(), &nc)?; - // Do not show messages from me - if message.author != my_username { - println!("Received {}", message); - } - Ok(()) - }); + user_message_writing_loop(&username, &nc)?; - let stdin = io::stdin(); - for line in stdin.lock().lines() { - let line = line.unwrap(); - let line = line.trim(); - if line.is_empty() { - continue; - } - - if line == "q" || line == "Q" { - break; - } - - publish_message( - &Message { - author: username.clone(), - message: line.to_string(), - }, - &nc, - )?; - } - sub.unsubscribe()?; - Ok(()) -} - -fn publish_message(message: &Message, nc: &Connection) -> io::Result<()> { - let json_string_utf8 = serde_json::to_string(message).expect("Cannot serialize message"); - nc.publish(SUBJECT_MESSAGES, json_string_utf8)?; + subscription.unsubscribe()?; Ok(()) } @@ -84,3 +51,27 @@ fn ask_user_name() -> String { } input_text } + +fn user_message_writing_loop(username: &str, nc: &Connection) -> io::Result<()> { + let stdin = io::stdin(); + for line in stdin.lock().lines() { + let line = line.unwrap(); + let line = line.trim(); + if line.is_empty() { + continue; + } + + if line == "q" || line == "Q" { + break; + } + + nats_cli::publish_message( + &ChatMessage { + author: username.to_string(), + message: line.to_string(), + }, + &nc, + )?; + } + Ok(()) +} diff --git a/nats-client/src/nats_cli.rs b/nats-client/src/nats_cli.rs new file mode 100644 index 0000000..23ee32f --- /dev/null +++ b/nats-client/src/nats_cli.rs @@ -0,0 +1,38 @@ +use std::io; + +use nats::{Connection, Handler}; + +use crate::chat_message::ChatMessage; + +const SERVER: &str = "127.0.0.1"; +const SUBJECT_MESSAGES: &str = "here.happens.messaging"; + +pub fn connect_to_nats() -> io::Result { + nats::connect(SERVER) +} + +pub fn subscribe_to_chat_messages(username: String, nc: &Connection) -> io::Result { + // This runs the closure in a separate Thread + let subscription = + nc.subscribe(SUBJECT_MESSAGES)? + .with_handler(move |msg| -> Result<(), io::Error> { + match ChatMessage::try_from(msg.data) { + Ok(message) => { + // Do not show messages from me + if message.author != username { + println!("Received {}", message); + } + } + Err(e) => eprintln!("{}", e), + }; + Ok(()) + }); + Ok(subscription) +} + +pub fn publish_message(message: &ChatMessage, nc: &Connection) -> io::Result<()> { + let json_string_utf8 = serde_json::to_string(message).expect("Cannot serialize message"); + // TODO: there is no error if server is offline + nc.publish(SUBJECT_MESSAGES, json_string_utf8)?; + Ok(()) +}