windowssocketsudpwinsock2

Windows RIO/UDP with IOCP: Issue in getting GetQueuedCompletionStatus to work


I have a console app where I'm working through setting up a UDP listener using RIO. I started with simple polling to test the flow between a simple console client and the console app/server. I can see the traffic in wireshark and the ip/ports look correct. However, when I move to a IOCP implementation, the GetQueuedCompletionStatus doesn't fire until I work through a shutdown process.

PostQueuedCompletionStatus(hIocp, 0, static_cast<ULONG_PTR>(0x00000001), nullptr);

Once this line executes, GetQueuedCompletionStatus returns and sends the echo back to the client app. I'm refreshing my network programming capabilities so not sure what I'm missing and I've not used RIO before.

I have the following defined:

#include <winsock2.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <windows.h>
#include <iostream>
#include <vector>
#include <thread>
#include <atomic>

#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Mswsock.lib")

static constexpr ULONG PORT = 8085;
static constexpr ULONG BUF_SIZE = 2048;       // bytes per payload buffer
static constexpr ULONG SLOT_COUNT = 10000;      // number of concurrent receive slots
static constexpr ULONG CQ_DEPTH = SLOT_COUNT * 2 + 16;  // must exceed max receives+sends

// -----------------------------------------------------------------------------
// Globals
// -----------------------------------------------------------------------------
RIO_EXTENSION_FUNCTION_TABLE rio;   // holds pointers to RIO APIs
std::atomic<bool> running{ true };  // for graceful shutdown

bool LoadRioFunctions(SOCKET s)
{
    GUID guid = WSAID_MULTIPLE_RIO;
    DWORD bytes = 0;
    int rc = WSAIoctl(
        s,
        SIO_GET_MULTIPLE_EXTENSION_FUNCTION_POINTER,
        &guid, sizeof(guid),
        &rio, sizeof(rio),
        &bytes,
        nullptr,
        nullptr
    );
    return (rc == 0);
}

In main(), I setup a socket and the other RIO items (apologies for the length; I've tried to removed as much as possible for post):

    WSADATA wsaData;
    if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
        std::cerr << "[Fatal] WSAStartup failed\n";
        return 1;
    }

    // 1) Create UDP socket with RIO support
    SOCKET sock = WSASocket(
        AF_INET,
        SOCK_DGRAM,
        IPPROTO_UDP,
        nullptr,
        0,
        WSA_FLAG_REGISTERED_IO | WSA_FLAG_OVERLAPPED
    );

    // 2) Bind to 0.0.0.0:PORT
    sockaddr_in localAddr{};
    localAddr.sin_family = AF_INET;
    localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    localAddr.sin_port = htons(PORT);

    if (bind(sock, reinterpret_cast<sockaddr*>(&localAddr), sizeof(localAddr)) == SOCKET_ERROR) {
    }

    // 3) Load RIO function pointers
    if (!LoadRioFunctions(sock)) {
    }

    // 4) Allocate & register dataPool: SLOT_COUNT * BUF_SIZE bytes
    std::vector<char> dataPool;
    dataPool.resize(static_cast<size_t>(SLOT_COUNT) * BUF_SIZE);

    RIO_BUFFERID dataBufId = rio.RIORegisterBuffer(
        dataPool.data(),
        static_cast<DWORD>(dataPool.size())
    );
    if (dataBufId == RIO_INVALID_BUFFERID) {
    }

    // 5) Allocate & register addrPool: SLOT_COUNT * sizeof(sockaddr_storage)
    std::vector<char> addrPool;
    addrPool.resize(static_cast<size_t>(SLOT_COUNT) * sizeof(sockaddr_storage));

    RIO_BUFFERID addrBufId = rio.RIORegisterBuffer(
        addrPool.data(),
        static_cast<DWORD>(addrPool.size())
    );
    if (addrBufId == RIO_INVALID_BUFFERID) {
    }

    // 6) Create IO Completion Port (IOCP) for RIO to post into
    HANDLE hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
    if (!hIocp) {
    }
    std::cout << "[Debug] Created IOCP handle = " << hIocp << "\n";
    if (hIocp == nullptr) {
    }

    // 7) Create a RIO Completion Queue with RIO_IOCP_COMPLETION notification
    static OVERLAPPED ov = {};
    RIO_NOTIFICATION_COMPLETION notify{};
    notify.Type = RIO_IOCP_COMPLETION;
    notify.Iocp.IocpHandle = hIocp;
    notify.Iocp.CompletionKey = reinterpret_cast<PVOID>((ULONG_PTR)0xDEADBEEF);
    notify.Iocp.Overlapped = &ov;

    RIO_CQ cq = rio.RIOCreateCompletionQueue(
        static_cast<DWORD>(CQ_DEPTH),
        &notify
    );
    if (cq == RIO_INVALID_CQ) {
    }

    // 8) Create a RIO Request Queue with SLOT_COUNT receives & sends,
    //    each using a 4‐byte context (the slot index).
    RIO_RQ rq = rio.RIOCreateRequestQueue(
        sock,
        SLOT_COUNT,        // maxReceiveRequests
        1, //sizeof(ULONG),     // receiveContextSize
        SLOT_COUNT,        // maxSendRequests
        1, //sizeof(ULONG),     // sendContextSize
        cq,                // receiveCompletionQueue
        cq,                // sendCompletionQueue (reuse)
        nullptr            // no additional notifications
    );
    if (rq == RIO_INVALID_RQ) {
    }

    // 9) Set up RIO_BUF arrays for data + address slots
    std::vector<RIO_BUF> dataBufs(SLOT_COUNT);
    std::vector<RIO_BUF> addrBufs(SLOT_COUNT);

    for (ULONG i = 0; i < SLOT_COUNT; ++i) {
        dataBufs[i].BufferId = dataBufId;
        dataBufs[i].Offset = i * BUF_SIZE;
        dataBufs[i].Length = BUF_SIZE;

        addrBufs[i].BufferId = addrBufId;
        addrBufs[i].Offset = static_cast<ULONG>(i * sizeof(sockaddr_storage));
        addrBufs[i].Length = static_cast<ULONG>(sizeof(sockaddr_storage));
    }

    // 10) Post an initial RIOReceiveEx for every slot (context = i)
    for (ULONG i = 0; i < SLOT_COUNT; ++i) {
        BOOL ok = rio.RIOReceiveEx(
            rq,
            &dataBufs[i],
            1,
            nullptr,
            &addrBufs[i],
            nullptr, nullptr,
            /*Flags=*/ 0,
            reinterpret_cast<PVOID>(static_cast<ULONG_PTR>(i))
        );
        if (!ok) {
        }
    }

    std::cout << "[Info] Posted " << SLOT_COUNT
        << " initial RIOReceiveEx calls (UDP port " << PORT << ")\n";

    // 11) Launch a dedicated thread to block on GetQueuedCompletionStatus
    //     and drain each batch of RIORESULTs.
    std::thread worker(
        IocpWorkerThread,
        hIocp,
        cq,
        rq,
        dataBufId,
        std::ref(dataPool),
        std::ref(addrPool),
        std::ref(dataBufs),
        std::ref(addrBufs)
    );

    // 12) Wait for user to press ENTER to shut down
    std::cout << "Press ENTER to shut down server.\n";
    std::cin.get();

IocpWorkerThread starts as follows:

void IocpWorkerThread(
    HANDLE            hIocp,
    RIO_CQ            cq,
    RIO_RQ            rq,
    RIO_BUFFERID      dataBufId,
    std::vector<char>& dataPool,
    std::vector<char>& addrPool,
    std::vector<RIO_BUF>& dataBufs,
    std::vector<RIO_BUF>& addrBufs
)
{
    constexpr ULONG BATCH = 64;
    std::vector<RIORESULT> results(BATCH);
    sockaddr_storage clientAddr;

    while (running.load()) {
        DWORD       bytesTransferred = 0;
        ULONG_PTR   completionKey = 0;
        LPOVERLAPPED pOvPtr = nullptr;
        // Block until RIO posts an IOCP completion (via RIOCreateCompletionQueue w/ IOCP)
        BOOL ok = GetQueuedCompletionStatus(
            hIocp,
            &bytesTransferred,
            &completionKey,
            &pOvPtr,
            INFINITE
        );
        DWORD gle = GetLastError();
        std::cout << "[Worker] GQCS returned: ok=" << ok
            << "  bytes=" << bytesTransferred
            << "  key=0x" << std::hex << completionKey << std::dec
            << "  pOv=" << pOvPtr
            << "  err=" << gle << "\n";

It's at GetQueuedCompletionStatus that it waits. And, until I hit enter in the console to terminate the app, it never returns even when I send packets with the test client.

Without the IOCP and the same test client, the server will send back the expected echo.

I think my main struggle is understanding the "completion key" and how it needs to be configured when creating the completion q and the request q.

As I noted above, I first had the server console app running with RIO and polling and without the IOCP and a simple client which worked. The client takes a string you type, sends to server. The server will read, log size of string and echo back.


Solution

  • You have created a Completion Queue for RIO to notify you of completed actions, but you did not tell RIO that you are ready to receive notification of those completions, which is why GetQueuedCompletionStatus() doesn't tell you anything.

    You need to call RIONotify() after you have created the Completion Queue. GetQueuedCompletionStatus() will then notify you when the Queue is not empty (ie, when RIOReceiveEx() has completed). Use RIODequeueCompletion() to retrieve the completed items from the Queue, and then call RIONotify() again to notify you when the Queue is not empty again.

    For example:

    ...
    
    RIO_CQ cq = rio.RIOCreateCompletionQueue(
            static_cast<DWORD>(CQ_DEPTH),
            &notify
        );
    
    rio.RIONotify(cq); // <-- ADD THIS!
    
    ...
        
    void IocpWorkerThread(
        HANDLE            hIocp,
        RIO_CQ            cq,
        RIO_RQ            rq,
        RIO_BUFFERID      dataBufId,
        std::vector<char>& dataPool,
        std::vector<char>& addrPool,
        std::vector<RIO_BUF>& dataBufs,
        std::vector<RIO_BUF>& addrBufs
    )
    {
        ...
    
        while (running.load()) {
            ...
            GetQueuedCompletionStatus(...);
            ...
            ULONG count = rio.RIODequeueCompletion(cq, ...);
            ...
            rio.RIONotify(cq); // <-- ADD THIS!
        }
    }
    

    See Windows 8 Registered I/O and I/O Completion Ports for more detail.