javaspringmongodbspring-bootchangestream

How do I get MongoDB ChangeStreamDocument out as my dto instead of Document type?


I have the following class in my MongoDB project:

@Document(collection = "teams")
public class Team {
    private @MongoId(FieldType.OBJECT_ID)
    @Schema(type = "string", example = "60b0c56e4192f01e8745bd75")
    ObjectId id;
    @Schema(example = "56373")
    private Integer orgId;
    private String name;
    private List<Member> players;
    private List<Member> staff;

    public class Member{
        private ObjectId id;
        private String name;
    }
}

As you can see, this class represents the documents in my teams collection in MongoDB. I'm trying to create a change stream, because I want to monitorize players and staff players joining and leaving teams, as well as existing teams being deleted from the teams collection.

So this is what I've tried:

@Component
public class MongoChangeStream {

    private final MongoTemplate mongoTemplate;

    public MongoDBChangeStream(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
    }

    @EventListener(ContextRefreshedEvent.class)
    public void changeStream() {

        // Select the collection to query
        MongoCollection<Document> collection = mongoTemplate.getCollection("teams");

        // Create pipeline for operationType filter
        List<Bson> pipeline = Arrays.asList(
                Aggregates.match(
                        Filters.in("operationType",
                                Arrays.asList("insert", "update", "delete"))));

        // Create the Change Stream and watch on the filters in the pipeline
        ChangeStreamIterable<Document> changeStream = collection.watch()
            .fullDocument(FullDocument.UPDATE_LOOKUP)
            .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);

        // Iterate over the Change Stream
        for (ChangeStreamDocument<Document> changeEvent : changeStream) {
            switch (changeEvent.getOperationType().name()) {
                case "UPDATE":
                    if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("players")) {
                        // Do something
                    }
                    if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("staff")) {
                        // Do something
                    }
                    break;
                case "DELETE":
                        // Do something
                    break;
            }
        }
    }
}

My question is basically how do I get the documents out from the changeStream as Team objects instead of Document objects?

I've tried to change all the occurrences of Documents with Team, but then I get this error:

Can't find a codec for CodecCacheKey{clazz=class com.test.dto.Team, types=null}.


Solution

  • I ended up making it work like this:

    @Document(collection = "teams")
    public class Team {
        @Id
        @BsonProperty("_id")
        private ObjectId id;
        private Integer orgId;
        private String name;
        private List<Member> players;
        private List<Member> staff;
    
        public class Member {
            @Id
            private ObjectId id;
            private String name;
        }
    }
    

    Then I added CodecRegistry to my MongoChangeStream class like this:

    So this is what I've tried:

    @Component
    public class MongoChangeStream {
    
        private final MongoTemplate mongoTemplate;
    
        public MongoDBChangeStream(MongoTemplate mongoTemplate) {
            this.mongoTemplate = mongoTemplate;
        }
    
        @EventListener(ContextRefreshedEvent.class)
        public void changeStream() {
    
            // Select the collection to query
            CodecRegistry pojoCodecRegistry = org.bson.codecs.configuration.CodecRegistries.fromRegistries(MongoClientSettings.getDefaultCodecRegistry(), org.bson.codecs.configuration.CodecRegistries.fromProviders(PojoCodecProvider.builder().conventions(List.of(ANNOTATION_CONVENTION)).automatic(true).build()));
            MongoCollection<Team> collection = mongoTemplate.getCollection("teams").withCodecRegistry(pojoCodecRegistry).withDocumentClass(Group.class);
    
            // Create pipeline for operationType filter
            List<Bson> pipeline = Arrays.asList(
                    Aggregates.match(
                            Filters.in("operationType",
                                    Arrays.asList("insert", "update", "delete"))));
    
            // Create the Change Stream and watch on the filters in the pipeline
            ChangeStreamIterable<Team> changeStream = collection.watch()
                .fullDocument(FullDocument.UPDATE_LOOKUP)
                .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED);
    
            // Iterate over the Change Stream
            for (ChangeStreamDocument<Team> changeEvent : changeStream) {
                switch (changeEvent.getOperationType().name()) {
                    case "UPDATE":
                        if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("players")) {
                            // Do something
                        }
                        if (changeEvent.getUpdateDescription().getUpdatedFields().containsKey("staff")) {
                            // Do something
                        }
                        break;
                    case "DELETE":
                            // Do something
                        break;
                }
            }
        }
    }