rusttcptcpclientrust-tokiotcpserver

TCPListener (Server) not getting accept request from Client for all clients before the server instance in ip list, when running in threads


I have 4 EC2 instances and I plan to have a distributed network, so every one will send a data to everyone (including itself).

I first read the ip addresses from a file to a variable ip_address_clone.

Say the list is like this:

A.A.A.A
B.B.B.B
C.C.C.C
D.D.D.D

Then I try to run server and client for all of them in thread, so that theres a sender and receiver worker active in an instance for all instances (again for itself as well).

thread::scope(|s| {
    s.spawn(|| {
        for _ip in ip_address_clone.clone() {
            let _result = newserver::handle_server(INITIAL_PORT + port_count);
        }
    });

    s.spawn(|| {
        let three_millis = time::Duration::from_millis(3);
        thread::sleep(three_millis);

        for ip in ip_address_clone.clone() {
            let self_ip_clone = self_ip.clone();

            let _result = newclient::match_tcp_client(
                [ip.to_string(), (INITIAL_PORT + port_count).to_string()].join(":"),
                self_ip_clone,
            );
        }
    });
});

The server code is:

use std::error::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::tcp::ReadHalf;
use tokio::net::TcpListener;

#[tokio::main]
pub async fn handle_server(port: u32) -> Result<(), Box<dyn Error>> {
    let listener = TcpListener::bind(["0.0.0.0".to_string(), port.to_string()].join(":"))
        .await
        .unwrap(); // open connection

    let (mut socket, _) = listener.accept().await.unwrap(); // starts listening
    println!("---continue---");

    let (reader, mut writer) = socket.split(); // tokio socket split to read and write concurrently

    let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
    let mut line: String = String::new();

    loop {
        //loop to get all the data from client until EOF is reached

        let _bytes_read: usize = reader.read_line(&mut line).await.unwrap();

        if line.contains("EOF")
        //REACTOR to be used here
        {
            println!("EOF Reached");

            writer.write_all(line.as_bytes()).await.unwrap();
            println!("{}", line);

            line.clear();

            break;
        }
    }

    Ok(())
}

And client code is:

use std::error::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

#[tokio::main]
pub async fn match_tcp_client(address: String, self_ip: String) -> Result<(), Box<dyn Error>> {
    // Connect to a peer
    let mut stream = TcpStream::connect(address.clone()).await?;
    // Write some data.
    stream.write_all(self_ip.as_bytes()).await?;
    stream.write_all(b"hello world!EOF").await?;
    // stream.shutdown().await?;
    Ok(())
}

Problem is, I am not getting the communication as I expect to. In fact, the first instance I run (with ssh) receives all the data, the second one receives all data except from the first one, the third one receives all data except from the first and second one, and so on.

Here's a log of the first instance:

Starting
execution type
nok
launched
---continue---
EOF Reached
A.A.A.Ahello world!EOF
---continue---
EOF Reached
B.B.B.Bhello world!EOF
---continue---
EOF Reached
C.C.C.Chello world!EOF
---continue---
EOF Reached
D.D.D.Dhello world!EOF

And log of second instance:

Starting
execution type
nok
launched
---continue---
EOF Reached
B.B.B.Bhello world!EOF
---continue---
EOF Reached
C.C.C.Chello world!EOF
---continue---
EOF Reached
D.D.D.Dhello world!EOF

It seems like though I am using thread, the communication remains synchronous, and a particular instance can only get data from itself to the rest of the ips in ip_address_clone. You can see the number of times ---continue--- occurs in the second instance log, its listener doesnt seem to accept the request from the first instance.


Solution

  • I think, the evidence that "A node is only getting data to itself", highly indicates that it is sending data to its own port (only) and not other ports (which are exactly the same). Here, I believe that unique ports should solve your problem.

    On a personal note, testing asynchronous communication between distributed nodes is hard; especially when we have multiple threads, and they don't work.