apache-sparkpysparkspark-streamingspark-structured-streamingspark-streaming-kafka

Issue in writing records in into MYSQL from Spark Structured Streaming Dataframe


I am using below code to write spark Streaming dataframe into MQSQL DB .Below is the kafka topic JSON data format and MYSQL table schema.Column name and types are same to same. But I am unable to see records written in MYSQL table. Table is empty with zero records.Please suggest.

Kafka Topic Data Fomat
ssingh@RENLTP2N073:/mnt/d/confluent-6.0.0/bin$ ./kafka-console-consumer --topic sarvtopic --from-beginning --bootstrap-server localhost:9092 {"id":1,"firstname":"James ","middlename":"","lastname":"Smith","dob_year":2018,"dob_month":1,"gender":"M","salary":3000} {"id":2,"firstname":"Michael ","middlename":"Rose","lastname":"","dob_year":2010,"dob_month":3,"gender":"M","salary":4000}

import pyspark

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("SSKafka") \
    .getOrCreate()
    dsraw = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "sarvtopic") \
  .option("startingOffsets", "earliest") \
  .load()

ds = dsraw.selectExpr("CAST(value AS STRING)")
dsraw.printSchema()

from pyspark.sql.types import StructField, StructType, StringType,LongType
from pyspark.sql.functions import *

custom_schema = StructType([
    StructField("id", LongType(), True),
    StructField("firstname", StringType(), True),
    StructField("middlename", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("dob_year", StringType(), True),
    StructField("dob_month", LongType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", LongType(), True),
])
      
Person_details_df2 = ds\
        .select(from_json(col("value"), custom_schema).alias("Person_details"))
        
Person_details_df3 = Person_details_df2.select("Person_details.*")


from pyspark.sql import DataFrameWriter

def foreach_batch_function(df, epoch_id):
    Person_details_df3.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb', driver='com.mysql.jdbc.Driver', dbtable="sparkkafka",  user='root',password='root$1234')
    pass

query = Person_details_df3.writeStream.trigger(processingTime='20 seconds').outputMode("append").foreachBatch(foreach_batch_function).start()

query
Out[14]: <pyspark.sql.streaming.StreamingQuery at 0x1fb25503b08>

MYSQL table Schema:

create table sparkkafka(
   id int,
   firstname VARCHAR(40) NOT NULL,
   middlename VARCHAR(40) NOT NULL,
   lastname VARCHAR(40) NOT NULL,
   dob_year int(40) NOT NULL,
   dob_month int(40) NOT NULL,
   gender VARCHAR(40) NOT NULL,
   salary int(40) NOT NULL,   
   PRIMARY KEY (id)
);

Solution

  • I presume Person_details_df3 is your streaming dataframe and your spark version is above 2.4.0 version.

    To use foreachBatch API write as below:

    db_target_properties = {"user":"xxxx", "password":"yyyyy"}
    
    def foreach_batch_function(df, epoch_id):
        df.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb',  table="sparkkafka",  properties=db_target_properties)
        pass
    
    query = Person_details_df3.writeStream.outputMode("append").foreachBatch(foreach_batch_function).start()
    
    query.awaitTermination()