The code I'm working with
package com.skimmer;
import java.util.ArrayList;
import java.util.List;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.LongStream;
import java.util.stream.Stream;
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// Simply creating some 'test' data
Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
Spliterator<String> spliterator = test.parallel().spliterator();
List<Callable<Long>> callableList = new ArrayList<Callable<Long>>();
// Creating a future for each split to process concurrently
int totalSplits = 0;
while ((spliterator = spliterator.trySplit()) != null) {
totalSplits++;
callableList.add(new Worker(spliterator, "future-" + totalSplits));
}
ExecutorService executor = Executors.newFixedThreadPool(totalSplits);
List<Future<Long>> futures = executor.invokeAll(callableList);
AtomicLong counter = new AtomicLong(0);
for (Future<Long> future : futures)
counter.getAndAdd(future.get());
System.out.println("Total processed " + counter.get());
System.out.println("Total splits " + totalSplits);
executor.shutdown();
}
public static class Worker implements Callable<Long> {
private Spliterator<String> spliterator;
private String name;
public Worker(Spliterator<String> spliterator, String name) {
this.spliterator = spliterator;
this.name = name;
}
@Override
public Long call() {
AtomicLong counter = new AtomicLong(0);
spliterator.forEachRemaining(s -> {
// We'll assume busy processing code here
counter.getAndIncrement();
});
System.out.println(name + " Total processed : " + counter.get());
return counter.get();
}
}
}
The output
furture-11 Total processed : 244
furture-10 Total processed : 488
furture-9 Total processed : 977
furture-12 Total processed : 122
furture-7 Total processed : 3906
furture-13 Total processed : 61
furture-8 Total processed : 1953
furture-6 Total processed : 7813
furture-14 Total processed : 31
furture-5 Total processed : 15625
furture-15 Total processed : 15
furture-4 Total processed : 31250
furture-17 Total processed : 4
furture-18 Total processed : 2
furture-19 Total processed : 1
furture-16 Total processed : 8
furture-3 Total processed : 62500
furture-2 Total processed : 125000
furture-1 Total processed : 250000
future-0 Total processed : 500000
Total processed 1000000
Total splits 20
My problem/Question : The first trySplit (and future task 'future-0') gets exactly n/2 total elements to begin processing. The first couple splits take a long time to complete - this gets worse as n grows. Is there any other way to process a stream where each future/callable gets an equal distribution of elements to process such as (N/splits) ie. 1000000/20 = 50000
Desired results
furture-11 Total processed : 50000
furture-10 Total processed : 50000
furture-9 Total processed : 50000
furture-12 Total processed : 50000
furture-7 Total processed : 50000
furture-13 Total processed : 50000
furture-8 Total processed : 50000
furture-6 Total processed : 50000
furture-14 Total processed : 50000
furture-5 Total processed : 50000
furture-15 Total processed : 50000
furture-4 Total processed : 50000
furture-17 Total processed : 50000
furture-18 Total processed : 50000
furture-19 Total processed : 50000
furture-16 Total processed : 50000
furture-3 Total processed : 50000
furture-2 Total processed : 50000
furture-1 Total processed : 50000
future-0 Total processed : 50000
Total processed 1000000
Total splits 20
Follow up question : If Spliterator is unable to do this what other approach/solution would be best used to process large streams concurrently.
Practical case scenario : Processing a large (6GB) CSV file that is too large to hold in memory
You are getting perfectly balanced splits here. The problem is, each time you split a sequence of elements into two halves, represented by two Spliterator
instances, you create a job for one of the halves, not even attempting to split it further, but only subdividing the other halve.
So right after the first split, you create a job covering 500,000 elements. Then, you call trySplit
on the other 500,000 elements, getting a perfect split into two chunks of 250,000 elements, create another job covering one chunk of 250,000 elements and only try to subdivide the other. And so on. It's your code creating unbalanced jobs.
When you change your first part to
// Simply creating some 'test' data
Stream<String> test = LongStream.range(0, 10000000L).mapToObj(i -> i + "-test");
// Creating a future for each split to process concurrently
List<Callable<Long>> callableList = new ArrayList<>();
int workChunkTarget = 5000;
Deque<Spliterator<String>> spliterators = new ArrayDeque<>();
spliterators.add(test.parallel().spliterator());
int totalSplits = 0;
while(!spliterators.isEmpty()) {
Spliterator<String> spliterator = spliterators.pop();
Spliterator<String> prefix;
while(spliterator.estimateSize() > workChunkTarget
&& (prefix = spliterator.trySplit()) != null) {
spliterators.push(spliterator);
spliterator = prefix;
}
totalSplits++;
callableList.add(new Worker(spliterator, "future-" + totalSplits));
}
you get quiet close to your desired target workload size (as close as we can, given that the numbers are not powers of two).
The Spliterator
design works much smoother with tools like ForkJoinTask
, where a new job can be submitted after each successful trySplit
and the job itself will decide to split and spawn new jobs concurrently when the worker threads are not saturated (like parallel stream operations are done in the reference implementation).