amazon-s3parquetpython-3.8pyarrowfsspec

FileNotFoundError when re-reading s3 parquet partition that was cached by PyArrow fsspec before partition was altered


The sequence of events to replicate is as such:

  1. read in a s3 parquet partition using pandas.read_parquet (which is pyarrow.dataset under the hood).
  2. add another file into that partition.
  3. read the same s3 parquet again, and we will have a FileNotFoundError on this new file.

Below is a code snippet to replicate it:

import os
import boto3
import pandas as pd

df1 = pd.DataFrame([{'a': i, 'b': i} for i in range(10)])
df1.to_parquet('part1.parquet')
df2 = pd.DataFrame([{'a': i+100, 'b': i+100} for i in range(10)])
df2.to_parquet('part2.parquet')

s3_client = boto3.client('s3')
url = 's3://bucket_name/test01'
s3_client.upload_file('part1.parquet', 'bucket_name', os.path.join(url, 'p=x', 'part1.parquet'))
dfx1 = pd.read_parquet(url) # this is fine

s3_client.upload_file('part2.parquet', 'bucket_name', os.path.join(url, 'p=x', 'part2.parquet'))

dfx2 = pd.read_parquet(url) # this will generate the FileNotFoundError on part2.parquet

The exception traceback is as such:

  File "<env>/lib/python3.8/site-packages/pandas/io/parquet.py", line 493, in read_parquet
    return impl.read(
  File "<env>/lib/python3.8/site-packages/pandas/io/parquet.py", line 240, in read
    result = self.api.parquet.read_table(
  File "<env>/lib/python3.8/site-packages/pyarrow/parquet.py", line 1996, in read_table
    return dataset.read(columns=columns, use_threads=use_threads,
  File "<env>/lib/python3.8/site-packages/pyarrow/parquet.py", line 1831, in read
    table = self._dataset.to_table(
  File "pyarrow/_dataset.pyx", line 323, in pyarrow._dataset.Dataset.to_table
  File "pyarrow/_dataset.pyx", line 2311, in pyarrow._dataset.Scanner.to_table
  File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/_fs.pyx", line 1179, in pyarrow._fs._cb_open_input_file
  File "<env>/python3.8/site-packages/pyarrow/fs.py", line 394, in open_input_file
    raise FileNotFoundError(path)
FileNotFoundError: bucket_name/test01/p=x/part2.parquet

Note that if I open a separate python console and try to read in that parquet/partition, it will have no problem at all.

My suspicion is pyarrow does some sort of caching when accessing s3 parquets and when the new file is added, the new file is not in its cache and it even though it knows the new file needs to be read in, it cannot find it. But reloading the pandas and pyarrow module does not help to reset such a cache.

Below are my package versions

[UPDATE] Overwriting part1.parquet do not work either, if you replace step 2 of the above to uploading part2.parquet to overwrite the existing part1.parquet. The cacher is aware that the original part1.parquet no longer exist. This exception traceback is as such:

File "pyarrow/_dataset.pyx", line 1680, in pyarrow._dataset.DatasetFactory.finish
  File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
  File "<env>/lib/python3.8/site-packages/fsspec/spec.py", line 1544, in read
    out = self.cache._fetch(self.loc, self.loc + length)
  File "/<env>/lib/python3.8/site-packages/fsspec/caching.py", line 377, in _fetch
    self.cache = self.fetcher(start, bend)
  File "<env>/lib/python3.8/site-packages/s3fs/core.py", line 1965, in _fetch_range
    raise FileExpired(
s3fs.utils.FileExpired: [Errno 16] The remote file corresponding to filename bucket_name/test01/p=x/part1.parquet and Etag "d64a2de4f9c93dff49ecd3f19c414f61" no longer exists.

Solution

  • Explanation: s3fs caches file listings by default. That means that, by default,if you have listed some directory, we do not list it again unless there is a write operation with s3fs or you explicitly clear the cache.

    1. why not use s3fs for you file transfers too?
        fs = fsspec.filesystem("s3")
        fs.put(os.path.join(url, 'p=x', 'part1.parquet'), "bucket_name/part1.parquet")
    
    1. you can explicitly clear the cache
        fs = fsspec.filesystem("s3")
        fs.invalidate_cache()  # or more specific to specific dirs
    
    1. turn off or control caching, add one of these to your pandas IO calls
        storage_options={"use_listings_cache": False}
        storage_options={"listings_expiry_time": <int>}