There are about 1M images, I need to read them and insert the bytes into redis with python. I have two choices, the first is to use a thread pool, and the second is to use asyncio, since this is only IO task. However, I find out that the thread pool method is much faster than asyncio method. A piece of example code is like this:
import pickle
import os
import os.path as osp
import re
import redis
import asyncio
from multiprocessing.dummy import Pool
r = redis.StrictRedis(host='localhost', port=6379, db=1)
data_root = './datasets/images'
print('obtain name and paths')
paths_names = []
for root, dis, fls in os.walk(data_root):
for fl in fls:
if re.search('JPEG$', fl) is None: continue
pth = osp.join(root, fl)
name = re.sub('\.+/', '', pth)
name = re.sub('/', '-', name)
name = 'redis-' + name
paths_names.append((pth, name))
print('num samples in total: ', len(paths_names))
### this is slower
print('insert into redis')
async def insert_one(path_name):
pth, name = path_name
if r.get(name): return
with open(pth, 'rb') as fr:
binary = fr.read()
r.set(name, binary)
async def func(cid, n_co):
num = len(paths_names)
for i in range(cid, num, n_co):
await insert_one(paths_names[i])
n_co = 256
loop = asyncio.get_event_loop()
tasks = [loop.create_task(func(cid, n_co)) for cid in range(n_co)]
fut = asyncio.gather(*tasks)
loop.run_until_complete(fut)
loop.close()
### this is more than 10x faster
def insert_one(path_name):
pth, name = path_name
if r.get(name): return
with open(pth, 'rb') as fr:
binary = fr.read()
r.set(name, binary)
def func(cid, n_co):
num = len(paths_names)
for i in range(cid, num, n_co):
insert_one(paths_names[i])
with Pool(128) as pool:
pool.map(func, paths_names)
Here I have two questions that puzzled me a lot:
What is the problem with the asyncio method, which makes is slower than thread method?
Is it encouraged to add millions of tasks to the gather
function? Like this:
num_parallel = 1000000000
tasks = [loop.create_task(fetch_func(cid, num_parallel)) for cid in range(num_parallel)]
await asyncio.gather(*tasks)
You're not actually using async for the I/O operations, you're still doing synchronous blocking calls for both disk reads and Redis read/writes.
Try changing your async approach to use async Redis methods. In particular, use aioredis (used to be a separate library, now it is part of the main redis lib).
import aioredis # if you're using redis library version < 4.2.0rc1
# or
from redis import asyncio as aioredis # if you're using redis >= 4.2.0rc1
aior = aioredis.from_url(
"redis://localhost", encoding="utf-8", decode_responses=True
)
# ...
if await aior.get(name): return
# ...
await aoir.set(name, binary)
See how much this change affects your speed difference. If you want to make the file I/O async too, try aiofile:
from aiofile import async_open
# ...
async with async_open(pth, 'rb') as fr:
binary = await fr.read()
# ...