pythondatabasegoogle-cloud-platformgoogle-cloud-functionsgoogle-cloud-sql

Google Cloud Functions and Cloud SQL - Intermittent Connection Errors & SQL Admin Quota


I am connecting to Cloud SQL from my Python Cloud Function using the language connector as described in Google docs:

def connect_with_connector(connector: Connector) -> sqlalchemy.engine.base.Engine:
    def getconn() -> pg8000.dbapi.Connection:
        conn: pg8000.dbapi.Connection = connector.connect(
            "instance",
            "pg8000",
            user="service-account",
            db="db_name",
            enable_iam_auth=True,
        )
        return conn

    pool = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
        # ...
    )
    return pool

I pass in a connector from the function, then use SQLAlchemy for executing statements, and afterwards, I close pool and connection:

pool.dispose()
connector.close()

From the beginning, this seemed inefficient to me. It now works most of the time, but every few hours, I get a series of errors that I believe stem from too many connections being set up. It often starts with a Broken Pipe error, then followed by a series of 429 Rate Limit Reached errors from the Google Cloud SQL Admin API and it correlates with my (simple) cloud function reaching its memory limit.

The volume of calls my cloud functions get is not huge, we don't reach any other API rate limits by far.

Does anyone have an idea how to resolve this inside a serverless environment?


Solution

  • This looks like a classic case of needing the lazy refresh option.

    TL;DR:

    connector = Connector(refresh_strategy="lazy")
    

    Also, you'll want to lazily initialize your database connection (and avoid recreating the connector on every function call). Something like this:

    import functions_framework
    import sqlalchemy
    import json
    from google.cloud.sql.connector import Connector, IPTypes
    import pg8000
    
    
    def connect_to_instance() -> sqlalchemy.engine.base.Engine:
        connector = Connector()
    
        def getconn() -> pg8000.dbapi.Connection:
            return connector.connect(
                "...", # the PostgreSQL's instance connection name here
                "pg8000",
                user     = "xyz",
                password = 'supersecret',
                db       = "db_name",
                ip_type  = IPTypes.PUBLIC
            )
        
        return sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator      = getconn,
            pool_size    = 5,
            max_overflow = 2,
            pool_timeout = 30,
            pool_recycle = 1800
        )
    
    
    # lazy initialization of global db
    db = None
    
    
    @functions_framework.http
    def hello_http(request):
        an_id = request.args["ID"]
        # lazy init within request context
        global db
        if not db:
            db = connect_to_instance()
        with db.connect() as conn:
            select_stmt = sqlalchemy.text(
                "SELECT col_1, col_2, col_3,"
                " FROM table_1"
                " WHERE col_1 = :some_id"
                " AND col_2 > 0"
            )
            
            fetch_result = conn.execute(
                select_stmt,
                {"some_id": an_id}
            ).fetchall()
    
        ost_inv = [list(o) for o in fetch_result]
        
        return json.dumps({"fetch_result": fetch_result}, default = str), 200, {"Content-Type": "application/json"}