pythonstored-proceduressqlalchemypsycopg2

How to use SQL Stored Procedures for dynamic tables in sqlalchemy with psycopg2


I need execute SQL query for different dynamic tables.

I try to do it as follows:

import sqlalchemy.pool as pool


def get_conn_pool():
    conn = psycopg2.connect(user=settings.POSTGRES_USER, password=settings.POSTGRES_PASSWORD,
                            database=settings.POSTGRES_DB, host=settings.POSTGRES_HOST, port=settings.POSTGRES_PORT)
    return conn

db_pool = pool.QueuePool(get_conn_pool, max_overflow=10, pool_size=5)

conn = db_pool.connect()
cursor = conn.cursor()

tables = ['current_block', 'tasks']

for table in tables:
    cursor.execute(f'CREATE PROCEDURE GetTableData @TableName nvarchar(30) AS SELECT * FROM @TableName GO; EXEC GetTableData @TableName = {table};')
    result = cursor.fetchall()
    print(result)

but i have an error:

Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
psycopg2.errors.SyntaxError: syntax error at or near "@"
LINE 1: CREATE PROCEDURE GetTableData @TableName nvarchar(30) AS SEL...

Solution

  • PostgreSQL does not use the CREATE PROCEDURE syntax with parameters. it uses CREATE FUNCTION or CREATE PROCEDURE with different parameter handling mechanisms.

    Since your goal is to query multiple tables dynamically, you can achieve this directly in Python without creating stored procedures as well :)

    import psycopg2
    from psycopg2 import sql
    from sqlalchemy.pool import QueuePool
    
    def get_conn_pool():
        return psycopg2.connect(
            user="your_username",
            password="your_password",
            database="your_database",
            host="your_host",
            port="your_port"
        )
    
    db_pool = QueuePool(get_conn_pool, max_overflow=10, pool_size=5)
    tables = ['current_block', 'tasks']
    
    with db_pool.connect() as conn:
        cursor = conn.cursor()
        for table in tables:
            query = sql.SQL("SELECT * FROM {table}").format(
                table=sql.Identifier(table)
            )
            cursor.execute(query)
            result = cursor.fetchall()
            print(f"Data from {table}: {result}")
    

    lets say stored procedures is must in your solution then, you can do the following. Create the stored procedure in postgres

    CREATE OR REPLACE PROCEDURE get_table_data(table_name text)
    LANGUAGE plpgsql
    AS $$
    BEGIN
        EXECUTE format('SELECT * FROM %I', table_name);
    END;
    $$;
    

    and then call it whereever needed with python.

    with db_pool.connect() as conn:
        cursor = conn.cursor()
        for table in tables:
            cursor.execute("CALL get_table_data(%s)", (table,))
            result = cursor.fetchall()
            print(f"Data from {table}: {result}")