apache-flinkflink-sql

Flink sql pipeline print?


I'm quite new to Flink SQL, and trying to integrate it with kafka. I have a pipeline similar to the one below.

I do see output on my out.topic, but I want to add prints for debugging and can not do so.

I've already tried using .print() (as below) but I see nothing, so I guess its something in my configuration that is bad. My goal is to somehow query the table and print it into the logs.

Any hints?

Thanks

    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    String bootstrapServers = "my-bootstrap-servers:9092";
    String schemaUrl = "my-schema-registry:8081";
    
    String createSourceTable = "CREATE TABLE input (" +
            "    NAME STRING," +
            "    ADDRESS STRING," +
            "    METRIC1 FLOAT," +
            "    METRIC2 FLOAT" +
            ") WITH (" +
            "    'connector' = 'kafka'," +
            "    'topic' = 'in.topic'," +
            "    'properties.bootstrap.servers' = 'http://" + bootstrapServers + "'," +
            "    'properties.group.id' = 'flink.sql.group'," +
            "    'value.format' = 'avro-confluent'," +
            "    'scan.startup.mode' = 'latest-offset'," +
            "    'value.avro-confluent.url' = 'http://" + schemaUrl + "')";

    tableEnv.executeSql(createSourceTable);

    String createSinkTable = "CREATE TABLE output (" +
            "    NAME STRING," +
            "    METRIC_RATIO FLOAT," +
            ") WITH (" +
            "    'connector' = 'kafka'," +
            "    'topic' = 'out.topic'," +
            "    'properties.bootstrap.servers' = '" + bootstrapServers + "'," +
            "    'properties.group.id' = 'flink.sql.group'," +
            "    'value.format' = 'avro-confluent'," +
            "    'value.avro-confluent.url' = 'http://" + schemaUrl + "')";
    
    tableEnv.executeSql(createSinkTable);

    String sqlQuery = "INSERT INTO output " +
            "SELECT " +
            "    NAME, " +
            "    CAST(METRIC1 / METRIC2 AS FLOAT) AS METRIC_RATIO" +
            "FROM input;";


    tableEnv.executeSql(sqlQuery);

    tableEnv.executeSql("SELECT * FROM output").print();

   }

Solution

  • Your code never reaches the line:

    tableEnv.executeSql("SELECT * FROM output").print();
    

    since it is in stream processing mode - the line will take over execution thread forever:

    String sqlQuery = "INSERT INTO output " +
            "SELECT " +
            "    NAME, " +
            "    CAST(METRIC1 / METRIC2 AS FLOAT) AS METRIC_RATIO" +
            "FROM input;";
    tableEnv.executeSql(sqlQuery);
    

    insertion is a continuous process.

    So you can create a parallel process (job) that reads from the sink and prints it out.