1

I'm writing a python 3.6 program that uses pyspark to do some computation. I'm writing it so it can behave like a proper UNIX program, accepting input from STDIN and emitting output to STDOUT.

Java is not on board, and the language default is to log out to STDOUT given sufficiently high log severity. pyspark is no stranger to this.

Do I need to mess with my file descriptors manually before importing any py4j library, or is there some sort of way to mangle the java side of things from the Python side so all logging goes to STDERR?

One kludge I expected to work but did not is basically this:

import contextlib
import sys

@contextlib.contextmanager
def impolite_library_wrapper():
    real_out = sys.stdout
    sys.stdout = sys.stderr
    yield
    sys.stdout = real_out

with impolite_library_wrapper():
    import pyspark.sql
    spark_builder = pyspark.sql.SparkSession.builder.enableHiveSupport()
    spark = spark_builder.getOrCreate()

print("pls")

...which I can run in a minimal environment like so:

$ bash
$ mkdir /tmp/pls
$ cd /tmp/pls
$ pipenv install pyspark==2.3
$ env -i "PATH=$PATH" pipenv run python wtf.py 2>/dev/null
2019-05-20 17:10:54 WARN  Utils:66 - Your hostname, <redacted> resolves to a loopback address...
2019-05-20 17:10:54 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2019-05-20 17:10:55 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
pls

I expected this to output pls and only pls.

I see that this will be addressed specifically by pyspark with SPARK-21094 in Spark 3.0; I could live with a pyspark-specific answer, but I am currently targeting Spark 2.3.

badp
  • 11,409
  • 3
  • 61
  • 89

1 Answers1

1

I hate every part of this, but it appears to be working:

import contextlib
import sys
import subprocess

class StderrOnlyPopen(subprocess.Popen):
    def __init__(self, args, bufsize=-1, executable=None,
                 stdin=None, stdout=sys.stderr, *more, **kwmore):
        super().__init__(args, bufsize, executable,
                         stdin, stdout, *more, **kwmore)

@contextlib.contextmanager
def impolite_library_wrapper():
    real_Popen = subprocess.Popen
    subprocess.Popen = StderrOnlyPopen
    yield
    subprocess.Popen = real_Popen

with impolite_library_wrapper():
    import pyspark.sql

spark_builder = pyspark.sql.SparkSession.builder.enableHiveSupport()
spark = spark_builder.getOrCreate()
badp
  • 11,409
  • 3
  • 61
  • 89