I am using below code to write spark Streaming dataframe into MQSQL DB .Below is the kafka topic JSON data format and MYSQL table schema.Column name and types are same to same. But I am unable to see records written in MYSQL table. Table is empty with zero records.Please suggest.
Kafka Topic Data Fomat
ssingh@RENLTP2N073:/mnt/d/confluent-6.0.0/bin$ ./kafka-console-consumer --topic sarvtopic --from-beginning --bootstrap-server localhost:9092
{"id":1,"firstname":"James ","middlename":"","lastname":"Smith","dob_year":2018,"dob_month":1,"gender":"M","salary":3000}
{"id":2,"firstname":"Michael ","middlename":"Rose","lastname":"","dob_year":2010,"dob_month":3,"gender":"M","salary":4000}
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("SSKafka") \
.getOrCreate()
dsraw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sarvtopic") \
.option("startingOffsets", "earliest") \
.load()
ds = dsraw.selectExpr("CAST(value AS STRING)")
dsraw.printSchema()
from pyspark.sql.types import StructField, StructType, StringType,LongType
from pyspark.sql.functions import *
custom_schema = StructType([
StructField("id", LongType(), True),
StructField("firstname", StringType(), True),
StructField("middlename", StringType(), True),
StructField("lastname", StringType(), True),
StructField("dob_year", StringType(), True),
StructField("dob_month", LongType(), True),
StructField("gender", StringType(), True),
StructField("salary", LongType(), True),
])
Person_details_df2 = ds\
.select(from_json(col("value"), custom_schema).alias("Person_details"))
Person_details_df3 = Person_details_df2.select("Person_details.*")
from pyspark.sql import DataFrameWriter
def foreach_batch_function(df, epoch_id):
Person_details_df3.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb', driver='com.mysql.jdbc.Driver', dbtable="sparkkafka", user='root',password='root$1234')
pass
query = Person_details_df3.writeStream.trigger(processingTime='20 seconds').outputMode("append").foreachBatch(foreach_batch_function).start()
query
Out[14]: <pyspark.sql.streaming.StreamingQuery at 0x1fb25503b08>
MYSQL table Schema:
create table sparkkafka(
id int,
firstname VARCHAR(40) NOT NULL,
middlename VARCHAR(40) NOT NULL,
lastname VARCHAR(40) NOT NULL,
dob_year int(40) NOT NULL,
dob_month int(40) NOT NULL,
gender VARCHAR(40) NOT NULL,
salary int(40) NOT NULL,
PRIMARY KEY (id)
);
I presume Person_details_df3 is your streaming dataframe and your spark version is above 2.4.0 version.
To use foreachBatch API write as below:
db_target_properties = {"user":"xxxx", "password":"yyyyy"}
def foreach_batch_function(df, epoch_id):
df.write.jdbc(url='jdbc:mysql://172.16.23.27:30038/securedb', table="sparkkafka", properties=db_target_properties)
pass
query = Person_details_df3.writeStream.outputMode("append").foreachBatch(foreach_batch_function).start()
query.awaitTermination()