apache-sparkelasticsearchelasticsearch-5elasticsearch-spark

Write to elasticsearch from spark is very slow


I am processing a text file and writing transformed rows from a Spark application to elastic search as bellow

input.write.format("org.elasticsearch.spark.sql")
      .mode(SaveMode.Append)
      .option("es.resource", "{date}/" + dir).save()

This runs very slow and takes around 8 minutes to write 287.9 MB / 1513789 records. enter image description here

How can I tune spark and elasticsearch settings to make it faster given that network latency will always be there.

I am using spark in local mode and have 16 cores and 64GB RAM. My elasticsearch cluster has one master and 3 data nodes with 16 cores and 64GB each.

I am reading text file as below

 val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
  "ignoreTrailingWhiteSpace" -> "true",
  "inferSchema" -> "false",
  "header" -> "false",
  "delimiter" -> "\t",
  "comment" -> "#",
  "mode" -> "PERMISSIVE")

....

val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)

Solution

  • First, Let's start with what's happening in your application. Apache Spark is reading 1 (not so big) csv file which is compressed. Thus first spark will spend time decompressing data and scan it before writing it in elasticsearch.

    This will create a Dataset/DataFrame with one partition (confirmed by the result of your df.rdd.getNumPartitions mentioned in the comments).

    One straight-forward solution would be to repartition your data on read and cache it, before writing it to elasticsearch. Now I'm not sure what your data looks like, so deciding the number of partitions is subject of benchmark from your side.

    val input = sqlContext.read.options(readOptions)
                          .csv(inputFile.getAbsolutePath)
                          .repartition(100) // 100 is just an example
                          .cache
    

    I'm not sure how much will be the benefit on your application, because I believe there might be other bottlenecks (network IO, disk type for ES).

    PS: I ought converting csv to parquet files before building ETL over them. There is real gain of performance here. (personal opinion and benchmarks)

    Another possible optimization would be to tweak the es.batch.size.entries setting for the elasticsearch-spark connector. The default value is 1000.

    You need to be careful when setting this parameter because you might overload elasticsearch. I strongly advice you take a look at the available configurations here.

    I hope this helps !