I try to parse Kafka using the code bellow:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("StructuredSocketRead") \
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","***.compute-1.amazonaws.com:9092") \
.option("subscribe","kafka-new-topic") \
.load()
# Transform to Output DataFrame
schema = StructType([
StructField("card_id", LongType()),
StructField("member_id", LongType()),
StructField("amount", LongType()),
StructField("pos_id", LongType()),
StructField("postcode", LongType()),
StructField("transaction_dt", StringType()),
])
value_df = df.select(from_json(col("value").cast("string"),schema).alias("value"))
exploded_df = value_df.select("value.*")
query = exploded_df \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
But the outcome is all null values.
I try to get the value by using the code bellow:
kafkaDF = lines.selectExpr("cast(key as string)","cast(value as string)")
query = kafkaDF \
.writeStream \
.outputMode("append") \
.format("console") \
.start()
query.awaitTermination()
And the result is:
Batch: 1
-------------------------------------------
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|"{\"card_id\":348702330256514,\"member_id\":37495066290,\"amount\":4380912,\"postcode\":96774,\"pos_id\":248063406800722,\"transaction_dt\":\"01-03-2018 08:24:29\"}\n" |
|"{\"card_id\":348702330256514,\"member_id\":37495066290,\"amount\":6703385,\"postcode\":84758,\"pos_id\":786562777140812,\"transaction_dt\":\"02-06-2018 04:15:03\"}\n" |
|"{\"card_id\":348702330256514,\"member_id\":37495066290,\"amount\":7454328,\"postcode\":93645,\"pos_id\":466952571393508,\"transaction_dt\":\"12-02-2018 09:56:42\"}\n" |
I don't know what was wrong here. Can someone please tell me why and how to resolve this issue?
I also try to change the data type in schema but no luck.
Turns out the data get from Kafka is not clean with special character "\", "\n". I cleaned the data with the following commands and it worked:
# Remove escaped quotes
kafkaDF = kafkaDF.withColumn("cleaned_value",regexp_replace(col("value"), r'\\\"', '"')).select("cleaned_value")
# Extract everything inside the curly braces
kafkaDF = kafkaDF.withColumn("cleaned_value2",regexp_extract(col("cleaned_value"), r'\{(.*)\}', 0)).select("cleaned_value2")