I try to test arrow optimized python udf of spark 4 like below,
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
spark = SparkSession.builder.master('local[*]').appName('test').getOrCreate()
spark.conf.set('spark.sql.execution.pythonUDF.arrow.enabled', True)
spark.conf.set('spark.sql.execution.arrow.pyspark.fallback.enabled', True)
rows = [{'name': 'joseph', 'age': 35}, {'name': 'jina', 'age': 30}, {'name': 'julian', 'age': 15}]
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)])
df = spark.createDataFrame(rows, schema)
@udf(returnType=schema, useArrow=True)
def transform(name: str, age: int):
return name.upper(), age + 10
# Apply UDF to transform both columns
df_trans = df.withColumn("trans", transform(df["name"], df["age"]))
df_trans.show()
But useArrow=True
option brings serious errors like below,
Traceback (most recent call last):
File "C:\spark-4.0.0-preview2-bin-hadoop3\python\pyspark\sql\pandas\utils.py", line 28, in require_minimum_pandas_version
import pandas
File "C:\spark-4.0.0-preview2-bin-hadoop3\python\pyspark\pandas\__init__.py", line 33, in <module>
require_minimum_pandas_version()
File "C:\spark-4.0.0-preview2-bin-hadoop3\python\pyspark\sql\pandas\utils.py", line 43, in require_minimum_pandas_version
raise PySparkImportError(
pyspark.errors.exceptions.base.PySparkImportError: [PACKAGE_NOT_INSTALLED] Pandas >= 2.0.0 must be installed; however, it was not found.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "c:\VSCode_Workspace\pyspark-test\com\aaa\spark\arrow_spark.py", line 19, in <module>
@udf(returnType=schema, useArrow=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\spark-4.0.0-preview2-bin-hadoop3\python\pyspark\sql\udf.py", line 142, in _create_py_udf
require_minimum_pandas_version()
File "C:\spark-4.0.0-preview2-bin-hadoop3\python\pyspark\sql\pandas\utils.py", line 43, in require_minimum_pandas_version
raise PySparkImportError(
pyspark.errors.exceptions.base.PySparkImportError: [PACKAGE_NOT_INSTALLED] Pandas >= 2.0.0 must be installed; however, it was not found.
When I set arrow option to False, these python codes work without errors. Kindly inform me how to fix these bugs of spark 4. I want to confirm the arrow enabled udf of pyspark.
First, you'll need to install the required dependencies:
pip install pandas>=2.0.0 pyarrow
Then, let's modify your code slightly to properly test and verify Arrow optimization:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
import pandas as pd
# Create Spark session with Arrow enabled
spark = SparkSession.builder \
.master('local[*]') \
.appName('arrow_test') \
.config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
.config('spark.sql.execution.arrow.pyspark.fallback.enabled', 'true') \
.getOrCreate()
# Create test data
rows = [{'name': 'joseph', 'age': 35},
{'name': 'jina', 'age': 30},
{'name': 'julian', 'age': 15}]
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
df = spark.createDataFrame(rows, schema)
# Define UDF with struct return type
@udf(returnType=schema, useArrow=True)
def transform(name: str, age: int):
return (name.upper(), age + 10)
# Apply UDF and show results
df_trans = df.withColumn("trans", transform(df["name"], df["age"]))
df_trans.show()
# Verify Arrow is being used
print("\nArrow enabled:", spark.conf.get("spark.sql.execution.arrow.pyspark.enabled"))
print("Current execution mode:", spark.conf.get("spark.sql.execution.pythonUDF.arrow.enabled"))
To verify the performance improvement, you can test with a larger dataset:
# Create larger test dataset
large_rows = [{'name': f'person_{i}', 'age': i} for i in range(100000)]
large_df = spark.createDataFrame(large_rows, schema)
# Test with timing
import time
def time_execution(df):
start = time.time()
df.withColumn("trans", transform(df["name"], df["age"])).count()
return time.time() - start
# Test with Arrow
spark.conf.set('spark.sql.execution.pythonUDF.arrow.enabled', True)
arrow_time = time_execution(large_df)
print(f"\nExecution time with Arrow: {arrow_time:.2f} seconds")
# Test without Arrow
spark.conf.set('spark.sql.execution.pythonUDF.arrow.enabled', False)
regular_time = time_execution(large_df)
print(f"Execution time without Arrow: {regular_time:.2f} seconds")
Key points to note:
You can also verify the UDF is working correctly by checking the schema and data types:
df_trans.printSchema()
df_trans.select("trans.*").show() # Expand the struct column
If you're still having issues after installing the required dependencies, you can check your environment:
import pandas as pd
import pyarrow as pa
print(f"Pandas version: {pd.__version__}")
print(f"PyArrow version: {pa.__version__}")
This should help you properly test and verify Arrow-enabled UDFs in Spark 4.