c++callbackstreamgrpc

How to safely wait for new data in a gRPC C++ server streaming reactor using the Callback API?


I've implemented a gRPC server streaming reactor using the C++ Callback API. Here's the basic idea: a separate application thread produces data every 10 ms and calls a response() method on the reactor. I push the data into a queue, and a gRPC thread dequeues and sends it to the client. The gRPC thread waits for data using a std::condition_variable. This works reliably in practice.

However, while reviewing the official gRPC C++ best practices, I found the following guideline:

Reactions should be fast. Do not do blocking or long-running/heavy weight tasks or sleep. It could impact other RPCs within the process.

Clearly, my current design violates this recommendation, since it blocks inside the Work() method with a condition variable.

I tried to find more information about gRPC's internal threading model but couldn't find anything conclusive in the documentation. As a result, I'm uncertain which threads are allowed to call gRPC API methods, so to stay safe, I only invoke them from gRPC threads.

One idea I considered was using std::async to wait for new data in a separate thread and notifying the reactor via grpc::Alarm, but I couldn't find information about whether Alarm is thread-safe in this context.

My question is: What is a safe and best-practice way to wait for incoming data in a server streaming reactor using the gRPC C++ Callback API?

This answer looks unreliable, as it contradicts the best-practice guideline mentioned above.

Below is a simplified version of my current implementation:

class EcatStreamReactor final
    : public grpc::ServerWriteReactor<ecat::EcatResponse>, public IBusClient
{
public:
    EcatStreamReactor(
        grpc::CallbackServerContext *context,
        const ecat::ControlMessage *request
    )
        : _request(request)
        , _context(context)
    {}

    void set_bus_controller(std::weak_ptr<IBusController> bus_controller);   // Implementation omitted

    /**
     * Called from application thread every 10 ms
     * Implements IBusClient
     */
    void response(const EcatResponse &response) override
    {
        std::unique_lock<std::mutex> lock(_data_mutex);
        _data_queue.push(response);
        _data_cv.notify_one();
    }

    // Called once from service handler after reactor construction
    void Work()
    {
        EcatResponse data;
        grpc::Status status;
        bool has_data = false;
        auto deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(100);
        {
            std::unique_lock<std::mutex> lock(_data_mutex);
            _data_cv.wait_until(lock, deadline, [this] {
                return !_data_queue.empty() || _cancelled.load();
            });

            if (_cancelled.load()) {
                status = grpc::Status::OK;
            } else if (!_data_queue.empty()) {
                data = _data_queue.front();
                _data_queue.pop();
                has_data = true;
            } else {
                status = grpc::Status(grpc::StatusCode::DEADLINE_EXCEEDED,
                                      "No data received from bus controller");
            }
        }

        if (has_data) {
            _res = ecat_conv::convert(data);
            StartWrite(&_res);
        } else {
            Finish(status);
        }
    }

    void OnWriteDone(bool ok) override
    {
        if (ok)
            Work();
        else
            Finish(grpc::Status::CANCELLED);
    }

    void OnCancel() override; // Implementation omitted

    void OnDone() override;   // Implementation omitted

private:
    ecat::EcatResponse _res;
    std::queue<EcatResponse> _data_queue;
    std::mutex _data_mutex;
    std::condition_variable_any _data_cv;
    std::weak_ptr<IBusController> _bus_controller;
    std::atomic<bool> _cancelled{false};
    grpc::CallbackServerContext *_context;
    const ecat::ControlMessage *_request;
};

Any advice or patterns for implementing this correctly and safely would be appreciated.


Solution

  • There's no requirement that `StartWrite()` can be called only from a reactor thread -- you can call that from anywhere, as long as you have proper synchronization. The only requirement is that there can be only one write pending at a time.

    The pattern I would use here would be the following:

    This way, there's no need for any blocking.