pythonmysqlsqlalchemy

SQLAlchemy nested query depending on parent query


How do I write a sqlalchemy subquery which depends on the parent query tables.

I have the following SQLAlchemy models.

class BaseModel( DeclarativeBase ):
    pass

class ABC( BaseModel ):

    __tablename__   = "ABC"
    __table_args__  = (
        Index( "index1", "account_id" ),
        ForeignKeyConstraint( [ "account_id" ], [ "A.id" ], onupdate = "CASCADE", ondelete = "CASCADE" ),
    )

    id:             Mapped[ int ]   = mapped_column( primary_key = True, autoincrement = True )
    account_id:     Mapped[ int ]
    account_idx:    Mapped[ int ]

class A( BaseModel ):

    __tablename__   = "A"
    __table_args__  = (
        Index( "index1", "downloaded", "idx" ),
    )

    id:             Mapped[ int ]   = mapped_column( primary_key = True, autoincrement = True )
    downloaded:     Mapped[ date ]
    account_id:     Mapped[ str ]   = mapped_column( String( 255 ) )
    display_name:   Mapped[ str ]   = mapped_column( String( 255 ) )
    idx:            Mapped[ int ]
    type:           Mapped[ str ]   = mapped_column( String( 255 ) )


    

I need to implement the following SQL in sqlalchemy but I can't work out how to join the subquery correctly.

select
    a.account_id,
    (
        select
            group_concat( a2.account_id )
        from
            ABC abc
            left join A a2 on a2.downloaded = a.downloaded and a2.idx = abc.account_idx
        where
            abc.account_id = a.id
    ) as 'brokerage_client_accounts'
from
    A a
where
    a.downloaded = "2024-11-12" and
    a.type != 'SYSTEM'
order by
    a.account_id
;

I have this so far but it doesn't work..

A2 = aliased( A )

brokerage_client_accounts_subq = select(
    func.aggregate_strings( A2.account_id, "," ).label( "accounts" ),
).select_from(
    A
).outerjoin(
    A2,
    and_( A2.downloaded == A.downloaded, A2.idx == ABC.account_idx )
).where(
    ABC.account_id == A.id
)

stmt = select(
    Account.account_id,
    brokerage_client_accounts_subq.c.accounts,
).where(
    and_(
        A.downloaded == date( 2024, 11, 12 ),
        A.type != "SYSTEM"
    )
).order_by(
    Account.account_id
)

I get the following errors

SAWarning: SELECT statement has a cartesian product between FROM element(s) "anon_1" and FROM element "A".  Apply join condition(s) between each element to resolve.

mysql.connector.errors.ProgrammingError: 1054 (42S22): Unknown column 'ABC.account_idx' in 'on clause'

I think this is because the subquery isn't joining to the "parent" A table

The SQL it produces is... ( I've reformatted it for readability )

SELECT
    `A`.account_id,
    anon_1.accounts
FROM
    `A`,
    (
        SELECT
            group_concat( `A_1`.account_id SEPARATOR %(aggregate_strings_1)s) AS accounts
        FROM
            `A` LEFT OUTER JOIN `A` AS `A_1` ON `A_1`.downloaded = `A`.downloaded AND `A_1`.idx = `ABC`.account_idx, `ABC`
        WHERE
            `ABC`.account_id = `A`.id
    ) AS anon_1
WHERE
    `A`.downloaded = %(downloaded_1)s AND `A`.type != %(type_1)s
ORDER BY
    `A`.account_id


Params:
    { 'aggregate_strings_1': ',', 'downloaded_1': datetime.date(2024, 11, 12), 'type_1': 'SYSTEM' }

Which seems to bear little resemblence to what I require.

Can anyone help please.


Solution

  • I can't comment on the efficiency of your query, but this seems to get what you want. You can use scalar_subquery.

    from datetime import date
    from sqlalchemy import create_engine, Index, ForeignKeyConstraint, String, select, func
    from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column, aliased
    
    class Base(DeclarativeBase):
        pass
    
    class ABC(Base):
        __tablename__ = "ABC"
        __table_args__ = (Index("index1", "account_id"), ForeignKeyConstraint(["account_id"], ["A.id"], onupdate="CASCADE", ondelete="CASCADE"))
        id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
        account_id: Mapped[int]
        account_idx: Mapped[int]
    
    class A(Base):
        __tablename__ = "A"
        __table_args__ = (Index("index1", "downloaded", "idx"),)
        id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
        downloaded: Mapped[date]
        account_id: Mapped[str] = mapped_column(String(255))
        display_name: Mapped[str] = mapped_column(String(255))
        idx: Mapped[int]
        type: Mapped[str] = mapped_column(String(255))
    
    engine = create_engine("mysql+pymysql://")
    Base.metadata.create_all(engine)
    
    with Session(engine) as session:
        # you dont need all aliases, but just for readability
        a = aliased(A, name="a")
        a2 = aliased(A, name="a2")
        abc = aliased(ABC, name="abc")
        subq = (
            select(func.group_concat(a2.account_id))
            .select_from(abc)
            .join(a2, ((a2.downloaded == a.downloaded) & (a2.idx == abc.account_idx)), isouter=True)
            .where(abc.account_id == a.id)
            .scalar_subquery()
            .label("brokerage_client_accounts")
        )
        statement = (
            select(a.account_id, subq)
            .where((a.downloaded == "2024-11-12") & (a.type != "SYSTEM"))
            .order_by(a.account_id)
        )
    

    This generates the following SQL

    SELECT
        a.account_id,
        (
            SELECT
                group_concat(a2.account_id) AS group_concat_1
            FROM
                `ABC` AS abc
            LEFT OUTER JOIN `A` AS a2 ON a2.downloaded = a.downloaded
            AND a2.idx = abc.account_idx
    WHERE
        abc.account_id = a.id) AS brokerage_client_accounts
    FROM
        `A` AS a
    WHERE
        a.downloaded = '2024-11-12'
        AND a.type != 'SYSTEM'
    ORDER BY
        a.account_id