javaspringspring-bootmultithreadingtransactions

How to handle asynchronous thread processing with same transaction in SpringBoot?


I have a Spring Boot project with the following structure:

When the API is called, the controller method processUser invokes an asynchronous method, and the API does not wait for its completion. From that method, I am calling UserService to process the user.

Current Flow:

Controller -> Creates a thread for the user (using @Async method) -> UserService (with @Transactional)

Now, I want to change the logic so that if a user has multiple sub-users, UserService should process all sub-users asynchronously in parallel to save time. However, if an error occurs during the processing of any sub-user, I want the entire transaction to roll back. The user thread should wait for all the sub-user threads to complete before finalizing the transaction.

Desired Flow:

Controller -> Creates a thread for the user (using @Async method) -> UserService (with @Transactional) -> Multiple threads for parallel processing (using @Async method) -> SubUserService (with @Transactional)

Currenlty I am facing below Exception :

Caused by: java.lang.IllegalStateException: No thread-bound request found: Are you referring to request attributes outside of an actual web request, or processing a request outside of the originally receiving thread? If you are actually operating within a web request and still receive this message, your code is probably running outside of DispatcherServlet: In this case, use RequestContextListener or RequestContextFilter to expose the current request.

NOTE:

  1. I try below Solution but it didn't work:
@Bean
    @Scope(value = "request", proxyMode = ScopedProxyMode.TARGET_CLASS)
    public FCSessionDataBeanWebapp sessionDataBean() {  
        AuthenticationUser authenticationUser = null;
        try{
        SecurityContext securityContext = SecurityContextHolder.getContext();
        OAuth2Authentication authentication = (OAuth2Authentication) securityContext.getAuthentication();
        authenticationUser = (AuthenticationUser) authentication.getPrincipal();
        }
        catch(Exception ex){
            log.error(ex.getMessage(), ex);
        }
        if(authenticationUser != null)
            return authenticationUser.getSessionDataBeanWebapp();
        else
            return new FCSessionDataBeanWebapp();
    }
  1. I am using @Transactional on the service layer, which handles database operations, but I am not using it on the thread method or the thread service itself. I attempted to make the thread method transactional as well, but it resulted in an "EntityManager closed" error. Since a new transaction is created for each sub-user thread, it is not useful for my use case.

Questions:

  1. How can I resolve the "EntityManager closed" exception mentioned above?
  2. Is there any way for sub-user threads to use the parent-user thread's transaction?
  3. Is there another way to process sub-users in parallel while still using the parent-user's transaction, such as with @TransactionalEventListener or other approaches?

Here is my current implementation:

Controller :

@Autowired
UserAsyncService userAsyncService;

@PostMapping("/process-user")
public ResponseEntity<Object> processUser(@RequestBody Integer userId) {
        userAsyncService.processUserAsync(userId);
        return ResponseEntity.ok().build();
}

UserAsyncService :

@Service
public class UserAsyncService {

    @Autowired
    UserService userService;

    @Async("threadPoolTaskExecutor")
        public void processUserAsync(Integer userId) throws Exception {
            userService.processUser(userId);
        }
}

UserService :

@Service
@Transactional
public class UserService {
    
    @Autowired
    SubUserAsyncService subUserAsyncService;

        public void processUser(Integer userId) throws Exception {
            // process user
            // done db process for user
            List<Integer> subUsers = userDao.retriveSubUserIds(userId);
            
            List<CompletableFuture<Void>> futures = new ArrayList<>();
            for(Integer subUser: subusers) {
                // Question : How can the following thread be implemented to use the current thread's transaction, ensuring that all subuser and parentuser data in the database is rolled back in case of an error?

                CompletableFuture<Void> future = subUserAsyncService.processSubUser(subUser);
                futures.add(future);
            }
            
            // Wait for all sunuser threads to complete
            for (CompletableFuture<Void> future : futures) {
                    future.join();
            }
            
            executorService.shutdown();
        }
}

SubUserAsyncService :

@Service
public class SubUserAsyncService {

    @Autowired
    SubUserService subUserService;

    @Async("threadPoolTaskExecutor2")
        public CompletableFuture<Void> processSubUserAsync(Integer userId) throws Exception {
            subUserService.processUser(userId);
        }
}

SubUserService :

@Transactional
@Service
public class SubUserService {

    @Autowired
    SubUserDao subUserDao;

        public void processUser(Integer userId) throws Exception {
            // process subuser
            subUserDao.processUser(userId);
        }
}

Solution

  • I have separated the subUser process into two parts:

    1. Fetching and processing the subUser data.
    2. Saving the subUser data.

    For the first part, I am using a multi-threaded environment with a new transaction. This transaction is only used for fetching the data, so a rollback is not required.

    In the second part, the process is completed within the original user process method. This ensures that if any error occurs, the transaction for both the subUser and the parent user is rolled back entirely.