I'm trying to use Flink to consume the change event log produced by Debezium. The JSON was this:
{
"schema":{
},
"payload":{
"before":null,
"after":{
"team_config_id":3800,
"team_config_team_id":"team22bcb26e-499a-41e6-8746-b7d980e79e04",
"team_config_sfdc_account_id":null,
"team_config_sfdc_account_url":null,
"team_config_business_type":5,
"team_config_dpsa_status":0,
"team_config_desc":null,
"team_config_company_id":null,
"team_config_hm_count_stages":null,
"team_config_assign_credits_times":null,
"team_config_real_renew_date":null,
"team_config_action_date":null,
"team_config_last_action_date":null,
"team_config_business_tier_notification":"{}",
"team_config_create_date":1670724933000,
"team_config_update_date":1670724933000,
"team_config_rediscovery_tier":0,
"team_config_rediscovery_tier_notification":"{}",
"team_config_sfdc_industry":null,
"team_config_sfdc_market_segment":null,
"team_config_unterminated_note_id":0
},
"source":{
},
"op":"c",
"ts_ms":1670724933149,
"transaction":null
}
}
And I've tried two ways to declare the input schema.
The first way was directly parse the JSON data :
create table team_config_source (
`payload` ROW <
`after` ROW <
...
team_config_create_date timestamp(3),
team_config_update_date timestamp(3),
...
>
>
) WITH (
'connector' = 'kafka',
...
'format' = 'json'
)
But Flink would throw an error org.apache.flink.formats.json.JsonToRowDataConverters$JsonParseException: Fail to deserialize at field: team_config_create_date
caused by java.time.format.DateTimeParseException: Text '1670724933000' could not be parsed at index 0
. Doesn't Flink support timestamp in this format?
I've also tried another way, using the built-in debezium format:
create table team_config_source (
team_config_create_id int,
...
) WITH (
'connector' = 'kafka',
...
'format' = 'debezium-json'
)
But Flink come up with another error java.io.IOException: Corrupt Debezium JSON message
caused by java.lang.NullPointerException
. I found somebody said that update event shouldn't has null
as before
value, but this message was a create event.
Could anyone help to check my DDL?
I am an a Flink expert but TIMESTAMP in Flink is not Epoch time, it is in datetime format.
In this case you can define table like:
team_config_create_bigint BIGINT,
team_config_update_bigint BIGINT,
...
team_config_create_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_create_bigint)),
team_config_update_date as TO_TIMESTAMP(FROM_UNIXTIME(team_config_update_bigint))