4

I am trying to log from a pandas udf called within a python transform.
Because the code is being called on the executor is does not show up in the driver's logs.

I have been looking at some options on SO but so far the closest option is this one

Any idea on how to surface the logs in the driver logs or any other log files available under build is welcome.

import logging
logger = logging.getLogger(__name__)

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def my_udf(my_pdf):
    logger.info('calling my udf')
    do_some_stuff()


results_df = my_df.groupby("Name").apply(my_udf)
dry
  • 831
  • 2
  • 8
  • 21

2 Answers2

1

It is not ideal (as it stops the code) but you can do

raise Exception(<variable_name>)

inside the pandas_udf and it gives you the value of the named variable.

0

As you said, the work done by the UDF is done by the executor not the driver, and Spark captures the logging output from the top-level driver process. If you are using a UDF within your PySpark query and need to log data, create and call a second UDF that returns the data you wish to capture and store it in a column to view once the build is finished:

def some_transformation(some_input):
    logger.info("log output related to the overall query")
    
    @F.udf("integer")
    def custom_function(integer_input):
        return integer_input + 5
    
    @F.udf("string")
    def custom_log(integer_input):
        return "Original integer was %d before adding 5" % integer_input
    
    df = (
        some_input
        .withColumn("new_integer", custom_function(F.col("example_integer_col"))
        .withColumn("debugging", custom_log(F.col("example_integer_col"))
    )

I also explain another option is you are more familiar with pandas here: How to debug pandas_udfs without having to use Spark?

Edit: I have a complete answer here: In Palantir Foundry, how do I debug pyspark (or pandas) UDFs since I can't use print statements?

Andrew Andrade
  • 2,608
  • 1
  • 17
  • 24