pythonscalaapache-sparkamazon-s3pyspark

Why AWS is rejecting my connections when I am using wholeTextFiles() with pyspark?


I use

sc.wholeTextFiles(",".join(fs), minPartitions=200)

to download 6k XMLs files from S3 (every file 50MBs) on single dataproc node with 96cpus. When I have minPartitions=200 AWS is rejecting my connections, but when I use minPartitions=50 everything is ok. Why?

Some logs from Spark:

(...)
19/05/22 14:11:17 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:17 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:26 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:26 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:28 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:30 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/05/22 14:11:30 ERROR org.apache.spark.api.python.PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 362, in main
    eval_type = read_int(infile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 717, in read_int
    raise EOFError
EOFError

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InterruptedIOException: getFileStatus on s3a://uni-swim-firehose/tfms/2019/04/03/10/SWIM-TFMS-2-2019-04-03-10-51-52-0fd9f05a-cbc5-4c1c-aef2-aa275ee3c404.gz: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool```

Solution

  • com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool

    wholeTextfiles each file has seperate client connection with s3 based on number of partitions you have. and is by default is 50.

    Hence you have no hiccups for 50 patitions.

    If you tried to increase to 200 then you got the above exception.

    Solution :

    see amazon docs : How do I resolve the error "Timeout waiting for connection from pool" in Amazon EMR?

    fs.s3.maxConnections in the emrfs-site.xml configuration file. It defaults to 50.

    since you are using s3a with spark you can try below maximum connections as 200 as showing in the example.


    python way :

    def create_spark_session(aws_access_key, aws_secret_key, app_name):
        try:
    
            spark = SparkSession.builder. \
                config("fs.s3a.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem"). \
                config("fs.s3a.awsAccessKeyId", aws_access_key). \
                config("fs.s3a.awsSecretAccessKey", aws_secret_key). \
                config("fs.s3a.fast.upload", "true"). \
                config("fs.s3a.multipart.size", "1G"). \
                config("fs.s3a.fast.upload.buffer", "disk"). \
                config("fs.s3a.connection.maximum", 200). \
                config("fs.s3a.attempts.maximum", 20). \
                config("fs.s3a.connection.timeout", 30). \
                config("fs.s3a.threads.max", 10). \
                config("fs.s3a.buffer.dir", "hdfs:///user/hadoop/temporary/s3a"). \
                appName(app_name). \
                getOrCreate()
    
            return spark
        except Exception as e:
            logging.error(e)
            sys.exit(-1)
    

    For Scala users:

    /**
          * example getSparkSessionForS3
          * @return
          */
        def getSparkSessionForS3():SparkSession = {
      val conf = new SparkConf()
        .setAppName("testS3File")
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .set("spark.hadoop.fs.s3a.endpoint", "yourendpoint")
        .set("spark.hadoop.fs.s3a.connection.maximum", "200")
        .set("spark.hadoop.fs.s3a.fast.upload", "true")
        .set("spark.hadoop.fs.s3a.connection.establish.timeout", "500")
        .set("spark.hadoop.fs.s3a.connection.timeout", "5000")
        .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
        .set("spark.hadoop.com.amazonaws.services.s3.enableV4", "true")
        .set("spark.hadoop.com.amazonaws.services.s3.enforceV4", "true")
    
      val spark = SparkSession
        .builder()
        .config(conf)
        .getOrCreate()
      spark
    }
    

    Further reading :

    1. amazon-s3-best-practice-and-tuning-for-hadoopspark-in-the-cloud ----slide number 38
    2. https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/#aTimeout_waiting_for_connection_from_pool_when_writing_to_S3A

    In #2 all these exceptions were discussed