I'm quite new to Flink SQL, and trying to integrate it with kafka. I have a pipeline similar to the one below.
I do see output on my out.topic, but I want to add prints for debugging and can not do so.
I've already tried using .print() (as below) but I see nothing, so I guess its something in my configuration that is bad. My goal is to somehow query the table and print it into the logs.
Any hints?
Thanks
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String bootstrapServers = "my-bootstrap-servers:9092";
String schemaUrl = "my-schema-registry:8081";
String createSourceTable = "CREATE TABLE input (" +
" NAME STRING," +
" ADDRESS STRING," +
" METRIC1 FLOAT," +
" METRIC2 FLOAT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'in.topic'," +
" 'properties.bootstrap.servers' = 'http://" + bootstrapServers + "'," +
" 'properties.group.id' = 'flink.sql.group'," +
" 'value.format' = 'avro-confluent'," +
" 'scan.startup.mode' = 'latest-offset'," +
" 'value.avro-confluent.url' = 'http://" + schemaUrl + "')";
tableEnv.executeSql(createSourceTable);
String createSinkTable = "CREATE TABLE output (" +
" NAME STRING," +
" METRIC_RATIO FLOAT," +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'out.topic'," +
" 'properties.bootstrap.servers' = '" + bootstrapServers + "'," +
" 'properties.group.id' = 'flink.sql.group'," +
" 'value.format' = 'avro-confluent'," +
" 'value.avro-confluent.url' = 'http://" + schemaUrl + "')";
tableEnv.executeSql(createSinkTable);
String sqlQuery = "INSERT INTO output " +
"SELECT " +
" NAME, " +
" CAST(METRIC1 / METRIC2 AS FLOAT) AS METRIC_RATIO" +
"FROM input;";
tableEnv.executeSql(sqlQuery);
tableEnv.executeSql("SELECT * FROM output").print();
}
Your code never reaches the line:
tableEnv.executeSql("SELECT * FROM output").print();
since it is in stream processing mode - the line will take over execution thread forever:
String sqlQuery = "INSERT INTO output " +
"SELECT " +
" NAME, " +
" CAST(METRIC1 / METRIC2 AS FLOAT) AS METRIC_RATIO" +
"FROM input;";
tableEnv.executeSql(sqlQuery);
insertion is a continuous process.
So you can create a parallel process (job) that reads from the sink and prints it out.