I have a Spark Connect server running. Things are fine when I don't use UDFs (df.show()
always works fine). But when I use UDF, it fails with SparkContext or SparkSession should be created first
. Obviously SparkSession exists, because it created the Dataframe and printed it. It's only when it tries to apply a UDF to it that it fails.
Is this (udfs/things-using-spark-context) something simply not yet supported in spark connect?
Here is a reproducible example:
pytest -k test_spark_connect
to run simple udf test on a spark connect remote session.pytest -k test_spark
to run same simple udf test on a new local spark session.PS: running pytest
to run both tests at the same time will cause conflicts, so must be run separately.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
@udf
def to_upper(s):
if s is not None:
return s.upper()
def test_spark_connect():
simple_spark_test(SparkSession.builder.remote("sc://localhost:15002").getOrCreate())
def test_spark():
simple_spark_test(SparkSession.builder.getOrCreate())
def simple_spark_test(session_to_test: SparkSession):
print(f'\nss: {session_to_test}')
print(f'ss.conf.get("spark.app.name"): {session_to_test.conf.get("spark.app.name")}')
df = session_to_test.createDataFrame([(1, "John Doe")], ("id", "name"))
df.show()
df.select(col("name"), to_upper("name")).show()
$ pytest -k test_spark_connect
... snip ...
conftest.py::test_spark_connect FAILED [100%]
ss: <pyspark.sql.connect.session.SparkSession object at 0x0000022C1D8C37F0>
ss.conf.get("spark.app.name"): Spark Connect server
+---+--------+
| id| name|
+---+--------+
| 1|John Doe|
+---+--------+
conftest.py:9 (test_spark_connect)
def test_spark_connect():
> simple_spark_test(SparkSession.builder.remote("sc://localhost:15002").getOrCreate())
conftest.py:11:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
conftest.py:22: in simple_spark_test
df.select(col("name"), to_upper("name")).show()
.venv\lib\site-packages\pyspark\sql\udf.py:425: in wrapper
return self(*args)
.venv\lib\site-packages\pyspark\sql\udf.py:340: in __call__
sc = get_active_spark_context()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
def get_active_spark_context() -> SparkContext:
"""Raise RuntimeError if SparkContext is not initialized,
otherwise, returns the active SparkContext."""
sc = SparkContext._active_spark_context
if sc is None or sc._jvm is None:
> raise RuntimeError("SparkContext or SparkSession should be created first.")
E RuntimeError: SparkContext or SparkSession should be created first.
.venv\lib\site-packages\pyspark\sql\utils.py:248: RuntimeError
$
$ pytest -k test_spark
conftest.py::test_spark
... snip ...
25/03/03 12:25:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
PASSED [100%]
ss: <pyspark.sql.session.SparkSession object at 0x000001EA41099540>
ss.conf.get("spark.app.name"): pyspark-shell
+---+--------+
| id| name|
+---+--------+
| 1|John Doe|
+---+--------+
+--------+--------------+
| name|to_upper(name)|
+--------+--------------+
|John Doe| JOHN DOE|
+--------+--------------+
$
I met the same error on Spark 3.5.5. Spark sql can run, but failed on udf:
>>> spark.sql("select 1").show()
+---+
| 1|
+---+
| 1|
+---+
>>> df.select(to_upper("name"), add_one("age")).show()
Traceback (most recent call last):
File "<python-input-13>", line 1, in <module>
df.select(to_upper("name"), add_one("age")).show()
~~~~~~~~^^^^^^^^
File "/Users/remziy/python3/venv/lib/python3.13/site-packages/pyspark/sql/udf.py", line 423, in wrapper
return self(*args)
File "/Users/remziy/python3/venv/lib/python3.13/site-packages/pyspark/sql/udf.py", line 339, in __call__
sc = get_active_spark_context()
File "/Users/remziy/python3/venv/lib/python3.13/site-packages/pyspark/sql/utils.py", line 248, in get_active_spark_context
raise RuntimeError("SparkContext or SparkSession should be created first.")
RuntimeError: SparkContext or SparkSession should be created first.
I finally solved it by defining the Spark connect session before defining the udf function. Only in this way, PySpark can know to use the spark connect version of udf registering. Also, python version on client side must be same as that on executors. For instance, you can only use python 3.8 client for Spark 3.5.5
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.remote("sc://localhost:15002/").create()
>>> from pyspark.sql.types import IntegerType
>>> from pyspark.sql.functions import udf
>>> slen = udf(lambda s: len(s), IntegerType())
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)")).show()
+----------+
|slen(name)|
+----------+
| 8|
+----------+