pythondataframeapache-spark

Why does my python function not properly cast the dates despite recognizing the format?


I am attempting to dynamically cast various date formats that come across as a string column but are actually dates. I've gotten pretty far, and this code can correctly identify the dates from the string, but the fields 'Date2' and 'Date3' always return as null values. I can't understand why that is, or how to correct it so that it returns the converted date values.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, max
from pyspark.sql.types import IntegerType, FloatType, TimestampType, StringType, DateType
from datetime import datetime, date

# Define the function to convert values
def convert_value(value):
    try:
        return int(value)
    except ValueError:
        pass
    
    try:
        return float(value)
    except ValueError:
        pass
    
    datetime_formats = [
        '%m/%d/%Y %H:%M:%S', '%Y-%m-%d %H:%M:%S',
        '%Y-%m-%dT%H:%M:%S', '%Y-%m-%d %H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S.%f'
    ]
    for fmt in datetime_formats:
        try:
            return datetime.strptime(value, fmt)
        except ValueError:
            pass
    
    date_formats = [
        '%Y-%m-%d', '%d-%m-%Y', '%m/%d/%Y', '%d/%m/%Y', '%Y/%m/%d', 
        '%b %d, %Y', '%d %b %Y'
    ]
    for fmt in date_formats:
        try:
            return datetime.strptime(value, fmt).date()
        except ValueError:
            pass
    
    return value

# Function to infer data type for each column
def infer_column_type(df, column):
    min_value = df.select(min(col(column))).collect()[0][0]
    max_value = df.select(max(col(column))).collect()[0][0]
    
    for value in [min_value, max_value]:
        if value is not None:
            converted_value = convert_value(value)
            print(f"Column: {column}, Value: {value}, Converted: {converted_value}")  # Debug print
            if isinstance(converted_value, int):
                return IntegerType()
            elif isinstance(converted_value, float):
                return FloatType()
            elif isinstance(converted_value, datetime):
                return TimestampType()
            elif isinstance(converted_value, date):
                return DateType()
    return StringType()

# Example data with different date formats in separate columns
data = [
    ('1', '2021-01-01', '01-02-2021', '1/2/2021', '2021-01-01T12:34:56', '1.1', 1),
    ('2', '2021-02-01', '02-03-2021', '2/3/2021', '2021-02-01T13:45:56', '2.2', 2),
    ('3', '2021-03-01', '03-04-2021', '3/4/2021', '2021-03-01T14:56:56', '3.3', 3)
]

# Create DataFrame
spark = SparkSession.builder.appName("example").getOrCreate()
columns = ['A', 'Date1', 'Date2', 'Date3', 'Date4', 'C', 'D']
df = spark.createDataFrame(data, columns)

# Apply inferred data types to columns
for column in df.columns:
    inferred_type = infer_column_type(df, column)
    df = df.withColumn(column, df[column].cast(inferred_type))

# Show the result
df.show()
df.dtypes

Solution

  • The issue is that while your convert_value function correctly identifies the date format using Python’s datetime.strptime...PySpark’s cast(DateType()) doesn't support this format unless it matches Spark's expected patterns (usually 'yyyy-MM-dd').

    As a result, Date2 and Date3 return null because their formats (e.g. '01-02-2021', '1/2/2021') aren’t parsed by Spark during casting.

    To fix this, don’t use .cast(DateType()). Instead, use to_date() with the specific format for each column. So:

    from pyspark.sql.functions import to_date
    df = df.withColumn("Date1", to_date("Date1", "yyyy-MM-dd"))
    df = df.withColumn("Date2", to_date("Date2", "dd-MM-yyyy"))
    df = df.withColumn("Date3", to_date("Date3", "M/d/yyyy"))