pythonprefect

How to use Blocks correctly to load AWS S3 credentials in Prefect?


I am using Prefect. And I tried to download a file from S3.

When I hard coded the AWS credentials, the file can be downloaded successfully:

import asyncio

from prefect_aws.s3 import s3_download
from prefect_aws.credentials import AwsCredentials

from prefect import flow, get_run_logger


@flow
async def fetch_taxi_data():
    logger = get_run_logger()
    credentials = AwsCredentials(
        aws_access_key_id="xxx",
        aws_secret_access_key="xxx",
    )
    data = await s3_download(
        bucket="hongbomiao-bucket",
        key="hm-airflow/taxi.csv",
        aws_credentials=credentials,
    )
    logger.info(data)

if __name__ == "__main__":
    asyncio.run(fetch_taxi_data())

Now I tried to load the credentials from Prefect Blocks.

I created a AWS Credentials Block:

enter image description here

However,

aws_credentials_block = AwsCredentials.load("aws-credentials-block")
data = await s3_download(
    bucket="hongbomiao-bucket",
    key="hm-airflow/taxi.csv",
    aws_credentials=aws_credentials_block,
)

throws the error:

AttributeError: 'coroutine' object has no attribute 'get_boto3_session'

And

aws_credentials_block = AwsCredentials.load("aws-credentials-block")
credentials = AwsCredentials(
    aws_access_key_id=aws_credentials_block.aws_access_key_id,
    aws_secret_access_key=aws_credentials_block.aws_secret_access_key,
)
data = await s3_download(
    bucket="hongbomiao-bucket",
    key="hm-airflow/taxi.csv",
    aws_credentials=credentials,
)

throws the error:

AttributeError: 'coroutine' object has no attribute 'aws_access_key_id'

I didn't find any useful document about how to use it.

Am I supposed to use Blocks to load credentials? If it is, what is the correct way to use Blocks correctly in Prefect? Thanks!


Solution

  • I just found the snippet in the screenshot in the question misses an await.

    After adding await, it works now!

    aws_credentials_block = await AwsCredentials.load("aws-credentials-block")
    data = await s3_download(
        bucket="hongbomiao-bucket",
        key="hm-airflow/taxi.csv",
        aws_credentials=aws_credentials_block,
    )
    

    UPDATE:

    Got an answer from Michael Adkins on GitHub, and thanks!

    await is only needed if you're writing an async flow or task. For users writing synchronous code, an await is not needed (and not possible). Most of our users are writing synchronous code and the example in the UI is in a synchronous context so it does not include the await.

    I saw the source code at

    https://github.com/PrefectHQ/prefect/blob/1dcd45637914896c60b7d49254a34e95a9ce56ea/src/prefect/blocks/core.py#L601-L604

    @classmethod
    @sync_compatible
    @inject_client
    async def load(cls, name: str, client: "OrionClient" = None):
        # ...
    

    So I think as long as the function has the decorator @sync_compatible, it means it can be used as both async and sync functions.