pythonfile-uploadfastapistarlettepython-requests-toolbelt

How to Upload a large File (≥3GB) to FastAPI backend?


I am trying to upload a large file (≥3GB) to my FastAPI server, without loading the entire file into memory, as my server has only 2GB of free memory.

Server side:

async def uploadfiles(upload_file: UploadFile = File(...):

Client side:

m = MultipartEncoder(fields = {"upload_file":open(file_name,'rb')})
prefix = "http://xxx:5000"
url = "{}/v1/uploadfiles".format(prefix)
try:
    req = requests.post(
    url,
    data=m,
    verify=False,
            )

which returns:

HTTP 422 {"detail":[{"loc":["body","upload_file"],"msg":"field required","type":"value_error.missing"}]}

I am not sure what MultipartEncoder actually sends to the server, so that the request does not match. Any ideas?


Solution

  • With requests-toolbelt library, you have to pass the filename as well, when declaring the field for upload_file, as well as set the Content-Type header—which is the main reason for the error you get, as you are sending the request without setting the Content-Type header to multipart/form-data, followed by the necessary boundary string—as shown in the documentation. Example:

    filename = 'my_file.txt'
    m = MultipartEncoder(fields={'upload_file': (filename, open(filename, 'rb'))})
    r = requests.post(url, data=m, headers={'Content-Type': m.content_type})
    print(r.request.headers)  # confirm that the 'Content-Type' header has been set
    

    However, I wouldn't recommend using a library (i.e., requests-toolbelt) that hasn't provided a new release for over three years now. I would suggest using Python requests instead, as demonstrated in this answer and that answer (also see Streaming Uploads and Chunk-Encoded Requests), or, preferably, use the HTTPX library, which supports sending requests asynchronously (if you had to send multiple requests simultaneously), as well as streaming File uploads by default, meaning that only one chunk at a time will be loaded into memory (see the documentation). Examples are given below.

    Option 1 (Fast) - Upload File and Form data using .stream()

    As previously explained in detail in this answer, when you declare an UploadFile object, FastAPI/Starlette, under the hood, uses a SpooledTemporaryFile with the max_size attribute set to 1MB, meaning that the file data is spooled in memory, until the file size exceeds the max_size, at which point the contents will be written to disk; more specifically, the file data will be written to a temporary file on your OS's temporary directory—see this answer on how to find/change the default temporary directory—that you later need to read the data from, using the .read() method. Hence, this whole process makes uploading file quite slow; especially, if it is a large file (as you'll see in Option 2 below later on).

    To avoid that and speed up the process, as the linked answer above suggested, one can access the request body as a stream. As per Starlette documentation, if you use the .stream() method, the (request) byte chunks are provided without storing the entire body into memory (and later to a temporary file, if the body size exceeds 1MB). This method allows you to read and process the byte chunks as they arrive.

    The below takes the suggested solution a step further, by using the streaming-form-data library, which provides a Python parser for parsing streaming multipart/form-data input chunks. This means that not only you can upload Form data along with File(s), but you also don't have to wait for the entire request body to be received, in order to start parsing the data. The way it is done is that you initialise the main parser class (passing the HTTP request headers that help determine the input Content-Type, and hence, the boundary string used to separate each body part in the multipart payload, etc.), and associate one of the Target classes to define what should be done with a field when it has been extracted out of the request body. For instance, FileTarget would stream the data to a file on disk, whereas ValueTarget would hold the data in memory (the ValueTarget class can be used for either Form or File data as well, if you don't need the file(s) saved to the disk). It is also possible to define your own custom Target classes. Iit should be mentioned that the streaming-form-data library does not currently support async calls to I/O operations, meaning that the writing of chunks happens synchronously (within a def function). Though, as the endpoint in the example below uses .stream() (which is an async def function), it will give up control for other tasks/requests to run in the event loop, while waiting for data to become available from the stream. You could also run the function for parsing the received data in a separate thread and await it, using Starlette's run_in_threadpool()—e.g., await run_in_threadpool(parser.data_received, chunk)—which is internally used by FastAPI, when you make calls to the async methods of UploadFile, as shown here. For more details on def vs async def in FastAPI, please have a look at this answer.

    Using the suggested solution below would also allow one to perform certain validation tasks, e.g., ensuring that the input size is not exceeding a certain value—something that could not be done before already loading the file into memory/disk, when using the usual UploadFile approach. The suggested solution achieves that using the MaxSizeValidator. However, as this would only be applied to File/Form fields that you had defined—and hence, it wouldn't prevent a malicious user from sending an extremely large request body (using random File/Form fields, or no fields at all), which could result in consuming server resources in a way that the application may end up crashing or become unresponsive to legitimate users—the example below incorporates a custom MaxBodySizeValidator class that could be used to make sure that the request body size does not exceed a pre-defined maximum value. Both validators desribed above solve the problem of limiting upload file size, as well as the entire request body size, in a likely better way than the one desribed here, which instead uses the UploadFile approach that requires the file to be entirely received and saved to the temporary directory, before performing the validation check (not to mention that the approach described in that github post does not take into account the request body size at all, which makes the approach vulnerable to the attack mentioned earlier, where malicious actors may attempt to overload the server with excessively large requests). Using an ASGI middleware such as this could be an alternative solution for limiting the request body size. Also, in case you are using Gunicorn with Uvicorn, you could also define limits with regards to, for example, the number of HTTP header fields in a request, the size of an HTTP request header field, and so on (see the documentation). Similar limits could also be applied when using reverse proxy servers, such as Nginx (which also allows you to set the maximum request body size using the client_max_body_size directive).

    A few notes for the example below. Since this approach uses the Request object directly, and not UploadFile and Form objects, the endpoint won't be properly documented in the Swagger auto-generated docs at /docs (if that's important for your application at all). This also means that you have to perform some validation checks on your own, such as whether the required fields for the endpoint were received or not, and if yes, whether they were in the expected format. For instance, for the data field, you could check whether the data.value is empty or not (empty would mean that the user has either not included that field in the multipart/form-data, or sent an empty value), as well as if isinstance(data.value, str). As for the file(s), you could check whether file_.multipart_filename is not empty; however, since a filename could likely not be included in the Content-Disposition by the user in their client request, you would also may want to check if the file exists in the filesystem, using os.path.isfile(filepath), in order to ensure that a file has been indeed uploaded (Note: you need to make sure there is no pre-existing file with the same name in that specified location; otherwise, the aforementioned function would always return True, even when the user did not send the file. You could always generate unique UUIDs for the filenames, as suggested here and here).

    Regarding the applied size limits, the MAX_REQUEST_BODY_SIZE below must be larger than MAX_FILE_SIZE (plus all of the Form values size) you expcect to receive, as the raw request body (that you get from using the .stream() method) includes a few more bytes for the --boundary and Content-Disposition header for each of the fields in the body. Hence, you should add a few more bytes, depending on the Form values and the number of files you expect to receive (hence the MAX_FILE_SIZE + 1024 below).

    app.py

    from fastapi import FastAPI, Request, HTTPException, status
    from streaming_form_data import StreamingFormDataParser
    from streaming_form_data.targets import FileTarget, ValueTarget
    from streaming_form_data.validators import MaxSizeValidator
    import streaming_form_data
    from starlette.requests import ClientDisconnect
    import os
    
    MAX_FILE_SIZE = 1024 * 1024 * 1024 * 4  # = 4GB
    MAX_REQUEST_BODY_SIZE = MAX_FILE_SIZE + 1024
    
    app = FastAPI()
    
    class MaxBodySizeException(Exception):
        def __init__(self, body_len: str):
            self.body_len = body_len
    
    class MaxBodySizeValidator:
        def __init__(self, max_size: int):
            self.body_len = 0
            self.max_size = max_size
    
        def __call__(self, chunk: bytes):
            self.body_len += len(chunk)
            if self.body_len > self.max_size:
                raise MaxBodySizeException(body_len=self.body_len)
     
    @app.post('/upload')
    async def upload(request: Request):
        body_validator = MaxBodySizeValidator(MAX_REQUEST_BODY_SIZE)
        filename = request.headers.get('Filename')
        
        if not filename:
            raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 
                detail='Filename header is missing')
        try:
            filepath = os.path.join('./', os.path.basename(filename)) 
            file_ = FileTarget(filepath, validator=MaxSizeValidator(MAX_FILE_SIZE))
            data = ValueTarget()
            parser = StreamingFormDataParser(headers=request.headers)
            parser.register('file', file_)
            parser.register('data', data)
            
            async for chunk in request.stream():
                body_validator(chunk)
                parser.data_received(chunk)
        except ClientDisconnect:
            print("Client Disconnected")
        except MaxBodySizeException as e:
            raise HTTPException(status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, 
               detail=f'Maximum request body size limit ({MAX_REQUEST_BODY_SIZE} bytes) exceeded ({e.body_len} bytes read)')
        except streaming_form_data.validators.ValidationError:
            raise HTTPException(status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, 
                detail=f'Maximum file size limit ({MAX_FILE_SIZE} bytes) exceeded') 
        except Exception:
            raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 
                detail='There was an error uploading the file') 
       
        if not file_.multipart_filename:
            raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail='File is missing')
    
        print(data.value.decode())
        print(file_.multipart_filename)
            
        return {"message": f"Successfuly uploaded {filename}"}
    

    As mentioned earlier, to upload the data (on client side), you can use the HTTPX library, which supports streaming file uploads by default, and thus allows you to send large streams/files without loading them entirely into memory. You can pass additional Form data as well, using the data argument. Below, a custom header, i.e., Filename, is used to pass the filename to the server, so that the server instantiates the FileTarget class with that name (you could use the X- prefix for custom headers, if you wish; however, it is not officially recommended anymore).

    test.py

    import httpx
    import time
    
    url ='http://127.0.0.1:8000/upload'
    files = {'file': open('bigFile.zip', 'rb')}
    headers = {'Filename': 'bigFile.zip'}
    data = {'data': 'Hello World!'}
    
    with httpx.Client() as client:
        start = time.time()
        r = client.post(url, data=data, files=files, headers=headers)
        end = time.time()
        print(f'Time elapsed: {end - start}s')
        print(r.status_code, r.json(), sep=' ')
    

    Upload Multiple Files and Form data using .stream()

    To upload multiple files, use a header for each filename, or use random names on server side, and once the file is fully uploaded, you could optionally rename it to file_.multipart_filename, for instance—regardless, in a real-world scenario, you should never trust the filename (or even the file extension) passed by the user, as it might be malicious, trying to extract or replace files in your system, and thus, it is always a good practice to add some random alphanumeric characters at the end/front of the filename, if not using a complete random name, for each file that is uploaded—and pass a list of files, as described in the httpx's documentation. Note that you should use a different key/field name for each file, so that they don't overlap when parsing them on server side, e.g., files = [('file0', open('bigFile.zip', 'rb')),('file1', open('otherBigFile.zip', 'rb'))]. Finally, define the Target classes (either FileTarget or ValueTarget) on server side, accordingly.

    You could test the example below, using either the HTML template at /, which uses JavaScript to prepare and send the request with multiple files, or the Python httpx client provided below.

    For simplicity purposes, the example below does not perform validation checks on the body size; however, if you wish, you could still perform those checks using the code provided in the previous example.

    app.py

    from fastapi import FastAPI, Request, HTTPException, status
    from fastapi.responses import HTMLResponse
    from starlette.requests import ClientDisconnect
    from urllib.parse import unquote
    import streaming_form_data
    from streaming_form_data import StreamingFormDataParser
    from streaming_form_data.targets import FileTarget, ValueTarget
    import os
    
    
    app = FastAPI()
    
       
    @app.get('/')
    async def main():
        content = """
        <!DOCTYPE html>
        <html>
           <body>
              <input type="file" id="fileInput" name="files" onchange="reset()" multiple><br>
              <input type="button" value="Submit" onclick="submitUsingFetch()">
              <p id="resp"></p>
              <script>
                 function reset() {
                    var resp = document.getElementById("resp");
                    resp.innerHTML = "";
                    resp.style.color = "black";
                 }
                 
                 function submitUsingFetch() {
                    var resp = document.getElementById("resp");
                    var fileInput = document.getElementById('fileInput');
                    if (fileInput.files[0]) {
                       var formData = new FormData();
                       var headers = new Headers();
                       formData.append("data", "Hello World!");
                 
                       var i = 0;
                       for (const file of fileInput.files) {
                          filename = encodeURI(file.name);
                          headers.append(`filename${i}`, filename);
                          formData.append(`file${i}`, file, filename);
                          i++;
                       }
                 
                       fetch('/upload', {
                             method: 'POST',
                             headers: headers,
                             body: formData,
                          })
                          .then(response => response.json())
                          .then(data => {
                             resp.innerHTML = JSON.stringify(data); // data is a JSON object
                          })
                          .catch(error => {
                             console.error(error);
                          });
                    } else {
                       resp.innerHTML = "Please choose some file(s)...";
                       resp.style.color = "red";
                    }
                 }
              </script>
           </body>
        </html>
        """
        return HTMLResponse(content=content)   
    
    
    @app.post('/upload')
    async def upload(request: Request):
        try:
            parser = StreamingFormDataParser(headers=request.headers)
            data = ValueTarget()
            parser.register('data', data)
            
            headers = dict(request.headers)
            filenames = []
            i =0
            while True:
                filename =  headers.get(f'filename{i}', None)
                if filename is None:
                    break
                filename = unquote(filename)
                filenames.append(filename)
                filepath = os.path.join('./', os.path.basename(filename)) 
                file_ = FileTarget(filepath)
                parser.register(f'file{i}', file_)
                i += 1
                    
            async for chunk in request.stream():
                parser.data_received(chunk)
        except ClientDisconnect:
            print("Client Disconnected")
        except Exception:
            raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 
                detail='There was an error uploading the file') 
        
        print(data.value.decode())
        return {"message": f"Successfuly uploaded {filenames}"}
    

    test.py

    import httpx
    import time
    
    url ='http://127.0.0.1:8000/upload'
    headers = {'filename0': 'bigFile.zip', 'filename1': 'otherBigFile.zip'}
    files = [('file0', open('bigFile.zip', 'rb')), ('file1', open('otherBigFile.zip', 'rb'))]
    data = {'data': 'Hello World!'}
    
    with httpx.Client() as client:
        start = time.time()
        r = client.post(url, data=data, files=files, headers=headers)
        end = time.time()
        print(f'Time elapsed: {end - start}s')
        print(r.status_code, r.json(), sep=' ')
    

    Upload both Files and JSON body

    In case you would like to upload both file(s) and JSON instead of Form data, you could use the approach described in Method 3 of this answer, thus also saving you from performing manual checks on the received Form fields, as explained earlier (see the linked answer for more details). To that end, please make the following changes in the code above. For an HTML/JS example, please refer to this answer.

    app.py

    #...
    from fastapi import Form
    from pydantic import BaseModel, ValidationError
    from typing import Optional
    from fastapi.encoders import jsonable_encoder
    
    #...
    
    class Base(BaseModel):
        name: str
        point: Optional[float] = None
        is_accepted: Optional[bool] = False
      
    def checker(data: str = Form(...)):
        try:
            return Base.model_validate_json(data)
        except ValidationError as e:
            raise HTTPException(detail=jsonable_encoder(e.errors()), status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
            
    
    @app.post('/upload')
    async def upload(request: Request):
        #...
        
        # place the below after the try-except block in the example given earlier
        model = checker(data.value.decode())
        print(dict(model))
    

    test.py

    #...
    import json
    
    data = {'data': json.dumps({"name": "foo", "point": 0.13, "is_accepted": False})}
    #...
    

    Option 2 (Slow) - Upload File and Form data using UploadFile and Form

    If you would like to use a normal def endpoint instead, see this answer.

    app.py

    from fastapi import FastAPI, File, UploadFile, Form, HTTPException, status
    import aiofiles
    import os
    
    CHUNK_SIZE = 1024 * 1024  # adjust the chunk size as desired
    app = FastAPI()
    
    @app.post("/upload")
    async def upload(file: UploadFile = File(...), data: str = Form(...)):
        try:
            filepath = os.path.join('./', os.path.basename(file.filename))
            async with aiofiles.open(filepath, 'wb') as f:
                while chunk := await file.read(CHUNK_SIZE):
                    await f.write(chunk)
        except Exception:
            raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 
                detail='There was an error uploading the file')
        finally:
            await file.close()
    
        return {"message": f"Successfuly uploaded {file.filename}"}
    

    As mentioned earlier, using this option would take longer for the file upload to complete, and as HTTPX uses a default timeout of 5 seconds, you will most likely get a ReadTimeout exception (as the server will need some time to read the SpooledTemporaryFile in chunks and write the contents to a permanent location on the disk). Thus, you can configure the timeout (see the Timeout class in the source code too), and more specifically, the read timeout, which "specifies the maximum duration to wait for a chunk of data to be received (for example, a chunk of the response body)". If set to None instead of some positive numerical value, there will be no timeout on read.

    test.py

    import httpx
    import time
    
    url ='http://127.0.0.1:8000/upload'
    files = {'file': open('bigFile.zip', 'rb')}
    headers = {'Filename': 'bigFile.zip'}
    data = {'data': 'Hello World!'}
    timeout = httpx.Timeout(None, read=180.0)
    
    with httpx.Client(timeout=timeout) as client:
        start = time.time()
        r = client.post(url, data=data, files=files, headers=headers)
        end = time.time()
        print(f'Time elapsed: {end - start}s')
        print(r.status_code, r.json(), sep=' ')