mongodbapache-kafkaspring-cloud-streambsonchange-data-capture

Deserialize BSON ChangeStreamDocument in a Kstream


I just configured a Mongo ChangeStream to push a message with full document in a kafka topic each time a document is modified, and then I would like to transform these messages and push them in a different topic.

To do that, I use a Kafka stream (a Spring Cloud Stream application), but I have difficulties deserializing the BSON ChangeStreamDocument and its fulldocument into my POJO. Do I have to use a third party library like BSON4Jackson or is there a way to do that out of the box with mongo driver/Spring Kafka?


Solution

  • I manage to do it using the mongo codecs and reader, I don't know if it is the best way :

        public T deserialize(String bsonString, Class<T> modelClass) {
           CodecRegistry registry = CodecRegistries.fromRegistries(
                        MongoClientSettings.getDefaultCodecRegistry(),
                        CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build())
           );
           BsonDocument document =BsonDocument.parse(bsonString);
           BsonReader reader = new BsonDocumentReader(document);
           return registry.get(modelClass).decode(reader, DecoderContext.builder().build());
        }