I am trying to create a dataframe from the following list:
data = [(1,'abc','2020-08-20 10:00:00', 'I'),
(1,'abc','2020-08-20 10:01:00', 'U'),
(1,'abc','2020-08-21 10:02:00', 'U'),
(2,'pqr','2020-08-20 10:00:00', 'I'),
(2,'pqr','2020-08-20 10:01:00', 'U'),
(2,'pqr','2020-08-21 10:02:00', 'D'),
(3,'rst','2020-08-20 10:00:00', 'I'),
(3,'rst','2020-08-20 10:01:00', 'U'),
(3,'rst','2020-08-21 10:02:00', 'U')]
I am running the following code to create a dataframe from this:
from pyspark.sql.types import *
mySchema = StructType([StructField("key", IntegerType()),
StructField("name", StringType()),
StructField("ts", TimestampType()),
StructField("cdc_flag", StringType())])
df_raw = spark.createDataFrame(data, mySchema)
And I am getting the following error:
TypeError: field ts: TimestampType can not accept object '2020-08-20 10:00:00' in type <class 'str'>
I tried changing the data type to DateType also. But getting the same error.
Please note that, I am trying to understand if this way of implementing the schema is possible or not. I think I can use withColumn and cast this ts column and drop the original column to handle this.
The error is justified because TimestampType
expects a Timestamp
type and not a str
. This can be derived by using java.sql.Timestamp
in Scala and datetime
in Python.
You just need to define your data
like:
from datetime import datetime
data = [(1,'abc',datetime.strptime('2020-08-20 10:00:00', '%Y-%m-%d %H:%M:%S'), 'I'),
(1,'abc',datetime.strptime('2020-08-20 10:01:00', '%Y-%m-%d %H:%M:%S'), 'U'),
(1,'abc',datetime.strptime('2020-08-21 10:02:00', '%Y-%m-%d %H:%M:%S'), 'U'),
(2,'pqr',datetime.strptime('2020-08-20 10:00:00', '%Y-%m-%d %H:%M:%S'), 'I'),
(2,'pqr',datetime.strptime('2020-08-20 10:01:00', '%Y-%m-%d %H:%M:%S'), 'U'),
(2,'pqr',datetime.strptime('2020-08-21 10:02:00', '%Y-%m-%d %H:%M:%S'), 'D'),
(3,'rst',datetime.strptime('2020-08-20 10:00:00', '%Y-%m-%d %H:%M:%S'), 'I'),
(3,'rst',datetime.strptime('2020-08-20 10:01:00', '%Y-%m-%d %H:%M:%S'), 'U'),
(3,'rst',datetime.strptime('2020-08-21 10:02:00', '%Y-%m-%d %H:%M:%S'), 'U')]
spark.createDataFrame(data, mySchema).show()
#+---+----+-------------------+--------+
#|key|name| ts|cdc_flag|
#+---+----+-------------------+--------+
#| 1| abc|2020-08-20 10:00:00| I|
#| 1| abc|2020-08-20 10:01:00| U|
#| 1| abc|2020-08-21 10:02:00| U|
#| 2| pqr|2020-08-20 10:00:00| I|
#| 2| pqr|2020-08-20 10:01:00| U|
#| 2| pqr|2020-08-21 10:02:00| D|
#| 3| rst|2020-08-20 10:00:00| I|
#| 3| rst|2020-08-20 10:01:00| U|
#| 3| rst|2020-08-21 10:02:00| U|
#+---+----+-------------------+--------+
spark.createDataFrame(data, mySchema).printSchema()
#root
# |-- key: integer (nullable = true)
# |-- name: string (nullable = true)
# |-- ts: timestamp (nullable = true)
# |-- cdc_flag: string (nullable = true)