In Code Workbooks, I can use print
statements, which appear in the Output section of the Code Workbook (where errors normally appear). This does not work for UDFs, and it also doesn't work in Code Authoring/Repositories.
What are ways I can debug my pyspark code, especially if I'm using UDFs?
I will explain 3 debugging tools for pyspark (and usable in Foundry):
The easiest, quickest way to view a variable, especially for pandas UDFs is to Raise an Exception.
def my_compute_function(my_input):
interesting_variable = some_function(my_input) # Want to see result of this
raise ValueError(interesting_variable)
This is often easier than reading/writing DataFrames because:
The downside is that it stops the execution of the code.
If you are more experienced with Pandas, you use a small sample of the data, and run your algoritm on the driver as a pandas series where you can do debugging.
Some techniques I previously used is not just downsampling the data by a number of rows, rather I filtered the data to be representative of my work. For example if I was writing an algorithm to determine flight delays, I would filter to all flights to a specific airport on a specific day. This way I'm testing holistically on the sample.
Code Repositories uses Python's built in logging library. This is widely documented online and allows you to control logging level (ERROR, WARNING, INFO) for easier filtering.
Logging output appears in both your output dataset's log files, and in your build's driver logs (Dataset -> Details -> Files -> Log Files, and Builds -> Build -> Job status logs; select "Driver logs", respectively).
This would allow you to view the logged information in the logs (after the build completes), but doesn't work for UDFs.
The work done by the UDF is done by the executor not the driver, and Spark captures the logging output from the top-level driver process. If you are using a UDF within your PySpark query and need to log data, create and call a second UDF that returns the data you wish to capture and store it in a column to view once the build is finished:
@transform_df(
...
)
def some_transformation(some_input):
logger.info("log output related to the overall query")
@F.udf("integer")
def custom_function(integer_input):
return integer_input + 5
@F.udf("string")
def custom_log(integer_input):
return "Original integer was %d before adding 5" % integer_input
df = (
some_input
.withColumn("new_integer", custom_function(F.col("example_integer_col"))
.withColumn("debugging", custom_log(F.col("example_integer_col"))
)