springspring-bootspring-kafkaspring-data-mongodb

MongoDB transaction in Kafka Listener has already been closed if exception occurs


I get the following error when using a MongoDB transaction inside @KafkaListener method.

The repository performs an unordered bulk operation (that may fail, but I'm intrested in knowing which operations were inserted). The log message clearly states that the transaction has already been closed, but the KafkaListener method keeps reprocessing the batch ( ListenerExecutionFailedException has been thrown)

I suspect that either the transaction has been closed when the data integrity violation has been catched by the MongoDB driver, or something is conflicting in between kafka transactions (that I have no real intrest in using right now) and MongoDB ones.

Is someone else experiecing this? Am I missing something in configuring transactions / kafka listener?

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.example.MyListener.listen(java.util.List<java.lang.String>)' threw exception
...
Caused by: org.springframework.transaction.TransactionSystemException: Could not commit Mongo transaction for session [ClientSessionImpl@660ef6de id = {"id": {"$binary": {"base64": "lRRKccYZTAasvsU3oVUCCg==", "subType": "04"}}}, causallyConsistent = true, txActive = false, txNumber = 11, closed = false, clusterTime = {"clusterTime": {"$timestamp": {"t": 1743775965, "i": 11}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}].
...
Caused by: com.mongodb.MongoCommandException: Command failed with error 251 (NoSuchTransaction): 'Transaction with { txnNumber: 11 } has been aborted.' on server localhost:27017. The full response is {"errorLabels": ["TransientTransactionError"], "ok": 0.0, "errmsg": "Transaction with { txnNumber: 11 } has been aborted.", "code": 251, "codeName": "NoSuchTransaction", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1743775965, "i": 11}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}, "operationTime": {"$timestamp": {"t": 1743775965, "i": 11}}}
    at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:210) ~[mongodb-driver-core-5.2.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:520) ~[mongodb-driver-core-5.2.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceiveInternal(InternalStreamConnection.java:448) ~[mongodb-driver-core-5.2.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.lambda$sendAndReceive$0(InternalStreamConnection.java:375) ~[mongodb-driver-core-5.2.1.jar:na]
    at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:378) ~[mongodb-driver-core-5.2.1.jar:na]
    at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:111) ~[mongodb-driver-core-5.2.1.jar:na]
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:748) ~[mongodb-driver-core-5.2.1.jar:na]
    at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:68) ~[mongodb-driver-core-5.2.1.jar:na]
  ...
@Component
@AllArgsConstructor
class MyRepository {

  private final MongoOperations mongoOperations;

  public void transactionalBulkSave(List<MyDocument> documents) {
    BulkOperations bulkOperations = mongoOperations.bulkOps(BulkMode.UNORDERED, MyDocument.class);
    bulkOperations.insert(documents);
    bulkOperations.execute();
  }
}
@Service
@AllArgsConstructor
@Slf4j
class MyService {

  private final MyRepository myRepository;

  @Transactional(transactionManager = "mongoTransactionManager")
  public void transactionalBulkSave(List<MyDocument> documents) {
    try {
      myRepository.transactionalBulkSave(documents);
    } catch (BulkOperationException e) {
      log.error(e.getMessage());
      Set<String> collect = e
          .getResult()
          .getInserts()
          .stream()
          .map(el -> el
              .getId()
              .asObjectId()
              .getValue()
              .toString())
          .collect(Collectors.toSet());
      List<MyDocument> valid = documents
          .stream()
          .filter(doc -> collect.contains(doc
              .id()
              .toString()))
          .toList();
      log.info("saved {}", valid);
    }
  }
}
@Component
@AllArgsConstructor
@Slf4j
class MyListener {

  private final MyService myService;

  @KafkaListener(
      topics = "test",
      batch = "true"
  )
  public void listen(List<String> strings) {
    log.info("Received: {}", strings);
    List<MyDocument> list = strings
        .stream()
        .map(e -> new MyDocument(ObjectId.get(), e))
        .toList();
    myService.transactionalBulkSave(list);
  }
}
@Configuration(proxyBeanMethods = false)
class MyConfiguration {

  @Bean
  MongoTransactionManager mongoTransactionManager(MongoDatabaseFactory databaseFactory) {
    return new MongoTransactionManager(databaseFactory);
  }
}
@Test
void shouldInsertOthersButNotDuplicates() {
  kafkaTemplate.send("test", UUID
      .randomUUID()
      .toString(), "1");

  await()
      .atMost(1000, TimeUnit.SECONDS)
      .untilAsserted(() -> assertThat(mongoOperations.findAll(MyDocument.class)).hasSize(1));

  kafkaTemplate.send("test", UUID
      .randomUUID()
      .toString(), "1");
  kafkaTemplate.send("test", UUID
      .randomUUID()
      .toString(), "2");

  await()
      .atMost(10, TimeUnit.SECONDS)
      .untilAsserted(() -> assertThat(mongoOperations.findAll(MyDocument.class)).hasSize(2));
}
@Document
record MyDocument(
    @Id
    ObjectId id,
    @Indexed(unique = true)
    String value
) {
}

Solution

  • Thank you for the sample application.

    So, your code in the transaction service is like this:

      @Transactional(transactionManager = "mongoTransactionManager")
      public void transactionalBulkSave(List<MyDocument> documents) {
        try {
          myRepository.transactionalBulkSave(documents);
        } catch (BulkOperationException e) {
          log.error(e.getMessage());
          Set<String> collect = e
                                  .getResult()
                                  .getInserts()
                                  .stream()
                                  .map(this::getInsertedId)
                                  .map(String::valueOf)
                                  .collect(Collectors.toSet());
          List<MyDocument> valid = documents
                                     .stream()
                                     .filter(doc -> collect.contains(doc
                                                                       .id()
                                                                       .toString()))
                                     .toList();
          log.info("saved {}", valid);
        } catch (Throwable any){
          log.error(any.getMessage());
        }
      }
    

    Even if you are swallowing exception, the transactional state in MongoDB has been broken. When we come back to the transaction manager its attempt to commit transaction is failing. Therefore it leads to that Transaction with { txnNumber: 3 } has been aborted. which bubbles back to the @KafkaListener with everything what can be done there with error handling.

    So, you might need to think about catching that org.springframework.transaction.TransactionSystemException in the @KafkaListener and perform respective transaction commit failure.