I'm trying to write the data from single source to multiple DataSinks (Mongo and Postgres DBs). Incoming data
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
Dataset<Row> personalDetails = df.selectExpr("name", "id", "age");
personalDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "PI").save();
}).start();
Dataset<Row> salDetails = df.selectExpr("basicSal", "bonus");
salDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "SAL").save();
}).start();
Problem with this is, I can see the Spark is opening the two Streams and reading the same events twice. Is it possible to read once and apply different transformations and write to different collection?
You should cache the DataFrame. See here:
Write to multiple locations - If you want to write the output of a streaming query to multiple locations, then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it.
And their example:
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
You can put all of your code in one foreachBatch
and write the dataframe to your 2 sinks. You can do that by caching the dataframe, and performing the selectExpr
on this cached dataframe, and saving it.
As a side note - please notice that in any case, if you wanted "all or nothing"(i.e. you don't want a situation where you wrote to mongo and not to postgres), you must use only one foreachBatch
, because otherwise (if you have 2 foreachBatch
, as in your question) you have 2 independent batches - one might fail when the other one succeeded, for the same data.