python-3.xaws-lambdaairflowmwaa

Altering "readtimeout" for Lambda Invoke in AirFlow


I have a lambda that consistently takes longer than 1 minute to finish executing. This is a problem with the default LambdaInvokeFunctionOperator, since by default, it's hook creates a Boto3 connection with a default read_timeout of 60s (meaning that after 60s, if the Lambda has not returned any result, the connection will abort with a Read Timeout error).

I solved the problem by creating a custom operator and hook where I was able to change the connection. But holy moly, surely there is a better way! I am an experienced Software Engineer but I hardly have experience with Python. So I fear I over engineered this and there is a better way.

Here is what I did:

from __future__ import annotations

from typing import Any
from botocore.config import Config

from airflow.providers.amazon.aws.utils import trim_none_values
from cached_property import cached_property
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.operators.lambda_function import LambdaInvokeFunctionOperator as BaseLambdaInvokeFunctionOperator

class LambdaInvokeFunctionOperator(BaseLambdaInvokeFunctionOperator):
    @cached_property
    def hook(self) -> LambdaHook:
        return LambdaHook(aws_conn_id=self.aws_conn_id)

class LambdaHook(AwsBaseHook):
    def __init__(self, *args, **kwargs) -> None:
        kwargs["client_type"] = "lambda"
        super().__init__(*args, **kwargs)

    @cached_property
    def conn(self):
        # All this effort to basically be able to pass this config in
        config_dict = {"connect_timeout": 5, "read_timeout": 900, "tcp_keepalive": True}
        config = Config(**config_dict)
        return self.get_client_type(self.region_name, config)

    def invoke_lambda(
        self,
        *,
        function_name: str,
        invocation_type: str | None = None,
        log_type: str | None = None,
        client_context: str | None = None,
        payload: str | None = None,
        qualifier: str | None = None,
    ):
        invoke_args = {
            "FunctionName": function_name,
            "InvocationType": invocation_type,
            "LogType": log_type,
            "ClientContext": client_context,
            "Payload": payload,
            "Qualifier": qualifier,
        }
        return self.conn.invoke(**trim_none_values(invoke_args))

The above code works (tested in Airflow 2.5.1) and my Dags do not suffer from a timeout. Is there a better way?

NOTE: For those that say I should just do this asynchronously, you have a point. However, that's a different question ;)


Solution

  • Wouldn't it be way easier to define the read_timeout in the aws connection? Pass in:

    {"connect_timeout": 5, "read_timeout": 900, "tcp_keepalive": True}
    

    to the extras in an aws connection and then use that connection when calling the LambdaInvokeFunctionOperator