Pandas lets you pass an AWS S3 path directly to .to_csv()
and .to_parquet()
.
There's a storage_options
argument for passing S3 specific arguments.
I would like to call .to_csv('s3://bucket/key.csv', storage_options=something)
and specify S3 object tags to apply to the uploaded object, as something
.
I've read the docs and I can't figure out how,
The pandas docs don't list the possible values for storage_options
, they just point to fsspec
. It looks like pandas calls fsspec which calls s3fs which calls aiobotocore which calls botocore, and that probably calls s3transfer. How can I pass S3 tag arguments all the way down this rabbit hole?
import pandas as pd
import boto3
bucket = 'mybucket' # change for your bucket
key = 'test/pandas/tags.csv'
tags = {'mytag': 'x'}
df = pd.DataFrame([{'a': 1}])
df.to_csv(f"s3://{bucket}/{key}") # try without any tags first
df.to_csv(f"s3://{bucket}/{key}", storage_options={'tags': tags})
resp = boto3.client('s3').get_object_tagging(Bucket=bucket, Key=key)
actual_tags = {t['Key']: t['Value'] for t in resp.get('TagSet', [])}
assert actual_tags == tags
Assertion passes. S3 object has tag mytag
: x
The second .to_csv()
line fails.
i.e. it works without tags. The tags are what's causing failure.
Traceback (most recent call last):
File "upld.py", line 9, in <module>
df.to_csv(f"s3://{bucket}/{key}", storage_options={'tags': tags})
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/core/generic.py", line 3463, in to_csv
return DataFrameRenderer(formatter).to_csv(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/formats/format.py", line 1105, in to_csv
csv_formatter.save()
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/formats/csvs.py", line 237, in save
with get_handle(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/common.py", line 608, in get_handle
ioargs = _get_filepath_or_buffer(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/pandas/io/common.py", line 357, in _get_filepath_or_buffer
file_obj = fsspec.open(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 456, in open
return open_files(
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 299, in open_files
[fs.makedirs(parent, exist_ok=True) for parent in parents]
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/core.py", line 299, in <listcomp>
[fs.makedirs(parent, exist_ok=True) for parent in parents]
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 91, in wrapper
return sync(self.loop, func, *args, **kwargs)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 71, in sync
raise return_result
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
result[0] = await coro
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 746, in _makedirs
await self._mkdir(path, create_parents=True)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 731, in _mkdir
await self._call_s3("create_bucket", **params)
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 252, in _call_s3
await self.set_session()
File "/home/ec2-user/.pyenv/versions/3.8.11/lib/python3.8/site-packages/s3fs/core.py", line 395, in set_session
self.session = aiobotocore.session.AioSession(**self.kwargs)
TypeError: __init__() got an unexpected keyword argument 'tags'
It looks like these arguments are being passed to the aiobotocore session instantiation, not the actual S3 put_object API call from aiobotocore. That makes me think it is not possible to do this.
Should I try:
storage_options={
'tags': {
'k': 'v'
}
}
or
storage_options={
'tags': [
{'Key': 'k', 'Value': 'v'}
]
}
Of course I could upload without tags, then add tags as a separate boto call. This is not atomic, and costs twice as much (for small files.) If there was a way to get the version id back from the upload, that would eliminate some concurrency issues (concurrent writes).
So I spent some time digging around in this. I could be wrong here, but I do not think it is possible. Here is why I believe that:
storage_options
gets passed along to fsspec
if the path is a url not beginning with http
(see here). Then these options are passed through fsspec
to s3fs.S3Filesystem
as kwargs
. Then the kwargs dead-end with the function in your error message.
(this is where I could be wrong!) Then the S3FileSystem
does a _put_file
call to write your csv. This function does not use self.kwargs
, but takes in function-level kwargs
that will not be passed by pandas.
Thus, I do not think tags are accessible through to_X
in pandas. However, it would be worthwhile to raise an issue on Pandas/fsspec github to get more info.