I want to process cloudevents created with python through knative broker ingress in a java kafka consumer. How do i get back the playload values from binary? Event Creation:
attributes = {
"type": "com.example.sampletype1",
"source": "https://example.com/event-producer",
}
data = {"price": 20.23, "timestamp": datetime.now().timestamp()}
event = CloudEvent(attributes, data)
headers, body = to_structured(event)
requests.post("http://kafka-broker-ingress.knative-eventing.svc.cluster.local/default/test", data=body, headers=headers)
so far i have a java kafka consumer:
...
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CloudEventDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, CloudEvent> consumer = new KafkaConsumer<>(properties);
...
while (true) {
ConsumerRecords<String, CloudEvent> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, CloudEvent> record : records) {
CloudEventData data = record.value().getData();
System.out.println(data.toString());
}
}
which just logs
BytesCloudEventData{value=[123, 34, 112, 114, 105, 99, 101, 34, 58, 49, 57, 53, 48, 51, 46, 56, 51, 44, 34, 116, 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 49, 46, 54, 54, 51, 53, 51, 54, 48, 54, 50, 55, 50, 50, 54, 50, 52, 69, 57, 125]}
The "binary" example you gave is already a JSON string:
value=[123, 34, 112, 114, 105, 99, 101, 34, 58, 49, 57, 53, 48, 51, 46, 56, 51, 44, 34, 116, 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 49, 46, 54, 54, 51, 53, 51, 54, 48, 54, 50, 55, 50, 50, 54, 50, 52, 69, 57, 125]
ASCII text: {"price":19503.83,"timestamp":1.663536062722624E9}
SUGGESTION: try something like this:
String s = new String(bytes, StandardCharsets.UTF_8);