pythonasynchronousamazon-s3boto3

How to Optimize Asynchronous Video Upload to S3 Python?


I'm making requests to various API endpoints that contain large mp4 files (>100 MB) and want to populate them to a S3 bucket. I concluded this as a I/O bound task due to the handling of API requests + streaming the video content to upload, which is why I chose to use asynchronous programming. My current code implementation isn't efficient enough when it comes to uploading to S3 however. How I approached it was by iterating over each chunk of the video first, then upload the entire thing, but realized that took a few minutes per video. I then tried uploading each iterated chunk similar to this implementation, but didn't see any improvements. I'm pretty sure this is a issue with how I'm handling each chunk of the video stream, but haven't found many solutions out there to help. I can load the video to my local directory with the each video showing and processing across tasks, but I noticed on S3 it looks like the videos are being processed one at a time. I wanted to check to see if this code is asynchronous? It doesn't appear to be for me due to how long it takes to load to S3. If it isn't, how can I make it asynchronous? This is the example code:

import asyncio
import httpx
from botocore.exceptions import ClientError
import aioboto3
from typing import AsyncIterator


async def async_upload(stream: AsyncIterator[bytes], 
                        destination_key: str, 
                        ) -> None:
        """
        Asynchronously uploads video files via `aioboto3`

        Args:
            stream (AsyncIterator[bytes]): The video stream broken up into iterable chunks
            destination_key (str): The desired destination for the video file
        """

        session = aioboto3.Session()

        async with session.client('s3') as s3:
            resp = await s3.create_multipart_upload(Bucket=bucket_name, Key=destination_key)

            upload_id = resp['UploadId']

            parts = []
            p_number = 1

            try:
                async for chunk in stream:
                    part_resp = await s3.upload_part(
                        Bucket=bucket_name,
                        Key=destination_key,
                        PartNumber=p_number,
                        UploadId=upload_id,
                        Body=chunk
                    )
                    parts.append({
                        'ETag': part_resp['ETag'],
                        'PartNumber': p_number
                    })

                    p_number += 1
                    print('Chunk done')
                
                await s3.complete_multipart_upload(
                    Bucket=bucket_name,
                    Key=destination_key,
                    UploadId=upload_id,
                    MultipartUpload={'Parts': parts}
                )
                print('Upload Done')
            except Exception as e:
                await s3.abort_multipart_upload(
                    Bucket=bucket_name,
                    Key=destination_key,
                    UploadId=upload_id
                )
                print('An error occured', e)


async def upload_video(url):
    # Example url
    async with httpx.AsyncClient(timeout=httpx.Timeout(20, read=10)) as request:
        # Stream to lazily load to memory for faster times
        async with request.stream("GET", url) as response:
            if response.status_code == 200:
                try:
                    s3_key = f'{folder_name}/{file_name}'
                    await async_upload(response.aiter_bytes(chunk_size=100*1024*1024), s3_key) # 100MB chunks
                        
                except ClientError as e:
                    print('Upload Failed', e)
            else:
                print('Bad Response', response.status_code)

async def main():
    urls = ['https://api.example.com/somevideo.mp4', 'https://api.example.com/somevideo.mp4', 
            'https://api.example.com/somevideo.mp4']
    
    tasks = [
        upload_video(url)
        for url in urls
    ]
   
    await asyncio.gather(*tasks)

if __name__ == '__main__':

    bucket_name = 'Test-Bucket'
    folder_name = 'Test'
    file_name = 'Video.MP4'

    asyncio.run(main())

Solution

  • I believe I maximized as much concurrency as this can handle. Any other issues I am having regarding upload times are via my network. Since these are >100MB videos, it's going to take an efficient network speed to upload these videos that I don't have from my home router, so a cloud instance of this script will be required for faster load times. Here is how I made uploading each part asynchronous as well to maximize the concurrency between each video being loaded:

    async def async_upload(stream: AsyncIterator[bytes], 
                            destination_key: str, 
                            ) -> None:
            """
            Asynchronously uploads video files via `aioboto3`
    
            Args:
                stream (AsyncIterator[bytes]): The video stream broken up into iterable chunks
                destination_key (str): The desired destination for the video file
            """
    
            session = aioboto3.Session()
    
            async with session.client('s3') as s3:
                resp = await s3.create_multipart_upload(Bucket=bucket_name, Key=destination_key)
    
                upload_id = resp['UploadId']
    
                p_number = 1
                upload_task = []
    
    
                async def upload_part(part_number: int, data: bytes) -> Dict[str, Any]:
                    part_resp = await s3.upload_part(
                            Bucket=bucket_name,
                            Key=destination_key,
                            PartNumber=part_number,
                            UploadId=upload_id,
                            Body=data
                        )
                    return {
                        'ETag': part_resp['ETag'],
                        'PartNumber': p_number
                    }
    
                try:
                    p_number = 1
                    async for chunk in stream:
                        task = asyncio.create_task(upload_part(p_number, chunk))
                        upload_task.append(task)
                        p_number += 1
    
                    parts = await asyncio.gather(*upload_task)
                    
                    await s3.complete_multipart_upload(
                        Bucket=bucket_name,
                        Key=destination_key,
                        UploadId=upload_id,
                        MultipartUpload={'Parts': parts}
                    )
                    print('Upload Done')
                except Exception as e:
                    await s3.abort_multipart_upload(
                        Bucket=bucket_name,
                        Key=destination_key,
                        UploadId=upload_id
                    )
                    print('An error occurred', e)