spark-structured-streamingelasticsearch-hadoop

How to set dynamic doc id in elasticsearch sink using spark structured streaming


In elasticsearch write sink how should I add doc id with dynamic values from the dataset field. In my case I need to set doc id based on a particular field from the formatted dataset. Came across "es.mapping.id" but how would I get values from my dataset?


Solution

  • Found that you can achieve this by just specifying the field name as the value for "es.mapping.id"

    StreamingQuery query = finalData.writeStream()
                    .outputMode(OutputMode.Append())
                    .format("org.elasticsearch.spark.sql")
                    .option("es.mapping.id", "input_key")        
                    .option("checkpointLocation","/tmp/spark-checkpoint")
                    .start("spark_index/doc");