apache-sparkelasticsearch-hadoop

what does load() do in spark?


spark is lazy right? so what does load() do?

start = timeit.default_timer()

 df = sqlContext.read.option(
     "es.resource", indexes
 ).format("org.elasticsearch.spark.sql")
 end = timeit.default_timer()

 print('without load: ', end - start) # almost instant
 start = timeit.default_timer()

 df = df.load()
 end = timeit.default_timer()
 print('load: ', end - start) # takes 1sec

 start = timeit.default_timer()

 df.show()
 end = timeit.default_timer()
 print('show: ', end - start) # takes 4 sec

If show() is the only action, I would guess load won't take much time as 1sec. So I'm concluding load() is an action (as opposed to transformation in spark)

Does load actually load whole data into memory? I don't think so, but then what does it do?

I've searched and looked at the doc https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html but it doesn't help..


Solution

  • tl;dr load() is a DataFrameReader api(org.apache.spark.sql.DataFrameReader#load) as seen from the below code, that returns a DataFrame, on top which Spark transformations can be applied.

    /**
       * Loads input in as a `DataFrame`, for data sources that support multiple paths.
       * Only works if the source is a HadoopFsRelationProvider.
       *
       * @since 1.6.0
       */
      @scala.annotation.varargs
      def load(paths: String*): DataFrame
    

    One needs to create a DataFrame to perform a transformation.
    To create a dataframe from a path(HDFS, S3 etc), users can use spark.read.format("<format>").load().(There are datasource specific API as well that loads the files automatically like spark.read.parquet(<path>))

    Why does it take whole 1 second?

    In file based sources, this time can be attributed to listing of files. In HDFS these listing is not expensive, where as in case of cloud storage like S3, this listing is very expensive and takes time propotionate to number of files.
    In your case the datasource used is elastic-search, The time can be attributed to connection establishment, collecting metadata to perform a distributed scan etc which depends on Elastic serach connector impl. We can enable the debug logs and check for more information. If elastic search has way to log the request it received, we could check the elastic search logs for the requests that were made after the time load() was fired.