I'm new to Flink and hope someone can help. I have tried to follow Flink tutorials.
We have a requirement where we consume from:
When an event arrives on kafka topic we need the json event fields (mobile_acc_id, member_id, mobile_number, valid_from, valid_to) to be stored in an external db (Postgres db)
When an event arrives on kinesis stream we need to look up the mobile_number, on the event, in Postgres DB (from step 1) and extract the "member_id" from db and enrich the incoming kinesis event and sink it to another output stream
So I set up a Stream and a Table environment like this:
public static StreamExecutionEnvironment initEnv() {
var env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setAutoWatermarkInterval(0L); //disables watermark
return env;
}
public static TableEnvironment initTableEnv() {
var settings = EnvironmentSettings.newInstance().inStreamingMode().build();
return TableEnvironment.create(settings);
}
calling process(..) methods with initEnv() will use kinesis as the source!
process(config.getTransformerConfig(), input, sink, deadLetterSink, initEnv());
In the process(..) am also initialising the Table Environment using initTableEnv() hoping that Flink with consume from both sources when I call env.execute(..):
public static void process(TransformerConfig cfg, SourceFunction<String> source, SinkFunction<UsageSummaryWithHeader> sink,
SinkFunction<DeadLetterEvent> deadLetterSink, StreamExecutionEnvironment env) throws Exception {
var events =
StreamUtils.source(source, env, "kinesis-events", cfg.getInputParallelism());
collectInSink(transform(cfg, events, deadLetterSink), sink, "kinesis-summary-events", cfg.getOutputParallelism());
processStreamIntoTable(initTableEnv());
env.execute("my-flink-event-enricher-svc");
}
private static void processStreamIntoTable(TableEnvironment tableEnv) throws Exception {
tableEnv.executeSql("CREATE TABLE mobile_accounts (\n" +
" mobile_acc_id VARCHAR(36) NOT NULL,\n" +
" member_id BIGINT NOT NULL,\n" +
" mobile_number VARCHAR(14) NOT NULL,\n" +
" valid_from TIMESTAMP NOT NULL,\n" +
" valid_to TIMESTAMP NOT NULL \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'mobile_accounts',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092',\n" +
" 'format' = 'json'\n" +
")");
tableEnv.executeSql("CREATE TABLE mobile_account\n" +
"(\n" +
" mobile_acc_id VARCHAR(36) NOT NULL,\n" +
" member_id BIGINT NOT NULL,\n" +
" mobile_number VARCHAR(14) NOT NULL,\n" +
" valid_from TIMESTAMP NOT NULL,\n" +
" valid_to TIMESTAMP NOT NULL \n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:postgresql://flinkpg:5432/flink-demo',\n" +
" 'table-name' = 'mobile_account',\n" +
" 'driver' = 'org.postgresql.Driver',\n" +
" 'username' = 'flink-demo',\n" +
" 'password' = 'flink-demo'\n" +
")");
Table mobileAccounts = tableEnv.from("mobile_accounts");
report(mobileAccounts).executeInsert("mobile_account");
}
public static Table report(Table mobileAccounts) {
return mobileAccounts.select(
$("mobile_acc_id"),
$("member_id"),
$("mobile_number"),
$("valid_from"),
$("valid_to"));
}
What I have noticed on the flink console is that it is only consuming from one Source!
I liked TableEnvironment as not much code is needed to get the items inserted into the DB.
How can we consume from both the sources, Kinesis and TableEnvironment in Flink?
Am I using the right approach?
Is there an alternative to implement my requirements?
I assume you are able to create the tables correct, then you can simply JOIN
two streams named kafka_stream
and kinesis_stream
as
SELECT l.*, r.something_useful FROM kinesis_stream as l
INNER JOIN kafka_stream as r
ON l.member_id = r.member_id;
If PostgreSQL sink is essential, you can make it in a different query as
INSERT INTO postgre_sink
SELECT * FROM kafka_stream;
They will solve your problem with Table API (or Flink SQL).