socketstcpwin-universal-appc++-cxppl

Windows 10 Sockets and MS PPL. TCP connection between server and client breaks after few seconds


My system:

Server: Windows 10 Pro 1511 10586.36 Microsoft Visual Studio 2015 Community 14.0.24720.00 Update 1

Client: Windows 10 IoT Core (build 10586) Raspberry Pi 2.

My error:

Connection establishes successfully, but being lost after a few seconds of normal work. So data exchange actually works, but in Release version only for a few seconds, and in Debug version is about 40-60 seconds.

I always get:

t.get(); // See code below. Cancelled status.
SocketErrorStatus errorStatus = SocketError::GetStatus(e->HResult);//See code below. errorStatus == 0 (which means Unknown error)

I get these errors sometimes on client's side, sometimes on server's side.

I read MS example - https://code.msdn.microsoft.com/windowsapps/StreamSocket-Sample-8c573931. I also found number of ways how to implement async read/write operations using SocketStream object - https://github.com/Microsoft/Windows-universal-samples/tree/master/Samples/DataReaderWriter.

But i was unable to achieve stable work. Below is the code.

Server side:

void NetworkHandler::Connect() {
    listener->Control->KeepAlive = true;
    listener->Control->NoDelay = true;
    listener->Control->QualityOfService = 
    Windows::Networking::Sockets::SocketQualityOfService::LowLatency;
    String^ localHostAddr = ref new String(L"192.168.0.10");
    HostName^ localHost = ref new HostName(localHostAddr);
    String^ localPort = ref new String(L"3001");
    create_task(listener->BindEndpointAsync(localHost,      
    localPort)).then([this, localHost](task<void> previousTask) {
        try {
            // Try getting an exception.
            previousTask.get();
            String^ message = "Listening on address " +
            localHost->CanonicalName;
        }
        catch (Platform::Exception^ exception) {
            String^ message = "Start listening failed with error: " +
            exception->Message;
        }
    });
    listener->ConnectionReceived += ref new
    TypedEventHandler<StreamSocketListener^,
    StreamSocketListenerConnectionReceivedEventArgs^>(this,
    &NetworkHandler::doAccept);
}

void NetworkHandler::doAccept(StreamSocketListener^ listener,
   StreamSocketListenerConnectionReceivedEventArgs^ object) {
   socket = object->Socket;
   reader = ref new DataReader(socket->InputStream);
   reader->InputStreamOptions = InputStreamOptions::Partial;
   writer = ref new DataWriter(socket->OutputStream);
   ifConnected = true;
   doRead();
   doWrite();
}

void NetworkHandler::doRead() {
    task<UINT32>(reader->LoadAsync(sizeof(UINT32))).then([this](UINT32
    size) {
    if (size < sizeof(UINT32)) {
        // The underlying socket was closed before we were able to read
        // the whole data.
        cancel_current_task();
    }
    UINT32 dataLength = reader->ReadUInt32();
    return task<UINT32>(reader->LoadAsync(dataLength)).then([this, 
        dataLength](UINT32 actualDataLength) {
        if (actualDataLength != dataLength) {
            // The underlying socket was closed before we were able to 
            // read the whole data.
            cancel_current_task();
        }
    });
}).then([this](task<void> t) {
    try {
        // Try getting all exceptions from the continuation chain above 
        // this point.
        t.get();
        //read data here
        timer = reader->ReadDouble(); //timer
        altitudeASL = reader->ReadDouble(); //altitudeASL
        roll = reader->ReadDouble(); //roll
        pitch = reader->ReadDouble(); //pitch
        yaw = reader->ReadDouble(); //yaw
        vCas = reader->ReadDouble(); //vcas
        Eng0Rpm = reader->ReadDouble(); 
        Eng1Rpm = reader->ReadDouble();
        Eng2Rpm = reader->ReadDouble();
        Eng3Rpm = reader->ReadDouble();
        doRead();  //call doRead() again after previous call 
        // successfully ended.
    }
    catch (Platform::Exception^ e) {
        // Explicitly close the socket.
        SocketErrorStatus errorStatus =  
        SocketError::GetStatus(e->HResult);
        if (errorStatus != SocketErrorStatus::Unknown) {
            switch (errorStatus) {
                case SocketErrorStatus::HostNotFound: {
                    // If hostname from user, this may indicate bad input
                    // set a flag to ask user to re-enter hostname
                    break;
                }
                case SocketErrorStatus::ConnectionRefused: {
                    // The server might be temporarily busy
                    break;
                }
                case SocketErrorStatus::NetworkIsUnreachable: {
                    // Could be a connectivity issue
                    break;
                }
                case SocketErrorStatus::UnreachableHost: {
                    // Could be a connectivity issue
                    break;
                }
                case SocketErrorStatus::NetworkIsDown: {
                    // Could be a connectivity issue
                    break;
                }
                default: {
                    // Connection failed and no options are available
                    // Try to use cached data if available 
                    // may want to tell user that connect failed
                    break;
                }
            }
        }
        else {
            // got an Hresult that is not mapped to an enum
            // Could be a connectivity issue
        }
        ifConnected = false;
        delete socket;
    }
    catch (task_canceled&) {
        // Do not print anything here - this will usually happen  
        // because user closed the client socket.
        // Explicitly close the socket.
        ifConnected = false;
        delete socket;
    }
});
}

void NetworkHandler::doWrite() {
    writer->WriteUInt32(4 * sizeof(DOUBLE)); //data size in bytes;
    writer->WriteDouble(aileronCmd);  // 1-st double, aileron;
    writer->WriteDouble(elevatorCmd); //2-nd double, elevator;
    writer->WriteDouble(rudderCmd); //3-rd double, rudder;
    writer->WriteDouble(throttleCmd); //4-th double, throttle;

    UINT32 totalMessageSize = sizeof(UINT32) + 4 * sizeof(DOUBLE);
    //total message size

    task<UINT32>(writer->StoreAsync()).then([this, totalMessageSize] 
    (UINT32 writtenBytes) {
    if (writtenBytes != totalMessageSize)
        cancel_current_task();
    }).then([this](task<void> t) {
    try {
        // Try getting all exceptions from the continuation chain above 
        this point.
        t.get();
        doWrite(); //call doWrite() again after previous call
        successfully ended.
    }
    catch (Platform::Exception^ e) {
        // Explicitly close the socket.
        SocketErrorStatus errorStatus =
        SocketError::GetStatus(e->HResult);
        if (errorStatus != SocketErrorStatus::Unknown) {
            switch (errorStatus) {
                case SocketErrorStatus::HostNotFound: {
                    // If hostname from user, this may indicate bad input
                    // set a flag to ask user to re-enter hostname
                    break;
                }
                case SocketErrorStatus::ConnectionRefused: {
                    // The server might be temporarily busy
                    break;
                }
                case SocketErrorStatus::NetworkIsUnreachable: {
                    // Could be a connectivity issue
                    break;
                }
                case SocketErrorStatus::UnreachableHost: {
                    // Could be a connectivity issue
                    break;
                }
                case SocketErrorStatus::NetworkIsDown: {
                    // Could be a connectivity issue
                    break;
                }
                default: {
                    // Connection failed and no options are available
                    // Try to use cached data if available 
                    // may want to tell user that connect failed
                    break;
                }
            }
        }
        else {
            // got an Hresult that is not mapped to an enum
            // Could be a connectivity issue
        }
        ifConnected = false;
        delete socket;
    }
    catch (task_canceled&) {
            // Do not print anything here - this will usually happen
            // because user closed the client socket.
            // Explicitly close the socket.
            ifConnected = false;
            delete socket;
        }
    });
}

Client side:

void SocketBoard::Connect() {
    socket = ref new StreamSocket();
    socket->Control->KeepAlive = true;
    socket->Control->NoDelay = true;
    socket->Control->QualityOfService =  
    Windows::Networking::Sockets::SocketQualityOfService::LowLatency;
    reader = ref new DataReader(socket->InputStream);
    reader->InputStreamOptions = InputStreamOptions::Partial;
    writer = ref new DataWriter(socket->OutputStream);
    String^ remoteHostAddr = ref new String(L"192.168.0.10");
    HostName^ remoteHost = ref new HostName(remoteHostAddr);
    String^ remotePort = ref new String(L"3001");
    create_task(socket->ConnectAsync(remoteHost, remotePort)).get();
    ifConnected = true;
    TimeSpan period;
    period.Duration = 1 * 10000000; // 10,000,000 ticks per second
    ThreadPoolTimer^ PeriodicTimer =
    ThreadPoolTimer::CreatePeriodicTimer(ref new
    TimerElapsedHandler([this](ThreadPoolTimer^ source) {
    timer_sec--;
    if (timer_sec <= 0)
        source->Cancel();
}), period, ref new TimerDestroyedHandler([&](ThreadPoolTimer^ source)  
{}));
   doRead();
   doWrite();
}

SocketBoard::doRead() { //same as Server }
SocketBoard::doWrite() { //same as Server }

I feel that something (internal buffer, or thread pool) is overheaded, and this cause an exception.

Could please anybody explain it?


Solution

  • I removed

    reader->InputStreamOptions = InputStreamOptions::Partial;
    

    and now reading/writing operations work correctly. The mistake was made when i copied

    if (actualDataLength != dataLength) {
        // The underlying socket was closed before we were able to 
        // read the whole data.
        cancel_current_task();
    }
    

    from MS Samples (mentioned above in the question). So when read is PARTIAL, cancel_current_task() is called and therefore socket is being closed which actually forces the opposite point (client/server) to throw.