javaakkaakka-stream

Akka streams over web socket fails to process/.receive data after successful authorise (Java)


I'm attempting to learn about Akka web sockets using a publicly available finance stream of data, the socket authenticates but does not receive the stream of data.

Reading https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html I've written :

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.WebSocketRequest;
import akka.stream.Materializer;
import akka.stream.SystemMaterializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

import java.util.concurrent.CompletionStage;

public class EodHistoricalDataTest {

    public static void main(String[] args) {
        final ActorSystem system = ActorSystem.create("eodhistoricaldata-example");
        final Materializer materializer = SystemMaterializer.get(system).materializer();
        Http http = Http.get(system);

        Sink<Message, CompletionStage<Done>> printSink =
                Sink.foreach((message) ->
                        System.out.println("Got message: " + message.asTextMessage().getStrictText())
                );

        final String subscribeMessage = "{\"action\":\"subscribe\",\"symbols\":\"AMZN\" }";
        
        final Source<Message, NotUsed> initialSource = Source.single(TextMessage.create(subscribeMessage));

        final Flow<Message, Message, NotUsed> flow =
                Flow.fromSinkAndSource(
                        printSink,  // This will print the incoming messages
                        initialSource);   // This sends the subscribe message when the WebSocket connection is established

        http.singleWebSocketRequest(
                WebSocketRequest.create("wss://ws.eodhistoricaldata.com/ws/us?api_token=demo"),
                flow,
                materializer);
    }
}

prints :

Got message: {"status_code":200,"message":"Authorized"}

But no further messages are received.

There is no issue with the web socket server as can see messages coming through :

enter image description here

Here is a modified code sample to keep source open based on https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#half-closed-websockets

:

 public class EodHistoricalDataTest {
    
        public static void main(String[] args) {
            final ActorSystem system = ActorSystem.create("eodhistoricaldata-example");
            final Materializer materializer = SystemMaterializer.get(system).materializer();
            Http http = Http.get(system);
    
            final Message subscribeMessage = TextMessage.create("{\"action\":\"subscribe\",\"symbols\":\"AMZN\" }");  // This is the subscribe message you want to send
    
            Sink<Message, CompletionStage<Done>> printSink =
                    Sink.foreach((message) ->
                            System.out.println("Got message: " + message.asTextMessage().getStrictText())
                    );
    
            final Flow<Message, Message, NotUsed> flow =
                    Flow.fromSinkAndSource(
                            printSink,  // This will print the incoming messages
                            Source.single(subscribeMessage));   // This sends the subscribe message when the WebSocket connection is established
    
            http.singleWebSocketRequest(
                    WebSocketRequest.create("wss://ws.eodhistoricaldata.com/ws/us?api_token=demo"),
                    flow,
                    materializer);
        }
    }

But the result is same.


Solution

  • That is probably related to that half-closed not being supported (end of input stream is reached directly after that single element).

    See docs here for explanation and an example how to keep the source open but not produce any more elements and complete at a later time using Source.maybe: https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html#half-closed-websockets