Implement request response messaging

This commit is contained in:
2023-05-07 19:17:24 +02:00
parent b33795b4e0
commit 2b1d23722d
4 changed files with 117 additions and 18 deletions

View File

@ -26,6 +26,13 @@ impl TryFrom<Vec<u8>> for ChatMessage {
} }
} }
impl From<ChatMessage> for Vec<u8> {
fn from(value: ChatMessage) -> Self {
let json_string_utf8 = serde_json::to_string(&value).expect("Cannot serialize message");
json_string_utf8.into_bytes()
}
}
#[derive(Debug)] #[derive(Debug)]
pub enum NatsParsingError { pub enum NatsParsingError {
// Message was no string at all // Message was no string at all
@ -41,7 +48,7 @@ impl Display for NatsParsingError {
let message = match self { let message = match self {
NatsParsingError::NoValidUtf8 => "NATS message was no valid UTF-8 string.", NatsParsingError::NoValidUtf8 => "NATS message was no valid UTF-8 string.",
NatsParsingError::CannotDeserializeJson => { NatsParsingError::CannotDeserializeJson => {
"NATS message is not a valid / expected json." "NATS message is not a json of expected structure."
} }
}; };
writeln!(f, "{}", message) writeln!(f, "{}", message)

View File

@ -41,11 +41,18 @@ fn run_chat_program() -> io::Result<()> {
username username
); );
let subscription = nats_cli::subscribe_to_chat_messages(username.clone(), &nc)?; let chat_subscription = nats_cli::subscribe_to_chat_messages(username.clone(), &nc)?;
// Handles requests with direct reply
let request_subscription = nats_cli::subscribe_to_requests(&nc, |message| ChatMessage {
author: "Automatic Response Bot".to_string(),
message: format!("Your request mirrored: {}", message.message),
})?;
user_message_writing_loop(&username, &nc)?; user_message_writing_loop(&username, &nc)?;
subscription.unsubscribe()?; chat_subscription.unsubscribe()?;
request_subscription.unsubscribe()?;
Ok(()) Ok(())
} }
@ -74,8 +81,17 @@ fn user_message_writing_loop(username: &str, nc: &Connection) -> io::Result<()>
if line == "q" || line == "Q" { if line == "q" || line == "Q" {
break; break;
} } else if line == "invalid" {
nats_cli::publish_invalid_message(&nc)?;
} else if line.starts_with("request") {
nats_cli::post_request(
&ChatMessage {
author: username.to_string(),
message: line.to_string(),
},
&nc,
)?;
} else {
nats_cli::publish_message( nats_cli::publish_message(
&ChatMessage { &ChatMessage {
author: username.to_string(), author: username.to_string(),
@ -84,5 +100,6 @@ fn user_message_writing_loop(username: &str, nc: &Connection) -> io::Result<()>
&nc, &nc,
)?; )?;
} }
}
Ok(()) Ok(())
} }

View File

@ -1,4 +1,7 @@
use std::io; use std::{
io::{self, ErrorKind},
time::Duration,
};
use nats::{Connection, Handler}; use nats::{Connection, Handler};
@ -6,6 +9,7 @@ use crate::chat_message::ChatMessage;
const SERVER: &str = "127.0.0.1"; const SERVER: &str = "127.0.0.1";
const SUBJECT_MESSAGES: &str = "telestion.chat"; const SUBJECT_MESSAGES: &str = "telestion.chat";
const SUBJECT_REQEUST_REPLY_MESSAGES: &str = "telestion.request-reply";
pub fn connect_to_nats(username: &str, password: &str) -> io::Result<Connection> { pub fn connect_to_nats(username: &str, password: &str) -> io::Result<Connection> {
nats::Options::with_user_pass(username, password) nats::Options::with_user_pass(username, password)
@ -25,16 +29,87 @@ pub fn subscribe_to_chat_messages(username: String, nc: &Connection) -> io::Resu
println!("Received {}", message); println!("Received {}", message);
} }
} }
Err(e) => eprintln!("Error from NATS: {}", e), Err(e) => eprintln!("Error processing message: {}", e),
}; };
Ok(()) Ok(())
}); });
Ok(subscription) Ok(subscription)
} }
pub fn subscribe_to_requests<F>(nc: &Connection, answer_request: F) -> io::Result<Handler>
where
F: Fn(ChatMessage) -> ChatMessage + Send + 'static,
{
// TODO: This is problematic due to the multithreading. Use multithreading safe things here!!!
let nc = nc.clone();
// This runs the closure in a separate Thread
let subscription = nc.subscribe(SUBJECT_REQEUST_REPLY_MESSAGES)?.with_handler(
move |msg| -> Result<(), io::Error> {
let reply_address = msg
.reply
.expect("All messages on this channel have to contain a reply address!");
match ChatMessage::try_from(msg.data) {
Ok(message) => {
let answer = answer_request(message);
publish_message_to_channel(&reply_address, &answer, &nc)?;
}
Err(e) => eprintln!("Error processing message: {}", e),
};
Ok(())
},
);
Ok(subscription)
}
pub fn publish_message(message: &ChatMessage, nc: &Connection) -> io::Result<()> { pub fn publish_message(message: &ChatMessage, nc: &Connection) -> io::Result<()> {
let json_string_utf8 = serde_json::to_string(message).expect("Cannot serialize message"); publish_message_to_channel(SUBJECT_MESSAGES, message, nc)
// TODO: there is no error if server is offline }
nc.publish(SUBJECT_MESSAGES, json_string_utf8)?;
pub fn publish_invalid_message(nc: &Connection) -> io::Result<()> {
nc.publish(SUBJECT_MESSAGES, "This is not a valid json")?;
Ok(()) Ok(())
} }
fn publish_message_to_channel(
channel: &str,
message: &ChatMessage,
nc: &Connection,
) -> io::Result<()> {
let json_string_utf8 = serde_json::to_string(message).expect("Cannot serialize message");
// TODO: there is no error if server is offline
nc.publish(channel, json_string_utf8)?;
Ok(())
}
pub fn post_request(message: &ChatMessage, nc: &Connection) -> io::Result<()> {
let json_string_utf8 = serde_json::to_string(message).expect("Cannot serialize message");
let response = nc.request_timeout(
SUBJECT_REQEUST_REPLY_MESSAGES,
json_string_utf8,
Duration::from_secs(5),
);
match response {
Ok(response) => {
match ChatMessage::try_from(response.data) {
Ok(message) => {
println!("Received quickchat message: {}", message);
}
Err(e) => eprintln!("Error processing message: {}", e),
};
Ok(())
}
Err(e) => {
if e.kind() == ErrorKind::NotFound {
eprintln!("There is no one else online!");
return Ok(());
} else if e.kind() == ErrorKind::TimedOut {
eprintln!("There was no fast enough response!");
return Ok(());
} else {
return Err(e);
}
}
}
}

View File

@ -8,8 +8,8 @@ authorization {
subscribe = ">" subscribe = ">"
} }
CHAT_CLIENT = { CHAT_CLIENT = {
publish = "telestion.chat" publish = ["telestion.>", "_INBOX.>"]
subscribe = "telestion.chat" subscribe = ["telestion.>", "_INBOX.>"]
} }
users = [ users = [
{user: admin, password: admin, permissions: $ADMIN} {user: admin, password: admin, permissions: $ADMIN}