I've been refactoring my psycopg2 code using functions, previously I had it all on a try-except-finally block, however I'm not quite sure how to implement a context-manager to handle the connection and cursor. My SQL queries work and look like this:
def random_query(schema, table, username, number_of_files):
random_query = sql.SQL("SELECT * FROM {schema}.{table} WHERE username = {username} ORDER BY RANDOM() LIMIT {limit}").format(
schema=sql.Identifier(schema),
table=sql.Identifier(table),
username=sql.Literal(username),
limit=sql.Literal(number_of_files)
)
cursor.execute(random_query)
return cursor.fetchone()
def insert_query(schema, table, values):
insert_query = sql.SQL("INSERT INTO {schema}.{table}(shortcode, username, filename, extension) VALUES ({shortcode}, {username}, {filename}, {extension})").format(
schema=sql.Identifier(schema),
table=sql.Identifier(table),
shortcode=sql.Literal(values[0]),
username=sql.Literal(values[1]),
filename=sql.Literal(values[2]),
extension=sql.Literal(values[3])
)
cursor.execute(insert_query)
conn.commit()
@contextmanager
def get_connection():
connection = psycopg2.connect(**DB_CONNECTION)
try:
yield connection
except Exception as err:
connection.rollback()
print('Error: ', err)
raise
finally:
if (connection):
connection.close()
print("Connection is closed.")
@contextmanager
def get_cursor(connection):
cursor = connection.cursor()
try:
yield cursor
finally:
cursor.close()
with get_connection() as conn, get_cursor(conn) as cursor:
random_record = random_query('test_schema', 'test_table', 'username', 1)
insert_query('test_schema', 'test_table2', random_record)
@contextmanager
def sql_connection():
connection = psycopg2.connect(**DB_CONNECTION)
cursor = connection.cursor()
try:
yield connection,cursor
except Exception as err:
connection.rollback()
print('Error : ', err)
raise
finally:
if (connection):
cursor.close()
connection.close()
print("Connection is closed")
with sql_connection() as (conn, cursor):
random_record = random_query('test_schema', 'test_table', 'username', 1)
insert_query('test_schema', 'test_table2', random_record)
My questions are:
insert_query
, there is a line that calls conn.commit()
From the documentation, I understand that this is not necessary if we are using a context manager. Can I remove them?Changed in version 2.5: if the connection is used in a with statement, the method is automatically called if no exception is raised in the with block.
Neither version is preferable, you are still over complicating things by duplicating behavior. Per the example here Connection:
import psycopg2
connection = psycopg2.connect(**DB_CONNECTION)
with connection:
with connection.cursor() as cur:
cur.execute(<sql>)
with connection:
with connection.cursor() as cur:
cur.execute(<other_sql>)
Committing, rollback on the connection and closing of cursor is done for you. All you have to do is connection.close()
when you no longer want to use the connection.
UPDATE
The question and answer are for psycopg2
, if you are using psycopg(3)
then the connection context manager behavior has changed in that version. In psycopg(3)
with connection
will close the connection on completion, whereas in psycopg2
it just closed the transaction.