I've implemented a gRPC server streaming reactor using the C++ Callback API. Here's the basic idea: a separate application thread produces data every 10 ms and calls a response()
method on the reactor. I push the data into a queue, and a gRPC thread dequeues and sends it to the client. The gRPC thread waits for data using a std::condition_variable
. This works reliably in practice.
However, while reviewing the official gRPC C++ best practices, I found the following guideline:
Reactions should be fast. Do not do blocking or long-running/heavy weight tasks or sleep. It could impact other RPCs within the process.
Clearly, my current design violates this recommendation, since it blocks inside the Work()
method with a condition variable.
I tried to find more information about gRPC's internal threading model but couldn't find anything conclusive in the documentation. As a result, I'm uncertain which threads are allowed to call gRPC API methods, so to stay safe, I only invoke them from gRPC threads.
One idea I considered was using std::async
to wait for new data in a separate thread and notifying the reactor via grpc::Alarm
, but I couldn't find information about whether Alarm
is thread-safe in this context.
My question is: What is a safe and best-practice way to wait for incoming data in a server streaming reactor using the gRPC C++ Callback API?
This answer looks unreliable, as it contradicts the best-practice guideline mentioned above.
Below is a simplified version of my current implementation:
class EcatStreamReactor final
: public grpc::ServerWriteReactor<ecat::EcatResponse>, public IBusClient
{
public:
EcatStreamReactor(
grpc::CallbackServerContext *context,
const ecat::ControlMessage *request
)
: _request(request)
, _context(context)
{}
void set_bus_controller(std::weak_ptr<IBusController> bus_controller); // Implementation omitted
/**
* Called from application thread every 10 ms
* Implements IBusClient
*/
void response(const EcatResponse &response) override
{
std::unique_lock<std::mutex> lock(_data_mutex);
_data_queue.push(response);
_data_cv.notify_one();
}
// Called once from service handler after reactor construction
void Work()
{
EcatResponse data;
grpc::Status status;
bool has_data = false;
auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(100);
{
std::unique_lock<std::mutex> lock(_data_mutex);
_data_cv.wait_until(lock, deadline, [this] {
return !_data_queue.empty() || _cancelled.load();
});
if (_cancelled.load()) {
status = grpc::Status::OK;
} else if (!_data_queue.empty()) {
data = _data_queue.front();
_data_queue.pop();
has_data = true;
} else {
status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED,
"No data received from bus controller");
}
}
if (has_data) {
_res = ecat_conv::convert(data);
StartWrite(&_res);
} else {
Finish(status);
}
}
void OnWriteDone(bool ok) override
{
if (ok)
Work();
else
Finish(grpc::Status::CANCELLED);
}
void OnCancel() override; // Implementation omitted
void OnDone() override; // Implementation omitted
private:
ecat::EcatResponse _res;
std::queue<EcatResponse> _data_queue;
std::mutex _data_mutex;
std::condition_variable_any _data_cv;
std::weak_ptr<IBusController> _bus_controller;
std::atomic<bool> _cancelled{false};
grpc::CallbackServerContext *_context;
const ecat::ControlMessage *_request;
};
Any advice or patterns for implementing this correctly and safely would be appreciated.
There's no requirement that `StartWrite()` can be called only from a reactor thread -- you can call that from anywhere, as long as you have proper synchronization. The only requirement is that there can be only one write pending at a time.
The pattern I would use here would be the following:
Add a bool write_in_progress
data member to the reactor that gets set to true when you call StartWrite()
and then set back to false in OnWriteDone()
.
Then you can have a MaybeStartWrite()
method that checks the value of write_in_progress
. If true, it returns without doing anything; if false, it dequeues the first entry from the queue and calls StartWrite()
.
Call MaybeStartWrite()
whenever you add an entry to the queue and whenever OnWriteDone()
is called.
This way, there's no need for any blocking.