I have the following definition of the JDBC
source in Apache Flink
.
val jdbcSource = JdbcSource.builder<LoggedInEvent>()
.setDBUrl("jdbc:postgresql://db:5432/postgres")
.setSql("SELECT player_id, past_logins FROM user_initial_data")
.setUsername("postgres")
.setPassword("example")
.setTypeInformation(TypeInformation.of(PlayerLoggedInEvent::class.java))
.setResultExtractor { LoggedInEvent(it.getInt(1).toString(), it.getInt(2), Instant.now().toEpochMilli()) }
.build()
val snapshotsStream = env.fromSource(jdbcSource, WatermarkStrategy.noWatermarks(), "LoggedInSnapshots")
Currently I'm experiencing two issues with this solution:
Flink does not provide this sort of scheduling, or polling.
On the other hand, Kafka Connect does this support this: https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html