I will explain 3 debugging tools for pyspark (and usable in Foundry):
- Raising Exceptions
- Running locally as a pandas series
- Logging and specifically logging in UDFs
Raising Exceptions
The easiest, quickest way to view a variable, especially for pandas UDFs is to Raise an Exception.
def my_compute_function(my_input):
interesting_variable = some_function(my_input) # Want to see result of this
raise ValueError(interesting_variable)
This is often easier than reading/writing DataFrames because:
- Can easily insert a raise statement without messing with the transform's return value or other logic
- Don't need to mess around with defining a valid schema for your debug statement
The downside is that it stops the execution of the code.
Running locally as a pandas series
If you are more experienced with Pandas, you use a small sample of the data, and run your algoritm on the driver as a pandas series where you can do debugging.
Some techniques I previously used is not just downsampling the data by a number of rows, rather I filtered the data to be representative of my work. For example if I was writing an algorithm to determine flight delays, I would filter to all flights to a specific airport on a specific day. This way I'm testing holistically on the sample.
Logging
Code Repositories uses Python's built in logging library. This is widely documented online and allows you to control logging level (ERROR, WARNING, INFO) for easier filtering.
Logging output appears in both your output dataset's log files, and in your build's driver logs (Dataset -> Details -> Files -> Log Files, and Builds -> Build -> Job status logs; select "Driver logs", respectively).
This would allow you to view the logged information in the logs (after the build completes), but doesn't work for UDFs.
Logging in UDFs
The work done by the UDF is done by the executor not the driver, and Spark captures the logging output from the top-level driver process. If you are using a UDF within your PySpark query and need to log data, create and call a second UDF that returns the data you wish to capture and store it in a column to view once the build is finished:
@transform_df(
...
)
def some_transformation(some_input):
logger.info("log output related to the overall query")
@F.udf("integer")
def custom_function(integer_input):
return integer_input + 5
@F.udf("string")
def custom_log(integer_input):
return "Original integer was %d before adding 5" % integer_input
df = (
some_input
.withColumn("new_integer", custom_function(F.col("example_integer_col"))
.withColumn("debugging", custom_log(F.col("example_integer_col"))
)