pysparkspark-structured-streaming

Using pyspark structured streaming to parse Kafka but getting null


I try to parse Kafka using the code bellow:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


spark = SparkSession  \
    .builder  \
    .appName("StructuredSocketRead")  \
    .getOrCreate()
spark.sparkContext.setLogLevel('ERROR') 
    
lines = spark  \
    .readStream  \
    .format("kafka")  \
    .option("kafka.bootstrap.servers","***.compute-1.amazonaws.com:9092")  \
    .option("subscribe","kafka-new-topic")  \
    .load()

# Transform to Output DataFrame
schema = StructType([
        StructField("card_id", LongType()),
        StructField("member_id", LongType()),
        StructField("amount", LongType()),
        StructField("pos_id", LongType()),
        StructField("postcode", LongType()),
        StructField("transaction_dt", StringType()),
    ])

value_df = df.select(from_json(col("value").cast("string"),schema).alias("value"))
exploded_df = value_df.select("value.*")

query = exploded_df  \
    .writeStream  \
    .outputMode("append")  \
    .format("console")  \
    .start()
    
query.awaitTermination()

But the outcome is all null values.

I try to get the value by using the code bellow:

kafkaDF = lines.selectExpr("cast(key as string)","cast(value as string)")

query = kafkaDF   \
    .writeStream  \
    .outputMode("append")  \
    .format("console")  \
    .start()
    
query.awaitTermination()

And the result is:

Batch: 1
-------------------------------------------
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                     |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|"{\"card_id\":348702330256514,\"member_id\":37495066290,\"amount\":4380912,\"postcode\":96774,\"pos_id\":248063406800722,\"transaction_dt\":\"01-03-2018 08:24:29\"}\n"   |
|"{\"card_id\":348702330256514,\"member_id\":37495066290,\"amount\":6703385,\"postcode\":84758,\"pos_id\":786562777140812,\"transaction_dt\":\"02-06-2018 04:15:03\"}\n"   |
|"{\"card_id\":348702330256514,\"member_id\":37495066290,\"amount\":7454328,\"postcode\":93645,\"pos_id\":466952571393508,\"transaction_dt\":\"12-02-2018 09:56:42\"}\n"   |

I don't know what was wrong here. Can someone please tell me why and how to resolve this issue?

I also try to change the data type in schema but no luck.


Solution

  • Turns out the data get from Kafka is not clean with special character "\", "\n". I cleaned the data with the following commands and it worked:

    # Remove escaped quotes
    kafkaDF = kafkaDF.withColumn("cleaned_value",regexp_replace(col("value"), r'\\\"', '"')).select("cleaned_value")
    
    # Extract everything inside the curly braces
    kafkaDF = kafkaDF.withColumn("cleaned_value2",regexp_extract(col("cleaned_value"), r'\{(.*)\}', 0)).select("cleaned_value2")