apache-flinkflink-sqlpyflink

Get nested fields from Kafka message using Apache Flink SQL


I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects.

The documentation suggests that it should be a MAP type but when I set that, I get the following error

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP

Here is my SQL

        CREATE TABLE input(
            id VARCHAR,
            title VARCHAR,
            properties MAP
        ) WITH (
            'connector' = 'kafka-0.11',
            'topic' = 'my-topic',
            'properties.bootstrap.servers' = 'localhost:9092',
            'properties.group.id' = 'python-test',
            'format' = 'json'
        )

And my JSON looks something like this:

{
  "id": "message-1",
  "title": "Some Title",
  "properties": {
    "foo": "bar"
  }
}

Solution

  • You can use ROW to extract nested fields in your JSON messages. Your DDL statement would look something like:

    CREATE TABLE input(
                 id VARCHAR,
                 title VARCHAR,
                 properties ROW(`foo` VARCHAR)
            ) WITH (
                'connector' = 'kafka-0.11',
                'topic' = 'my-topic',
                'properties.bootstrap.servers' = 'localhost:9092',
                'properties.group.id' = 'python-test',
                'format' = 'json'
            );