I am processing a text file and writing transformed rows from a Spark application to elastic search as bellow
input.write.format("org.elasticsearch.spark.sql")
.mode(SaveMode.Append)
.option("es.resource", "{date}/" + dir).save()
This runs very slow and takes around 8 minutes to write 287.9 MB / 1513789 records.
How can I tune spark and elasticsearch settings to make it faster given that network latency will always be there.
I am using spark in local mode and have 16 cores and 64GB RAM. My elasticsearch cluster has one master and 3 data nodes with 16 cores and 64GB each.
I am reading text file as below
val readOptions: Map[String, String] = Map("ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"inferSchema" -> "false",
"header" -> "false",
"delimiter" -> "\t",
"comment" -> "#",
"mode" -> "PERMISSIVE")
....
val input = sqlContext.read.options(readOptions).csv(inputFile.getAbsolutePath)
First, Let's start with what's happening in your application. Apache Spark is reading 1 (not so big) csv
file which is compressed. Thus first spark will spend time decompressing data and scan it before writing it in elasticsearch
.
This will create a Dataset
/DataFrame
with one partition (confirmed by the result of your df.rdd.getNumPartitions
mentioned in the comments).
One straight-forward solution would be to repartition
your data on read and cache it, before writing it to elasticsearch
. Now I'm not sure what your data looks like, so deciding the number of partitions is subject of benchmark from your side.
val input = sqlContext.read.options(readOptions)
.csv(inputFile.getAbsolutePath)
.repartition(100) // 100 is just an example
.cache
I'm not sure how much will be the benefit on your application, because I believe there might be other bottlenecks (network IO, disk type for ES).
PS: I ought converting csv to parquet files before building ETL over them. There is real gain of performance here. (personal opinion and benchmarks)
Another possible optimization would be to tweak the es.batch.size.entries
setting for the elasticsearch-spark connector. The default value is 1000
.
You need to be careful when setting this parameter because you might overload elasticsearch. I strongly advice you take a look at the available configurations here.
I hope this helps !