I am creating a data after loading many XML files .
Each xml file has one unique field fun:DataPartitionId
I am creating many rows from one XML files .
Now I want to add this fun:DataPartitionId
for each row in the resulting rows from the XML.
For example suppose 1st XML has 100 rows then each 100 rows will have same fun:DataPartitionId
field .
So fun:DataPartitionId
is as a header filed in each XML.
This is what I am doing .
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val getDataPartition = udf { (DataPartition: String) =>
if (DataPartition=="1") "SelfSourcedPublic"
else if (DataPartition=="2") "Japan"
else if (DataPartition=="3") "SelfSourcedPrivate"
else "ThirdPartyPrivate"
}
val getFFActionParent = udf { (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "I|!|"
else "D|!|"
}
val getFFActionChild = udf { (FFAction: String) =>
if (FFAction=="Insert") "I|!|"
else if (FFAction=="Overwrite") "O|!|"
else "D|!|"
}
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfDataPartition=getDataPartition(dfContentEnvelope("env:Header.fun:DataPartitionId"))
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
val df =dfContentItem.withColumn("DataPartition",dfDataPartition)
df.show()
When you read your xml
file using
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
DataParitionId
column is read as Long
fun:DataPartitionId: long (nullable = true)
so you should change the udf
function as
val getDataPartition = udf { (DataPartition: Long) =>
if (DataPartition== 1) "SelfSourcedPublic"
else if (DataPartition== 2) "Japan"
else if (DataPartition== 3) "SelfSourcedPrivate"
else "ThirdPartyPrivate"
}
If possible you should be using when function instead of udf function to boost the processing speed and memory usage
Now I want to add this fun:DataPartitionId for each row in the resulting rows from the xml .
Your mistake is that you forgot to select
that particular column, so the following code
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select("column1.*")
should be
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")
Then you can apply the udf
function
val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
So working code as a whole should be
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
val getDataPartition = udf { (DataPartition: Long) =>
if (DataPartition=="1") "SelfSourcedPublic"
else if (DataPartition=="2") "Japan"
else if (DataPartition=="3") "SelfSourcedPrivate"
else "ThirdPartyPrivate"
}
val dfContentEnvelope = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "env:ContentEnvelope").load("s3://trfsmallfffile/XML")
val dfContentItem = dfContentEnvelope.withColumn("column1", explode(dfContentEnvelope("env:Body.env:ContentItem"))).select($"env:Header.fun:DataPartitionId".as("DataPartitionId"),$"column1.*")
val df = dfContentItem.select(getDataPartition($"DataPartitionId"), $"env:Data.sr:Source.*", $"_action".as("FFAction|!|"))
df.show(false)
And you can proceed with the rest of the code.