javamysqlapache-flinkflink-streamingflink-table-api

How to create a DataStreamSource from a Mysql Database?


I have a problem running a flink job that is basically running a query against a mysql database and then tries to create a temporary view that must be accessed from a different job.

public static void main(String[] args) throws Exception {
    
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    final TypeInformation<?>[] fieldTypes =
        new TypeInformation<?>[] {
          BasicTypeInfo.INT_TYPE_INFO,
          BasicTypeInfo.STRING_TYPE_INFO,
          BasicTypeInfo.STRING_TYPE_INFO
        };

    final RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

    String selectQuery = "select * from ***";
    String driverName = "***";
    String sourceDb = "***";
    String dbUrl = "jdbc:mysql://mySqlDatabase:3306/";
    String dbPassword = "***";
    String dbUser = "***";

    JdbcInputFormat.JdbcInputFormatBuilder inputBuilder =
        JdbcInputFormat.buildJdbcInputFormat()
            .setDrivername(driverName)
            .setDBUrl(dbUrl + sourceDb)
            .setQuery(selectQuery)
            .setRowTypeInfo(rowTypeInfo)
            .setUsername(dbUser)
            .setPassword(dbPassword);

    DataStreamSource<Row> source = env.createInput(inputBuilder.finish());

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    Table customerTable =
            tableEnv.fromDataStream(source).as("id", "name", "test");

    tableEnv.createTemporaryView("***", ***Table);
    Table resultTable = tableEnv.sqlQuery(
            "SELECT * FROM ***");

    DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);

    resultStream.print();
    env.execute();

I'm quite new to Flink, and I'm currently going trough the APIs provided for all of these, but I can't actually understand what I'm doing wrong. In my mind, testing this process by printing the result at the end of the job seems straight forward, but the only thing I get printed is something like this:

2022-02-14 12:22:57,702 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from INITIALIZING to RUNNING.
2022-02-14 12:22:57,853 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680) switched from RUNNING to FINISHED.
2022-02-14 12:22:57,853 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 (8a1cd3aa6a753c9253926027b1332680).
2022-02-14 12:22:57,856 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: Custom Source -> DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=ROW<`f0` INT, `f1` STRING, `f2` STRING> NOT NULL, rowtime=false, watermark=false) -> Calc(select=[f0 AS id, f1 AS name, f2 AS test]) -> TableToDataSteam(type=ROW<`id` INT, `name` STRING, `test` STRING> NOT NULL, rowtime=false) -> Sink: Print to Std. Out (1/1)#0 8a1cd3aa6a753c9253926027b1332680.

The point of this job would be to create a temporary table view used for caching some static data that will be used in other Flink jobs by querying that table view.


Solution

  • For more context on how to use MySQL with Flink, see https://stackoverflow.com/a/71030967/2000823. As a streaming data source, it's more common to work with MySQL's write-ahead-log as a CDC stream, but another approach that is sometimes taken (but not encouraged by Flink's APIs) is to periodically poll MySQL with a SELECT query.

    As for what you've tried, using createInput is discouraged for streaming jobs, as this doesn't work with Flink's checkpointing mechanism. Rather than using a hadoop input format, it's better to choose one of the available source connectors.

    A temporary view doesn't hold any data, and isn't something that can be accessed from another job. A Flink table, or a view, is metadata describing how data stored somewhere else (e.g., in mysql or kafka) is to be interpreted as a table by Flink. You can store a view in a catalog so that multiple jobs can share its definition, but the underlying data will remain in the external data store, and only the view metadata is stored in the catalog.

    So in this case, the job you've written will create a temporary view that is only visible to this job and no others (since it is a temporary view, and not a persistent view stored in a persistent catalog). The output of your job won't be in the log file(s), but will instead go to stdout, or to *.out files in the logging directory of each task manager.