I'm attempting to learn about Akka web sockets using a publicly available finance stream of data, the socket authenticates but does not receive the stream of data.
Reading https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html I've written :
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.WebSocketRequest;
import akka.stream.Materializer;
import akka.stream.SystemMaterializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.util.concurrent.CompletionStage;
public class EodHistoricalDataTest {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("eodhistoricaldata-example");
final Materializer materializer = SystemMaterializer.get(system).materializer();
Http http = Http.get(system);
Sink<Message, CompletionStage<Done>> printSink =
Sink.foreach((message) ->
System.out.println("Got message: " + message.asTextMessage().getStrictText())
);
final String subscribeMessage = "{\"action\":\"subscribe\",\"symbols\":\"AMZN\" }";
final Source<Message, NotUsed> initialSource = Source.single(TextMessage.create(subscribeMessage));
final Flow<Message, Message, NotUsed> flow =
Flow.fromSinkAndSource(
printSink, // This will print the incoming messages
initialSource); // This sends the subscribe message when the WebSocket connection is established
http.singleWebSocketRequest(
WebSocketRequest.create("wss://ws.eodhistoricaldata.com/ws/us?api_token=demo"),
flow,
materializer);
}
}
prints :
Got message: {"status_code":200,"message":"Authorized"}
But no further messages are received.
There is no issue with the web socket server as can see messages coming through :
Here is a modified code sample to keep source open based on https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#half-closed-websockets
:
public class EodHistoricalDataTest {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create("eodhistoricaldata-example");
final Materializer materializer = SystemMaterializer.get(system).materializer();
Http http = Http.get(system);
final Message subscribeMessage = TextMessage.create("{\"action\":\"subscribe\",\"symbols\":\"AMZN\" }"); // This is the subscribe message you want to send
Sink<Message, CompletionStage<Done>> printSink =
Sink.foreach((message) ->
System.out.println("Got message: " + message.asTextMessage().getStrictText())
);
final Flow<Message, Message, NotUsed> flow =
Flow.fromSinkAndSource(
printSink, // This will print the incoming messages
Source.single(subscribeMessage)); // This sends the subscribe message when the WebSocket connection is established
http.singleWebSocketRequest(
WebSocketRequest.create("wss://ws.eodhistoricaldata.com/ws/us?api_token=demo"),
flow,
materializer);
}
}
But the result is same.
That is probably related to that half-closed not being supported (end of input stream is reached directly after that single element).
See docs here for explanation and an example how to keep the source open but not produce any more elements and complete at a later time using Source.maybe
: https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#half-closed-websockets