i'm trying to implement pub sub pattern using grpc but i'm confusing a bit about how to do it properly.
my proto: rpc call (google.protobuf.Empty) returns (stream Data);
client:
asynStub.call(Empty.getDefaultInstance(), new StreamObserver<Data>() {
@Override
public void onNext(Data value) {
// process a data
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
} catch (StatusRuntimeException e) {
LOG.warn("RPC failed: {}", e.getStatus());
}
Thread.currentThread().join();
server service:
public class Sender extends DataServiceGrpc.DataServiceImplBase implements Runnable {
private final BlockingQueue<Data> queue;
private final static HashSet<StreamObserver<Data>> observers = new LinkedHashSet<>();
public Sender(BlockingQueue<Data> queue) {
this.queue = queue;
}
@Override
public void data(Empty request, StreamObserver<Data> responseObserver) {
observers.add(responseObserver);
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// waiting for first element
Data data = queue.take();
// send head element
observers.forEach(o -> o.onNext(data));
} catch (InterruptedException e) {
LOG.error("error: ", e);
Thread.currentThread().interrupt();
}
}
}
}
How to remove clients from global observers properly? How to received some sort of a signal when connection drops?
How to manage client-server reconnections? How to force client reconnect when connection drops?
Thanks in advance!
In the implementation of your service:
@Override
public void data(Empty request, StreamObserver<Data> responseObserver) {
observers.add(responseObserver);
}
You need to get the Context of the current request, and listen for cancellation. For single-request, multi-response calls (a.k.a. Server streaming) the gRPC generated code is simplified to pass in the the request directly. This means that you don't have direct access to the underlying ServerCall.Listener
, which is how you would normally listen for clients disconnecting and cancelling.
Instead, every gRPC call has a Context
associated with it, which carries the cancellation and other request-scoped signals. For your case, you just need to listen for cancellation by adding your own listener, which then safely removes the response observer from your linked hash set.
As for reconnects: gRPC clients will automatically reconnect if the connection is broken, but usually will not retry the RPC unless it is safe to do so. In the case of server streaming RPCs, it is usually not safe to do, so you'll need to retry the RPC on your client directly.