apache-sparkspark-csv

Parse Micro/Nano Seconds timestamp in spark-csv Dataframe reader : Inconsistent results


I'm trying to read a csv file which has timestamps till nano seconds. sample content of file TestTimestamp.csv-

spark- 2.4.0, scala - 2.11.11

   /**
     * TestTimestamp.csv -
     * 101,2019-SEP-23 11.42.35.456789123 AM
     *
     */

Tried to read it using timestampFormat = "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa"

val dataSchema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true)))

val data = spark.read.format("csv")
      .option("header", "false")
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      //.option("nullValue", "")
      .option("dateFormat", "yyyy-MMM-dd")
      .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
      .schema(dataSchema)
      .load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimeStamp.csv")

    data.select('Created_TS).show

Output which I get is completely wrong date-time. 23rd Sept got changed to 28th September

+--------------------+
|          Created_TS|
+--------------------+
|2019-09-28 18:35:...|
+--------------------+

Even if I have Hours in 24 Hour formats like - "2019-SEP-23 16.42.35.456789123" and I try to use only first few digits of second fractions by giving timestampFormat = "yyyy-MMM-dd HH.mm.ss.SSS"

similar incorrect result-

val data2 = spark.read.format("csv")
      .option("header", "false")
      .option("inferSchema", "false")
      .option("treatEmptyValuesAsNulls", "true")
      //.option("nullValue", "")
      .option("dateFormat", "yyyy-MMM-dd")
      .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSS")
      .schema(dataSchema)
      .load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimeStamp.csv")

    data2.select('Created_TS).show

+--------------------+
|          Created_TS|
+--------------------+
|2019-09-28 23:35:...|
+--------------------+

is there any way to parse such timestamp strings while creating dataframe using csv reader ?


Solution

  • Here is the solution inspired by werner's answer about using udfs..-

    Input csv -

    101,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM,2019-SEP-23 11.42.35.456789123 AM
    

    Original Schema with TimestampType columns

    val orig_schema = StructType(Array(StructField("ID", DoubleType, true), StructField("Created_TS", TimestampType, true), StructField("Updated_TS", TimestampType, true), StructField("Modified_TS", TimestampType, true)))
    

    Convert all TimestampType to StringType

    val dataSchema = StructType(orig_schema.map(x =>
          {
            x.dataType match {
              case TimestampType => StructField(x.name, StringType, x.nullable)
              case _             => x
            }
    
          }))
    

    toDate function for convert String to Timstamp

    //TODO parameterize string formats
    
        def toDate(date: String): java.sql.Timestamp = {
          val formatter = new DateTimeFormatterBuilder()
            .parseCaseInsensitive()
            .appendPattern("yyyy-MMM-dd hh.mm.ss.SSSSSSSSS a").toFormatter()
          Timestamp.valueOf(LocalDateTime.parse(date, formatter))
        }
    
    // register toDate as udf
    val to_timestamp = spark.sqlContext.udf.register("to_timestamp", toDate _)
    

    Create Column Expression to select from raw Dataframe

    // Array of Column Name & Types
        val nameType: Array[(String, DataType)] = orig_schema.fields.map(f => (f.name, f.dataType))
    
    // Create Column Expression to select from raw Dataframe
    val selectExpr = nameType.map(f => {
          f._2 match {
            case TimestampType => expr(s"CASE WHEN ${f._1} is NULL THEN NULL ELSE to_timestamp(${f._1}) END AS ${f._1}")
            case _             => expr(s"${f._1}")
          }
        })
    

    Read as StringType , Use column selector expression which uses udf to convert string to Timestamp

    val data = spark.read.format("csv")
          .option("header", "false")
          .option("inferSchema", "false")
          .option("treatEmptyValuesAsNulls", "true")
          //.option("nullValue", "")
          .option("dateFormat", "yyyy-MMM-dd")
          .option("timestampFormat", "yyyy-MMM-dd hh.mm.ss.SSSSSSSSS aaa")
          .schema(dataSchema)
    .load("C:\\TestData\\Raw\\TetraPak\\Shipments\\TestTimestamp_new.csv").select(selectExpr: _*)
    
    data.show
    

    Here's desired output..so now I don't have to worry about number of columns and creating expressions with udf manually

    +-----+--------------------+--------------------+--------------------+
    |   ID|          Created_TS|          Updated_TS|         Modified_TS|
    +-----+--------------------+--------------------+--------------------+
    |101.0|2019-09-23 11:42:...|2019-09-23 11:42:...|2019-09-23 11:42:...|
    +-----+--------------------+--------------------+--------------------+