javaapache-kafkaconsumerknative-eventingcloudevents

Java: How to read binary cloudevent


I want to process cloudevents created with python through knative broker ingress in a java kafka consumer. How do i get back the playload values from binary? Event Creation:

attributes = {
"type": "com.example.sampletype1",
"source": "https://example.com/event-producer",
}
data = {"price": 20.23, "timestamp": datetime.now().timestamp()}
event = CloudEvent(attributes, data)
headers, body = to_structured(event)
requests.post("http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/test", data=body, headers=headers)

so far i have a java kafka consumer:

...
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<String, CloudEvent> consumer = new KafkaConsumer<>(properties);
...
while (true) {
    ConsumerRecords<String, CloudEvent> records =
            consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, CloudEvent> record : records) {
        CloudEventData data = record.value().getData();
        System.out.println(data.toString());
    }
}

which just logs BytesCloudEventData{value=[123, 34, 112, 114, 105, 99, 101, 34, 58, 49, 57, 53, 48, 51, 46, 56, 51, 44, 34, 116, 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 49, 46, 54, 54, 51, 53, 51, 54, 48, 54, 50, 55, 50, 50, 54, 50, 52, 69, 57, 125]}


Solution

  • The "binary" example you gave is already a JSON string:

    value=[123, 34, 112, 114, 105, 99, 101, 34, 58, 49, 57, 53, 48, 51, 46, 56, 51, 44, 34, 116, 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 49, 46, 54, 54, 51, 53, 51, 54, 48, 54, 50, 55, 50, 50, 54, 50, 52, 69, 57, 125]

    ASCII text: {"price":19503.83,"timestamp":1.663536062722624E9}

    SUGGESTION: try something like this:

    String s = new String(bytes, StandardCharsets.UTF_8);