I am using a concurrent.futures.ThreadPoolExecutor in a WSGI (Flask) Python REST service to send queries in parallel. I use the code below to instantiate one executor per request. There is one ThreadPoolExecutor per request with as many threads as there are requests. Obviously, if there are multiple queries in parallel this will create that many thread pools. My question is there any way to create a global one and shared it among the request? and would it result in more efficiency since I don't have to create and destroy one for each request?. The second question is if a ThreadPoolExecutor has a limit of max_workers=32. How does this limit works when I am submitting more than 32 queries to the common thread pool?
Would my assumption that when the amount of queries submitted to a thread-pool is going to exceed the # of cores should not be an an issue since when the request is sent to the database and the thread should block. The thread-pool then should fetch and send the next query. So I should be able to handle more queries than there are cores available, right? I know that in other languages this is how it works I expect the same in Python.
with futures.ThreadPoolExecutor(max_workers=min(32, len(list_of_queries)), thread_name_prefix='query_pool') as tpe:
future_list = [tpe.submit(execute_query, connection, query) for query in
list_of_queries]
for future in futures.as_completed(future_list):
try:
# make sure none of the results has an exception
future.result()
except Exception as exc:
raise exc
When you have a thread pool of N threads, N can certainly be larger than the number of processors you have if the tasks being submitted are largely blocking on I/O, network activity or any action that results in the thread's executing being suspended until some action completes. In the meanwhile another thread can be running. However, when the thread is largely CPU-intensive executing Python bytecode (compiled Python statements) and you are using the "standard" CPyton implementation, then you want to be using multiprocessing instead of multithreading. The problem with Python bytecode execution is that there is a Global Interpreter Lock that the interpreter must acquire before it can interpret bytecode and therefore bytecode cannot execute concurrently in threads.
If we submit more than M tasks to a threading pool, they are enqueued onto an input task queue. When a thread becomes idle having finished executing a task or when the pool is initially created with idle threads, then it takes the next task off the queue if there is one or waits for a task to be enqueued. All that means is that if you submit M tasks where M > N, then the first N tasks will start executing more or less immediately while the remaining tasks continue to sit on the queue until threads become idle.
Let's assume you have M = 4 queries to be submitted. With your current logic you would be creating a thread pool of size 4 (i.e. min(32, 4)
). All 4 tasks will run immediately and concurrently only if you do not have a similar flask invocation currently running. Consider the following example where 4 tasks are being submitted to the threading pool and each task requires 5 seconds to execute. When all 4 tasks are immediately executed, we would expect the total elapsed time for completion to be 4 seconds since the executions run concurrently. Here is the flask code:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from flask import Flask
app = Flask(__name__)
queries = [1, 2, 3, 4]
pool = ThreadPoolExecutor(min(32, len(queries)))
def worker(n: int) -> int:
# Emulate waiting for some network retrieval:
time.sleep(5)
return n
@app.route('/index')
def index():
futures = [pool.submit(worker, n) for n in queries]
total = 0
for future in as_completed(futures):
total += future.result()
return str(total)
app.run()
But what happens if we have two requests being made to the index
view simultaneously? We can arrange that with the following code:
#!/usr/bin/env python3
import time
from threading import Thread
import requests
def fetch_url():
start = time.monotonic()
headers = {'user-agent': 'my-app/0.0.1'}
response = requests.get('http://127.0.0.1:5000/index', headers=headers)
response.raise_for_status()
elapsed = time.monotonic() - start
print(f'result = {response.text}, elapsded time = {elapsed}')
threads = [Thread(target=fetch_url) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
Essentially a total of 8 tasks are being submitted at approximately the same time to a queue that can run only 4 tasks concurrently. The remaining 4 tasks will have to wait for the first 4 tasks to complete before they can even be started. The following is the output from the above code:
result = 10, elapsded time = 5.0
result = 10, elapsded time = 10.0
Therefore, you should create the largest thread pool you will need to handle multiple concurrent requests to the flask application without exceeding system-imposed limits.
Perhaps a better alternative is to not use multithreading at all since it seems to have imposed a limit of 32 threads in your environment and to use instead asyncio
. This is rather large topic with a bit of a learning curve, but worth the effort since you can efficiently create thousands of concurrent tasks (coroutines). You would, of course, need a database connection implementation that uses asyncio
. The flask app now becomes:
import asyncio
import time
from flask import Flask
app = Flask(__name__)
queries = [1, 2, 3, 4]
async def worker(n: int) -> int:
await asyncio.sleep(5)
return n
async def run_queries():
tasks = (worker(n) for n in queries)
return await asyncio.gather(*tasks)
@app.route('/index')
def index():
return str(sum(asyncio.run(run_queries())))
app.run()
Prints:
result = 10, elapsded time = 5.030999999988126
result = 10, elapsded time = 5.030999999988126
You can even install flask asyncio support so that views can be async def
functions. See this for details.