I am trying to generate a complex xml from my JavaRDd< Book > and JavaRdd< Reviews > How can i club these two to generate below xml ?
<xml>
<library>
<books>
<book>
<author>test</author>
</book>
</books>
<reviews>
<review>
<id>1</id>
</review>
</reviews>
</library>
As you see, There is a Parent root Library which has child Books and Reviews.
Below is how i generate Book and Review Dataframe
DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);
I know to generate the xml and my doubt is particularly for having Library rootTag and having Books and Reviews as its Child.
I am using Java. but if you could point me right you can write Scala or Python example.
It may not be the most efficient way of using Spark to do this, but code below works as you want. (Scala though, because my Java is a bit rusty)
import java.io.{File, PrintWriter}
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.io.Source
val spark = SparkSession.builder()
.master("local[3]")
.appName("test")
.config("spark.driver.allowMultipleContexts", "true")
.getOrCreate()
import spark.implicits._
/* Some code to test */
case class Book(author: String)
case class Review(id: Int)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
val bookFrame = List(
Book("book1"),
Book("book2"),
Book("book3"),
Book("book4"),
Book("book5")
).toDS()
val reviewFrame = List(
Review(1),
Review(2),
Review(3),
Review(4)
).toDS()
/* End test code **/
// Using databricks api save as 1 big xml file (instead of many parts, using repartition)
// You don't have to use repartition, but each part-xxx file will wrap contents in the root tag, making it harder to concat later.
// And TBH it really doesn't matter that Spark is doing the merging here, since the combining of data is already on the master node only
bookFrame
.repartition(1)
.write
.format("com.databricks.spark.xml")
.option("rootTag", "books")
.option("rowTag", "book")
.mode(SaveMode.Overwrite)
.save("/tmp/books/") // store to temp location
// Same for reviews
reviewFrame
.repartition(1)
.write
.format("com.databricks.spark.xml")
.option("rootTag", "reviews")
.option("rowTag", "review")
.mode(SaveMode.Overwrite)
.save("/tmp/review") // store to temp location
def concatFiles(path:String):List[String] = {
new File(path)
.listFiles
.filter(
_.getName.startsWith("part") // get all part-xxx files only (should be only 1)
)
.flatMap(file => Source.fromFile(file.getAbsolutePath).getLines())
.map(" " + _) // prefix with spaces to allow for new root level xml
.toList
}
val lines = List("<xml>","<library>") ++ concatFiles("/tmp/books/") ++ concatFiles("/tmp/review/") ++ List("</library>")
new PrintWriter("/tmp/target.xml"){
write(lines.mkString("\n"))
close
}
Result:
<xml>
<library>
<books>
<book>
<author>book1</author>
</book>
<book>
<author>book2</author>
</book>
<book>
<author>book3</author>
</book>
<book>
<author>book4</author>
</book>
<book>
<author>book5</author>
</book>
</books>
<reviews>
<review>
<id>1</id>
</review>
<review>
<id>2</id>
</review>
<review>
<id>3</id>
</review>
<review>
<id>4</id>
</review>
</reviews>
</library>
Another approach could be (to Only use spark) to create a new object
case class BookReview(books: List[Book], reviews: List[Review])
and store that to xml after .collect()
all books & reviews into a single list.
Although then I wouldn't use spark to just process a single record (BookReview) but use a normal xml library (like xstream or so) to store this object.
Update
The List concat methods are not memory friendly, so using streams and buffers this could be a solution instead of the concatFiles
method.
def outputConcatFiles(path: String, outputFile: File): Unit = {
new File(path)
.listFiles
.filter(
_.getName.startsWith("part") // get all part-xxx files only (should be only 1)
)
.foreach(file => {
val writer = new BufferedOutputStream(new FileOutputStream(outputFile, true))
val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))
try {
Stream.continually(reader.readLine())
.takeWhile(_ != null)
.foreach(line =>
writer.write(s" $line\n".getBytes)
)
} catch {
case e: Exception => println(e.getMessage)
} finally {
writer.close()
reader.close()
}
})
}
val outputFile = new File("/tmp/target2.xml")
new PrintWriter(outputFile) { write("<xml>\n<library>\n"); close}
outputConcatFiles("/tmp/books/", outputFile)
outputConcatFiles("/tmp/review/", outputFile)
new PrintWriter(new FileOutputStream(outputFile, true)) { append("</library>"); close}