I'm reading data from Kafka using spark streaming and passing to py file for prediction. It returns predictions as well as the original data. It's saving the original data with its predictions to file however it is creating a single file for each RDD. I need a single file consisting of all the data collected till the I stop the program to be saved to a single file.
I have tried writeStream it does not create even a single file. I have tried to save it to parquet using append but it creates multiple files that is 1 for each RDD. I tried to write with append mode still multiple files as output. The below code creates a folder output.csv and enters all the files into it.
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder()
.appName("consumer")
.master("local[*]")
.getOrCreate()
val scc = new StreamingContext(ss.sparkContext, Seconds(2))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer"->
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer">
"org.apache.kafka.common.serialization.StringDeserializer",
"group.id"-> "group5" // clients can take
)
mappedData.foreachRDD(
x =>
x.map(y =>
ss.sparkContext.makeRDD(List(y)).pipe(pyPath).toDF().repartition(1)
.write.format("csv").mode("append").option("truncate","false")
.save("output.csv")
)
)
scc.start()
scc.awaitTermination()
I need to get just 1 file with all the statements one by one collected while streaming.
Any help will be appreciated, thank you in anticipation.
You cannot modify any file in hdfs once it has been written. If you wish to write the file in realtime(append the blocks of data from streaming job in the same file every 2 seconds), its simply isn't allowed as hdfs files are immutable. I suggest you try to write a read logic that reads from multiple files, if possible.
However, if you must read from a single file, I suggest either one of the two approaches, after you have written output to a single csv/parquet folder, with "Append" SaveMode(which will create part files for each block you write every 2 seconds).
You can write a simple logic in spark to read this folder with multiple files and write it to another hdfs location as a single file using reparation(1) or coalesce(1), and read the data from that location. See below:
spark.read.csv("oldLocation").coalesce(1).write.csv("newLocation")