This is a part of my code.
For example, even if 3 out of 10 ends first, I want the next 3 to start right away and always keep 10 threads running.
However, the current code is moving on to the next 10 only when all 10 are completely finished.
How can I modify the code?
I always want to keep a certain number of threads, but as it stands, I have to finish the previous 10 threads completely before moving on to the next 10.
import concurrent.futures
import time
def example_task(n):
print(f"Task {n} started.")
time.sleep(n)
print(f"Task {n} completed.")
return n
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for i in range(10):
futures.append(executor.submit(example_task, i+1))
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(f"Result of task: {result}")
next_task = len(futures) + 1
futures.append(executor.submit(example_task, next_task))
except Exception as e:
print(f"Error: {e}")
Use a queue-based approach with ThreadPoolExecutor, where tasks are continuously submitted as soon as one completes.
import concurrent.futures
import time
import itertools
def example_task(n):
print(f"Task {n} started.")
time.sleep(n) # Simulate work
print(f"Task {n} completed.")
return n
def main():
max_threads = 5
total_tasks = 20 # Total number of tasks you want to process
task_counter = itertools.count(1) # Infinite counter for task numbers
with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = {}
# Submit initial batch of tasks
for _ in range(max_threads):
task_id = next(task_counter)
futures[executor.submit(example_task, task_id)] = task_id
# Process tasks dynamically
while futures:
done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
for future in done:
task_id = futures.pop(future) # Remove completed task
try:
result = future.result()
print(f"Result of task {result}")
# Submit a new task if we haven't reached the total task limit
if task_id < total_tasks:
new_task_id = next(task_counter)
futures[executor.submit(example_task, new_task_id)] = new_task_id
else:
print("FINISHING")
break
except Exception as e:
print(f"Task {task_id} failed: {e}")
if __name__ == "__main__":
main()