pythonasync-awaitconcurrencypython-asyncio

Using Asyncio for downloading big files: 10 times slower than threading


The goal is to download images from the URLs asynchronously.

Here is the code for threading, which works fine:

import requests
import threading
import os
import time
import concurrent.futures as futures


def download_image(url, folder):
    os.makedirs(folder, exist_ok=True)
    filename = url.split('/')[-1]
    filepath = os.path.join(folder, filename)
    
    response = requests.get(url)
    with open(filepath, "wb") as f:
        f.write(response.content)


urls = [
    "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
    "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
    "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
    "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
    "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
    "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
    "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
    "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
    "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
] * 2

start_time = time.time()
with futures.ThreadPoolExecutor(max_workers=3) as executor:
    executor.map(download_image, urls, ["2a_threadpool"] * len(urls))
end_time = time.time()
print(f"ThreadPoolExecutor download time: {end_time - start_time:.2f} seconds")

Here is the code for asyncio, which is very slow:

import requests
import threading
import asyncio
import os
import time
import aiohttp
import aiofiles


async def download_image_async(url, folder, session):
        async with session.get(url) as response:
            data = await response.read()
            filename = url.split('/')[-1]
            filepath = os.path.join(folder, filename)

            async with aiofiles.open(filepath, "wb") as f:
                await f.write(data)


async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [download_image_async(url, "3_asyncio", session) for url in urls]
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    start_time = time.time()
    asyncio.run(main())
    end_time = time.time()
    print(f"Asyncio download time: {end_time - start_time:.2f} seconds")

The images are quite large (more than 5 MB), and I can't find any mistakes in my code. Are there any possible improvements to the code? Or are large files a bottleneck? If so, why?


Solution

  • After testing your code, I noticed that your examples aren't downloading the complete files due to a download error. The files are there, but they can't be opened.

    I also think that if you limit the number of threads to three, you should also reduce the number of concurrent calls within the second example.

    In my comparison, both versions have a similar execution time.
    As a supplement, I have added a version that uses httpx instead of aiohttp. It also achieves a similar result.

    from concurrent import futures
    import os
    import requests
    import time
    
    def download(url, target):
        try:
            with requests.get(url, stream=True, headers={'User-Agent': 'Downloader/1.0'}) as resp:
                resp.raise_for_status()
                with open(target, 'wb') as f:
                    for chunk in resp.iter_content(chunk_size=8192):
                        if not chunk: break
                        f.write(chunk)
        except Exception as e:
            print(f"Error retrieving {url}: {e}")
    
    def url2dest(folder, url):
        filename = url.split('/')[-1]
        filepath = os.path.join(folder, filename)
        return filepath
    
    def main():
        urls = [
            "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
        ] * 2
        folder = '2_threadpool'
        os.makedirs(folder, exist_ok=True)
        with futures.ThreadPoolExecutor(max_workers=3) as executor:
            executor.map(lambda u: download(u, url2dest(folder, u)), urls)
    
    
    if __name__ == '__main__':
        bgn_time = time.time()
        main()
        end_time = time.time()
        print(f"ThreadPoolExecutor download time: {end_time - bgn_time:.2f} seconds")
    
    from functools import wraps
    import aiofiles
    import aiohttp
    import asyncio 
    import os
    import time
    
    def limit_concurrency(limit=10):
        sem = asyncio.Semaphore(limit)
        def executor(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                async with sem:
                    return await func(*args, **kwargs)
            return wrapper
        return executor
    
    @limit_concurrency(limit=3)
    async def download(session: aiohttp.ClientSession, url: str, filename: str):
        try:
            async with session.get(url, headers={'User-Agent': 'Downloader/1.0'}) as response:
                assert response.status == 200
                async with aiofiles.open(filename, mode='wb') as fp:
                    async for chunk in response.content.iter_chunked(8192):
                        await fp.write(chunk)
        except Exception as e:
            print(f"Error retrieving {url}: {e}")
    
    def url2dest(folder, url):
        filename = url.split('/')[-1]
        filepath = os.path.join(folder, filename)
        return filepath
    
    async def main():
        urls = [
            "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
        ] * 2
        folder = '3_asyncio'
        os.makedirs(folder, exist_ok=True)
        async with aiohttp.ClientSession() as session:
            tasks = [download(session, url, url2dest(folder, url)) for url in urls]
            await asyncio.gather(*tasks)
    
    if __name__ == '__main__':
        bgn_time = time.time()
        asyncio.run(main())
        end_time = time.time()
        print(f"Asyncio download time: {end_time - bgn_time:.2f} seconds")
    
    from functools import wraps
    import aiofiles
    import asyncio
    import httpx
    import os
    import time
    
    def limit_concurrency(limit=10):
        sem = asyncio.Semaphore(limit)
        def executor(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                async with sem:
                    return await func(*args, **kwargs)
            return wrapper
        return executor
    
    @limit_concurrency(limit=3)
    async def download(client, url, target):
        try:
            resp = await client.get(url, headers={'User-Agent': 'Downloader/1.0'})
            resp.raise_for_status()
            async with aiofiles.open(target, 'wb') as f:
                for chunk in resp.iter_bytes(chunk_size=8192):
                    await f.write(chunk)
        except Exception as e:
            print(f"Error retrieving {url}: {e}")
    
    def url2dest(folder, url):
        filename = url.split('/')[-1]
        filepath = os.path.join(folder, filename)
        return filepath
    
    async def main():
        urls = [
            "https://images-assets.nasa.gov/image/PIA03149/PIA03149~orig.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/3/37/African_Bush_Elephant.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/9/97/The_Earth_seen_from_Apollo_17.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/2/29/%22Arte_Colonial%22.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/d/d2/%22CUT%22_June_1929_04.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/8/82/%22CUT%22_June_1929_05.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/b/b1/%22Council_of_the_Gods%22_in_Galleria_Borghese_%28Rome%29_ceiling.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/7/71/%22East_front_of_Court_House_and_Planter%27s_House.%22.jpg",
            "https://upload.wikimedia.org/wikipedia/commons/b/b6/%22Greater_Germany%22._Major_administrative_divisions._July_1._1944._100_kms_-_btv1b531213280.jpg",
        ] * 2
        folder = '4_asyncio'
        os.makedirs(folder, exist_ok=True)
        async with httpx.AsyncClient() as client:
            tasks = [download(client, url, url2dest(folder, url)) for url in urls]
            await asyncio.gather(*tasks)
    
    if __name__ == '__main__':
        bgn_time = time.time()
        asyncio.run(main())
        end_time = time.time()
        print(f"Asyncio download time: {end_time - bgn_time:.2f} seconds")