pythonpython-3.xgoogle-cloud-functionsmoviepy

Google Cloud Function using moviepy slows down and times out


I have written a program that takes in a youtubeURL, parses the transcript into clips, downloads the original video using moviepy, trims it into the clips, then uploads them to Google Storage. I can confirm that the function works locally. However, when I deploy and call the function (using @https_fn.on_request() and @tasks_fn.on_task_dispatched, it works correctly but at a dramatically slower rate, eventually timing out despite having the timeout set to 60 minutes.

Has anyone been able to handle rather large video files using moviepy in google cloud functions?

BACKUP_COUNT = 1 # Define this as per your needs
HOURLY_BATCH_SIZE = 5 # Define this as per your needs
BACKUP_START_DATE = datetime(2023, 1, 1) # Define this as per your needs

@https_fn.on_request()
def queue_video_task(req: https_fn.Request) -> https_fn.Response:
    """Adds backup tasks to a Cloud Tasks queue."""

    request_data = req.get_json()
    youtubeURL = request_data.get("url")
    authorID = request_data.get("authorID")

    logger.info(f"Received request for {youtubeURL}")

    id = uuid.uuid4()

    if not youtubeURL:
        return https_fn.Response('Missing URL', status=400)

    task_queue = functions.task_queue("ingestVideo") #enqueueing ingestVideo()
    target_uri = get_function_url("ingestVideo")

    schedule_time = datetime.now()

    dispatch_deadline_seconds = 60 * 30  # 60 minutes

    doc_ref = firestore.client().collection('processing').document(f"{id}")
    
    doc = doc_ref.get()
    
    if doc.exists:
        if doc._data['processing'] == False: # update document in case another function is dispatched
            return https_fn.Response('Video already processed', status=202)

    data = {
        "url" : youtubeURL,
        "authorID" : authorID,
        "processing" : True
    }
    doc_ref.set(data)

    # backup_date = BACKUP_START_DATE + timedelta(days=i)
    body = {
        "data": {
            "url": youtubeURL,
            "authorID" : authorID,
            "id": f"{id}"
            }
        }

    task_options = functions.TaskOptions(
        schedule_time=schedule_time,
        dispatch_deadline_seconds=dispatch_deadline_seconds,
        uri=target_uri
    )

    logger.info("Updated document")
    logger.info(f"Sent with body: {body}")
    
    task_queue.enqueue(body, task_options)
    return https_fn.Response(status=200, response=f"Enqueued {BACKUP_COUNT} tasks")

@tasks_fn.on_task_dispatched(
        retry_config=RetryConfig(max_attempts=5, min_backoff_seconds=1200),
        rate_limits=RateLimits(max_concurrent_dispatches=1), 
        memory=MemoryOption.GB_4, 
        timeout_sec=3600, 
        secrets=["OPENAI_KEY"])
def ingestVideo(req: tasks_fn.CallableRequest) -> str:

    youtubeURL = req.data["url"]
    id = req.data["id"]  # Changed to avoid overwriting the original UUID
    authorID = req.data["authorID"]  # Changed to avoid overwriting the original UUID

    logger.info(f"Received body: {req.data}")

    openai_api_key = os.environ.get('OPENAI_KEY')
    client = OpenAI(api_key=openai_api_key)

    logger.info(f"Request: {youtubeURL}")

    if not operation_is_complete(id):
        logger.info(f"Starting")
        subtitles = getSubtitles(youtubeURL)
        logger.info(f"Got Subtitles")
        chunks = chunk_subtitles(subtitles = subtitles)
        logger.info(f"Got Chunks")
        results = analyze_chunks_with_llm(chunks, client)
        logger.info(f"Finished clipping: found {len(results)}")

        # downloaded_video_path = download_youtube_video(link)
        video, audio = download_youtube_video(youtubeURL)
        logger.info("Downloaded")
        combined_buffer_path = combine_video_audio(video, audio, output_path=f"{id}.mp4")
        logger.info("Combined")

    for i in range(len(results)):
        if not operation_is_complete(id):
            clip = results[i]
            id = uuid.uuid4()
            path = f"{id}_clip{i}.mp4"
            logger.info("Uploading Clip")
            trimmed_video_path = trim_video(combined_buffer_path, start_time=timestamp_to_seconds(clip['start']), end_time=timestamp_to_seconds(clip['end']), output_path=f'{path}')
            upload_blob_from_memory(trimmed_video_path, path)
            create_document(id, path, clip, authorID)
            logger.info("Documented Clip")

    db = firestore.client()
    doc_ref = db.collection("processing").document(f"{id}")
    doc = doc_ref.get()
    if doc.exists:
        data = doc.to_dict()
        data['processing'] = False
        doc_ref.update(data)
    
    return "Episode Processing Completed"```

Solution

  • Your local machine is likely much, much more powerful than the default server configuration that runs Cloud Functions. The only thing you can realistically do to speed up a CPU-bound process is to configure a more powerful server for the function that needs to run faster.

    If you've maxed out the configuration, then your workload just isn't suitable for Cloud Functions, and you should look into different options that meet your requirements. Cloud Functions just isn't meant to be a product for high performance compute jobs. There are many other GCP products for that, such as Compute Engine (but they will almost certainly be more difficult to use and cost much more).