I've got a simple Flask API used for fetching data from different databases for display within a react app. I recently put out a new version of the API which seems to have broken it, although I'm not sure why. I've narrowed down the point of failure to the cursor.execute call in these functions:
def get_sites(self):
query = "SELECT * FROM some_table"
cursor = self.conn.cursor(dictionary=True)
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
return results
def get_assets(self):
query = "SELECT * FROM some_table"
cursor = self.conn.cursor(dictionary=True)
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
return results
Occasionally, this will throw a "Lost connection to MySQL server during query" error, but this doesn't happen every time; it usually just dies silently.
Here is how I am establishing and saving SQL connections for use at the top of my Flask app:
CUSTOMER_DBS = {}
customer_db_routing_config = configparser.ConfigParser()
customer_db_routing_config.optionxform = str
customer_db_routing_config.read_file(open("customer_db_routing.ini"))
for token in customer_db_routing_config['customers']:
customer_db_config = configparser.ConfigParser()
customer_db_config.read_file(open(customer_db_routing_config['customers'][token]))
customer_db_conn = DatabaseInterface(customer_db_config)
CUSTOMER_DBS[token] = customer_db_conn
Where DatabaseInterface is a custom class for executing queries with the following init:
def __init__(self, config):
self.config = config
self.conn = mysql.connector.connect(user=config['mysql']['user'],
password=config['mysql']['password'],
host=config['mysql']['host'])
cursor = self.conn.cursor()
cursor.execute("SET SESSION TRANSACTION READ ONLY")
Yes I'm aware I'm not closing the cursor, however for now I left it as-is since it doesn't affect the bug.
Here are the two routes the issue occurs in:
@app.route("/api/customer/<string:token>/site")
@key_required
def sites(key, token):
if token in FILTERED_TOKENS:
return {"error": f"No valid customer with token {token}" }
site_name = request.args.get("name")
site_type = request.args.get("type")
if len(request.args) > 2:
return {"error": "Only ONE of the following parameters may be used: name, type"}
# use customer token to get path to customer db config
if not CUSTOMER_DBS[token].conn.is_connected():
CUSTOMER_DBS[token].conn.reconnect()
# get sites from customer db
if len(request.args) == 1:
results = CUSTOMER_DBS[token].get_sites()
elif site_name:
results = CUSTOMER_DBS[token].get_site_by_name(site_name)
elif site_type:
results = CUSTOMER_DBS[token].get_sites_by_type(site_type)
# base64 encode logo png and info_json so they are json serializable
for row in results:
if row["logo"]:
row["logo"] = base64.b64encode(row["logo"]).decode("utf-8")
# this column is oddly missing from <database_name>
if "info_json" in row.keys():
if row["info_json"]:
row["info_json"] = base64.b64encode(row["info_json"]).decode("utf-8")
return results
@app.route("/api/customer/<string:token>/asset")
@key_required
def assets(key, token):
if token in FILTERED_TOKENS:
return {"error": f"No valid customer with token {token}"}
asset_name = request.args.get("name")
site_name = request.args.get("site_name")
site_id = request.args.get("site_id")
superasset_id = request.args.get("superasset_id")
superasset_name = request.args.get("superasset_name")
address = request.args.get("address")
if len(request.args) > 2:
return {"error": "Only ONE of the following parameters may be used: name, site_name, site_id, superasset_id, superasset_name, address"}
# use customer token to get path to customer db config
if not CUSTOMER_DBS[token].conn.is_connected():
CUSTOMER_DBS[token].conn.reconnect()
# get assets from customer db
if len(request.args) == 1:
results = CUSTOMER_DBS[token].get_assets()
elif asset_name:
results = CUSTOMER_DBS[token].get_asset_by_name(asset_name)
elif site_name:
results = CUSTOMER_DBS[token].get_assets_by_site_name(site_name)
elif site_id:
results = CUSTOMER_DBS[token].get_assets_by_site_id(site_id)
elif superasset_id:
results = CUSTOMER_DBS[token].get_assets_by_superasset_id(superasset_id)
elif superasset_name:
results = CUSTOMER_DBS[token].get_assets_by_superasset_name(superasset_name)
elif address:
results = CUSTOMER_DBS[token].get_assets_by_address(address)
# base64 encoding blobs to avoid errors when returning
for row in results:
if row["info_json"]:
row["info_json"] = base64.b64encode(row["info_json"]).decode("utf-8")
if row["rt_condition_json"]:
row["rt_condition_json"] = base64.b64encode(row["rt_condition_json"]).decode("utf-8")
# filter out NAA
results = [ row for row in results if row["asset_tag"] != "NAA" ]
return results
Another interesting piece of information: the issue occurs on a specific page of the react app, which calls both of these routes. However, if I revert to the previous version of the API, it works, so I'm pretty sure it's not a problem with the react component. The main thing I changed in the new API version is creating the DB connections on startup and saving them in a dictionary. Previously, each route would create a new DB connection and then close it after making the query. So my thinking is this has something to do with the problem, however this method of storing persistent DB connections is something I've done before in other applications, so I'm really at a loss here. In the browser console I can see ERR_CONNECTION_RESET, but other than that it's mostly a silent failure.
EDIT: Found the problem. It was in this code from the react component:
useEffect(() => {
// track the selected customer by saving the token passed via useLocation
setCustomerToken(location.state.token)
// get the install sites of the selected token
client.get_sites(location.state.token).then((res) => {
setInstallSites(res) // extract only the customer names, that's all we need res.map(site => site.customer_name)
console.log(res)
})
.catch((error) => {
console.error('Error fetching customers:', error);
});
client.get_assets(location.state.token).then((res) => {
setAssets(res.map(asset => asset.asset_tag)) // extract only asset tags, that's all we need
})
.catch((error) => {
console.error('Error fetching assets:', error);
});
}, []);
I think what's happening is since these client methods are asynchronous (they return promises), the second method tries to do a query to the DB while the first one hasn't completed yet. This must cause mysql-connector-python and thus Flask to panic and quit for some reason. This can be fixed by ensuring these calls occur one after the other by using .then, although I don't really like this.
The solution (without changing the react component a.k.a chaining .then) was to use connection pooling. Here is the updated init:
def __init__(self, config):
self.config = config
self.conn_pool = mysql.connector.pooling.MySQLConnectionPool(user=config['mysql']['user'],
password=config['mysql']['password'],
host=config['mysql']['host'],
pool_name=config['mysql']['host'].split('.')[0])
conn = self.conn_pool.get_connection()
cursor = conn.cursor()
cursor.execute("SET SESSION TRANSACTION READ ONLY")
cursor.close()
conn.close()
Here is an example query:
def get_customers(self):
query = "SELECT * FROM some_table"
conn = self.conn_pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute(query)
results = cursor.fetchall()
cursor.close()
conn.close()
return results
The conn.close() is essential as when you using connection pooling it doesn't kill the connection but rather returns it to the connection pool. If you don't call close() then your pool will quickly be exhausted.
And here is an example route:
@app.route("/api/customer")
@key_required
def customers(key):
results = CENTRAL_DB.get_customers()
# filter the results
results = [ c for c in results if c["mqtt_customer_token"] not in FILTERED_TOKENS ]
return results