Problem criteria:
The solution is trivial to describe: set a timeout on the read.
The implementation of said solution has been elusive.
I think I may have finally tracked down something that is viable, but I am so weary from false starts that I seek someone's approval who has done this sort of thing before before moving ahead with it.
By calling GetOverlappedResultsEx with a non-zero timeout:
https://learn.microsoft.com/en-us/windows/win32/api/ioapiset/nf-ioapiset-getoverlappedresultex
- If dwMilliseconds is nonzero, and an I/O completion routine or APC is queued, GetLastError returns WAIT_IO_COMPLETION.
- If dwMilliseconds is nonzero and the specified timeout interval elapses, GetLastError returns WAIT_TIMEOUT.
Thus, I can sit and wait until IO has been alerted or the timeout exceeded and react accordingly:
Is it really that simple, though? Because I have yet to find any questions or example code, etc. that closely resembles what I got going on here (which is largely based on a codebase I inherited) and as a consequence, have failed to find any examples/suggestions to support that this is appropriate.
Demo program: https://github.com/rguilbault-mt/rguilbault-mt/blob/main/WinSock.cpp
to run:
-p -d -t -gor
Make the read delay > timeout to force the timeout condition.
Relevant bits for this question:
StartThreadpoolIo(gIoTp[s]);
if (WSARecv(s, bufs, 1, &readBytes, &dwFlags, &ioData->ol, NULL) == SOCKET_ERROR)
{
std::lock_guard<std::mutex> log(gIoMtx);
switch (WSAGetLastError())
{
case WSA_IO_PENDING:
std::cout << preamble(__func__) << "asynchronous" << std::endl;
break;
default:
std::cerr << preamble(__func__) << "WSARecv() failed: " << WSAGetLastError() << std::endl;
CancelThreadpoolIo(gIoTp[s]);
return false;
}
}
else
{
std::lock_guard<std::mutex> log(gIoMtx);
std::cout << preamble(__func__) << "synchronous - " << readBytes << " read" << std::endl;
}
if (gGetOverlappedResult)
{
{
std::lock_guard<std::mutex> log(gIoMtx);
std::cout << preamble(__func__) << "wait until I/O occurs or we timeout..." << std::endl;
}
DWORD bytesTransferred = 0;
if (!GetOverlappedResultEx((HANDLE)s, &ioData->ol, &bytesTransferred, gTimeout, true))
{
DWORD e = GetLastError();
std::lock_guard<std::mutex> log(gIoMtx);
switch (e)
{
case WAIT_IO_COMPLETION:
std::cout << preamble(__func__) << "read activity is forthcoming" << std::endl;
break;
case WAIT_TIMEOUT:
// we hit our timeout, cancel the I/O
CancelIoEx((HANDLE)s, &ioData->ol);
break;
default:
std::cerr << preamble(__func__) << "GetOverlappedResult error is unhandled: " << e << std::endl;
}
}
else
{
std::lock_guard<std::mutex> log(gIoMtx);
std::cerr << preamble(__func__) << "GetOverlappedResult success: " << bytesTransferred << std::endl;
}
}
Confirmation/other suggestions welcomed/appreciated.
I was debating what the proper protocol was and decided I'm just going to answer my own question for the benefit of the world (if anyone bumps into my similar criteria/issue) even though I would have preferred that @HansPassant get credit for the answer.
Anyway, with his suggestion, using the wait mechanism provided by Microsoft allows me to pull of what I need without orchestrating any thread-based monitoring of my own. Here are the relevant bits:
else if (gRegisterWait)
{
if (!RegisterWaitForSingleObject(&ioData->waiter, (HANDLE)s, waitOrTimerCallback, ioData, gTimeout, WT_EXECUTEONLYONCE))
{
std::lock_guard<std::mutex> log(gIoMtx);
std::cerr << preamble(__func__) << "RegisterWaitForSingleObject failed: " << GetLastError() << std::endl;
}
else
{
std::lock_guard<std::mutex> log(gIoMtx);
std::cout << preamble(__func__) << "RegisterWaitForSingleObject success: " << ioData->waiter << std::endl;
}
}
VOID CALLBACK waitOrTimerCallback(
PVOID lpParameter,
BOOLEAN TimedOut
)
{
IoData* ioData = (IoData*)lpParameter;
{
std::lock_guard<std::mutex> log(gIoMtx);
std::cout << preamble(__func__) << (TimedOut ? "true" : "false") << std::endl;
std::cout << "\tSocket: " << ioData->socket << std::endl;
}
if (!TimedOut)
{
std::lock_guard<std::mutex> log(gIoMtx);
std::cout << preamble(__func__) << "read activity is forthcoming" << std::endl;
}
else
{
// we hit our timeout, cancel the I/O
CancelIoEx((HANDLE)ioData->socket, &ioData->ol);
std::lock_guard<std::mutex> log(gIoMtx);
std::cout << preamble(__func__) << "timeout reached, cancelling I/O" << std::endl;
}
// need to unregister the waiter but not supposed to do it in the callback
if (!TrySubmitThreadpoolCallback(unregisterWaiter, &ioData->waiter, NULL))
{
std::lock_guard<std::mutex> log(gIoMtx);
std::cerr << preamble(__func__) << "failed to unregister waiter...does this mean I have a memory leak?" << std::endl;
}
}
https://learn.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-registerwaitforsingleobject
When the wait is completed, you must call the UnregisterWait or UnregisterWaitEx function to cancel the wait operation. (Even wait operations that use WT_EXECUTEONLYONCE must be canceled.) Do not make a blocking call to either of these functions from within the callback function.
submit the unregistering of the waiter to the threadpool to be dealt with outside of the callback:
VOID CALLBACK unregisterWaiter(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context
)
{
PHANDLE pWaitHandle = (PHANDLE)Context;
{
std::lock_guard<std::mutex> log(gIoMtx);
std::cout << preamble(__func__) << std::endl;
std::cout << "\Handle: " << (HANDLE)*pWaitHandle << std::endl;
}
if (!UnregisterWait(*pWaitHandle))
{
std::lock_guard<std::mutex> log(gIoMtx);
std::cerr << preamble(__func__) << "UnregisterWait failed: " << GetLastError() << std::endl;
}
}
Managing the pointer to the handle created needs to be accounted for, but I think you can tuck it into the structure wrapping the overlapped IO and then pass the pointer to your wrapper around. Seems to work fine. The documentation makes no indication of whether I'm on the hook for freeing anything, so I assume that is why we're required to call the UnregisterWait function regardless of whether we're only executing once, etc. That detail can be considered outside the scope of the question.
Note, for others' benefit, I've updated the github link from my question with the latest version of the code.