I'm attempting to write an async streaming gRPC server (following this example) in C++ where multiple calls to Write are performed on a separate thread. Unfortunately, this causes a SIGSEGV on my system. The server is able to perform one write before it crashes. The below code provides a simple example of what I'm attempting to do. The overloaded call operator receives a message from a separate thread and executes the Write() call, writing MyMessage to the stream.
void MyServer::HandleRpcs() {
new CallData(&m_service, m_queue.get());
void* tag;
bool ok;
while (true) {
GPR_ASSERT(m_queue->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
void MyServer::CallData::Proceed() {
if (m_state == CREATE) {
m_state = PROCESS;
m_service->RequestRpc(&m_context, &m_request, &m_responder, m_queue, m_queue, this);
}
else if (m_state == PROCESS) {
new CallData(m_service, m_queue);
// Request the RPC here, which begins the message calls to the overloaded () operator
}
else {
GPR_ASSERT(m_state == FINISH);
delete this;
}
}
void MyServer::CallData::operator()(Message message) {
std::lock_guard<std::recursive_mutex> lock{m_serverMutex};
MyStream stream;
stream.set_message(message.payload);
m_responder.Write(stream, this);
PushTaskToQueue();
}
void MyServer::CallData::PushTaskToQueue() {
// m_alarm is a member of CallData
m_alarm.Set(m_queue, gpr_now(gpr_clock_type::GPR_CLOCK_REALTIME), this);
}
Turns out I had a misunderstanding of gRPC and the completion queue. I was calling Write() before the completion queue returns the tag, which caused the crash. To resolve this, I created a static void*
member variable in MyServer
called m_tag
and passed it into the Next
function's tag
parameter, like so:
GPR_ASSERT(m_queue->Next(&m_tag, &ok));
Then, I checked if the tag matches up with the handler's this
pointer in the overloaded call operator:
if (m_tag != this) return;
And I then saw my message stream come through.