10

I want to log to the standard logger inside an executor during transformation with log levels and formatting respected. Unfortunately I can't get access to the log4j logger object inside the method as it's not serializable, and the spark context isn't available inside the transformation. I could just log outside of the transformation all of the objects I'm going to touch but that doesn't really help debugging or monitoring code execution.

def slow_row_contents_fetch(row):
    rows = fetch_id_row_contents(row) # API fetch, DB fetch, etc
    # This shows up, but not controllable by log level
    print "Processed slow row with {} results".format(len(rows))
    return rows

sc.parallelize(fetchable_ids).flatMap(slow_row_contents_fetch, True)

Outside of the transformation I can get the logger via:

logger = sc._jvm.org.apache.log4j.LogManager.getRootLogger()
logger.warn('This will show up as expected')

But the sc isn't available inside the transformation, for good reasons. You see the following message if you try to call the sc directly inside the transformation:

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

I can just print but that's not easily filterable and just get tracked as unformatted error messages to the log4j logger.

Serializing the logger itself, as excepted, fails when calling the logger within the transform function:

...
File "/usr/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/usr/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
  File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o36.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
        at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
        at py4j.Gateway.invoke(Gateway.java:252)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)

Is there a way to get access to the executor logger during transformations in pyspark?

Pyrce
  • 8,296
  • 3
  • 31
  • 46

2 Answers2

9

After a few hours of digging into the spark repository, it seems this is impossible to achieve currently. The executor doesn't actually have a jvm instance it's attached to, the data is just streamed over the socket without a jvm native binding to utilize.

Here's the worker creation code that streams the error messages to stderr:

private def createSimpleWorker(): Socket = {
  ...
  val worker = pb.start()

  // Redirect worker stdout and stderr
  redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)

  ...
}

/**
 * Redirect the given streams to our stderr in separate threads.
 */
private def redirectStreamsToStderr(stdout: InputStream, stderr: InputStream) {
  try {
    new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start()
    new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start()
  } catch {
    case e: Exception =>
      logError("Exception in redirecting streams", e)
  }
}

And here's the worker.py code for communicating the job processing. There's no place to emit log messages or message type which indicates a log event.

try:
    ...
    command = pickleSer._read_with_length(infile)
    if isinstance(command, Broadcast):
        command = pickleSer.loads(command.value)
    func, profiler, deserializer, serializer = command
    init_time = time.time()

    def process():
        iterator = deserializer.load_stream(infile)
        serializer.dump_stream(func(split_index, iterator), outfile)

    if profiler:
        profiler.profile(process)
    else:
        process()
except Exception:
    try:
        write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile)
        write_with_length(traceback.format_exc().encode("utf-8"), outfile)
    except IOError:
        # JVM close the socket
        pass
    except Exception:
        # Write the error to stderr if it happened while serializing
        print("PySpark worker failed with exception:", file=sys.stderr)
        print(traceback.format_exc(), file=sys.stderr)
    exit(-1)
finish_time = time.time()
report_times(outfile, boot_time, init_time, finish_time)
write_long(shuffle.MemoryBytesSpilled, outfile)
write_long(shuffle.DiskBytesSpilled, outfile)

# Mark the beginning of the accumulators section of the output
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile)
write_int(len(_accumulatorRegistry), outfile)
for (aid, accum) in _accumulatorRegistry.items():
    pickleSer._write_with_length((aid, accum._value), outfile)
...

And finally the message types available:

class SpecialLengths(object):
    END_OF_DATA_SECTION = -1
    PYTHON_EXCEPTION_THROWN = -2
    TIMING_DATA = -3
    END_OF_STREAM = -4
    NULL = -5
Pyrce
  • 8,296
  • 3
  • 31
  • 46
1

Have a look at this question

Similar situation

You can get your map function to return you an object that can contain a stack trace string or a real object, and a bool flag stating if there was an error. This can be useful to debug a task that has side effects, or if you have specific data conditions that cause failures.

Community
  • 1
  • 1
ThatDataGuy
  • 1,969
  • 2
  • 17
  • 43
  • This unfortunately doesn't meet the constraints of what I was trying to achieve, but it's an interesting idea. In the situation I was in I wanted to print out some debug information but have the rows untouched as there were several stages afterwards that depended on those outputs. – Pyrce Nov 15 '16 at 23:10
  • Maybe you can create a map function that returns an object that contains the messages that you want, and the data? Then you can split off the data and the messages with two simple map functions, and then use the data in the rest of your later work. – ThatDataGuy Nov 16 '16 at 08:35
  • That's harder to do in Dataframe UDI functions as I would need to add another column of data for messages, but it's a reasonable work around under some conditions for the thing I want to actually achieve – Pyrce Nov 17 '16 at 22:11