c++boost-asioportaudioopus

How can I loop encode/decoding data from socket?


I'm currently working on a voice transmission software in C++ using PortAudio and Opus for the sound related part and Boost Asio for the network part.

I encounter difficulties while trying to setup encoding and decoding in a loop.

To be more precise I have a function who handle the "calling" part of my project, this function is launch when 2 people accept to call with each other.

I got 2 different functions, one when the client accept the call, and the second when the client initiate the call.

Both functions are build in the same way, I start initialize every libraries that I need, then I listen to the socket for audio data to decode and send encoded audio data to the socket.

My issue is : I'm trying to do this in a while statement with the simple condition : if(clientConnected) {} but it seems that the code is doing only one loop turn and then block and I don't know why.

(I know that cause the std::cout << dataSize is only printed once)

This is one of the 2 call functions (receiveCall) (if you have any question ask me) :

void Window::receiveCall(const std::string& ip) {

    // Initialize PortAudio and Opus
    PortAudioWrapper portAudio;
    OpusWrapper opus;
    if (!portAudio.OpenDefaultStream(1, 1, paFloat32, 48000, 960))
    {
        std::cerr << "Failed to open PortAudio stream" << std::endl;
        return;
    }
    if (!opus.Init(48000, 2, OPUS_APPLICATION_VOIP))
    {
        std::cerr << "Failed to initialize Opus" << std::endl;
        return;
    }
    if (!portAudio.StartStream())
    {
        std::cerr << "Failed to start PortAudio stream" << std::endl;
        return;
    }
    boost::asio::io_service ioService;
    boost::asio::ip::tcp::socket socket(ioService);
    boost::asio::ip::tcp::acceptor acceptor(ioService);

    try
    {
        // Create an acceptor to listen for incoming connections
        boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string(ip), 12345);
        acceptor.open(endpoint.protocol());
        acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
        acceptor.bind(endpoint);
        acceptor.listen();

        std::cout << "Waiting for incoming connection on " << ip << ":12345" << std::endl;

        // Wait for a client to connect

        acceptor.accept(socket);

        std::cout << "Connected to client at " << socket.remote_endpoint().address().to_string() << std::endl;
        bool clientConnected = true;

        // Create a buffer to hold the audio data
        std::vector<unsigned char> buffer(1276);

        // Send and receive audio data
        while (clientConnected)
        {
            // Read audio data from PortAudio
            float audioData[960 * 2 * sizeof(float)];
            portAudio.readStream(audioData, 960);
            
            // Compress the audio data using Opus
            int dataSize = opus.Encode(audioData, 960, buffer.data(), 1276);
            std::cout << "dataSize : "<< dataSize << std::endl;
            // Send the compressed audio data to the client
            boost::asio::write(socket, boost::asio::buffer(buffer.data(), dataSize));

            // Receive compressed audio data from the client
            boost::system::error_code error;
            dataSize = socket.read_some(boost::asio::buffer(buffer), error);
            if (error == boost::asio::error::eof)
            {
                std::cout << "Client disconnected" << std::endl;
                clientConnected = false; // set flag to false when client disconnects
            }
            else if (error)
            {
                std::cerr << "Error receiving audio data: " << error.message() << std::endl;
                clientConnected = false; // set flag to false when client disconnects
            }

            // Decompress the audio data using Opus
            float decodedData[960 * 2 * sizeof(float)];
            int numSamples = opus.Decode(buffer.data(), dataSize, decodedData, 960);

            // Write the decompressed audio data to PortAudio
            portAudio.writeStream(decodedData, numSamples);
        }
    }    
    catch (std::exception& e) {
        std::cerr << "Exception in receiveCall: " << e.what() << std::endl;
    }
    // Cleanup code
    socket.close();
    acceptor.close();
    portAudio.StopStream();
}

The problem is probably in the loop but I don't know what it could be.

Thanks in advance.


Solution

  • The Opus side and buffer dimensioning (with caveats) uses 2-channel. Yet you initialize the PA stream with only one channel. That's probably a mistake.

    Besides, there is a lack of framing. Your code simple assumes that whatever is "sent" in one call is also received in one call to read_some. As the name implies, this is just not the case. It reads "some".

    Since you seem to know that 1276 is the upperbound for the encoded data size, you could avoid framing by hardcoding the chosen buffer lengths, channel count and sample formats for your peers. Then you could instead use a asio::read to composed-read one or more read_some operations to read 'the entire buffer' (or to EOF):

    dataSize = read(socket, asio::buffer(buffer), error);
    

    Of course this requires the sending to ignore the dataSize to work:

    write(socket, asio::buffer(buffer /*, dataSize*/));
    

    However, the Decode call needs to know dataSize, so you still have to transmit it.

    Other Issues

    I think the dimensions of the audioData and decodedAudio buffers are off. Fortunately on the "too large" side. I think the * sizeof(float) factor is wrong. I'd use C++ arrays anyways:

    struct sample { float chan0, chan1; };
    std::array<sample, 960> audioData, decodedData;
    

    Now you can avoid hardcoding the 960 magic number in various places too!

    Sending big-endian datasize

    Using Boost Endian to represent 32-bit integers portably on the wire:

    using NetSize = boost::endian::big_uint32_t;
    

    Now sending could look like:

    NetSize dataSize = opus.Encode(audioData.data(), audioData.size(), buffer.data(), buffer.size());
    std::cout << "dataSize : " << dataSize << std::endl;
    
    // Send the compressed audio data to the client
    write(socket,
          std::array{
              asio::buffer(&dataSize, sizeof(dataSize)),
              asio::buffer(buffer, dataSize),
          });
    

    And receiving similarly, but in two steps:

    read(socket, asio::buffer(&dataSize, sizeof(dataSize)), error);
    if (error) {
        dataSize = 0;
    } else {
        dataSize = read(socket, asio::buffer(buffer, dataSize), error);
    }
    

    Putting It All Together

    Live On Coliru

    #include <boost/asio.hpp>
    #include <boost/endian/arithmetic.hpp>
    #include <iostream>
    namespace asio = boost::asio;
    using asio::ip::tcp;
    
    using NetSize = boost::endian::big_uint32_t;
    
    enum { paFloat32 };
    struct PortAudioWrapper {
        unsigned channels_ = 0;
        float    val_      = 0;
    
        bool OpenDefaultStream(int, int channels, int format, unsigned, int) {
            channels_ = channels;
            assert(format == paFloat32);
            return true;
        }
    
        bool StartStream() { return true; }
        bool StopStream() { return true; }
    
        // float audioData[960 * 2 * sizeof(float)];
        bool writeStream(void const* buf, size_t n) {
            float const* fp = static_cast<float const*>(buf);
            assert(n);
            auto val = *fp;
            std::cerr << "writing " << channels_ << "x" << n << " of " << val //
                      << " (all equal: " << std::boolalpha
                      << std::all_of(fp, fp + channels_ * n, [val](float sample) { return sample == val; })
                      << " )" << std::endl;
            return true;
        }
        bool readStream(void* buf, size_t n) {
            float* fp = static_cast<float *>(buf);
            val_ += 1.111;
            std::cerr << "recording " << channels_ << "x" << n << " of audio " << val_ << std::endl;
            std::fill_n(fp, channels_ * n, val_);
            return true;
        }
    };
    
    enum { OPUS_APPLICATION_VOIP };
    struct OpusWrapper {
        unsigned channels_ = 0;
    
        bool Init(unsigned, unsigned channels, int) {
            channels_ = channels;
            return true;
        };
    
        int Encode(void const* pcm, size_t numsamples, uint8_t* buf, size_t bufsize) {
            assert(numsamples);
    
            numsamples = 1; // we just encode 1 sample - it's awesome compression!
            size_t const n = numsamples * channels_ * sizeof(float);
            assert(n <= bufsize);
            std::copy_n(static_cast<uint8_t const*>(pcm), n, buf);
            return n;
        }
    
        int Decode(uint8_t const* buf, size_t bufsize, void* pcm, size_t sampleroom) {
            int const samplesize = channels_ * sizeof(float);
            assert(samplesize);
            size_t const n = bufsize / samplesize;
            assert(n <= sampleroom);
    
            assert(bufsize >= n);
            std::copy_n(buf, n, static_cast<uint8_t*>(pcm));
            return n;
        }
    };
    
    struct Window {
        void receiveCall(std::string const& ip) {
            // Initialize PortAudio and Opus
            PortAudioWrapper portAudio;
            OpusWrapper      opus;
            if (!portAudio.OpenDefaultStream(1, 2, paFloat32, 48000, 960)) {
                std::cerr << "Failed to open PortAudio stream" << std::endl;
                return;
            }
            if (!opus.Init(48000, 2, OPUS_APPLICATION_VOIP)) {
                std::cerr << "Failed to initialize Opus" << std::endl;
                return;
            }
            if (!portAudio.StartStream()) {
                std::cerr << "Failed to start PortAudio stream" << std::endl;
                return;
            }
    
            assert(portAudio.channels_ == opus.channels_);
            asio::io_service ioService;
            tcp::socket      socket(ioService);
            tcp::acceptor    acceptor(ioService);
    
            try {
                // Create an acceptor to listen for incoming connections
                tcp::endpoint endpoint{asio::ip::address::from_string(ip), 12345};
                acceptor.open(endpoint.protocol());
                acceptor.set_option(tcp::acceptor::reuse_address(true));
                acceptor.bind(endpoint);
                acceptor.listen();
    
                std::cout << "Waiting for incoming connection on " << ip << ":12345" << std::endl;
    
                // Wait for a client to connect
    
                acceptor.accept(socket);
    
                std::cout << "Connected to client at " << socket.remote_endpoint() << std::endl;
                bool clientConnected = true;
    
                // Create a buffer to hold the audio data
                std::array<unsigned char, 1276> buffer;
    
                // Send and receive audio data
                while (clientConnected) {
                    // Read audio data from PortAudio
                    struct sample { float chan0, chan1; };
                    std::array<sample, 960> audioData, decodedData;
                    static_assert(sizeof(audioData) == 2 * 960 * sizeof(float));
                    static_assert(sizeof(decodedData) == 2 * 960 * sizeof(float));
    
                    portAudio.readStream(audioData.data(), audioData.size());
    
                    // Compress the audio data using Opus
                    NetSize dataSize = opus.Encode(audioData.data(), audioData.size(), buffer.data(), buffer.size());
                    std::cout << "dataSize : " << dataSize << std::endl;
    
                    // Send the compressed audio data to the client
                    write(socket,
                          std::array{
                              asio::buffer(&dataSize, sizeof(dataSize)),
                              asio::buffer(buffer, dataSize),
                          });
    
                    // Receive compressed audio data from the client
                    boost::system::error_code error;
                    read(socket, asio::buffer(&dataSize, sizeof(dataSize)), error);
                    if (error) {
                        dataSize = 0;
                    } else {
                        dataSize = read(socket, asio::buffer(buffer, dataSize), error);
                    }
    
                    clientConnected = !error.failed(); // set flag to false when client disconnects
                    if (error == asio::error::eof) {
                        std::cout << "Client disconnected" << std::endl;
                    } else if (error) {
                        std::cerr << "Error receiving audio data: " << error.message() << std::endl;
                    }
    
                    // Decompress the audio data using Opus
                    assert(dataSize <= buffer.size());
                    int numSamples = opus.Decode(buffer.data(), dataSize, decodedData.data(), decodedData.size());
    
                    // Write the decompressed audio data to PortAudio
                    portAudio.writeStream(decodedData.data(), numSamples);
                }
            } catch (std::exception& e) {
                std::cerr << "Exception in receiveCall: " << e.what() << std::endl;
            }
    
            // Cleanup code
            socket.close();
            acceptor.close();
            portAudio.StopStream();
        }
    };
    
    int main() {
        Window w;
        w.receiveCall("127.0.0.1");
    }
    

    Coliru output:

    g++ -std=c++20 -O2 -Wall -pedantic -pthread main.cpp 
    ./a.out&
    sleep 1; nc 127.0.0.1 12345 -w 1 | od -A none -f
    Waiting for incoming connection on 127.0.0.1:12345
    Connected to client at 127.0.0.1:34176
    recording 2x960 of audio 1.111
    dataSize : 8
    Client disconnected
    a.out: main.cpp:26: bool PortAudioWrapper::writeStream(const void*, size_t): Assertion `n' failed.
         3.85186e-34           1.111           1.111
    bash: line 9:  7295 Aborted                 (core dumped) ./a.out
    

    Of course, the "peer" doesn't send back a valid frame with opus data.

    enter image description here