pythonpysparkdatabricksazure-databricksdatabricks-connect

Different behaviour of databricks-connect vs. pyspark when creating DataFrames


I've got a project where I develop an ETL pipeline in VS Code using the Databricks extension which is based on the Python package "databricks-connect". To run unit tests on functions I use pytest as well as the VS Code extension "Testing". In gitlab I want to run those unit tests as well when specific merge requests are created to check whether all tests are fine. Within gitlab my tests run in a Docker container where I set up a local pyspark environment that does not include databricks-connect but pyspark (both packages cannot coexist).

That's how I create my spark session that is used to run the tests in both "worlds" (VS Code + databricks-connect vs. Docker + pyspark):

from pyspark.sql import SparkSession

def create_spark_session() -> SparkSession:
    # Within VS Code where databricks connect is installed
    # the try block "works" whereas within my Docker container
    # I end up in the except block.
    try:
        from databricks.connect import DatabricksSession

        DEV_CONFIG = parse_config_file()
        return DatabricksSession.builder.remote(
            host=DEV_CONFIG.host,
            token=DEV_CONFIG.token,
            cluster_id=DEV_CONFIG.cluster_id,
        ).getOrCreate()
        # from databricks.sdk.runtime import spark
        # return spark
    except ImportError:
        return SparkSession.builder.master("local[4]").config("spark.databricks.clusterUsageTags.clusterAllTags", "{}").getOrCreate()

The issue is that there seems to be a difference between databricks-connect and databricks when it comes to creating DataFrames.

For example the code

df_test = spark_session.createDataFrame(
        [
            "123",  # string
            123456,  # integer
            12345678912345,  # integer with 14 digits
        ],
        ["my_str_column"],
    )

works perfectly using VS Code and databricks-connect but within my Docker container where pyspark is used it runs into the error "pyspark.errors.exceptions.base.PySparkTypeError: [CANNOT_INFER_SCHEMA_FOR_TYPE] Can not infer schema for type: str."

Also the code

data = [
        # one and only row
        (102),
    ]

schema = StructType(
        [
            StructField("my_float_column", FloatType()),
        ]
    )

df = spark_session.createDataFrame(data, schema)

works perfectly using VS Code and databricks-connect but within my Docker container where pyspark is used it runs into the error "pyspark.errors.exceptions.base.PySparkTypeError: [CANNOT_ACCEPT_OBJECT_IN_TYPE] FloatType() can not accept object 102 in type int."

Why do both packages behave differently? And what's the best way to deal with this? By the way: Within a Databricks notebook all the syntaxes run smoothly.


Solution

  • The differences in behavior between databricks-connect and pyspark when creating DataFrames are due to the way packages handle type inference and schema enforcement.

    Regarding the Inference for Single Column DataFrame: In databricks-connect, the createDataFrame method can infer the schema from a list of values directly.

    In pyspark, the createDataFrame method requires a list of tuples for inferring the schema correctly.

    To make your code compatible with both environments, you should always provide data as a list of tuples, even if there is only one column:

    df_test = spark.createDataFrame(
        [
            ("123",),  
            (123456,), 
            (12345678912345,), 
        ],
        ["my_str_column"],
    )
    df_test.show()
    

    Results:

    +--------------+
    | my_str_column|
    +--------------+
    |           123|
    |        123456|
    |12345678912345|
    +--------------+
    

    Regarding the Schema Enforcement for Specific Types:

    In databricks-connect, the createDataFrame method tends to be more flexible with type conversions.

    In pyspark, the createDataFrame method strictly enforces the schema types.

    In this case you should explicitly cast the data to the required types before creating the DataFrame.

    from pyspark.sql.types import FloatType, StructType, StructField

    data = [
        (102.0,),  # explicitly cast to float
    ]
    
    schema = StructType(
        [
            StructField("my_float_column", FloatType()),
        ]
    )
    
    df = spark.createDataFrame(data, schema)
    df.show()
    

    Results:

    +---------------+
    |my_float_column|
    +---------------+
    |          102.0|
    +---------------+