javagrpcgrpc-javagrpc-web

GRPC-Web Stream: Only receiving data when stream is closed


I have setup a simple Java & Spring Boot backend in order to stream data to a gRPC-web client.

The Streaming is working as expected when calling from Postman. I call the endpoint and receive the data every 2 seconds.

But when I start the stream from the web client, I only get the data once the stream is closed on server-side. I can't figure out why this is happening.

I have the Envoy gateway to proxy the calls from the FE to BE. This is the whole code I have:

Proto Definition:

syntax = "proto3";

package com.example.api;

option java_multiple_files = true;
option java_package = "com.example.api";

service SSEService {
  rpc Connect (SSEConnectionRequest) returns (stream SSEMessageResponse) {}
  rpc Ping (PingRequest) returns (PongResponse) {}
}

message SSEConnectionRequest {
  repeated string topics = 1;
}

message SSEMessageResponse {
  string id = 1;
  string type = 2;
  string payload = 3;
}

message PingRequest {
  string message = 1;
}

message PongResponse {
  string message = 1;
}

This the Backend handler:

@GrpcService
@Slf4j
public class SSEHandler extends SSEServiceGrpc.SSEServiceImplBase {
    @SneakyThrows
    @Override
    public void connect(SSEConnectionRequest request, StreamObserver<SSEMessageResponse> responseObserver) {
        var newObserver = (ServerCallStreamObserver<SSEMessageResponse>) responseObserver;
        log.info("Is observer ready? {}", newObserver.isReady());

        while(!newObserver.isReady()) {
            log.info("Waiting for the stream to be ready");
        }

        for (var i = 0; i < 5; i++) {
            var response = SSEMessageResponse.newBuilder()
                .setId(UUID.randomUUID().toString())
                .setType("MessageType")
                .setPayload("Message number #" + i)
                .build();

            log.info("Sending response down the stream to client. Number {}", i);
            newObserver.onNext(response);

            Thread.sleep(2000);
        }
            responseObserver.onCompleted();

    }

    @Override
    public void ping(PingRequest request, StreamObserver<PongResponse> responseObserver) {
        log.info("Will send a response now");
        var response = PongResponse.newBuilder()
            .setMessage("Test response")
            .build();

        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }
}

This is the web client:

const grpcClient = new SSEServiceClient('http://localhost:9999');
const stream =   grpcClient.connect(new SSEConnectionRequest());
    stream.on('data', (data: SSEMessageResponse) => {
      console.log('data', data);
    });

    stream.on('status', (status: any) => {
      console.log('STATUS', status);
    });

    stream.on('error', (e: Error) => {
      console.log('error', e);
      stream.cancel();
    });

And this is the Envoy static config:

admin:
  access_log_path: /tmp/admin_access.log
  address:
    socket_address: { address: 0.0.0.0, port_value: 9901 }

static_resources:
  listeners:
    - name: listener_0
      address:
        socket_address: { address: 0.0.0.0, port_value: 9999 }
      filter_chains:
        - filters:
          - name: envoy.filters.network.http_connection_manager
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
              codec_type: auto
              stat_prefix: ingress_http
              stream_idle_timeout: 0s
              route_config:
                name: local_route
                virtual_hosts:
                  - name: local_service
                    domains: ["*"]
                    routes:
                      - match: { prefix: "/" }
                        route:
                          cluster: sse_service
                          timeout: 0s
                          max_stream_duration:
                            grpc_timeout_header_max: 0s
                    cors:
                      allow_origin_string_match:
                        - prefix: "*"
                      allow_methods: GET, PUT, DELETE, POST, OPTIONS
                      allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
                      max_age: "1728000"
                      expose_headers: custom-header-1,grpc-status,grpc-message
              http_filters:
                - name: envoy.filters.http.grpc_web
                  typed_config:
                    "@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
                - name: envoy.filters.http.cors
                  typed_config:
                    "@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
                - name: envoy.filters.http.router
                  typed_config:
                    "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
  clusters:
    - name: sse_service
      connect_timeout: 60s
      type: logical_dns
      typed_extension_protocol_options:
        envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
          "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
          explicit_http_config:
            http2_protocol_options: {}
      http2_protocol_options: {}
      lb_policy: round_robin
      load_assignment:
        cluster_name: cluster_0
        endpoints:
          - lb_endpoints:
            - endpoint:
                address:
                  socket_address:
                    address: localhost
                    port_value: 9090

Any clue on what my be going on?


Solution

  • To whomever bumped into this, I found a workaround. The problem wasn't in the Backend nor the Envoy proxy but on how I compiled the proto stubs. For some reason reason when I compiled with the options:

    --js_out=import_style=commonjs,binary:$OUT_DIR --grpc-web_out=import_style=typescript,mode=grpcweb:$OUT_DIR
    

    I just got the data when the stream was closed on server-side. But when I compiled with:

    --grpc-web_out=import_style=typescript,mode=grpcwebtext:$OUT_DIR
    

    Basically changing from mode=grpcweb to mode=grpcwebtext did the trick. The stream worked normally, sending the data every 2 seconds as expected.