I am using Typesafe Config to manage my Flink SQL command. For example,
source = """
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '10.111.135.233:9092',
'properties.group.id' = 'flink-test2',
'json.fail-on-missing-field' = 'false',
'format' = 'json'
)
"""
My properties.bootstrap.servers
is maven-profile specific, therefore I want to configure it in Maven pom.xml
<profiles>
<profile>
<id>testing</id>
<properties>
<kafka.source.servers>10.111.135.233:9092</kafka.source.servers>
</properties>
</profile>
</profiles>
And I would like to reference and use the property in Typesafe config,
source = """
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '"""${kafka.source.servers}"""',
'properties.group.id' = 'flink-test2',
'json.fail-on-missing-field' = 'false',
'format' = 'json'
)
"""
However, if I run mvn clean package -Ptesting
, the config is built into
source = """
CREATE TABLE kafka_source (
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>>
) WITH (
'connector' = 'kafka',
'topic' = 'data-report-stat-old-logtype7',
'properties.bootstrap.servers' = '"""10.111.135.233:9092"""',
'properties.group.id' = 'flink-test2',
'json.fail-on-missing-field' = 'false',
'format' = 'json'
)
"""
and it throws exceptions when loading the config, because 10.111.135.233:9092
contains a :
:
Expecting close brace } or a comma, got ':' (if you intended ':' to be part of a key or string value, try enclosing the key or value in double quotes
What is the right way to solve this? Thanks!
I believe this is an issue from typesafe config. To solve the problem, use application.properties
file instead of application.conf
.
last-n-clicks.generation-ingestion.source=\
CREATE TABLE kafka_source ( \
`body` ROW<`log` ROW<`uid` BIGINT, serverts BIGINT, `contentstr` STRING>> \
) WITH ( \
'connector' = 'kafka', \
'topic' = 'data-report-stat-old-logtype7', \
'properties.bootstrap.servers' = '${kafka.source.servers}', \
'properties.group.id' = 'flink-test2', \
'json.fail-on-missing-field' = 'false', \
'format' = 'json' \
)