javajava-streampartitioning

Split list into smaller lists and then stream them in multiple threads


I have a database in which I have a table with links.

I have managed to find out that I can split a list into smaller list with the help of partitioning. According to this article it seems that Partition class is the fastest: Divide a list to lists of n size in Java 8.

After I have split them into smaller lists I would like to use these links and scrape data from them simultanously. I could have used one list and then:

linkList.parallelStream().forEach(link -> {
        ScrapeLink(link);});

And set

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "5");

But in my case I would like to split them into smaller lists and then parallelStream to another method were I use ScraperAPI to have each split of links in one session (using session_number Reuse the same proxy by setting session_number=123 for example.)

So when I have a list like this:

final List<String> links = Arrays.asList("link1","link2","link3","link4","link5","link6","link7");

System.out.println(Partition.ofSize(numbers, 3));

I will have [[link1, link2, link3], [link4, link5, link6], [link7]]

But how do I do then when I wants to process these small link lists in multiple threads at the same time?

My thoughts was to use Java 8 Streams. But their might be a better way?


Solution

  • You can make use of Default forkjoinpool (of capacity 5 as you mentioned)

    and also a custom thread pool defined for your sublists.

    So you need to make a runnable class like this first which you will submit later on to your "New" Threadpool

        @AllArgsConstructor
        public void LinkProcessorTask implements Runnable {
            private String link;
            
            @Override
            public void run() {
                //do something with your link in the sublist
            }
        }
        
    
        public void doWork() {
    
          List<List<String>> subListsOfLinks = .... // partitioning function
    
          subListsOfLinks.parallelStream().forEach(list -> {
              ExecutorService executorService = Executors.newFixedThreadPool(4 //concurrency);
              for(String link: list) {
                  LinkProcessorTask linkProcessorTask = new LinkProcessorTask(link);
                  executorService.submit(linkProcessorTask);
                  executorService.awaitTermination(//Timeout);
    
              }
          })
        }
    

    Now its your own design decision now whether you want to make this new Threadpool a global one with fixed concurrency. or you want to invoke within your ForkJoinPool.

    If you go within, total number of threads spawned = ForkJoinPoolConcurrency * CustomThreadPoolConcurrency.

    Otherwise it will be just ForkJoinPoolConcurrency + CustomThreadPoolConcurrency.

    Depends on your machine etc, multiple factors.

    You can avoid the hefty awaitTermination method by using a CountDownLatch, if you want to wait for all the links of a set to complete first and then proceed ahead.