apache-kafkakafka-producer-apispring-cloud-streamspring-kafka

Error to serialize message when sending to kafka topic


i need to test a message, which contains headers, so i need to use MessageBuilder, but I can not serialize.

I tried adding the serialization settings on the producer props but it did not work.

Can someone help me?

this error:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

My test class:

public class TransactionMastercardAdapterTest extends AbstractTest{

@Autowired
private KafkaTemplate<String, Message<String>> template;

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

@BeforeClass
public static void setUp() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

@Test
public void sendTransactionCommandTest(){

    String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
            + "\"cardId\" : \"11\","
            + "\"transactionId\" : \"20110405123456\","
            + "\"amount\" : 200.59,"
            + "\"partnerId\" : \"11\"}";

    Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, Message<String>> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, Message<String>> ("notification_topic", MessageBuilder.withPayload(payload)
            .setHeader("status", "RECEIVED")
            .setHeader("service", "MASTERCARD")
            .build()));

    Map<String, Object> configs = KafkaTestUtils.consumerProps("test1", "false", embeddedKafka);

    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);

    Consumer<byte[], byte[]> consumer = cf.createConsumer();
    consumer.subscribe(Collections.singleton("transaction_topic"));
    ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
    consumer.commitSync();

    assertThat(records.count()).isEqualTo(1);
}

}


Solution

  • I'd say the error is obvious:

    Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer
    

    Where your value is GenericMessage, but StringSerializer can work only with strings.

    What you need is called JavaSerializer which does not exist, but not so difficult to write:

    public class JavaSerializer implements Serializer<Object> {
    
        @Override
        public byte[] serialize(String topic, Object data) {
            try {
                ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
                ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
                objectStream.writeObject(data);
                objectStream.flush();
                objectStream.close();
                return byteStream.toByteArray();
            }
            catch (IOException e) {
                throw new IllegalStateException("Can't serialize object: " + data, e);
            }
        }
    
        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
    
        }
    
        @Override
        public void close() {
    
        }
    
    }
    

    And configure it for that value.serializer property.