pythonapache-sparkelasticsearchpysparkelasticsearch-hadoop

create new SparkSession for different query?


I'd like to get two data from elasticsearch

One is filtered with a query, another has no filter.

 // with query
 session = get_spark_session(query=query)

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()


 df.show() // empty result

 // without query
 session = get_spark_session()

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()

 df.show() // empty result


 def get_spark_session(query=None, excludes=[]):

     conf = pyspark.SparkConf()
     conf.set("spark.driver.allowMultipleContexts", "true")
     conf.set("es.index.auto.create", "true")
     conf.set("es.nodes.discovery", "true")
     conf.set("es.scroll.size", 10000)
     conf.set("es.read.field.exclude", excludes)
     conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
     if query:
         conf.set("es.query", query)


     sc = SparkSession.builder.config(conf=conf).getOrCreate()

     return sc 

The question is whether the session is reused or not..

When I run filtered query first, and non-filtered query second, both gives empty result

But when I run non-filtered query first, it shows some results, and the subsequent filtered query shows empty result.

 // below, I reverse the order
 // without query
 session = get_spark_session()

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()

 df.show() // some result

 // with query
 session = get_spark_session(query=query)

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()


 df.show() // empty result

** edit

So I can get desired result with the following:

def get_spark_session(query=None, excludes=[]):

    conf = pyspark.SparkConf()
    conf.set("spark.driver.allowMultipleContexts", "true")
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes.discovery", "true")
    conf.set("es.scroll.size", 10000)
    conf.set("es.read.field.exclude", excludes)
    conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
    if query:
        conf.set("es.query", query)
    else:
        conf.set("es.query", "") # unset the query 

Solution

  • SparkSession.builder gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder. In your case, spark config is being reused. Removing "es.query" from config should fix this:

    def get_spark_session(query=None, excludes=[]):
         conf = pyspark.SparkConf()
         conf.unset("es.query")
         conf.set("spark.driver.allowMultipleContexts", "true")
         conf.set("es.index.auto.create", "true")
         conf.set("es.nodes.discovery", "true")
         conf.set("es.scroll.size", 10000)
         conf.set("es.read.field.exclude", excludes)
         conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
         if query:
             conf.set("es.query", query)    
    
         sc = SparkSession.builder.config(conf=conf).getOrCreate()    
         return sc