I am connecting to Cloud SQL from my Python Cloud Function using the language connector as described in Google docs:
def connect_with_connector(connector: Connector) -> sqlalchemy.engine.base.Engine:
def getconn() -> pg8000.dbapi.Connection:
conn: pg8000.dbapi.Connection = connector.connect(
"instance",
"pg8000",
user="service-account",
db="db_name",
enable_iam_auth=True,
)
return conn
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
# ...
)
return pool
I pass in a connector from the function, then use SQLAlchemy for executing statements, and afterwards, I close pool and connection:
pool.dispose()
connector.close()
From the beginning, this seemed inefficient to me. It now works most of the time, but every few hours, I get a series of errors that I believe stem from too many connections being set up. It often starts with a Broken Pipe
error, then followed by a series of 429 Rate Limit Reached
errors from the Google Cloud SQL Admin API and it correlates with my (simple) cloud function reaching its memory limit.
The volume of calls my cloud functions get is not huge, we don't reach any other API rate limits by far.
Does anyone have an idea how to resolve this inside a serverless environment?
This looks like a classic case of needing the lazy refresh option.
TL;DR:
connector = Connector(refresh_strategy="lazy")
Also, you'll want to lazily initialize your database connection (and avoid recreating the connector on every function call). Something like this:
import functions_framework
import sqlalchemy
import json
from google.cloud.sql.connector import Connector, IPTypes
import pg8000
def connect_to_instance() -> sqlalchemy.engine.base.Engine:
connector = Connector()
def getconn() -> pg8000.dbapi.Connection:
return connector.connect(
"...", # the PostgreSQL's instance connection name here
"pg8000",
user = "xyz",
password = 'supersecret',
db = "db_name",
ip_type = IPTypes.PUBLIC
)
return sqlalchemy.create_engine(
"postgresql+pg8000://",
creator = getconn,
pool_size = 5,
max_overflow = 2,
pool_timeout = 30,
pool_recycle = 1800
)
# lazy initialization of global db
db = None
@functions_framework.http
def hello_http(request):
an_id = request.args["ID"]
# lazy init within request context
global db
if not db:
db = connect_to_instance()
with db.connect() as conn:
select_stmt = sqlalchemy.text(
"SELECT col_1, col_2, col_3,"
" FROM table_1"
" WHERE col_1 = :some_id"
" AND col_2 > 0"
)
fetch_result = conn.execute(
select_stmt,
{"some_id": an_id}
).fetchall()
ost_inv = [list(o) for o in fetch_result]
return json.dumps({"fetch_result": fetch_result}, default = str), 200, {"Content-Type": "application/json"}