I have a spring application, I have an endpoint which accept multiple files, so you can upload multiple files. I'm trying to make it work faster by utilizing threads, so what I did is I created a ThreadPool and tried to process each file async (in a different thread). Actually I found a tutorial on youtube which is doing almost what I want and I wanted to try his code and see it works or not, but it didn't work for me either. this is the link of the tutorial.
https://www.youtube.com/watch?v=3rJBLFA95Io I believe he doesn't get the error because in his case always number of files and thread match. What I found out is if number of tasks in my pool are equal or less than number of available threads then every thing works perfectly fine, but if number of tasks or greater than number of available threads I get an error which is this :
java.io.FileNotFoundException: /private/var/folders/41/81526n295q1cptcb1tbrs544h504m8/T/tomcat.9191.5294201821824312569/work/Tomcat/localhost/ROOT/upload_ff67c7fe_2f2f_44c6_8eb9_3250c8a8739b_00000003.tmp (No such file or directory)
at java.base/java.io.FileInputStream.open0(Native Method) ~[na:na]
at java.base/java.io.FileInputStream.open(FileInputStream.java:216) ~[na:na]
at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) ~[na:na]
at org.apache.tomcat.util.http.fileupload.disk.DiskFileItem.getInputStream(DiskFileItem.java:198) ~[tomcat-embed-core-9.0.63.jar:9.0.63]
at org.apache.catalina.core.ApplicationPart.getInputStream(ApplicationPart.java:100) ~[tomcat-embed-core-9.0.63.jar:9.0.63]
at org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile.getInputStream(StandardMultipartHttpServletRequest.java:251) ~[spring-web-5.3.20.jar:5.3.20]
at com.mhndev.springexecutor.service.UserService.parseCSVFile(UserService.java:47) ~[classes/:na]
at com.mhndev.springexecutor.service.UserService.saveUsers(UserService.java:29) ~[classes/:na]
at com.mhndev.springexecutor.service.UserService$$FastClassBySpringCGLIB$$c14fedc2.invoke(<generated>) ~[classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$doSubmit$3(AsyncExecutionAspectSupport.java:278) ~[spring-aop-5.3.20.jar:5.3.20]
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
So what I suspect is when a task remains in the pool the multipart file gets deleted before the task can have a assigned thread for some reason.
my controller :
package com.mhndev.springexecutor.controller;
import com.mhndev.springexecutor.entity.User;
import com.mhndev.springexecutor.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@RestController
public class UserController {
@Autowired
private UserService userService;
@PostMapping(value = "/users", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
public ResponseEntity saveUsers(@RequestParam(value = "files") MultipartFile[] files) throws Exception {
for(MultipartFile file: files) {
userService.saveUsers(file);
}
return ResponseEntity.status(HttpStatus.CREATED).build();
}
@GetMapping(value = "/users", produces = "application/json")
public CompletableFuture<ResponseEntity> findAllUsers() {
return userService.findAllUsers().thenApply(ResponseEntity::ok);
}
@GetMapping(value = "/getUsersByThread", produces = "application/json")
public ResponseEntity getUsers(){
CompletableFuture<List<User>> users1=userService.findAllUsers();
CompletableFuture<List<User>> users2=userService.findAllUsers();
CompletableFuture<List<User>> users3=userService.findAllUsers();
CompletableFuture.allOf(users1,users2,users3).join();
return ResponseEntity.status(HttpStatus.OK).build();
}
}
and my service:
package com.mhndev.springexecutor.service;
import com.mhndev.springexecutor.entity.User;
import com.mhndev.springexecutor.repository.UserRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Service
public class UserService {
@Autowired
private UserRepository repository;
Logger logger = LoggerFactory.getLogger(UserService.class);
@Async("taskExecutor")
public CompletableFuture<List<User>> saveUsers(MultipartFile file) throws Exception {
long start = System.currentTimeMillis();
List<User> users = parseCSVFile(file);
logger.info("saving list of users of size {}, thread name : {}", users.size(), "" + Thread.currentThread().getName());
users = repository.saveAll(users);
long end = System.currentTimeMillis();
logger.info("Total time {}", (end - start));
return CompletableFuture.completedFuture(users);
}
@Async("taskExecutor")
public CompletableFuture<List<User>> findAllUsers(){
logger.info("get list of user by " + Thread.currentThread().getName());
List<User> users = repository.findAll();
return CompletableFuture.completedFuture(users);
}
private List<User> parseCSVFile(final MultipartFile file) throws Exception {
final List<User> users = new ArrayList<>();
try {
try (final BufferedReader br = new BufferedReader(new InputStreamReader(file.getInputStream()))) {
String line;
while ((line = br.readLine()) != null) {
final String[] data = line.split(",");
final User user = new User();
user.setName(data[0]);
user.setEmail(data[1]);
user.setGender(data[2]);
users.add(user);
}
return users;
}
} catch (final IOException e) {
logger.error("Failed to parse CSV file", e);
throw new Exception("Failed to parse CSV file {}", e);
}
}
}
and configuration for threadpool:
package com.mhndev.springexecutor.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(3);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("User-Thread-");
executor.initialize();
return executor;
}
}
I fixed the problem by using the CompletableFuture. so I changed my controller as follow :
@PostMapping(value = "/users", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
public ResponseEntity saveUsers(@RequestParam(value = "files") MultipartFile[] files) throws Exception {
var res = new ArrayList<CompletableFuture<List<User>>>();
for(MultipartFile file: files) {
res.add(userService.saveUsers(file));
}
for(CompletableFuture<List<User>> item: res) {
System.out.println(item.get().size());
}
return ResponseEntity.status(HttpStatus.CREATED).build();
}
it seems request's session is over and then temp file is deleted after request's session is over. you know how I figured out ? I added a sleep in the controller body and it worked so I made sure that it is because request is over but there are some unfinished tasks in the queue of thread pool and controller doesn't wait for tasks to be completed, and it seems that I had to use compltable future.