pythonmultithreadingconcurrent.futures

Multithreading stuck at last future


def download_files_from_folder(base_url, folder_name):
    folder_url = f"{base_url}{folder_name}/"
    response = requests.get(folder_url)
    soup = BeautifulSoup(response.content, "html.parser")
    links = soup.find_all("a")
    peninsular_rain_rates = []
    east_rain_rates = []
    completed_urls = []
    with ThreadPoolExecutor(max_workers=15) as executor:
        futures = {executor.submit(process_gz_file, folder_url + link.get("href")): link.get("href") for                                     link in links if link.get("href").endswith(".gz")}
        for future in as_completed(futures):
            peninsular, east, completed = future.result()
            peninsular_rain_rates.extend(peninsular)
            east_rain_rates.extend(east)
            completed_urls.extend(completed)
            futures.pop(future)
            print(len(futures))

In this code, I am trying to fetch URLS links which are .gz files (around 8000 files in total, each ~1.5 MB) from a website for processing and finally extending the results to arrays. But the problem is that the code seems to be stuck at the last future, this is known by popping each future when it is done and printing the length of the remaining futures dictionary. I have experimented with lower number of files (eg. 50 links) and it is working fine.

Could this be a memory issue?


Solution

  • First, I am not sure why you have made futures a dictionary since you are not using the Future instance returned by method as_completed to index into the dictionary. If you really feel the need to remove from futures those instances that represent completed tasks as they are completed and as SIGUP has suggested this is causing a problem, then instead of using as_completed, use a callback as the following template demonstrates:

    from concurrent.futures import ThreadPoolExecutor
    
    def worker(n):
        import time
    
        # emulate work
        time.sleep(1)
        return n
    
    def submit_tasks():
        results = []
        futures = set()
    
        def done_callback(future):
            futures.remove(future)
            print(len(futures))
            results.append(future.result())
    
        ...
        with ThreadPoolExecutor(max_workers=15) as executor:
            # Note that I am not iterating futures concurrently with
            # members being removed from it, which would cause an error:
            for n in range(100):
                future = executor.submit(worker, n)
                futures.add(future)
                future.add_done_callback(done_callback)
        # There is an implicit call to executor.shutdown(wait=True) here:
        # By time we reach here, the results list will have been completely filled in
        # and the length of futures will be 0
        assert len(results) == 100
        assert len(futures) == 0
    
    submit_tasks()
    

    Perhaps a simpler approach would be to use instead a multiprocessing.pool.ThreadPool instance with either the imap or imap_unordered according to whether you would like your results list to contain results in task submission order or not:

    from multiprocessing.pool import ThreadPool
    
    def worker(n):
        import time
    
        # emulate work
        time.sleep(1)
        return n
    
    def submit_tasks():
        results = []
    
        # Here we could just use: tasks = range(100)
        # But if you have something more complicated, then use a
        # generator expression or generator function for space efficiency
        # as follows:
        tasks = (n for n in range(100))
        tasks_left_to_complete = 100
    
        with ThreadPool(15) as pool:
            for result in pool.imap_unordered(worker, tasks):
                results.append(result)
                tasks_left_to_complete -= 1
                print(tasks_left_to_complete)
    
    submit_tasks()