def download_files_from_folder(base_url, folder_name):
folder_url = f"{base_url}{folder_name}/"
response = requests.get(folder_url)
soup = BeautifulSoup(response.content, "html.parser")
links = soup.find_all("a")
peninsular_rain_rates = []
east_rain_rates = []
completed_urls = []
with ThreadPoolExecutor(max_workers=15) as executor:
futures = {executor.submit(process_gz_file, folder_url + link.get("href")): link.get("href") for link in links if link.get("href").endswith(".gz")}
for future in as_completed(futures):
peninsular, east, completed = future.result()
peninsular_rain_rates.extend(peninsular)
east_rain_rates.extend(east)
completed_urls.extend(completed)
futures.pop(future)
print(len(futures))
In this code, I am trying to fetch URLS links which are .gz
files (around 8000 files in total, each ~1.5 MB) from a website for processing and finally extending the results to arrays. But the problem is that the code seems to be stuck at the last future, this is known by popping each future when it is done and printing the length of the remaining futures dictionary. I have experimented with lower number of files (eg. 50 links) and it is working fine.
Could this be a memory issue?
First, I am not sure why you have made futures
a dictionary since you are not using the Future
instance returned by method as_completed
to index into the dictionary. If you really feel the need to remove from futures
those instances that represent completed tasks as they are completed and as SIGUP has suggested this is causing a problem, then instead of using as_completed
, use a callback as the following template demonstrates:
from concurrent.futures import ThreadPoolExecutor
def worker(n):
import time
# emulate work
time.sleep(1)
return n
def submit_tasks():
results = []
futures = set()
def done_callback(future):
futures.remove(future)
print(len(futures))
results.append(future.result())
...
with ThreadPoolExecutor(max_workers=15) as executor:
# Note that I am not iterating futures concurrently with
# members being removed from it, which would cause an error:
for n in range(100):
future = executor.submit(worker, n)
futures.add(future)
future.add_done_callback(done_callback)
# There is an implicit call to executor.shutdown(wait=True) here:
# By time we reach here, the results list will have been completely filled in
# and the length of futures will be 0
assert len(results) == 100
assert len(futures) == 0
submit_tasks()
Perhaps a simpler approach would be to use instead a multiprocessing.pool.ThreadPool
instance with either the imap
or imap_unordered
according to whether you would like your results
list to contain results in task submission order or not:
from multiprocessing.pool import ThreadPool
def worker(n):
import time
# emulate work
time.sleep(1)
return n
def submit_tasks():
results = []
# Here we could just use: tasks = range(100)
# But if you have something more complicated, then use a
# generator expression or generator function for space efficiency
# as follows:
tasks = (n for n in range(100))
tasks_left_to_complete = 100
with ThreadPool(15) as pool:
for result in pool.imap_unordered(worker, tasks):
results.append(result)
tasks_left_to_complete -= 1
print(tasks_left_to_complete)
submit_tasks()