I have two dataset, dataset 1 is below
LineItem.organizationId|^|LineItem.lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
Japan|^|1507101869432|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|null|^|null|^|ACAE|^|false|^|null|^|null|^|null|^|null|^|false|^|null|^|null|^|null|^|null|^|505126|^|505074|^|null|^|null|^|null|^|null|^|null|^|null|^|null|^|3018759|^|null|^|I|!|
And this is how i load data with auto discover schema
val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)
Dataset 2:
4295867927|^|860|^|CUS|^|External Revenue|^||^||^|REXR|^|False|^||^||^||^||^|False|^|False|^|CUS_REXR|^||^||^|505074|^|505074|^|505074|^|505074|^|505074|^||^|505074|^|True|^||^|3015250|^||^|I|!|
I create a data frame out of both and then do join. Finally i write output of both data frame in the csv file.
Here is the code to write into the csv file.
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(dfMainOutputFinal.col_*, "null", "")).show()
dfMainOutputFinal.write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("nullValue", "")
.option("codec", "gzip")
.save("s3://trfsdisu/SPARK/FinancialLineItem/output")
All are working fine except .option("nullValue", "")
. I'm not able to replace null with blank value.
In my output i still see the null values.
I have tried this also but got same result.
val newDf = df.na.fill("e",Seq("blank"))
I'm suspecting that the dataframe does not actually contain nulls, but that they are strings with the letters "null". If this is the case, then you can simply replace all instances of "null" with "". After this you can use .option("nullValue", "")
as before. To replace string in columns, it's possible to use regexp_replace(column, "string to replace", "string to replace with")
. S small example:
val df = Seq("a", "null", "c", "b").toDF("col1")
val df2 = df.withColumn("col1", regexp_replace(col("col1"), "null", ""))
Here the "null" was replaced with "" as wanted, the final dataframe looks like this:
+----+
|col1|
+----+
| a|
| |
| c|
| b|
+----+