elasticsearchpysparkelasticsearch-hadoopelasticsearch-spark

How to write dataframe with struct column into Elasticsearch via PySpark


I'm trying to write a dataframe containing struct column into Elasticsearch:

df1 = spark.createDataFrame([{"date": "2020.04.10","approach": "test", "outlier_score": 1, "a":"1","b":2},
                       {"date": "2020.04.10","approach": "test", "outlier_score": 0, "a":"2","b":1}],
                       )

df1 = df1.withColumn('details', to_json(struct(
   col('a'),
   col('b')
)))

df1.show(truncate=False)

df1.select('date','approach','outlier_score','details').write.format("org.elasticsearch.spark.sql").option('es.resource', 'outliers').save(mode="append")

which results into:

+---+--------+---+----------+-------------+---------------+
|a  |approach|b  |date      |outlier_score|details        |
+---+--------+---+----------+-------------+---------------+
|1  |test    |2  |2020.04.10|1            |{"a":"1","b":2}|
|2  |test    |1  |2020.04.10|0            |{"a":"2","b":1}|
+---+--------+---+----------+-------------+---------------+   

This indeed works, but JSON gets escaped, so that the corresponding details fields are not clickable in Kibana:

    {
  "_index": "outliers",
  "_type": "_doc",
  "_id": "NuDSA3IBhHa_VjuWENYR",
  "_version": 1,
  "_score": 0,
  "_source": {
    "date": "2020.04.10",
    "approach": "test",
    "outlier_score": 1,
    "details": "{\"a\":\"1\",\"b\":2}"
  },
  "highlight": {
    "date": [
      "@kibana-highlighted-field@2020.04.10@/kibana-highlighted-field@"
    ]
  }
}

I tried providing .option("es.input.json","true"), but get an exception:

org.elasticsearch.hadoop.rest.EsHadoopRemoteException: mapper_parsing_exception: failed to parse;org.elasticsearch.hadoop.rest.EsHadoopRemoteException: not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes

If instead I try to write the data without conversion to JSON, i.e. remove to_json( from the original code, I get another exception:

org.elasticsearch.hadoop.rest.EsHadoopRemoteException: mapper_parsing_exception: failed to parse field [details] of type [text] in document with id 'TuDWA3IBhHa_VjuWFNmX'. Preview of field's value: '{a=2, b=1}';org.elasticsearch.hadoop.rest.EsHadoopRemoteException: illegal_state_exception: Can't get text on a START_OBJECT at 1:68
    {"index":{}}
{"date":"2020.04.10","approach":"test","outlier_score":0,"details":{"a":"2","b":1}}

So the question is how to write the PySpark dataframe with nested JSON columns into Elasticsearch, so that JSON does not get escaped?


Solution

  • Writing the data without conversion to JSON (without to_json) should actually produce no exceptions. The problem is that the mapping was already created automatically for escaped JSON field.

    In order to fix the exception, the index should be deleted or recreated. After that the mapping will be automatically created for the details field as the object. Alternatively, it is also possible to delete all records with details field and then change the mapping of this field to object type.