pysparkapache-spark-sql

Error: TimestampType can not accept object while creating a Spark dataframe from a list


I am trying to create a dataframe from the following list:

data = [(1,'abc','2020-08-20 10:00:00', 'I'),
(1,'abc','2020-08-20 10:01:00', 'U'),
(1,'abc','2020-08-21 10:02:00', 'U'),
(2,'pqr','2020-08-20 10:00:00', 'I'),
(2,'pqr','2020-08-20 10:01:00', 'U'),
(2,'pqr','2020-08-21 10:02:00', 'D'),
(3,'rst','2020-08-20 10:00:00', 'I'),
(3,'rst','2020-08-20 10:01:00', 'U'),
(3,'rst','2020-08-21 10:02:00', 'U')]

I am running the following code to create a dataframe from this:

from pyspark.sql.types import *
mySchema = StructType([StructField("key", IntegerType()),
                      StructField("name", StringType()),
                      StructField("ts", TimestampType()),
                      StructField("cdc_flag", StringType())])

df_raw = spark.createDataFrame(data, mySchema)

And I am getting the following error:

TypeError: field ts: TimestampType can not accept object '2020-08-20 10:00:00' in type <class 'str'>

I tried changing the data type to DateType also. But getting the same error.

Please note that, I am trying to understand if this way of implementing the schema is possible or not. I think I can use withColumn and cast this ts column and drop the original column to handle this.


Solution

  • The error is justified because TimestampType expects a Timestamp type and not a str. This can be derived by using java.sql.Timestamp in Scala and datetime in Python.

    You just need to define your data like:

    from datetime import datetime
    
    data = [(1,'abc',datetime.strptime('2020-08-20 10:00:00', '%Y-%m-%d %H:%M:%S'), 'I'),
    (1,'abc',datetime.strptime('2020-08-20 10:01:00', '%Y-%m-%d %H:%M:%S'), 'U'),
    (1,'abc',datetime.strptime('2020-08-21 10:02:00', '%Y-%m-%d %H:%M:%S'), 'U'),
    (2,'pqr',datetime.strptime('2020-08-20 10:00:00', '%Y-%m-%d %H:%M:%S'), 'I'),
    (2,'pqr',datetime.strptime('2020-08-20 10:01:00', '%Y-%m-%d %H:%M:%S'), 'U'),
    (2,'pqr',datetime.strptime('2020-08-21 10:02:00', '%Y-%m-%d %H:%M:%S'), 'D'),
    (3,'rst',datetime.strptime('2020-08-20 10:00:00', '%Y-%m-%d %H:%M:%S'), 'I'),
    (3,'rst',datetime.strptime('2020-08-20 10:01:00', '%Y-%m-%d %H:%M:%S'), 'U'),
    (3,'rst',datetime.strptime('2020-08-21 10:02:00', '%Y-%m-%d %H:%M:%S'), 'U')]
    
    
    spark.createDataFrame(data, mySchema).show()
    #+---+----+-------------------+--------+
    #|key|name|                 ts|cdc_flag|
    #+---+----+-------------------+--------+
    #|  1| abc|2020-08-20 10:00:00|       I|
    #|  1| abc|2020-08-20 10:01:00|       U|
    #|  1| abc|2020-08-21 10:02:00|       U|
    #|  2| pqr|2020-08-20 10:00:00|       I|
    #|  2| pqr|2020-08-20 10:01:00|       U|
    #|  2| pqr|2020-08-21 10:02:00|       D|
    #|  3| rst|2020-08-20 10:00:00|       I|
    #|  3| rst|2020-08-20 10:01:00|       U|
    #|  3| rst|2020-08-21 10:02:00|       U|
    #+---+----+-------------------+--------+
    
    
    spark.createDataFrame(data, mySchema).printSchema()
    #root
    # |-- key: integer (nullable = true)
    # |-- name: string (nullable = true)
    # |-- ts: timestamp (nullable = true)
    # |-- cdc_flag: string (nullable = true)