I tried to use Vertx HttpClient/WebClient to consume the GraphQLSubscritpion
but it did not work as expected.
The server-side related code(written with Vertx Web GraphQL) is like the following, when a comment is added, then trigger onNext
to send the comment to the Publisher
.
public VertxDataFetcher<UUID> addComment() {
return VertxDataFetcher.create((DataFetchingEnvironment dfe) -> {
var commentInputArg = dfe.getArgument("commentInput");
var jacksonMapper = DatabindCodec.mapper();
var input = jacksonMapper.convertValue(commentInputArg, CommentInput.class);
return this.posts.addComment(input)
.onSuccess(id -> this.posts.getCommentById(id.toString())
.onSuccess(c ->subject.onNext(c)));
});
}
private BehaviorSubject<Comment> subject = BehaviorSubject.create();
public DataFetcher<Publisher<Comment>> commentAdded() {
return (DataFetchingEnvironment dfe) -> {
ConnectableObservable<Comment> connectableObservable = subject.share().publish();
connectableObservable.connect();
return connectableObservable.toFlowable(BackpressureStrategy.BUFFER);
};
}
In the client, I mixed to use the HttpClient/WebClient, most of the time, I would like to use WebClient, which easier for handling form post. But it seems it does not work have a WebSocket
connection.
So the websocket part is returning to use HttpClient.
var options = new HttpClientOptions()
.setDefaultHost("localhost")
.setDefaultPort(8080);
var httpClient = vertx.createHttpClient(options);
httpClient.webSocket("/graphql")
.onSuccess(ws -> {
ws.textMessageHandler(text -> log.info("web socket message handler:{}", text));
JsonObject messageInit = new JsonObject()
.put("type", "connection_init")
.put("id", "1");
JsonObject message = new JsonObject()
.put("payload", new JsonObject()
.put("query", "subscription onCommentAdded { commentAdded { id content } }"))
.put("type", "start")
.put("id", "1");
ws.write(messageInit.toBuffer());
ws.write(message.toBuffer());
})
.onFailure(e -> log.error("error: {}", e));
// this client here is WebClient.
client.post("/graphql")
.sendJson(Map.of(
"query", "mutation addComment($input:CommentInput!){ addComment(commentInput:$input) }",
"variables", Map.of(
"input", Map.of(
"postId", id,
"content", "comment content of post id" + LocalDateTime.now()
)
)
))
.onSuccess(
data -> log.info("data of addComment: {}", data.bodyAsString())
)
.onFailure(e -> log.error("error: {}", e));
When running the client and server, the comment is added, but the WebSocket client does not print any info about websocket message. On the server console, there is an message like this.
2021-06-25 18:45:44,356 DEBUG [vert.x-eventloop-thread-1] graphql.GraphQL: Execution '182965bb-80de-416d-b5fe-fe157ab87f1c' completed with zero errors
It seems the backend commentAdded
datafetcher is not invoked at all.
The complete codes of GraphQL client and server are shared on my Github.
After reading some testing codes of Vertx Web GraphQL, I found I have to add the ConnectionInitHandler
on ApolloWSHandler
like this.
.connectionInitHandler(connectionInitEvent -> {
JsonObject payload = connectionInitEvent.message().content().getJsonObject("payload");
if (payload != null && payload.containsKey("rejectMessage")) {
connectionInitEvent.fail(payload.getString("rejectMessage"));
return;
}
connectionInitEvent.complete(payload);
}
)
When the client sends connection_init
message, the connectionInitEvent.complete
is required to start the communication between the client and the server.