I want to read kafka topic then write it to kudu table by spark streaming.
// sessions and contexts
val conf = new SparkConf().setMaster("local[2]").setAppName("TestMain")
val sparkSession = SparkSession.builder().config(conf).getOrCreate()
val sparkContext = sparkSession.sparkContext
val kuduContext = new KuduContext("...", sparkContext);
// structure
val schema: StructType = StructType(
StructField("userNo", IntegerType, true) ::
StructField("bandNo", IntegerType, false) ::
StructField("ipv4", StringType, false) :: Nil);
// kudu - prepare table
kuduContext.deleteTable("test_table");
kuduContext.createTable("test_table", schema, Seq("userNo"), new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("userNo").asJava, 3))
// get stream from kafka
val parsed = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "...")
.option("startingOffsets", "latest")
.option("subscribe", "feed_api_band_get_popular_post_list")
.load()
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
// write it to kudu
kuduContext.insertRows(parsed.toDF(), "test_table");
Now it complains
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
It seems I change my code to use traditional KafkaUtils.createDirectStream
KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
).foreachRDD(rdd => {
rdd.foreach(record => {
// write to kudu.............
println(record.value());
})
});
ssc.start();
ssc.awaitTermination();
So, which one is right approach? or is there any way to make it run from first approach?
Spark version is 2.2.0.
Both the approaches seem right. First one uses the Spark Structured streaming way of doing things wherein the data is appended on a tabular basis. Second method does it via traditional DStream way of doing things