pythonapache-flinkremote-executionflink-statefun

Kafka Key access on Ingress of a Python Flink Stateful function


I've been looking at Flink Stateful Functions. It looks super promising - except for one thing - and I hope I'm just missing it.

For the life of me, can't see a way to access the kafka key from a kafka ingress in Python. In Java, I see I could use the deserialiser and effectively pack it into the decoded message object. But I can't find an alternative.
In our case, the key has valuable info, that does not exist in the value.

Anyone come across this - or have I just missed it?


Solution

  • The first thing that I'd like to mention is that the builtin Kafka ingress for remote functions, requires that the key can be interpreted as a UTF-8 string. If this is indeed your case, then you can simply obtain it via:

    def example(context, message):
      key = context.address.id 
      ... 
      print(key) # utf8 string
    

    If this is not the case, then unfortunately at this point, you would have to use the embedded SDK, to extract the key and the value before forwarding the message to a remote function.