pythonpandasamazon-web-servicesaws-data-wrangler

AWS Data Wrangler s3.to_parquet replicate current S3 path structure


When using wr.s3.to_parquet I can construct a path with a Formatted string literal and have existing folders using the pattern.

def SaveInS3_test(Ticker, Granularity, Bucket, df, keyPrefix=""):

    year, month, day = datetime.utcnow().strftime("%Y/%m/%d/%H").split("/")[0:3]

    path = (
        f"s3://{Bucket}/{keyPrefix}{year}/{month}/{day}/{Ticker}/{Granularity}.parquet"
    )
    
    print(path)

    wr.s3.to_parquet(df, path, index=True, dataset=True, mode="append")


df=pd.DataFrame({'col': [1, 2, 3]})
SaveInS3_test("GBP", "H1","my_bucket", df, keyPrefix="Test/")

The path would then be something like this:

s3://my_bucket/Test/2022/08/06/GBP/H1.parquet

I would like to use the Athena/Glue database functionality of wrangler as follows (this works):

wr.s3.to_parquet(
    df=df,
    path=f's3://my_bucket',
    dataset=True,
    database='default',  # Athena/Glue database
    table='my_table')  # Athena/Glue table

Can I use my F-string approach to path structure in some way with this database functionality?:

s3://my_bucket/Test/2022/08/06/GBP/H1.parquet

I'm not sure how I would use partitions or similar to do this.

Any attempts I make to use a path return an InvalidArgumentValue as it does not match the existing Glue catalog table path.


Solution

  • Solution

    def save_in_s3(df, ticker, granularity, bucket, prefix):
        dt = datetime.utcnow()
    
        # Important! Create new partition cols
        df = df.assign(**{
            'ticker': ticker, 'granularity': granularity,
            'year': dt.year, 'month': dt.month, 'day': dt.day
        })
    
        # Write to s3
        wr.s3.to_parquet(
            df,
            path=f's3://{bucket}/{prefix}',
            table='table_name',
            database='database_name',
            index=False,
            dataset=True,
            partition_cols=['year', 'month', 'day', 'ticker', 'granularity']
        )
    

    Worked out example

    Let's assume we call the function on two different dates with different args:

    # Called on 2022-08-06
    save_in_s3(df, ticker='GBP', granularity='H1', bucket='my_bucket', prefix='my_folder')
    
    # Called on 2022-08-07
    save_in_s3(df, ticker='INR', granularity='H2', bucket='my_bucket', prefix='my_folder')
    

    Now, assuming you already have the database created in the glue, the above function calls will output the following files in s3:

    s3://my_bucket/my_folder/year=2022/month=8/day=6/ticker=GBP/granularity=H1/*.parquet
    s3://my_bucket/my_folder/year=2022/month=8/day=7/ticker=INR/granularity=H2/*.parquet
    

    The corresponding athena table will have the following partitions:

    SHOW PARTITIONS "database_name"."table_name";
    
    -- year=2022/month=8/day=6/ticker=GBP/granularity=H1
    -- year=2022/month=8/day=7/ticker=INR/granularity=H2
    

    Essentially the partitions and s3 folder structure will be automatically created by wrangler for you.