mongodbspring-bootevent-listenerchangestream

Spring + MongoDB Use nonReactive and Reactive repository at once


I have a SpringBoot + Mongo application, implemented with non-reactive Repositories (extends MongoRepository<MyDocument, String>) and everything works fine. Now I need to implement an auditing system and I wanted to develop it by using Mongo's ChangeStreams and Spring's Reactive Change Streams (reference). I tried to create 3 classes to achieve this:

MongoMessageListener

@Component
@Slf4j
public class MongoMessageListener implements MessageListener<ChangeStreamDocument<Document>, MyDocument> {

    @Override
    public void onMessage(Message<ChangeStreamDocument<Document>, MyDocument> message) {

        OperationType operationType = message.getRaw().getOperationType();

        log.info("Operation type is : {}", operationType);

        log.info("Received Message in collection: {},message raw: {}, message body:{}",
                message.getProperties().getCollectionName(), message.getRaw(), message.getBody());
    }
}

MongoListenerConfig

@Configuration
@Slf4j
public class MongoStreamListenerConfig extends AbstractReactiveMongoConfiguration {

    @Bean
    MessageListenerContainer changeStreamListenerContainer(
            MongoTemplate template,
            MongoMessageListener consentAuditListener,
            ErrorHandler errorHandler) {

        MessageListenerContainer messageListenerContainer =
                new MongoStreamListenerContainer(template, errorHandler);

        ChangeStreamRequest<ParentContentDocument> request =
                ChangeStreamRequest.builder(consentAuditListener)
                        .collection("my_document_collection")
                        .filter(newAggregation(match(where("operationType").is("update"))))
                        .fullDocumentLookup(FullDocument.UPDATE_LOOKUP)
                        .build();

        messageListenerContainer.register(request, MyDocument.class, errorHandler);
        log.info("> Mongo Stream Listener is registered");
        return messageListenerContainer;
    }

    @Override
    protected String getDatabaseName() {
        return "myDatabase";
    }

    @Bean
    ErrorHandler getLoggingErrorHandler() {
        return new ErrorHandler() {
            @Override
            public void handleError(Throwable throwable) {
                log.error("Error in creating audit records {}", throwable.getMessage());
            }
        };
    }
}

MongoStreamListenerContainer

public class MongoStreamListenerContainer extends DefaultMessageListenerContainer {

    public MongoStreamListenerContainer(MongoTemplate template, ErrorHandler errorHandler) {
        super(template, Executors.newFixedThreadPool(15), errorHandler);
    }

    @Override
    public boolean isAutoStartup() {
        return true;
    }
}

I also added a repo which extends ReactiveMongoRepository<MyDocument, String>

When I try to run my application it raises multiples errors of ClassNotFoundException ([...] 'reactiveStreamsMongoClient' threw exception; nested exception is java.lang.NoClassDefFoundError: com/mongodb/internal/connection/InternalConnectionPoolSettings) , or @Autowired to import repositories into Services not satisfied (Autowired(required=true)})

In my Main.java class I tried to set both @EnableMongoRepositories(basePackages = "com.my.path.to.repository") and @EnableReactiveMongoRepositories("com.my.path.to.reactive.repository") , but nothing seems to work. I doubted I could't squeeze together non-reactive and reactive repos, but I found this SO question and some more, so I guess you can. I tried to follow this Spring project step-by-step, but I always get ClassNotFound errors.

In my pom I have

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-data-mongodb</artifactId>
 <version>2.5.3</version>
</dependency>
<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
 <version>2.5.3</version>
</dependency>
<dependency>
 <groupId>org.mongodb</groupId>
 <artifactId>mongodb-driver-reactivestreams</artifactId>
 <version>4.6.1</version>
</dependency>
<dependency>
 <groupId>io.projectreactor</groupId>
 <artifactId>reactor-core</artifactId>
 <version>3.4.19</version>
</dependency> 

I really cant' see what I'm missing: if it's just some configurations or if I can't mix non-reactive and reactive repos (even though I found users say you can). Any help will be VERY much appreciated, it's driving me a bit crazy! Thank you so much!


Solution

  • As you asked in one of my old Question, I will share what I did on my webflux project. I'm using an old library written in web MVC in my project where the non-reactive mongo is used. I'm using the following dependencies;

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-mongodb</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    </dependency>
    

    my reactive mongo config looks like this. here I'm using the mongo URI connection string to connect, you can also use user/pass for this

    @Configuration
    public class ReactiveMongoConfiguration extends AbstractReactiveMongoConfiguration {
    
      private final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    
    
      @Override
      @Bean
      @Primary
      public MongoClient reactiveMongoClient() {
        ConnectionString connectionString = new ConnectionString(MY_DB_URI_KEY);
        return MongoClients.create(MongoClientSettings.builder().applyConnectionString(connectionString)
            .streamFactoryFactory(NettyStreamFactoryFactory.builder().eventLoopGroup(eventLoopGroup).build()).build());
      }
    
      @Bean
      @Primary
      public ReactiveMongoTemplate reactiveMongoTemplate() {
        return new ReactiveMongoTemplate(reactiveMongoClient(), getDatabaseName());
      }
    
      @Override
      protected String getDatabaseName() {
        return new ConnectionString(MY_DB_URI_KEY).getDatabase();
      }
    
      @PreDestroy
      public void shutDownEventLoopGroup() {
        eventLoopGroup.shutdownGracefully();
      }
    }}
    

    My old web MVC library contains non-reactive mongo bean that looks like this,

    @Configuration
    public class MongoConfigurations {
    
        @Bean
        public MongoTemplate mongoTemplate() {
                ConnectionString connectionString = new ConnectionString(MY_DB_URI);
                MongoClient mongoClient = MongoClients.create(connectionString);
                mongoTemplate = new MongoTemplate(mongoClient, connectionString.getDatabase());
                return mongoTemplate;
        }
    }
    

    on my main application I excluded following auto configurations;

    exclude = { MongoAutoConfiguration.class, MongoDataAutoConfiguration.class }
    

    now you can use both reactive an non-reactive mono templates by injecting. :)