I'm trying to read a csv file which has timestamps till nano seconds. sample content of file TestTimestamp.csv-
spark- 2.4.0, scala - 2.11.11
/**
* TestTimestamp.csv -
* 101,2019-SEP-23 11.42.35.456789123 AM
*
*/
Tried to read it using timestampFormat = "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa"
val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true)))
val data = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
//.option("nullValue", "")
.option("dateFormat", "yyyy-MMM-dd")
.option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
.schema(dataSchema)
.load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimeStamp.csv")
data.select('Created_TS).show
Output which I get is completely wrong date-time. 23rd Sept got changed to 28th September
+--------------------+
| Created_TS|
+--------------------+
|2019-09-28 18:35:...|
+--------------------+
Even if I have Hours in 24 Hour formats like - "2019-SEP-23 16.42.35.456789123" and I try to use only first few digits of second fractions by giving timestampFormat = "yyyy-MMM-dd HH.mm.ss.SSS"
similar incorrect result-
val data2 = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
//.option("nullValue", "")
.option("dateFormat", "yyyy-MMM-dd")
.option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSS")
.schema(dataSchema)
.load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimeStamp.csv")
data2.select('Created_TS).show
+--------------------+
| Created_TS|
+--------------------+
|2019-09-28 23:35:...|
+--------------------+
is there any way to parse such timestamp strings while creating dataframe using csv reader ?
Here is the solution inspired by werner's answer about using udfs..-
Input csv -
101,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM
Original Schema with TimestampType columns
val orig_schema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true), StructField("Updated_TS", TimestampType, true), StructField("Modified_TS", TimestampType, true)))
Convert all TimestampType to StringType
val dataSchema = StructType(orig_schema.map(x =>
{
x.dataType match {
case TimestampType => StructField(x.name, StringType, x.nullable)
case _ => x
}
}))
toDate function for convert String to Timstamp
//TODO parameterize string formats
def toDate(date: String): java.sql.Timestamp = {
val formatter = new DateTimeFormatterBuilder()
.parseCaseInsensitive()
.appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
Timestamp.valueOf(LocalDateTime.parse(date, formatter))
}
// register toDate as udf
val to_timestamp = spark.sqlContext.udf.register("to_timestamp", toDate _)
Create Column Expression to select from raw Dataframe
// Array of Column Name & Types
val nameType: Array[(String, DataType)] = orig_schema.fields.map(f => (f.name, f.dataType))
// Create Column Expression to select from raw Dataframe
val selectExpr = nameType.map(f => {
f._2 match {
case TimestampType => expr(s"CASE WHEN ${f._1} is NULL THEN NULL ELSE to_timestamp(${f._1}) END AS ${f._1}")
case _ => expr(s"${f._1}")
}
})
Read as StringType , Use column selector expression which uses udf to convert string to Timestamp
val data = spark.read.format("csv")
.option("header", "false")
.option("inferSchema", "false")
.option("treatEmptyValuesAsNulls", "true")
//.option("nullValue", "")
.option("dateFormat", "yyyy-MMM-dd")
.option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
.schema(dataSchema)
.load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimestamp_new.csv").select(selectExpr: _*)
data.show
Here's desired output..so now I don't have to worry about number of columns and creating expressions with udf manually
+-----+--------------------+--------------------+--------------------+
| ID| Created_TS| Updated_TS| Modified_TS|
+-----+--------------------+--------------------+--------------------+
|101.0|2019-09-23 11:42:...|2019-09-23 11:42:...|2019-09-23 11:42:...|
+-----+--------------------+--------------------+--------------------+