scalawebsocketakkaakka-streamalpakka

Akka stream best practice for dynamic Source and Flow controlled by websocket messages


I'm trying to understand what is the best way to implement with akka stream and alpakka the following scenario (simplified):

  1. The frontend opens a websocket connection with the backend
  2. Backend should wait an initialization message with some parameters (for example bootstrapServers, topicName and a transformationMethod that is a string parameter)
  3. Once these informations are in place, backend can start the alpakka consumer to consume from topic topicName from bootstrapServers and applying some transformation to the data based on transformationMethod, pushing these results inside the websocket
  4. Periodically, frontend can send through the websocket messages that changes the transformationMethod field, so that the transformation algorithm of the messages consumed from Kafka can dynamically change, based on the value of transformationMethod provided into the websocket.

I don't understand if it's possible to achieve this on akka stream inside a Graph, especially the dynamic part, both for the initialization of the alpakka consumer and also for the dynamic changing of the transformationMethod parameter.

Example:

Frontend establish connection, and after 10 second it sends trough the socket the following:

{"bootstrapServers": "localhost:9092", "topicName": "topic", "transformationMethod": "PLUS_ONE"}

Because of that, Alpakka consumer is instantiated and starts reading messages from Kafka.

Messages are flowing in Kafka, so it arrives 1 and in the websocket the frontend will receive 2 (because of the PLUS_ONE transformation method, that is probably placed in a map or a via with a Flow), then 2 and so frontend receives 3 and so on.

Then, frontend sends:

{"transformationMethod": "SQUARE"}

So now, from Kafka arrives 3 and the frontend will receive 9, then 4 and so the output will be 16 ecc...

This is more or less the flow of what I would like to obtain.

I am able to create a websocket connection with Alpakka consumer that perform some sort of "static" transformations and push back the result to the websocket, it's straightforward, what I miss is this dynamic part but I'm not sure if i can implement that inside the same graph or if I need more layers (maybe with some Actor that manages the flow and will activate/change the behavior of the Alpakka consumer in real time sending messages?)

Thanks


Solution

  • I would probably tend to implement this by spawning an actor for each websocket, prematerializing a Source which will receive messages from the actor (probably using ActorSource.actorRefWithBackpressure), building a Sink (likely using ActorSink.actorRefWithBackpressure) which adapts incoming websocket messages into control-plane messages (initialization (including the ActorRef associated with the prematerialized source) and transformation changes) and sends them to the actor, and then tying them together using the handleMessagesWithSinkSource on WebsocketUpgrade.

    The actor you're spawning would, on receipt of the initialization message, start a stream which is feeding messages to it from Kafka; some backpressure can be fed back to Kafka by having the stream feed messages via an ask protocol which waits for an ack; in order to keep that stream alive, the actor would need to ack within a certain period of time regardless of what the downstream did, so there's a decision to be made around having the actor buffer messages or drop them.