2

My goal is to to read binary (gpg-encrypted) files on hdfs consisting of csv data. My approach -- following this answer -- has been to define a Python function to read and decrypt a gpg file, yielding each line, and to apply this function as a flatMap to a parallel list of files.

Essentially, the Python function spawns a subprocess that reads the file using hadoop and pipes the result to gpg to decrypt. This works just fine when running Spark in local mode. However, running it distributed (yarn-client), a simple line count returns 0, essentially because Python thinks the stdout pipe is always closed.

The issue seems to be that the subprocess involves a pipe between two commands. When I remove the latter (just a line count of the encrypted file), the line count matches what I get on command line. I've tried this multiple ways, all with the same result.

Here's the Python function:

import subprocess as sp

def read_gpg_file_on_hdfs(filename):
    # Method 1:
    p = sp.Popen('hadoop fs -cat {} | gpg -d'.format(filename), shell=True,
                 stdout=sp.PIPE)
    # Method 2:
    p1 = sp.Popen(['hadoop', 'fs', '-cat', filename], stdout=sp.PIPE)
    p = sp.Popen(['gpg', '-d'], stdin=p1.stdout, stdout=sp.PIPE)
    p1.stdout.close()

    # Method 3:
    p = sp.Ppen('gpg -d <(hadoop fs -cat {})'.format(filename), shell=True,
                stdout=sp.PIPE, stderr=sp.PIPE)

    for line in p.stdout:
        yield line.strip()

And here's the Spark command:

sc.parallelize(['/path/to/file.gpg']).flatMap(read_gpg_file_on_hdfs).count()

Now I know that PySpark uses pipes to communicate with Spark, but I don't follow the details and I don't know if this would affect what I'm trying to do. My question is whether there is a way to accomplish what I'm trying to do.

Note that I'm using Spark 1.2.1 distributed (the latest release from MapR). Also, I considered using binaryFiles, but this would fail for large gpg files, which I sometimes encounter.

Thanks in advance!

Community
  • 1
  • 1
santon
  • 4,395
  • 1
  • 24
  • 43

1 Answers1

2

It turns out that the gpg command is actually the issue. Presumably it's related to the details of how subprocesses are launched in local mode versus distributed, but in local mode the homedir of gpg is set correctly. But when launched in distributed mode, homedir was pointing to an incorrect directory and the second subprocess immediately failed. This error message didn't appear to be logged anywhere, so stdout was just returned as an empty string.

santon
  • 4,395
  • 1
  • 24
  • 43