javakotlinjdbcstreamapache-flink

Flink Scheduled JDBC Source


I have the following definition of the JDBC source in Apache Flink.

    val jdbcSource = JdbcSource.builder<LoggedInEvent>()
    .setDBUrl("jdbc:postgresql://db:5432/postgres")
    .setSql("SELECT player_id, past_logins FROM user_initial_data")
    .setUsername("postgres")
    .setPassword("example")
    .setTypeInformation(TypeInformation.of(PlayerLoggedInEvent::class.java))
    .setResultExtractor { LoggedInEvent(it.getInt(1).toString(), it.getInt(2), Instant.now().toEpochMilli()) }
    .build()

val snapshotsStream = env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), "LoggedInSnapshots")

Currently I'm experiencing two issues with this solution:

  1. I can't schedule this to execute every N seconds, so is there any simple way to do it with existing tooling?
  2. This is realted to #1, but this executes only once and job finishes. I want this to be scheduled and run continuously within the same job.

Solution

  • Flink does not provide this sort of scheduling, or polling.

    On the other hand, Kafka Connect does this support this: https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html