pythonopencvcomputer-visionfastapi

How to pass a video uploaded via FastAPI to OpenCV VideoCapture?


I am trying to upload an mp4 video file using UploadFile in FastAPI. However, the uploaded format is not readable by OpencCV (cv2).

This is my endpoint:

from fastapi import FastAPI, File, UploadFile
from fastapi.responses import PlainTextResponse

@app.post("/video/test", response_class=PlainTextResponse)
async def detect_faces_in_video(video_file: UploadFile):
    
    contents = await video_file.read()
    print(type(video_file)) # <class 'starlette.datastructures.UploadFile'>
    print(type(contents)) # <class 'bytes'>

    return ""

and the two file formats (i.e., bytes and UploadFile) are not readable by OpenCV.


Solution

  • You are trying to pass either the file contents (bytes) or UploadFile object; however, VideoCapture() accepts either a video filename, capturing device or or an IP video stream.

    UploadFile is basically a SpooledTemporaryFile (a file-like object) that operates similar to a TemporaryFile. However, it does not have a visible name in the file system. As you mentioned that you wouldn't be keeping the files on the server after processing them, you could copy the file contents to a NamedTemporaryFile that "has a visible name in the file system, which can be used to open the file" (using the name attribute), as described here and here. As per the documentation:

    Whether the name can be used to open the file a second time, while the named temporary file is still open, varies across platforms (it can be so used on Unix; it cannot on Windows). If delete is true (the default), the file is deleted as soon as it is closed.

    Hence, on Windows you need to set the delete argument to False when instantiating a NamedTemporaryFile, and once you are done with it, you can manually delete it, using the os.remove() or os.unlink() method.

    Below are given two options on how to do that. Option 1 implements a solution using a def endpoint, while Option 2 uses an async def endpoint (utilising the aiofiles library). For more details on the difference between def and async def, as well as the run_in_threadpool function used in Option 2, please have a look at this answer and this answer (Note that if processing of the file involves a CPU-bound task, you might benefit from using ProcessPool instead of Threadpool—it is all explained in the linked answers above). If you expect users to upload rather large files in size that wouldn't fit into your server's RAM, please have a look at this answer and this answer on how to read the uploaded video file in chunks instead.

    Option 1 - Using def endpoint

    from fastapi import FastAPI, File, UploadFile, HTTPException
    from tempfile import NamedTemporaryFile
    import os
    
    @app.post("/video/detect-faces")
    def detect_faces(file: UploadFile = File(...)):
        temp = NamedTemporaryFile(delete=False)
        try:
            try:
                contents = file.file.read()
                with temp as f:
                    f.write(contents);
            except Exception:
                raise HTTPException(status_code=500, detail='Something went wrong')
            finally:
                file.file.close()
            
            res = process_video(temp.name)  # Pass temp.name to VideoCapture()
        except Exception:
            raise HTTPException(status_code=500, detail='Something went wrong when processing the file')
        finally:
            #temp.close()  # the `with` statement above takes care of closing the file
            os.remove(temp.name)
            
        return res
    

    Option 2 - Using async def endpoint

    from fastapi import FastAPI, File, UploadFile, HTTPException
    from tempfile import NamedTemporaryFile
    from fastapi.concurrency import run_in_threadpool
    import aiofiles
    import asyncio
    import os
    
    @app.post("/video/detect-faces")
    async def detect_faces(file: UploadFile = File(...)):
        try:
            async with aiofiles.tempfile.NamedTemporaryFile("wb", delete=False) as temp:
                try:
                    contents = await file.read()
                    await temp.write(contents)
                except Exception:
                    raise HTTPException(status_code=500, detail='Something went wrong')
                finally:
                    await file.close()
            
            res = await run_in_threadpool(process_video, temp.name)  # Pass temp.name to VideoCapture()
        except Exception:
            raise HTTPException(status_code=500, detail='Something went wrong when processing the file')
        finally:
            os.remove(temp.name)
    
        return res