apache-flinkflink-streaming

Fetch new records from DB dynamically using Flink and post to kafka/axon topic


I am new to Flink and have a use case of 2 Flink jobs, 1st to consume data from Topic1 and insert/update in DB and 2nd to fetch any new data inserted/updated in DB to Topic2 used by different services.

Method 1:

    env.fromSource(axon.source(Constants.CONSUMER_TOPIC_NAME, Constants.CONSUMER_GROUP_ID),
                    WatermarkStrategy.noWatermarks(), "foo-kafka-source").map(axonMessage ->
                    (FooModel) axonMessage.getPayload())
            .addSink(jdbc.exactlyOnceSink(new FooEntityJdbcSink()))
            .uid("foo-kafka-source");

Method 2

  DataStream<AxonMessage> stream = jdbc.source(env, new FooJdbcSource())
            .map(x -> {
                AxonMessage message = new AxonMessage(x);
                message.setTopic(Constants.PRODUCER_TOPIC_NAME);
                message.setKey(FooModel.class);
                return message;
            });

    stream.sinkTo(axon.sink()) 
            .name("foo-kafka-sink")
            .uid("foo-kafka-sink");

In FooJdbcSource, the SQL query used to fetch data is

SELECT {colm_1, colm_2, ...} FROM foo_table;

The issue is job 2 is only picking data one-time and pushing to topic. Any new records added while the job is running is not picked. I found few solutions like JdbcDynamicTableSource or SourceFunction(deprecated) that will check the DB table at certain intervals (windowing) and fetch new data based on timestamp; but the table is not expected to frequently update and we need any changes on topic instantly in run-time without polling/monitoring the DB at intervals as it will be too many DB operations/hits. Is there any way in Flink we can dynamically monitor the DB table.

Any help will be appreciated.

Thanks!!!


Solution

  • Depending on the type of database you are using, there are numerous Kafka-related connectors that will function as change-data capture (CDC) streams from your database to a Kafka topic which can then be listened to via your Flink job to stream through the changes/updates as they occur.

    There are plenty of different supported Kafka connectors out there depending on your preferred database flavor that are independent from Flink like Debezium, which supports upports a wide array of relational and non-relational databases including:

    Flink also provides a Flink CDC project that supports additional connectors via Flink to perform the same behavior as well. Additionally, several other technologies provide their own official CDC connectors to support this behavior as well.