pythonetlprefect

How to dynamically rename a task run based on passed variable from within a Prefect flow for Prefect 2.0?


I have a prefect task that will source a table by name from a database as below. What I would like to do is to have the task run name include the source table variable so that I can easily keep track of it from the Prefect Cloud UI (when I source 10+ tables in the one flow).

@task
def source_table_by_name(id, table_name):
    logger = get_run_logger()

    sql = f"SELECT * FROM {table_name} WHERE upload_id = '{id}'"
    df = pd.read_sql(sql, sd_engine_prod)

    logger.info(f"Source table {table_name} from database")
    return df

What I tried to do initially was put a template in the name to be able to reference the variables passed (I'll be honest, ChatGPT hallucinated this one for me).

@task(name='source_table_by_name_{table_name}')
def source_table_by_name(id, table_name):
    logger = get_run_logger()

    sql = f"SELECT * FROM {table_name} WHERE upload_id = '{id}'"
    df = pd.read_sql(sql, sd_engine_prod)

    logger.info(f"Source table {table_name} from database")
    return df


@flow
def report_flow(upload_id):
    df_table1 = source_table_by_name(upload_id, table1)
    df_table2 = source_table_by_name(upload_id, table2)

I am able to just write a specific task for each table to be sourced so the naming is fixed but clear from the start. However it would be great to have a more DRY approach if possible.


Solution

  • Totally justifiable question, we have an internal WIP to address the issue in a better way and there is also this open issue but for now you could use with_options() and pass the variable through a for loop:

    from prefect import flow, task
    from typing import List
    import time
    
    
    @task
    def hello_world(user: str) -> None:
        print(f"✨ Hello from Prefect, {user}! 👋 📚")
        time.sleep(5)
    
    
    @flow(log_prints=True)
    def hi(
        users: List[str] = [
            "Marvin",
            "Anna",
            "Prefect"
        ]
    ) -> None:
        for user in users:
            hello_world.with_options(name=user).submit(user)
    
    
    if __name__ == "__main__":
        hi()