My goal is to to read binary (gpg-encrypted) files on hdfs consisting of csv data. My approach -- following this answer -- has been to define a Python function to read and decrypt a gpg file, yielding each line, and to apply this function as a flatMap
to a parallel list of files.
Essentially, the Python function spawns a subprocess that reads the file using hadoop
and pipes the result to gpg
to decrypt. This works just fine when running Spark in local mode. However, running it distributed (yarn-client
), a simple line count returns 0
, essentially because Python thinks the stdout
pipe is always closed.
The issue seems to be that the subprocess involves a pipe between two commands. When I remove the latter (just a line count of the encrypted file), the line count matches what I get on command line. I've tried this multiple ways, all with the same result.
Here's the Python function:
import subprocess as sp
def read_gpg_file_on_hdfs(filename):
# Method 1:
p = sp.Popen('hadoop fs -cat {} | gpg -d'.format(filename), shell=True,
stdout=sp.PIPE)
# Method 2:
p1 = sp.Popen(['hadoop', 'fs', '-cat', filename], stdout=sp.PIPE)
p = sp.Popen(['gpg', '-d'], stdin=p1.stdout, stdout=sp.PIPE)
p1.stdout.close()
# Method 3:
p = sp.Ppen('gpg -d <(hadoop fs -cat {})'.format(filename), shell=True,
stdout=sp.PIPE, stderr=sp.PIPE)
for line in p.stdout:
yield line.strip()
And here's the Spark command:
sc.parallelize(['/path/to/file.gpg']).flatMap(read_gpg_file_on_hdfs).count()
Now I know that PySpark uses pipes to communicate with Spark, but I don't follow the details and I don't know if this would affect what I'm trying to do. My question is whether there is a way to accomplish what I'm trying to do.
Note that I'm using Spark 1.2.1 distributed (the latest release from MapR). Also, I considered using binaryFiles
, but this would fail for large gpg files, which I sometimes encounter.
Thanks in advance!