There are about 1M images, I need to read them and insert the bytes into redis with python. I have two choices, the first is to use a thread pool, and the second is to use asyncio, since this is only IO task. However, I find out that the thread pool method is much faster than asyncio method. A piece of example code is like this:
import pickle import os import os.path as osp import re import redis import asyncio from multiprocessing.dummy import Pool r = redis.StrictRedis(host='localhost', port=6379, db=1) data_root = './datasets/images' print('obtain name and paths') paths_names =  for root, dis, fls in os.walk(data_root): for fl in fls: if re.search('JPEG$', fl) is None: continue pth = osp.join(root, fl) name = re.sub('\.+/', '', pth) name = re.sub('/', '-', name) name = 'redis-' + name paths_names.append((pth, name)) print('num samples in total: ', len(paths_names)) ### this is slower print('insert into redis') async def insert_one(path_name): pth, name = path_name if r.get(name): return with open(pth, 'rb') as fr: binary = fr.read() r.set(name, binary) async def func(cid, n_co): num = len(paths_names) for i in range(cid, num, n_co): await insert_one(paths_names[i]) n_co = 256 loop = asyncio.get_event_loop() tasks = [loop.create_task(func(cid, n_co)) for cid in range(n_co)] fut = asyncio.gather(*tasks) loop.run_until_complete(fut) loop.close() ### this is more than 10x faster def insert_one(path_name): pth, name = path_name if r.get(name): return with open(pth, 'rb') as fr: binary = fr.read() r.set(name, binary) def func(cid, n_co): num = len(paths_names) for i in range(cid, num, n_co): insert_one(paths_names[i]) with Pool(128) as pool: pool.map(func, paths_names)
Here I have two questions that puzzled me a lot:
What is the problem with the asyncio method, which makes is slower than thread method?
Is it encouraged to add millions of tasks to the
gather function? Like this:
num_parallel = 1000000000 tasks = [loop.create_task(fetch_func(cid, num_parallel)) for cid in range(num_parallel)] await asyncio.gather(*tasks)
You're not actually using async for the I/O operations, you're still doing synchronous blocking calls for both disk reads and Redis read/writes.
Try changing your async approach to use async Redis methods. In particular, use aioredis (used to be a separate library, now it is part of the main redis lib).
import aioredis # if you're using redis library version < 4.2.0rc1 # or from redis import asyncio as aioredis # if you're using redis >= 4.2.0rc1 aior = aioredis.from_url( "redis://localhost", encoding="utf-8", decode_responses=True ) # ... if await aior.get(name): return # ... await aoir.set(name, binary)
See how much this change affects your speed difference. If you want to make the file I/O async too, try aiofile:
from aiofile import async_open # ... async with async_open(pth, 'rb') as fr: binary = await fr.read() # ...