I have setup a simple Java & Spring Boot backend in order to stream data to a gRPC-web client.
The Streaming is working as expected when calling from Postman. I call the endpoint and receive the data every 2 seconds.
But when I start the stream from the web client, I only get the data once the stream is closed on server-side. I can't figure out why this is happening.
I have the Envoy gateway to proxy the calls from the FE to BE. This is the whole code I have:
Proto Definition:
syntax = "proto3";
package com.example.api;
option java_multiple_files = true;
option java_package = "com.example.api";
service SSEService {
rpc Connect (SSEConnectionRequest) returns (stream SSEMessageResponse) {}
rpc Ping (PingRequest) returns (PongResponse) {}
}
message SSEConnectionRequest {
repeated string topics = 1;
}
message SSEMessageResponse {
string id = 1;
string type = 2;
string payload = 3;
}
message PingRequest {
string message = 1;
}
message PongResponse {
string message = 1;
}
This the Backend handler:
@GrpcService
@Slf4j
public class SSEHandler extends SSEServiceGrpc.SSEServiceImplBase {
@SneakyThrows
@Override
public void connect(SSEConnectionRequest request, StreamObserver<SSEMessageResponse> responseObserver) {
var newObserver = (ServerCallStreamObserver<SSEMessageResponse>) responseObserver;
log.info("Is observer ready? {}", newObserver.isReady());
while(!newObserver.isReady()) {
log.info("Waiting for the stream to be ready");
}
for (var i = 0; i < 5; i++) {
var response = SSEMessageResponse.newBuilder()
.setId(UUID.randomUUID().toString())
.setType("MessageType")
.setPayload("Message number #" + i)
.build();
log.info("Sending response down the stream to client. Number {}", i);
newObserver.onNext(response);
Thread.sleep(2000);
}
responseObserver.onCompleted();
}
@Override
public void ping(PingRequest request, StreamObserver<PongResponse> responseObserver) {
log.info("Will send a response now");
var response = PongResponse.newBuilder()
.setMessage("Test response")
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
This is the web client:
const grpcClient = new SSEServiceClient('http://localhost:9999');
const stream = grpcClient.connect(new SSEConnectionRequest());
stream.on('data', (data: SSEMessageResponse) => {
console.log('data', data);
});
stream.on('status', (status: any) => {
console.log('STATUS', status);
});
stream.on('error', (e: Error) => {
console.log('error', e);
stream.cancel();
});
And this is the Envoy static config:
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address: { address: 0.0.0.0, port_value: 9901 }
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 0.0.0.0, port_value: 9999 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
codec_type: auto
stat_prefix: ingress_http
stream_idle_timeout: 0s
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route:
cluster: sse_service
timeout: 0s
max_stream_duration:
grpc_timeout_header_max: 0s
cors:
allow_origin_string_match:
- prefix: "*"
allow_methods: GET, PUT, DELETE, POST, OPTIONS
allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
max_age: "1728000"
expose_headers: custom-header-1,grpc-status,grpc-message
http_filters:
- name: envoy.filters.http.grpc_web
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb
- name: envoy.filters.http.cors
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: sse_service
connect_timeout: 60s
type: logical_dns
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {}
http2_protocol_options: {}
lb_policy: round_robin
load_assignment:
cluster_name: cluster_0
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: localhost
port_value: 9090
Any clue on what my be going on?
To whomever bumped into this, I found a workaround. The problem wasn't in the Backend nor the Envoy proxy but on how I compiled the proto stubs. For some reason reason when I compiled with the options:
--js_out=import_style=commonjs,binary:$OUT_DIR --grpc-web_out=import_style=typescript,mode=grpcweb:$OUT_DIR
I just got the data when the stream was closed on server-side. But when I compiled with:
--grpc-web_out=import_style=typescript,mode=grpcwebtext:$OUT_DIR
Basically changing from mode=grpcweb
to mode=grpcwebtext
did the trick. The stream worked normally, sending the data every 2 seconds as expected.