While populating BigQuery from Kafka using confluentinc/kafka-connect-bigquery, I'd like to use the timestamp of our domain (Avro) events to (day-) partition the tables.
My connector.properties
looks as follows:
[...]
transforms=ConvertTimestamp
transforms.ConvertTimestamp.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.ConvertTimestamp.field=metadata_date
transforms.ConvertTimestamp.target.type=Timestamp
timestampPartitionFieldName=metadata_date
[...]
Exception in thread "pool-5-thread-522" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows:
[row index 0]: invalid: Timestamp field value is out of range:1597279521000000000
[row index 1]: invalid: Timestamp field value is out of range:1597279523000000000
[...]
The issue seems to be, our timestamps are in microseconds (UTC Unix epoch)
"type": {
"type": "long",
"logicalType": "timestamp-micros"
}
while BigQuery wants milliseconds (or seconds?).
Is there a way to transform the timestamp directly using the connector?
This appears to have been addressed now. You can use the additional transform parameter unix.precision
to ensure that Avro fields of type timestamp-micros
are parsed correctly:
transforms=ConvertTimestamp
transforms.ConvertTimestamp.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.ConvertTimestamp.field=metadata_date
transforms.ConvertTimestamp.target.type=Timestamp
transforms.ConvertTimestamp.unix.precision=microseconds
It appears that the Confluence documentation has not been updated to reflect this, but the corresponding proposal document sheds some more light on this matter.