The following AWS CLI command works fine:
aws firehose put-record --delivery-stream-name 52N-STA-DF-ICBG --cli-binary-format raw-in-base64-out --record='{"Data":"{\"ADF_Metadata\":{\"OTF_Metadata\":{\"DestinationTableName\":\"dataset\",\"DestinationDatabaseName\":\"52n_sta_iceberg\",\"Operation\":\"INSERT\"}},\"ADF_Record\":{\"dataset_id\":2010,\"identifier\":\"2010\",\"sta_identifier\":\"2010\",\"name\":\"oven temperature\",\"description\":\"This is a datastream for an oven’s internal temperature.\",\"first_time\":\"2020-06-26T09:42:02.000\",\"last_time\":\"2021-06-26T09:42:02.000\",\"result_time_start\":null,\"result_time_end\":null,\"observed_area\":null,\"fk_procedure_id\":1,\"fk_phenomenon_id\":1,\"fk_feature_id\":1,\"fk_platform_id\":2,\"fk_unit_id\":1,\"fk_format_id\":5,\"fk_aggregation_id\":null,\"observation_type\":\"simple\"}}\n"
}'
However, when I try to do it using my Java application, Firehose is unsuccessful in delivering the record to the Iceberg table:
public void icebergMerge(ObjectNode dataNode, String tableName, String operation)
throws STACRUDException {
ObjectNode rootNode = mapper.createObjectNode();
ObjectNode adf_metadata = rootNode.putObject(FirehoseConstants.ADF_METADATA);
ObjectNode otf_metadata = adf_metadata.putObject(FirehoseConstants.OTF_METADATA);
otf_metadata.put(FirehoseConstants.TABLE, tableName.toLowerCase());
otf_metadata.put(FirehoseConstants.DATABASE, FirehoseConstants.DATABASE_NAME);
otf_metadata.put(FirehoseConstants.OPERATION, operation);
ObjectNode adf_record = rootNode.putObject(FirehoseConstants.ADF_RECORD);
adf_record.setAll(dataNode);
try {
String dataPayload = mapper.writeValueAsString(rootNode) + "\n";
ObjectNode outerNode = mapper.createObjectNode();
outerNode.put("Data", dataPayload);
streamToFirehose(mapper.writeValueAsString(outerNode));
} catch (JsonProcessingException e) {
throw new STACRUDException("Bad request: cannot parse payload for table " + operation);
}
}
private void streamToFirehose(String jsonPayload) {
try {
Record record = Record.builder()
.data(SdkBytes.fromUtf8String(jsonPayload))
.build();
PutRecordRequest putRecordRequest = PutRecordRequest.builder()
.deliveryStreamName(FirehoseConstants.DELIVERY_STREAM_NAME)
.record(record)
.build();
PutRecordResponse resp = firehoseClient.putRecord(putRecordRequest);
LOGGER.debug("Record sent successfully to Firehose.");
} catch (Exception e) {
LOGGER.debug("Error sending to Firehose: " + e.getMessage());
}
}
Here is what jsonPayload
looks like in streamToFirehose()
function as per the debugger:
{"Data":"{\"ADF_Metadata\":{\"OTF_Metadata\":{\"DestinationTableName\":\"dataset\",\"DestinationDatabaseName\":\"52n_sta_iceberg\",\"Operation\":\"INSERT\"}},\"ADF_Record\":{\"dataset_id\":2010,\"identifier\":\"2010\",\"sta_identifier\":\"2010\",\"name\":\"oven temperature\",\"description\":\"This is a datastream for an oven’s internal temperature.\",\"first_time\":\"2020-06-26T09:42:02.000\",\"last_time\":\"2021-06-26T09:42:02.000\",\"result_time_start\":null,\"result_time_end\":null,\"observed_area\":null,\"fk_procedure_id\":1,\"fk_phenomenon_id\":1,\"fk_feature_id\":1,\"fk_platform_id\":2,\"fk_unit_id\":1,\"fk_format_id\":5,\"fk_aggregation_id\":null,\"observation_type\":\"simple\"}}\n"}
AWS SDK version: 2.27.17
The payload format is as per the AWS docs: https://docs.aws.amazon.com/firehose/latest/dev/apache-iceberg-format-input-record.html
The table is defined in Glue Catalog
I know the "\n" is redundant but the problem is the same with/without it
The problem was in the formatting of the record. The putRecord
API in Java automatically encapsulates the payload in { "Data": payload }
format. Thus the outerNode
object was redundant.