I'm trying to understand what is the best way to implement with akka stream and alpakka the following scenario (simplified):
bootstrapServers
, topicName
and a transformationMethod
that is a string parameter)topicName
from bootstrapServers
and applying some transformation to the data based on transformationMethod
, pushing these results inside the websockettransformationMethod
field, so that the transformation algorithm of the messages consumed from Kafka can dynamically change, based on the value of transformationMethod
provided into the websocket.I don't understand if it's possible to achieve this on akka stream inside a Graph, especially the dynamic part, both for the initialization of the alpakka consumer and also for the dynamic changing of the transformationMethod
parameter.
Example:
Frontend establish connection, and after 10 second it sends trough the socket the following:
{"bootstrapServers": "localhost:9092", "topicName": "topic", "transformationMethod": "PLUS_ONE"}
Because of that, Alpakka consumer is instantiated and starts reading messages from Kafka.
Messages are flowing in Kafka, so it arrives 1
and in the websocket the frontend will receive 2
(because of the PLUS_ONE
transformation method, that is probably placed in a map
or a via
with a Flow), then 2
and so frontend receives 3
and so on.
Then, frontend sends:
{"transformationMethod": "SQUARE"}
So now, from Kafka arrives 3
and the frontend will receive 9
, then 4
and so the output will be 16
ecc...
This is more or less the flow of what I would like to obtain.
I am able to create a websocket connection with Alpakka consumer that perform some sort of "static" transformations and push back the result to the websocket, it's straightforward, what I miss is this dynamic part but I'm not sure if i can implement that inside the same graph or if I need more layers (maybe with some Actor that manages the flow and will activate/change the behavior of the Alpakka consumer in real time sending messages?)
Thanks
I would probably tend to implement this by spawning an actor for each websocket, prematerializing a Source
which will receive messages from the actor (probably using ActorSource.actorRefWithBackpressure
), building a Sink
(likely using ActorSink.actorRefWithBackpressure
) which adapts incoming websocket messages into control-plane messages (initialization (including the ActorRef
associated with the prematerialized source) and transformation changes) and sends them to the actor, and then tying them together using the handleMessagesWithSinkSource
on WebsocketUpgrade
.
The actor you're spawning would, on receipt of the initialization message, start a stream which is feeding messages to it from Kafka; some backpressure can be fed back to Kafka by having the stream feed messages via an ask protocol which waits for an ack; in order to keep that stream alive, the actor would need to ack within a certain period of time regardless of what the downstream did, so there's a decision to be made around having the actor buffer messages or drop them.