pythonparquetpyarrowfastparquetpython-s3fs

How to read partitioned parquet files from S3 using pyarrow in python


I looking for ways to read data from multiple partitioned directories from s3 using python.

data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet data_folder/serial_number=2/cur_date=27-12-2012/asdsdfsd0324324.snappy.parquet

pyarrow's ParquetDataset module has the capabilty to read from partitions. So I have tried the following code :

>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)

It threw the following error :

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
    .format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/

Based on documentation of pyarrow I tried using s3fs as the file system, ie :

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)

Which throws the following error :

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
    if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'

I am limited to use a ECS cluster, hence spark/pyspark is not an option.

Is there a way we can easily read the parquet files easily, in python from such partitioned directories in s3 ? I feel that listing the all the directories and then reading the is not a good practise as suggested in this link. I would need to convert the read data to a pandas dataframe for further processing & hence prefer options related to fastparquet or pyarrow. I am open to other options in python as well.


Solution

  • I managed to get this working with the latest release of fastparquet & s3fs. Below is the code for the same:

    import s3fs
    import fastparquet as fp
    s3 = s3fs.S3FileSystem()
    fs = s3fs.core.S3FileSystem()
    
    #mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet 
    s3_path = "mybucket/data_folder/*/*/*.parquet"
    all_paths_from_s3 = fs.glob(path=s3_path)
    
    myopen = s3.open
    #use s3fs as the filesystem
    fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
    #convert to pandas dataframe
    df = fp_obj.to_pandas()
    

    credits to martin for pointing me in the right direction via our conversation

    NB : This would be slower than using pyarrow, based on the benchmark . I will update my answer once s3fs support is implemented in pyarrow via ARROW-1213

    I did quick benchmark on on indivdual iterations with pyarrow & list of files send as a glob to fastparquet. fastparquet is faster with s3fs vs pyarrow + my hackish code. But I reckon pyarrow +s3fs will be faster once implemented.

    The code & benchmarks are below :

    >>> def test_pq():
    ...     for current_file in list_parquet_files:
    ...         f = fs.open(current_file)
    ...         df = pq.read_table(f).to_pandas()
    ...         # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
    ...         #probably not the best way to split :)
    ...         elements_list=current_file.split('/')
    ...         for item in elements_list:
    ...             if item.find(date_partition) != -1:
    ...                 current_date = item.split('=')[1]
    ...             elif item.find(dma_partition) != -1:
    ...                 current_dma = item.split('=')[1]
    ...         df['serial_number'] = current_dma
    ...         df['cur_date'] = current_date
    ...         list_.append(df)
    ...     frame = pd.concat(list_)
    ...
    >>> timeit.timeit('test_pq()',number =10,globals=globals())
    12.078817503992468
    
    >>> def test_fp():
    ...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
    ...     df = fp_obj.to_pandas()
    
    >>> timeit.timeit('test_fp()',number =10,globals=globals())
    2.961556333000317
    

    Update 2019

    After all PRs, Issues such as Arrow-2038 & Fast Parquet - PR#182 have been resolved.

    Read parquet files using Pyarrow

    # pip install pyarrow
    # pip install s3fs
    
    >>> import s3fs
    >>> import pyarrow.parquet as pq
    >>> fs = s3fs.S3FileSystem()
    
    >>> bucket = 'your-bucket-name'
    >>> path = 'directory_name' #if its a directory omit the traling /
    >>> bucket_uri = f's3://{bucket}/{path}'
    's3://your-bucket-name/directory_name'
    
    >>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
    >>> table = dataset.read()
    >>> df = table.to_pandas() 
    

    Read parquet files using Fast parquet

    # pip install s3fs
    # pip install fastparquet
    
    >>> import s3fs
    >>> import fastparquet as fp
    
    >>> bucket = 'your-bucket-name'
    >>> path = 'directory_name'
    >>> root_dir_path = f'{bucket}/{path}'
    # the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
    >>> s3_path = f"{root_dir_path}/*/*/*.parquet"
    >>> all_paths_from_s3 = fs.glob(path=s3_path)
    
    >>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
    >>> df = fp_obj.to_pandas()
    

    Quick benchmarks

    This is probably not the best way to benchmark it. please read the blog post for a through benchmark

    #pyarrow
    >>> import timeit
    >>> def test_pq():
    ...     dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
    ...     table = dataset.read()
    ...     df = table.to_pandas()
    ...
    >>> timeit.timeit('test_pq()',number =10,globals=globals())
    1.2677053569998407
    
    #fastparquet
    >>> def test_fp():
    ...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
    ...     df = fp_obj.to_pandas()
    
    >>> timeit.timeit('test_fp()',number =10,globals=globals())
    2.931876824000028
    

    Further reading regarding Pyarrow's speed

    Reference :