I have a problem using PySpark. There is a stream data generated by Kafka and I'm supposed to parse it using Spark.
The JSON format is like this:
{"CustomerId":606811,"Latitude":35.834896,"Longitude":50.019657,"Response":{"Stores":[{"Id":771,"LegacyStoreId":5497,"LegacyStoreTypeId":1,"PartnerId":3,"StoreName":"test","StoreDisplayName":"test","PartnerName":"test","ServiceRadius":3,"Longitude":56.009797,"Latitude":35.829067,"StatusCode":1,"CityId":200,"CityName":null,"Description":"test","IsOk24":false,"RouteDistanceInMeter":0,"IsRouteDistanceValid":false,"IsOutOfOrders":false,"AirDistanceInMeter":1100,"IsAirDistanceInValid":true,"IsDeliveryCoverage":true,"IsNonCoverageArea":false,"Rate":3.9,"Reviews":560,"IsHighPriorityStore":false,"StoreScore":0,"PartnerRank":1,"DeliveryCost":"80000","FirstDeliveryTime":"test","Labels":[],"MinCartTotalPrice":500000},{"Id":463,"LegacyStoreId":4765,"LegacyStoreTypeId":1,"PartnerId":3,"StoreName":"test","StoreDisplayName":"test","PartnerName":"test","ServiceRadius":3,"Longitude":56.995281,"Latitude":35.82251,"StatusCode":1,"CityId":200,"CityName":null,"Description":"test","IsOk24":false,"RouteDistanceInMeter":0,"IsRouteDistanceValid":false,"IsOutOfOrders":false,"AirDistanceInMeter":2593,"IsAirDistanceInValid":true,"IsDeliveryCoverage":true,"IsNonCoverageArea":false,"Rate":3.8,"Reviews":532,"IsHighPriorityStore":false,"StoreScore":0,"PartnerRank":1,"DeliveryCost":"80000","FirstDeliveryTime":"test","Labels":[],"MinCartTotalPrice":500000}]},"Id":"f2655da4-c236-4f86-9ca0-8063a4c77da8","CreationAt":"2023-06-18T14:49:11.8545562+03:30"}
The problem is I can't parse data located in Stores ArrayType. How can I fix it?
My code is as follows. Unfortunately, it returns null for Stores ArrayType columns.
spark= SparkSession \
.builder \
.appName("striming") \
.config("spark.jars.packages",
"*****************") \
.config('spark.driver.extraClassPath', '/usr/local/spark/resources/jars/sqljdbc42.jar') \
.config('spark.executor.extraClassPath', '/usr/local/spark/resources/jars/sqljdbc42.jar') \
.config("spark.cores.max", "1") \
.config("spark.executor.memory", "1g") \
.config("spark.executor.cores", "1") \
.config("spark.dynamicAllocation.initialExecutors", "1") \
.master("local[1]") \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
kafka_df = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"My Kafka Servers").option(
"subscribe",
"Event").option("startingOffsets", "earliest").option("failOnDataLoss", "false").option('multiline', "True").load()
schema = StructType([StructField("CreationAt", StringType(), True),
StructField("CustomerId", LongType(), True),
StructField("Id", StringType(), True),
StructField("Latitude", DoubleType(), True),
StructField("Longitude", DoubleType(), True),
StructField("Response",
StructType(
[StructField("Stores",
ArrayType(StructType(
[
StructField("Id", StringType(), True),
StructField("LegacyStoreId", StringType(), True),
StructField("PartnerName", StringType(), True),
StructField("FirstDeliveryTime", LongType(), True),
StructField("StatusCode", LongType(), True),
StructField("StoreName", StringType(), True)]), True),
True)]), True)
]
)
value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("StoreSelection"))
value_df.printSchema()
sites_flat = value_df.selectExpr("StoreSelection")\
.select("StoreSelection.CustomerId", "StoreSelection.Latitude", "StoreSelection.Longitude", "StoreSelection.CreationAt", explode_outer("StoreSelection.Response.Stores").alias("Stores"))\
.select( "CustomerId","Latitude","Longitude", "CreationAt","Stores.LegacyStoreId", "Stores.StoreName", "Stores.PartnerName")\
.select( "CustomerId","Latitude","Longitude", "CreationAt","LegacyStoreId", "StoreName", "PartnerName")
sites_flat.printSchema()
def foreach_batch_function(df, epoch_id):
df.write \
df.show()
invoiceWriterQuery = sites_flat.writeStream.foreachBatch(foreach_batch_function).outputMode("update") \
.option("checkpointLocation", "/usr/local/airflow/dags/otime/log_file").trigger(
processingTime="1 minute").start().awaitTermination()
invoiceWriterQuery.awaitTermination()
It seems like your schema's nested structure is not matching the nested structure of the JSON. The Stores field is within the Response structure, but the Response structure in your schema seems to be directly assuming Stores. Also, to be able to describe Stores, the schema has to cover all its fields (not just a few). Please try to adjust your schema definition as shown below:
schema = StructType([
StructField("CreationAt", StringType(), True),
StructField("CustomerId", LongType(), True),
StructField("Id", StringType(), True),
StructField("Latitude", DoubleType(), True),
StructField("Longitude", DoubleType(), True),
StructField("Response",
StructType([
StructField("Stores",
ArrayType(StructType([
StructField("Id", IntegerType(), True),
StructField("LegacyStoreId", IntegerType(), True),
StructField("LegacyStoreTypeId", IntegerType(), True),
StructField("PartnerId", IntegerType(), True),
StructField("StoreName", StringType(), True),
StructField("StoreDisplayName", StringType(), True),
StructField("PartnerName", StringType(), True),
...
... # Continue for all fields in Stores[]
...
])), True),
]), True),
])
Continue with additional fields as per the values inside your Stores array. This way, your schema will match the nested JSON structure more accurately. You can always discard the fields you don't need later in your transformation steps.