I'd like to get two data from elasticsearch
One is filtered with a query, another has no filter.
// with query
session = get_spark_session(query=query)
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // empty result
// without query
session = get_spark_session()
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // empty result
def get_spark_session(query=None, excludes=[]):
conf = pyspark.SparkConf()
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.scroll.size", 10000)
conf.set("es.read.field.exclude", excludes)
conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
if query:
conf.set("es.query", query)
sc = SparkSession.builder.config(conf=conf).getOrCreate()
return sc
The question is whether the session is reused or not..
When I run filtered
query first, and non-filtered
query second,
both gives empty result
But when I run non-filtered
query first, it shows some results, and the subsequent filtered
query shows empty result.
// below, I reverse the order
// without query
session = get_spark_session()
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // some result
// with query
session = get_spark_session(query=query)
df = session.read.option(
"es.resource", "analytics-prod-2019.08.02"
).format("org.elasticsearch.spark.sql").load()
df.show() // empty result
** edit
So I can get desired result with the following:
def get_spark_session(query=None, excludes=[]):
conf = pyspark.SparkConf()
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.scroll.size", 10000)
conf.set("es.read.field.exclude", excludes)
conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
if query:
conf.set("es.query", query)
else:
conf.set("es.query", "") # unset the query
SparkSession.builder gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder. In your case, spark config is being reused. Removing "es.query" from config should fix this:
def get_spark_session(query=None, excludes=[]):
conf = pyspark.SparkConf()
conf.unset("es.query")
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
conf.set("es.scroll.size", 10000)
conf.set("es.read.field.exclude", excludes)
conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
if query:
conf.set("es.query", query)
sc = SparkSession.builder.config(conf=conf).getOrCreate()
return sc