apache-sparkhadoopapache-spark-sqldata-ingestion

Spark pulling data into RDD or dataframe or dataset


I'm trying to put into simple terms when spark pulls data through the driver, and then when spark doesn't need to pull data through the driver.

I have 3 questions -

  1. Let's day you have a 20 TB flat file file stored in HDFS and from a driver program you pull it into a data frame or an RDD, using one of the respective libraries' out of the box functions (sc.textfile(path) or sc.textfile(path).toDF, etc). Will it cause the driver program to have OOM if the driver is run with only 32 gb memory? Or at least have swaps on the driver Jim? Or will spark and hadoop be smart enough to distribute the data from HDFS into a spark executor to make a dataframe/RDD without going through the driver?
  2. The exact same question as 1 except from an external RDBMS?
  3. The exact same question as 1 except from a specific nodes file system (just Unix file system, a 20 TB file but not HDFS)?

Solution

  • Regarding 1

    Spark operates with distributed data structure like RDD and Dataset (and Dataframe before 2.0). Here are the facts that you should know about this data structures to get the answer to your question:

    1. All the transformation operations like (map, filter, etc.) are lazy. This means that no reading will be performed unless you require a concrete result of your operations (like reduce, fold or save the result to some file).
    2. When processing a file on HDFS Spark operates with file partitions. Partition is a minimal logical batch of data the can be processed. Normally one partition equals to one HDFS block and the total number of partitions can never be less then number of blocks in a file. The common (and default one) HDFS block size is 128Mb
    3. All actual computations (including reading from the HDFS) in RDD and Dataset are performed inside of executors and never on driver. Driver creates a DAG and logical plan of execution and assigns tasks to executors for further processing.
    4. Each executor runs the previously assigned task against a particular partition of data. So normally if you allocate only one core to your executor it would process no more than 128Mb (default HDFS block size) of data at the same time.

    So basically when you invoke sc.textFile no actual reading happens. All mentioned facts explain why OOM doesn't occur while processing even 20 Tb of data.

    There are some special cases like i.e. join operations. But even in this case all executors flush their intermediate results to local disk for further processing.

    Regarding 2

    In case of JDBC you can decide how many partitions will you have for your table. And choose the appropriate partition key in your table that will split the data into partitions properly. It's up to you how many data will be loaded into a memory at the same time.

    Regarding 3

    The block size of the local file is controlled by the fs.local.block.size property (I guess 32Mb by default). So it is basically the same as 1 (HDFS file) except the fact that you will read all data from one machine and one physical disk drive (which is extremely inefficient in case of 20TB file).