pythonapache-sparkpysparkpyarrow

pyspark useArrow=True option of Spark 4 brings the pandas version errors


I try to test arrow optimized python udf of spark 4 like below,

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

spark = SparkSession.builder.master('local[*]').appName('test').getOrCreate()
spark.conf.set('spark.sql.execution.pythonUDF.arrow.enabled', True)
spark.conf.set('spark.sql.execution.arrow.pyspark.fallback.enabled', True)

rows = [{'name': 'joseph', 'age': 35}, {'name': 'jina', 'age': 30}, {'name': 'julian', 'age': 15}]
 
schema = StructType([
    StructField('name', StringType(), True), 
    StructField('age', IntegerType(), True)])

df = spark.createDataFrame(rows, schema)

@udf(returnType=schema, useArrow=True)
def transform(name: str, age: int):
    return name.upper(), age + 10  

# Apply UDF to transform both columns
df_trans = df.withColumn("trans", transform(df["name"], df["age"]))
df_trans.show()

But useArrow=True option brings serious errors like below,

Traceback (most recent call last):
  File "C:\spark-4.0.0-preview2-bin-hadoop3\python\pyspark\sql\pandas\utils.py", line 28, in require_minimum_pandas_version
    import pandas
  File "C:\spark-4.0.0-preview2-bin-hadoop3\python\pyspark\pandas\__init__.py", line 33, in <module>
    require_minimum_pandas_version()
  File "C:\spark-4.0.0-preview2-bin-hadoop3\python\pyspark\sql\pandas\utils.py", line 43, in require_minimum_pandas_version
    raise PySparkImportError(
pyspark.errors.exceptions.base.PySparkImportError: [PACKAGE_NOT_INSTALLED] Pandas >= 2.0.0 must be installed; however, it was not found.

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "c:\VSCode_Workspace\pyspark-test\com\aaa\spark\arrow_spark.py", line 19, in <module>
    @udf(returnType=schema, useArrow=True)
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\spark-4.0.0-preview2-bin-hadoop3\python\pyspark\sql\udf.py", line 142, in _create_py_udf
    require_minimum_pandas_version()
  File "C:\spark-4.0.0-preview2-bin-hadoop3\python\pyspark\sql\pandas\utils.py", line 43, in require_minimum_pandas_version
    raise PySparkImportError(
pyspark.errors.exceptions.base.PySparkImportError: [PACKAGE_NOT_INSTALLED] Pandas >= 2.0.0 must be installed; however, it was not found.

When I set arrow option to False, these python codes work without errors. Kindly inform me how to fix these bugs of spark 4. I want to confirm the arrow enabled udf of pyspark.


Solution

  • First, you'll need to install the required dependencies:

    pip install pandas>=2.0.0 pyarrow
    

    Then, let's modify your code slightly to properly test and verify Arrow optimization:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit, udf
    from pyspark.sql.types import IntegerType, StringType, StructField, StructType
    import pandas as pd
    
    # Create Spark session with Arrow enabled
    spark = SparkSession.builder \
        .master('local[*]') \
        .appName('arrow_test') \
        .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
        .config('spark.sql.execution.arrow.pyspark.fallback.enabled', 'true') \
        .getOrCreate()
    
    # Create test data
    rows = [{'name': 'joseph', 'age': 35}, 
            {'name': 'jina', 'age': 30}, 
            {'name': 'julian', 'age': 15}]
     
    schema = StructType([
        StructField('name', StringType(), True), 
        StructField('age', IntegerType(), True)
    ])
    
    df = spark.createDataFrame(rows, schema)
    
    # Define UDF with struct return type
    @udf(returnType=schema, useArrow=True)
    def transform(name: str, age: int):
        return (name.upper(), age + 10)
    
    # Apply UDF and show results
    df_trans = df.withColumn("trans", transform(df["name"], df["age"]))
    df_trans.show()
    
    # Verify Arrow is being used
    print("\nArrow enabled:", spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))
    print("Current execution mode:", spark.conf.get("spark.sql.execution.pythonUDF.arrow.enabled"))
    

    To verify the performance improvement, you can test with a larger dataset:

    # Create larger test dataset
    large_rows = [{'name': f'person_{i}', 'age': i} for i in range(100000)]
    large_df = spark.createDataFrame(large_rows, schema)
    
    # Test with timing
    import time
    
    def time_execution(df):
        start = time.time()
        df.withColumn("trans", transform(df["name"], df["age"])).count()
        return time.time() - start
    
    # Test with Arrow
    spark.conf.set('spark.sql.execution.pythonUDF.arrow.enabled', True)
    arrow_time = time_execution(large_df)
    print(f"\nExecution time with Arrow: {arrow_time:.2f} seconds")
    
    # Test without Arrow
    spark.conf.set('spark.sql.execution.pythonUDF.arrow.enabled', False)
    regular_time = time_execution(large_df)
    print(f"Execution time without Arrow: {regular_time:.2f} seconds")
    

    Key points to note:

    You can also verify the UDF is working correctly by checking the schema and data types:

    df_trans.printSchema()
    df_trans.select("trans.*").show()  # Expand the struct column
    

    If you're still having issues after installing the required dependencies, you can check your environment:

    import pandas as pd
    import pyarrow as pa
    
    print(f"Pandas version: {pd.__version__}")
    print(f"PyArrow version: {pa.__version__}")
    

    This should help you properly test and verify Arrow-enabled UDFs in Spark 4.