pythonconcurrent.futures

How to keep the same number of threads on Python all the time?


This is a part of my code.

For example, even if 3 out of 10 ends first, I want the next 3 to start right away and always keep 10 threads running.

However, the current code is moving on to the next 10 only when all 10 are completely finished.

How can I modify the code?

I always want to keep a certain number of threads, but as it stands, I have to finish the previous 10 threads completely before moving on to the next 10.

import concurrent.futures
import time

def example_task(n):
    print(f"Task {n} started.")
    time.sleep(n)
    print(f"Task {n} completed.")
    return n

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    futures = []

    for i in range(10):
        futures.append(executor.submit(example_task, i+1))

    for future in concurrent.futures.as_completed(futures):
        try:
            result = future.result()
            print(f"Result of task: {result}")
        
            next_task = len(futures) + 1
                futures.append(executor.submit(example_task, next_task))
        except Exception as e:
            print(f"Error: {e}")

Solution

  • Use a queue-based approach with ThreadPoolExecutor, where tasks are continuously submitted as soon as one completes.

    import concurrent.futures
    import time
    import itertools
    
    def example_task(n):
        print(f"Task {n} started.")
        time.sleep(n)  # Simulate work
        print(f"Task {n} completed.")
        return n
    
    def main():
        max_threads = 5
        total_tasks = 20  # Total number of tasks you want to process
        task_counter = itertools.count(1)  # Infinite counter for task numbers
    
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
            futures = {}
    
            # Submit initial batch of tasks
            for _ in range(max_threads):
                task_id = next(task_counter)
                futures[executor.submit(example_task, task_id)] = task_id
    
            # Process tasks dynamically
            while futures:
                done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
    
                for future in done:
                    task_id = futures.pop(future)  # Remove completed task
                    try:
                        result = future.result()
                        print(f"Result of task {result}")
    
                        # Submit a new task if we haven't reached the total task limit
                        if task_id < total_tasks:
                            new_task_id = next(task_counter)
                            futures[executor.submit(example_task, new_task_id)] = new_task_id
                        else:
                            print("FINISHING")
                            break
    
                    except Exception as e:
                        print(f"Task {task_id} failed: {e}")
    
    if __name__ == "__main__":
        main()