I currently have 2 files: Main.rs and Connection.rs.
Connection.rs currently contains the ability to Send
, Listen
and Connect
to a TcpStream
.
Connection.rs
use tokio::io::{AsyncReadExt, AsyncWriteExt, WriteHalf, ReadHalf};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use std::sync::Arc;
use iced_futures::futures;
#[derive(Debug, Clone)]
pub enum ConnMessage {
Code(usize, String),
Chat(usize, String),
View(usize, String),
None,
}
#[derive(Debug, Clone)]
pub enum ConnectionError {
ConnectError(usize),
SendError(usize),
ReadError(usize),
}
#[derive(Debug, Clone)]
pub struct Connection {
pub write_stream: Arc<Mutex<WriteHalf<TcpStream>>>,
pub read_stream: Arc<Mutex<ReadHalf<TcpStream>>>,
pub loc: usize,
pub code: String,
}
impl Connection {
pub async fn connect(loc:usize) -> Result<Connection, ConnectionError> {
let socket = TcpStream::connect("3.92.0.221:80").await.map_err(|_| ConnectionError::ConnectError(loc))?;
let (rd, wr) = tokio::io::split(socket);
let conn = Connection {
write_stream: Arc::new(Mutex::new(wr)),
read_stream: Arc::new(Mutex::new(rd)),
loc: loc,
code: String::from(""),
};
Ok( conn )
}
pub fn listen(conn: Connection) -> Result<(), ConnectionError> {
tokio::spawn(async move {
let mut message = String::from("");
loop {
let mut buf = [0u8; 16];
let mut rd = conn.read_stream.lock().await;
rd.read(&mut buf).await.unwrap();
// ASSUMPTION - Disconnected when Array is all 0s, i.e. a set of bytes that contained nothing is sent
let mut disconnected = true;
for i in buf.iter() {
if i != &0u8 {
disconnected = false;
}
}
if disconnected {
println!("Disconnected");
}
else {
let string_result = std::str::from_utf8(&buf).map_err(|_| ConnectionError::ReadError(conn.loc));
if string_result.is_ok() {
let string_contents = string_result.unwrap();
println!("conn.loc: {} -- Contents: {}", conn.loc, string_contents);
message += string_contents;
// End of Message - Parse and Reset
if message.contains("\\.") {
println!("EOM");
message = message.replace("\\.", "");
// Send `message` to Message inside Main.rs
message = String::from("");
println!("Resetting Msg");
}
else {
println!("Not end of message");
}
}
else {
println!("String Result Error");
}
}
}
});
Ok(())
}
pub async fn send(connection: Connection, string: String) -> Result<(), ConnectionError> {
let mut stream = connection.write_stream.lock().await;
stream.write_all(string.as_bytes()).await.map_err(|_| ConnectionError::SendError(connection.loc))?;
//println!("Code: {}", connection.code);
Ok( () )
}
}
Main.rs currently contains utilization of Iced to include a GUI that I created to establish connections on button presses.
use iced::{
pane_grid, PaneGrid, Application, Settings, executor, Command, Container, Length, Element,
scrollable, button, Align, HorizontalAlignment, Column, Scrollable, Text, Button, Row,
text_input, TextInput,
};
use tokio::io::{AsyncReadExt, AsyncWriteExt, WriteHalf, ReadHalf};
//use tokio_util::codec::{BytesCodec, FramedRead, FramedWrite};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
//use futures::prelude::*;
use std::sync::Arc;
mod Connection;
pub fn main() -> iced::Result {
ClientAdminGUI::run(Settings::default())
}
struct ClientAdminGUI {
sessions_pane: pane_grid::State<ClientAdminSessionPaneContent>,
chat_pane: pane_grid::State<ClientAdminChatPaneContent>,
image_viewer_pane: pane_grid::State<ClientAdminViewerPaneContent>,
connections: Vec<Connection::Connection>,
cur_connection: Option<Connection::Connection>,
}
#[derive(Debug, Clone)]
enum Message {
Session(Result<Connection::Connection, Connection::ConnectionError>), //Async Handler
SendMessage,
Send(Result<(), Connection::ConnectionError>), //Async Handler
InputChanged(String),
Button(usize, ButtonMessage),
//None,
UpdateCode(String),
ReadConnMessage(Result<Connection::ConnMessage, ()>),
}
impl Application for ClientAdminGUI {
type Message = Message;
type Executor = executor::Default;
type Flags = ();
fn new(_flags: ()) -> (Self, Command<Message>) {
let sessions_pane_content_value = ClientAdminSessionPaneContent::new();
let (sessions_pane, _) = pane_grid::State::new(sessions_pane_content_value);
let chat_pane_content_value = ClientAdminChatPaneContent::new();
let (chat_pane, _) = pane_grid::State::new(chat_pane_content_value);
let (image_viewer_pane, _) = pane_grid::State::new(ClientAdminViewerPaneContent::new());
(
ClientAdminGUI {
sessions_pane,
chat_pane,
image_viewer_pane,
connections: Vec::new(),
cur_connection: None
},
Command::none(),
)
}
fn title(&self) -> String {
String::from("Client Admin GUI")
}
fn update(&mut self, message: Message) -> Command<Message> {
match message {
Message::Session(Ok(result)) => {
// result is a connection
self.connections.push(result);
// ...
Connection::Connection::listen(Some(self.connections[self.connections.len()-1].clone()).unwrap());
}
... //For all the rest of `Message`s
}
}
}
Inside my listen
function, I get the reply from a connection inside of there. But, I am not entirely sure how to feed that back to the application to be able to do some action with it.
Question:
How can I send the data I obtain from the listen
function within Connection.rs back to my main.rs as a Message
- say to my Message::UpdateCode(String)
functionality?
I simply needed to pass a closure
as an argument to the function. From there, I had to ensure the lifetime was correct and it had the correct paramters.
You can accomplish this by passing a generic
type then using where
to set the exact type of the generic.
Connection.rs
use tokio::io::{AsyncReadExt, AsyncWriteExt, WriteHalf, ReadHalf};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use std::sync::Arc;
use iced_futures::futures;
#[derive(Debug, Clone)]
pub enum ConnMessage {
Code(usize, String),
Chat(usize, String),
View(usize, String),
None,
}
#[derive(Debug, Clone)]
pub enum ConnectionError {
ConnectError(usize),
SendError(usize),
ReadError(usize),
}
#[derive(Debug, Clone)]
pub struct Connection {
pub write_stream: Arc<Mutex<WriteHalf<TcpStream>>>,
pub read_stream: Arc<Mutex<ReadHalf<TcpStream>>>,
pub loc: usize,
pub code: String,
}
impl Connection {
pub async fn connect(loc:usize) -> Result<Connection, ConnectionError> {
let socket = TcpStream::connect("3.92.0.221:80").await.map_err(|_| ConnectionError::ConnectError(loc))?;
let (rd, wr) = tokio::io::split(socket);
let conn = Connection {
write_stream: Arc::new(Mutex::new(wr)),
read_stream: Arc::new(Mutex::new(rd)),
loc: loc,
code: String::from(""),
};
Ok( conn )
}
pub fn listen<F>(conn: Connection, read_message: F) where F: Fn(String, usize) + 'static + std::marker::Send {
tokio::spawn(async move {
let mut message = String::from("");
loop {
let mut buf = [0u8; 16];
let mut rd = conn.read_stream.lock().await;
rd.read(&mut buf).await.unwrap();
// ASSUMPTION - Disconnected when Array is all 0s, i.e. a set of bytes that contained nothing is sent
let mut disconnected = true;
for i in buf.iter() {
if i != &0u8 {
disconnected = false;
}
}
if disconnected {
println!("Disconnected");
}
else {
let string_result = std::str::from_utf8(&buf).map_err(|_| ConnectionError::ReadError(conn.loc));
if string_result.is_ok() {
let string_contents = string_result.unwrap();
println!("conn.loc: {} -- Contents: {}", conn.loc, string_contents);
message += string_contents;
// End of Message - Parse and Reset
if message.contains("\\.") {
println!("EOM");
message = message.replace("\\.", "");
read_message(message, conn.loc);
message = String::from("");
println!("Resetting Msg");
}
else {
println!("Not end of message");
}
}
else {
println!("String Result Error");
}
}
}
});
}
pub async fn send(connection: Connection, string: String) -> Result<(), ConnectionError> {
let mut stream = connection.write_stream.lock().await;
stream.write_all(string.as_bytes()).await.map_err(|_| ConnectionError::SendError(connection.loc))?;
Ok( () )
}
}
Main.rs (Just relevant portion)
Connection::Connection::listen(Some(self.connections[self.connections.len()-1].clone()).unwrap(),
(|string:String, loc:usize| {
println!("String is: {} -- loc: {}", string, loc);
}));