amazon-s3arrow-functionspyarrowapache-arrow

Is there a native S3 filesystem implementation for Apache Arrow Java?


I'm working with Apache Arrow in Java and I want to know if there is an implementation in the java library that provides a native S3 filesystem implementation like the one provided in the Python implementation of Arrow (pyarrow) which uses the S3FileSystem. I have gone through the Arrow Java IPC documentation and I do not see any such implementation there.

In Python, using pyarrow, one can read a table from S3 like this:

import pyarrow.parquet as pq

# using a URI -> filesystem is inferred
pq.read_table("s3://my-bucket/data.parquet")
# using a path and filesystem
s3 = fs.S3FileSystem(..)
pq.read_table("my-bucket/data.parquet", filesystem=s3)

I want to know if similar functionalities are implemented for Google Cloud Storage File System (GcsFileSystem) and Hadoop Distributed File System (HDFS) as well.

If there is no native implementation available in Java, is there any upcoming or beta release planned to provide these functionalities in Java?


Solution

  • It didn't appear that Arrow Java provides a purely native FileSystem support for cloud providers.

    Another option would be to use the Arrow Java Dataset module which offers a Factory that supports reading data from external file systems thru FileSystemDatasetFactory JNI classes.

    We are going to use this S3/GS URIs for demo:

    - aws s3 ls s3://voltrondata-labs-datasets/nyc-taxi-tiny/year=2022/month=2/part-0.parquet
    - gsutil ls gs://voltrondata-labs-datasets/nyc-taxi-tiny/year=2022/month=2/part-0.parquet
    

    Let use this Arrow Java Dataset Cookbook for testing:

    import org.apache.arrow.dataset.file.FileFormat;
    import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
    import org.apache.arrow.dataset.jni.NativeMemoryPool;
    import org.apache.arrow.dataset.scanner.ScanOptions;
    import org.apache.arrow.dataset.scanner.Scanner;
    import org.apache.arrow.dataset.source.Dataset;
    import org.apache.arrow.dataset.source.DatasetFactory;
    import org.apache.arrow.memory.BufferAllocator;
    import org.apache.arrow.memory.RootAllocator;
    import org.apache.arrow.vector.ipc.ArrowReader;
    import org.apache.arrow.vector.types.pojo.Schema;
    
    public class DatasetModule {
        public static void main(String[] args) {
            String uri = "s3://voltrondata-labs-datasets/nyc-taxi-tiny/year=2022/month=2/part-0.parquet"; // AWS S3
            // String uri = "hdfs://{hdfs_host}:{port}/nyc-taxi-tiny/year=2022/month=2/part-0.parquet"; // HDFS
            // String uri = "gs://voltrondata-labs-datasets/nyc-taxi-tiny/year=2022/month=2/part-0.parquet"; // Google Cloud Storage
            ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
            try (
                    BufferAllocator allocator = new RootAllocator();
                    DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
                    Dataset dataset = datasetFactory.finish();
                    Scanner scanner = dataset.newScan(options);
                    ArrowReader reader = scanner.scanBatches()
            ) {
                Schema schema = scanner.schema();
                System.out.println(schema);
                while (reader.loadNextBatch()) {
                    System.out.println("RowCount: " + reader.getVectorSchemaRoot().getRowCount());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    Consider: