I am using a python socketio async server. I have several socketio.on processes that can occur in my backend. Each user in my website can send a socket to the backend and execute their own process however, in my terminal, each users process is executing one at a time.
For example:
socketio=sio.AsyncServer(cors_allowed_origins="*")
app = web.Application()
socketio.attach(app)
def trainmodel(x, y, username,code):
******Over here training the users AI model which takes about 10 seconds to do
args = TrainingArguments(
output_dir="output",
num_train_epochs=10,
per_device_train_batch_size=10,
optim = "adamw_torch"
)
trainer = Trainer(
model=model,
args=args,
train_dataset=train_dataset,
#eval_dataset=val_dataset,
compute_metrics=compute_metrics
)
trainer.train()
@socketio.on("sendbackends")
async def trainit(sid,data,code):
trainmodel(x,y,username,code)
@socketio.on("bottoken")
async def handle():
await socketio.emit("redirect",namespace = "/")
if __name__ =='__main__':
web.run_app(app, port=5000)
my socket.on functions are running one at a time, however I want every process to run in parallel where it doesnt block the next processes.
TL;DR: What you're looking for is parallel processing, but aiohttp only provides concurrent processing. You can use concurrent.futures.ProcessPoolExecutor
to add parallelism to your program, but be careful of race conditions. See code at the end of this answer.
The issue here is that aiohttp allows concurrent, not parallel, handling of http requests. The distinction between concurrency and parallelism is complicated, and there are many articles that will do a better job explaining it than I can in this post (see here and here), but I'll briefly mention the most important parts.
Both concurrency and parallelism attempt to speed up execution of a program by reducing the amount of time the program is waiting for a function to complete. However, they do this in very different ways, and by targeting different types of functions.
Concurrency targets blocking functions, which are functions that take a long time because they are waiting for an external operation to complete. Prototypical examples of blocking functions are functions that read a file or make a network request. Concurrency "speeds up" blocking functions by continuing to execute other code while waiting for the blocking function to complete. I put "speeds up" in quotes because the blocking functions take the same amount of time (or even longer, due to overhead costs) to complete, but the program appears faster because it doesn't wait around for the blocking functions to finish before doing some more work. Of course, any work that requires the results of the blocking function, such as the contents of the file or the body of the network request, can't be performed until the blocking function completes.
Parallelism targets CPU-bound functions, which are functions that take a long time because they require intensive computation by the CPU. Your trainmodel
function is a great example of a CPU-bound function. Training an AI model requires a huge number of arithmetic operations, which must be performed by your CPU (or GPU, but that's another conversation). Parallelism "speeds up" these functions by taking advantage of the multiple CPUs on your computer, executing your CPU-bound function in a separate process with a separate CPU. Again, "speeds up" is in quotes because the computations take a similar amount of time, they are just happening at the same time on different CPUs, allowing your main program to continue running. And again, any work that requires the results of the CPU-bound function can't be performed until the function completes. (Parallelism can also solve the issue of blocking functions, but is a bit overkill for that purpose.)
(While written for Node.js, the "Blocking" section of this article does a good job of succinctly differentiating between blocking and CPU-intensive code.)
As I mentioned before, aiohttp allows concurrent programming, but if we want to be able to execute multiple instances of the CPU-bound trainmodel
function at once, we need to introduce parallelism. There are many factors you need to consider when adding parallelism to a program, like whether your functions will need to share data, or whether they need to be performed in a particular order. I'm going to assume the trainmodel
function doesn't depend on or modify any global state, but that doesn't seem to be true, since it looks like all models will write to the output
directory. Any problems with parallelism probably belong in a different post.
Adding parallelism isn't too bad, but it does require some changes to the script structure:
import socketio as sio
from aiohttp import web
import asyncio
import concurrent.futures
import functools
# 1. move trainmodel outside of main function
def trainmodel(x, y, username,code):
# contents of trainmodel func
# 2. move server setup to main function
def main():
# 3. create process pool
pool = concurrent.futures.ProcessPoolExecutor()
socketio=sio.AsyncServer(cors_allowed_origins="*")
app = web.Application()
socketio.attach(app)
@socketio.on("sendbackends")
async def trainit(sid,data,code):
# 4. execute trainmodel in the process pool
loop = asyncio.get_running_loop()
await loop.run_in_executor(pool, functools.partial(trainmodel, x, y, username, code))
@socketio.on("bottoken")
async def handle():
await socketio.emit("redirect",namespace = "/")
web.run_app(app, port=5000)
if __name__ =='__main__':
main()
trainmodel
function, so we need to keep that function on the top level.main
function.concurrent.futures
do the work of creating a process pool.asyncio.get_running_loop
. We then add our task to the process pool, giving it all the information it needs to invoke trainmodel
by passing it the function and any arguments using functools.partial
.