I want to log to the standard logger inside an executor during transformation with log levels and formatting respected. Unfortunately I can't get access to the log4j logger object inside the method as it's not serializable, and the spark context isn't available inside the transformation. I could just log outside of the transformation all of the objects I'm going to touch but that doesn't really help debugging or monitoring code execution.
def slow_row_contents_fetch(row):
rows = fetch_id_row_contents(row) # API fetch, DB fetch, etc
# This shows up, but not controllable by log level
print "Processed slow row with {} results".format(len(rows))
return rows
sc.parallelize(fetchable_ids).flatMap(slow_row_contents_fetch, True)
Outside of the transformation I can get the logger via:
logger = sc._jvm.org.apache.log4j.LogManager.getRootLogger()
logger.warn('This will show up as expected')
But the sc
isn't available inside the transformation, for good reasons. You see the following message if you try to call the sc directly inside the transformation:
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
I can just print but that's not easily filterable and just get tracked as unformatted error messages to the log4j logger.
Serializing the logger itself, as excepted, fails when calling the logger within the transform function:
...
File "/usr/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/usr/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco
File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling o36.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Is there a way to get access to the executor logger during transformations in pyspark?