I have a console app where I'm working through setting up a UDP listener using RIO. I started with simple polling to test the flow between a simple console client and the console app/server. I can see the traffic in wireshark and the ip/ports look correct.
However, when I move to a IOCP implementation, the GetQueuedCompletionStatus
doesn't fire until I work through a shutdown process.
PostQueuedCompletionStatus(hIocp, 0, static_cast<ULONG_PTR>(0x00000001), nullptr);
Once this line executes, GetQueuedCompletionStatus
returns and sends the echo back to the client app. I'm refreshing my network programming capabilities so not sure what I'm missing and I've not used RIO before.
I have the following defined:
#include <winsock2.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#include <windows.h>
#include <iostream>
#include <vector>
#include <thread>
#include <atomic>
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Mswsock.lib")
static constexpr ULONG PORT = 8085;
static constexpr ULONG BUF_SIZE = 2048; // bytes per payload buffer
static constexpr ULONG SLOT_COUNT = 10000; // number of concurrent receive slots
static constexpr ULONG CQ_DEPTH = SLOT_COUNT * 2 + 16; // must exceed max receives+sends
// -----------------------------------------------------------------------------
// Globals
// -----------------------------------------------------------------------------
RIO_EXTENSION_FUNCTION_TABLE rio; // holds pointers to RIO APIs
std::atomic<bool> running{ true }; // for graceful shutdown
bool LoadRioFunctions(SOCKET s)
{
GUID guid = WSAID_MULTIPLE_RIO;
DWORD bytes = 0;
int rc = WSAIoctl(
s,
SIO_GET_MULTIPLE_EXTENSION_FUNCTION_POINTER,
&guid, sizeof(guid),
&rio, sizeof(rio),
&bytes,
nullptr,
nullptr
);
return (rc == 0);
}
In main(), I setup a socket and the other RIO items (apologies for the length; I've tried to removed as much as possible for post):
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) {
std::cerr << "[Fatal] WSAStartup failed\n";
return 1;
}
// 1) Create UDP socket with RIO support
SOCKET sock = WSASocket(
AF_INET,
SOCK_DGRAM,
IPPROTO_UDP,
nullptr,
0,
WSA_FLAG_REGISTERED_IO | WSA_FLAG_OVERLAPPED
);
// 2) Bind to 0.0.0.0:PORT
sockaddr_in localAddr{};
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = htonl(INADDR_ANY);
localAddr.sin_port = htons(PORT);
if (bind(sock, reinterpret_cast<sockaddr*>(&localAddr), sizeof(localAddr)) == SOCKET_ERROR) {
}
// 3) Load RIO function pointers
if (!LoadRioFunctions(sock)) {
}
// 4) Allocate & register dataPool: SLOT_COUNT * BUF_SIZE bytes
std::vector<char> dataPool;
dataPool.resize(static_cast<size_t>(SLOT_COUNT) * BUF_SIZE);
RIO_BUFFERID dataBufId = rio.RIORegisterBuffer(
dataPool.data(),
static_cast<DWORD>(dataPool.size())
);
if (dataBufId == RIO_INVALID_BUFFERID) {
}
// 5) Allocate & register addrPool: SLOT_COUNT * sizeof(sockaddr_storage)
std::vector<char> addrPool;
addrPool.resize(static_cast<size_t>(SLOT_COUNT) * sizeof(sockaddr_storage));
RIO_BUFFERID addrBufId = rio.RIORegisterBuffer(
addrPool.data(),
static_cast<DWORD>(addrPool.size())
);
if (addrBufId == RIO_INVALID_BUFFERID) {
}
// 6) Create IO Completion Port (IOCP) for RIO to post into
HANDLE hIocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, 0);
if (!hIocp) {
}
std::cout << "[Debug] Created IOCP handle = " << hIocp << "\n";
if (hIocp == nullptr) {
}
// 7) Create a RIO Completion Queue with RIO_IOCP_COMPLETION notification
static OVERLAPPED ov = {};
RIO_NOTIFICATION_COMPLETION notify{};
notify.Type = RIO_IOCP_COMPLETION;
notify.Iocp.IocpHandle = hIocp;
notify.Iocp.CompletionKey = reinterpret_cast<PVOID>((ULONG_PTR)0xDEADBEEF);
notify.Iocp.Overlapped = &ov;
RIO_CQ cq = rio.RIOCreateCompletionQueue(
static_cast<DWORD>(CQ_DEPTH),
¬ify
);
if (cq == RIO_INVALID_CQ) {
}
// 8) Create a RIO Request Queue with SLOT_COUNT receives & sends,
// each using a 4‐byte context (the slot index).
RIO_RQ rq = rio.RIOCreateRequestQueue(
sock,
SLOT_COUNT, // maxReceiveRequests
1, //sizeof(ULONG), // receiveContextSize
SLOT_COUNT, // maxSendRequests
1, //sizeof(ULONG), // sendContextSize
cq, // receiveCompletionQueue
cq, // sendCompletionQueue (reuse)
nullptr // no additional notifications
);
if (rq == RIO_INVALID_RQ) {
}
// 9) Set up RIO_BUF arrays for data + address slots
std::vector<RIO_BUF> dataBufs(SLOT_COUNT);
std::vector<RIO_BUF> addrBufs(SLOT_COUNT);
for (ULONG i = 0; i < SLOT_COUNT; ++i) {
dataBufs[i].BufferId = dataBufId;
dataBufs[i].Offset = i * BUF_SIZE;
dataBufs[i].Length = BUF_SIZE;
addrBufs[i].BufferId = addrBufId;
addrBufs[i].Offset = static_cast<ULONG>(i * sizeof(sockaddr_storage));
addrBufs[i].Length = static_cast<ULONG>(sizeof(sockaddr_storage));
}
// 10) Post an initial RIOReceiveEx for every slot (context = i)
for (ULONG i = 0; i < SLOT_COUNT; ++i) {
BOOL ok = rio.RIOReceiveEx(
rq,
&dataBufs[i],
1,
nullptr,
&addrBufs[i],
nullptr, nullptr,
/*Flags=*/ 0,
reinterpret_cast<PVOID>(static_cast<ULONG_PTR>(i))
);
if (!ok) {
}
}
std::cout << "[Info] Posted " << SLOT_COUNT
<< " initial RIOReceiveEx calls (UDP port " << PORT << ")\n";
// 11) Launch a dedicated thread to block on GetQueuedCompletionStatus
// and drain each batch of RIORESULTs.
std::thread worker(
IocpWorkerThread,
hIocp,
cq,
rq,
dataBufId,
std::ref(dataPool),
std::ref(addrPool),
std::ref(dataBufs),
std::ref(addrBufs)
);
// 12) Wait for user to press ENTER to shut down
std::cout << "Press ENTER to shut down server.\n";
std::cin.get();
IocpWorkerThread starts as follows:
void IocpWorkerThread(
HANDLE hIocp,
RIO_CQ cq,
RIO_RQ rq,
RIO_BUFFERID dataBufId,
std::vector<char>& dataPool,
std::vector<char>& addrPool,
std::vector<RIO_BUF>& dataBufs,
std::vector<RIO_BUF>& addrBufs
)
{
constexpr ULONG BATCH = 64;
std::vector<RIORESULT> results(BATCH);
sockaddr_storage clientAddr;
while (running.load()) {
DWORD bytesTransferred = 0;
ULONG_PTR completionKey = 0;
LPOVERLAPPED pOvPtr = nullptr;
// Block until RIO posts an IOCP completion (via RIOCreateCompletionQueue w/ IOCP)
BOOL ok = GetQueuedCompletionStatus(
hIocp,
&bytesTransferred,
&completionKey,
&pOvPtr,
INFINITE
);
DWORD gle = GetLastError();
std::cout << "[Worker] GQCS returned: ok=" << ok
<< " bytes=" << bytesTransferred
<< " key=0x" << std::hex << completionKey << std::dec
<< " pOv=" << pOvPtr
<< " err=" << gle << "\n";
It's at GetQueuedCompletionStatus
that it waits. And, until I hit enter in the console to terminate the app, it never returns even when I send packets with the test client.
Without the IOCP and the same test client, the server will send back the expected echo.
I think my main struggle is understanding the "completion key" and how it needs to be configured when creating the completion q and the request q.
As I noted above, I first had the server console app running with RIO and polling and without the IOCP and a simple client which worked. The client takes a string you type, sends to server. The server will read, log size of string and echo back.
You have created a Completion Queue for RIO to notify you of completed actions, but you did not tell RIO that you are ready to receive notification of those completions, which is why GetQueuedCompletionStatus()
doesn't tell you anything.
You need to call RIONotify()
after you have created the Completion Queue. GetQueuedCompletionStatus()
will then notify you when the Queue is not empty (ie, when RIOReceiveEx()
has completed). Use RIODequeueCompletion()
to retrieve the completed items from the Queue, and then call RIONotify()
again to notify you when the Queue is not empty again.
For example:
...
RIO_CQ cq = rio.RIOCreateCompletionQueue(
static_cast<DWORD>(CQ_DEPTH),
¬ify
);
rio.RIONotify(cq); // <-- ADD THIS!
...
void IocpWorkerThread(
HANDLE hIocp,
RIO_CQ cq,
RIO_RQ rq,
RIO_BUFFERID dataBufId,
std::vector<char>& dataPool,
std::vector<char>& addrPool,
std::vector<RIO_BUF>& dataBufs,
std::vector<RIO_BUF>& addrBufs
)
{
...
while (running.load()) {
...
GetQueuedCompletionStatus(...);
...
ULONG count = rio.RIODequeueCompletion(cq, ...);
...
rio.RIONotify(cq); // <-- ADD THIS!
}
}
See Windows 8 Registered I/O and I/O Completion Ports for more detail.