We are using java 8 parallel stream to process a task, and we are submitting the task through ForkJoinPool#submit. We are not using jvm wide ForkJoinPool.commonPool, instead we are creating our own custom pool to specify the parallelism and storing it as static variable.
We have validation framework, where we subject a list of tables to a List of Validators, and we submit this job through the custom ForkJoinPool as follows:
static ForkJoinPool forkJoinPool = new ForkJoinPool(4);
List<Table> tables = tableDAO.findAll();
ModelValidator<Table, ValidationResult> validator = ValidatorFactory
.getInstance().getTableValidator();
List<ValidationResult> result = forkJoinPool.submit(
() -> tables.stream()
.parallel()
.map(validator)
.filter(result -> result.getValidationMessages().size() > 0)
.collect(Collectors.toList())).get();
The problem we are having is, in the downstream components, the individual validators which run on separate threads from our static ForkJoinPool rely on tenant_id, which is different for every request and is stored in an InheritableThreadLocal variable. Since we are creating a static ForkJoinPool, the threads pooled by the ForkJoinPool will only inherit the value of the parent thread, when it is created first time. But these pooled threads will not know the new tenant_id for the current request. So for subsequent execution these pooled threads are using old tenant_id.
I tried creating a custom ForkJoinPool and specifying ForkJoinWorkerThreadFactory in the constructor and overriding the onStart method to feed the new tenant_id. But that doesnt work, since the onStart method is called only once at creation time and not during individual execution time.
Seems like we need something like the ThreadPoolExecutor#beforeExecute which is not available in case of ForkJoinPool. So what alternative do we have if we want to pass the current thread local value to the statically pooled threads?
One workaround would be to create the ForkJoinPool for each request, rather than make it static but we wouldn't want to do it, to avoid the expensive nature of thread creation.
What alternatives do we have?
I found the following solution that works without changing any underlying code. Basically, the map method takes a functional interface which I am representing as a lambda expression. This expression adds a preExecution hook to set the new tenantId in the current ThreadLocal
and cleaning it up in postExecution.
forkJoinPool.submit(tables.stream()
.parallel()
.map((item) -> {
preExecution(tenantId);
try {
return validator.apply(item);
} finally {
postExecution();
}
}
)
.filter(validationResult ->
validationResult.getValidationMessages()
.size() > 0)
.collect(Collectors.toList())).get();