I'm trying to create a source table using Apache Flink 1.11 where I can get access to nested properties in a JSON message. I can pluck values off root properties but I'm unsure how to access nested objects.
The documentation suggests that it should be a MAP
type but when I set that, I get the following error
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
Here is my SQL
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
And my JSON looks something like this:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
}
}
You can use ROW
to extract nested fields in your JSON messages. Your DDL statement would look something like:
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);