pythonapache-sparkelasticsearch-hadoop

Is it possible to write to a dynamically created Elasticsearch index with a formatted date using elasticsearch-hadoop/spark?


Within standalone spark I'm trying to write from a dataframe to Elasticsearch. While I can get that to work, what I can't figure out is how to write to a dynamically named index that is formatted like 'index_name-{ts_col:{YYYY-mm-dd}}', where 'ts_col' is a datetime field in the dataset.

I've seen all sorts of posts saying that type of syntax should work, but when I try it I get the errors included at the bottom. It seems to be checking first to see if the index exists before creating it, but it's passing the unformatted index name to that, and not the dynamically created one. I've tried creating the index first with the same syntax using the python elasticsearch module, but it can't handle dynamic index names.

Is there any solution out there available to me, or do I have to loop through my dataset within spark to find each of the dates represented, create the indices I need, and then write to each index, one at a time? Am I missing something obvious? Logstash does this with ease, I don't get why I can't get it to work within Spark.

Here's the write command I'm using (tried different variations of it too):

df.write.format("org.elasticsearch.spark.sql")
  .option('es.index.auto.create', 'true')
  .option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')
  .option('es.mapping.id', 'es_id')
  .save()

Here's the jar I'm using:

elasticsearch-hadoop-5.0.0/dist/elasticsearch-spark-20_2.11-5.0.0.jar

Here's the error I get when I use the write command above:

ERROR NetworkClient: Node [##.##.##.##:9200] failed (Invalid target URI HEAD@null/index_name-{ts_col:{YYYY.mm.dd}}/type_name); selected next node [##.##.##.##:9200]

...

...

Py4JJavaError: An error occurred while calling o114.save. : org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed;

And if I set overwrite to True, I get:

Py4JJavaError: An error occurred while calling o58.save. : org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: no such index null at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:488) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:446) at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:436) at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:363) at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:92) at org.elasticsearch.hadoop.rest.RestRepository.delete(RestRepository.java:455) at org.elasticsearch.spark.sql.ElasticsearchRelation.insert(DefaultSource.scala:500) at org.elasticsearch.spark.sql.DefaultSource.createRelation(DefaultSource.scala:94) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:442) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745)

And if I try to use the Elasticsearch python client to create the index ahead of time I get:

RequestError: TransportError(400, u'invalid_index_name_exception', u'Invalid index name [index_name-{ts_col:YYYY.MM.dd}], must be lowercase')


Solution

  • You don't need to put date format again in curly braces. You can read about it more here

    .option('es.resource', 'index_name-{ts_col:{YYYY.mm.dd}}/type_name')

    Change the above as shown below:

    .option('es.resource', 'index_name-{ts_col:YYYY.mm.dd}/type_name')
    

    Note: Make sure your ts_col field has proper date format.