apache-sparkminioapache-spark-connector

How can we read from a Minio cluster and write to another Minio cluster in Spark?


I have an use case where we have some input data in a Minio cluster and we need to read and transform that data and then add them to another Minio cluster, and we have to do it using Spark. How can we achieve the same ?


Solution

  • If you use hadoop-aws, you can simply read from and write to Minio using the s3a:// protocol. You should be able to set different different endpoints, credentials, etc. for each individual bucket, using the properties:

    spark.hadoop.fs.s3a.bucket.<bucket>.endpoint
    spark.hadoop.fs.s3a.bucket.<bucket>.aws.credentials.provider
    spark.hadoop.fs.s3a.bucket.<bucket>.access.key
    spark.hadoop.fs.s3a.bucket.<bucket>.secret.key
    spark.hadoop.fs.s3a.bucket.<bucket>.path.style.access
    

    So, imagine you have a Minio server https://minio1.com with bucket dataIn and https://minio2.com with bucket dataOut, you can set the following configuration (e.g. in spark-defaults.conf, using the --conf argument of spark-submit, or directly on your SparkConf object in code):

    spark.hadoop.fs.s3a.bucket.dataIn.endpoint                  https://minio1.com
    spark.hadoop.fs.s3a.bucket.dataIn.aws.credentials.provider  org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
    spark.hadoop.fs.s3a.bucket.dataIn.access.key                ACCESS_KEY_1
    spark.hadoop.fs.s3a.bucket.dataIn.secret.key                SECRET_KEY_1
    spark.hadoop.fs.s3a.bucket.dataIn.path.style.access         true
    
    spark.hadoop.fs.s3a.bucket.dataOut.endpoint                  https://minio2.com
    spark.hadoop.fs.s3a.bucket.dataOut.aws.credentials.provider  org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
    spark.hadoop.fs.s3a.bucket.dataOut.access.key                ACCESS_KEY_2
    spark.hadoop.fs.s3a.bucket.dataOut.secret.key                SECRET_KEY_2
    spark.hadoop.fs.s3a.bucket.dataOut.path.style.access         true
    

    Then, in your application, simply transfer the data as follows:

    val documents = spark.read.parquet("s3a://dataIn/path/to/data")
    
    val transformed = documents.select(...) // do your transformations here
    
    transformed.write.parquet("s3a://dataOut/path/to/target")