-1

I need to read/scan/write files to/from the hdfs from within a pyspark worker.

Note the following api's are not applicable since they run off of the driver:

sc.textFile()
sc.saveAsParquetFile()

etc

It would be very much preferable not to involve additional third party libraries (e.g. pyhadoop).

One option is to shell out e.g.

 os.system('hdfs dfs -ls %(hdfsPath)s' %locals())

But is there a more native pyspark way to achieve this?

UPDATE This is not a case of broadcasting data because each worker will read different data from hdfs. One of the use cases is reading a few large binary files in each worker (this is clearly not a case for broadcast). Another case is to read "command" file containing instructions. I have successfully used this pattern in native hadoop and in scala spark.

WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560

2 Answers2

2

The solution appears to be to subprocess out (no direct python access). Piecing together the accepted answer and one of the comments from: Python read file as stream from HDFS

cat = subprocess.Popen(["hadoop", "fs", "-cat", "/path/to/myfile"], stdout=subprocess.PIPE)
for line in iter(cat.stdout.readline, ''): 
    print line,   # include the comma
Community
  • 1
  • 1
WestCoastProjects
  • 58,982
  • 91
  • 316
  • 560
-1

More native PySpark way to do this is to read the data in the driver using sc.textFile() or other read method and pass it to the workers as either RDD or a broadcast variable if it is small enough to fit into the memory of each executor.

Could you describe your case, I doubt you really need to read the files in the workers

UPDATE:

Short summary:

  1. Reading set of files directly from the workers on a big cluster might kill the namenode
  2. Reading separate files directly from workers is not nescessary in most of the cases. You can just wildcard the set of files for textFile() method or use wholeTextFiles() or binaryFiles() methods to read set of files together with their names
  3. In a specific case of image processing with gigabytes of images just put them to the sequencefile and read it with sequenceFile() method
  4. Reading directly from HSFS with Python without additional libraries can be implemented with querying WebHDFS REST API directly, which is an overkill given this is exactly what libraries implement. Another option might be to use pipe() Spark method calling the Java program reading HDFS files and returning them in serialized form to stdout. Another option is copying a file from HDFS to the temporary space by escaping to the shell, and then reading this file with standard read file functionality. Personally I would fire my developer for implementing any of the approaches I proposed here
0x0FFF
  • 4,948
  • 3
  • 20
  • 26
  • Updated OP: each worker has different data so broadcast is not appropriate. – WestCoastProjects Mar 12 '15 at 17:11
  • Then what is the case? Why not use `textFile()` with wildcard or `wholeTextFiles()`? – 0x0FFF Mar 12 '15 at 17:15
  • Yes, it is not designed for this. This is why I tell you should use `textFile()` or `wholeTextFiles()` in the driver. Please, clarify the case you need this functionality, as of now it looks like you want to mess up the Spark concept – 0x0FFF Mar 12 '15 at 17:35
  • Same for me - certified in both and working with both. Reading series of files from the workers running under yarn on a big cluster would kill your namenode – 0x0FFF Mar 12 '15 at 17:57
  • And reading a few files will not. The files are GB worth of image files per worker . Good luck with broadcasting those! – WestCoastProjects Mar 12 '15 at 18:04
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/72865/discussion-between-javadba-and-0x0fff). – WestCoastProjects Mar 12 '15 at 18:07
  • You can store them in sequencefile and read them with `sequenceFile()`. Really big images - increase HDFS block size for this sequence file. Putting them to the sequencefile is a simple mapreduce job, you know if you're certified. – 0x0FFF Mar 12 '15 at 18:11