I am reading data from kafka topic using spark structured streaming, I want to run sql queries on this streaming data.
Following is code:-
from pyspark.sql import SparkSession, SQLContext
def process_batch(df, id):
# here I want to run sql queries on data frame but it is giving error
# table not found
spark = spark_session()
df.createOrReplaceTempView("x")
spark.sql("select * from x")
def spark_session():
spark = SparkSession \
.builder \
.appName("Python kafka Spark example") \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1') \
.getOrCreate()
return spark
def main():
spark = spark_session()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
query = df.writeStream.foreachBatch(process_batch).start()
query.awaitTermination()
error = org.apache.spark.sql.AnalysisException: Table or view not found: x;
created a new dataframe from an existing batch df and ran sql queries over it , solved problem by this appraoch.
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
def process_batch(df, id):
df.show()
df1 = df.collect()
spark = spark_session()
schemaString = "key value"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
df2 = spark.createDataFrame(df1, schema)
df2.createOrReplaceTempView("x")
spark.sql("SELECT value FROM x limit 2").show()
def spark_session():
spark = SparkSession \
.builder \
.appName("Python kafka Spark example") \
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1') \
.getOrCreate()
return spark
def main():
spark = spark_session()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
# query = df \
# .writeStream \
# .outputMode("append") \
# .format("console") \
# .start()
#spark.sql("select * from test").show()
query = df.writeStream.foreachBatch(process_batch).start()
query.awaitTermination()
if __name__ == "__main__":
main()