I am using Prefect. And I tried to download a file from S3.
When I hard coded the AWS credentials, the file can be downloaded successfully:
import asyncio
from prefect_aws.s3 import s3_download
from prefect_aws.credentials import AwsCredentials
from prefect import flow, get_run_logger
@flow
async def fetch_taxi_data():
logger = get_run_logger()
credentials = AwsCredentials(
aws_access_key_id="xxx",
aws_secret_access_key="xxx",
)
data = await s3_download(
bucket="hongbomiao-bucket",
key="hm-airflow/taxi.csv",
aws_credentials=credentials,
)
logger.info(data)
if __name__ == "__main__":
asyncio.run(fetch_taxi_data())
Now I tried to load the credentials from Prefect Blocks.
I created a AWS Credentials Block:
However,
aws_credentials_block = AwsCredentials.load("aws-credentials-block")
data = await s3_download(
bucket="hongbomiao-bucket",
key="hm-airflow/taxi.csv",
aws_credentials=aws_credentials_block,
)
throws the error:
AttributeError: 'coroutine' object has no attribute 'get_boto3_session'
And
aws_credentials_block = AwsCredentials.load("aws-credentials-block")
credentials = AwsCredentials(
aws_access_key_id=aws_credentials_block.aws_access_key_id,
aws_secret_access_key=aws_credentials_block.aws_secret_access_key,
)
data = await s3_download(
bucket="hongbomiao-bucket",
key="hm-airflow/taxi.csv",
aws_credentials=credentials,
)
throws the error:
AttributeError: 'coroutine' object has no attribute 'aws_access_key_id'
I didn't find any useful document about how to use it.
Am I supposed to use Blocks to load credentials? If it is, what is the correct way to use Blocks correctly in Prefect? Thanks!
I just found the snippet in the screenshot in the question misses an await
.
After adding await
, it works now!
aws_credentials_block = await AwsCredentials.load("aws-credentials-block")
data = await s3_download(
bucket="hongbomiao-bucket",
key="hm-airflow/taxi.csv",
aws_credentials=aws_credentials_block,
)
UPDATE:
Got an answer from Michael Adkins on GitHub, and thanks!
await
is only needed if you're writing an async flow or task. For users writing synchronous code, anawait
is not needed (and not possible). Most of our users are writing synchronous code and the example in the UI is in a synchronous context so it does not include the await.
I saw the source code at
@classmethod
@sync_compatible
@inject_client
async def load(cls, name: str, client: "OrionClient" = None):
# ...
So I think as long as the function has the decorator @sync_compatible
, it means it can be used as both async and sync functions.