7

I am trying to port some code from pandas to (py)Spark. Unfortunately I am already failing with the input part, where I want to read in binary data and put it in a Spark Dataframe.

So far I am using fromfile from numpy:

dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')])
data = np.fromfile('binary_file.bin', dtype=dt)
data=data[1:]                                           #throw away header
df_bin = pd.DataFrame(data, columns=data.dtype.names)

But for Spark I couldn't find how to do it. My workaround so far was to use csv-Files instead of the binary file, but that is not an ideal solution. I am aware that I shouldn't use numpy's fromfile with spark. How can I read in a binary file that is already loaded into hdfs?

I tried something like

fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin])
fileRDD.map(lambda x: ???)

But it is giving me a No such file or directory error.

I have seen this question: spark in python: creating an rdd by loading binary data with numpy.fromfile but that only works if I have the files stored in the home of the driver node.

Community
  • 1
  • 1
WilliamEllisWebb
  • 922
  • 2
  • 10
  • 17

4 Answers4

6

So, for anyone that starts with Spark as me and stumbles upon binary files. Here is how I solved it:

dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'),
             ('value','>f8'),('pollID','>i2')])
schema=StructType([StructField('idx_metric',IntegerType(),False),
                   StructField('idx_resource',IntegerType(),False), 
                   StructField('date',IntegerType),False), 
                   StructField('value',DoubleType(),False), 
                   StructField('pollID',IntegerType(),False)])

filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary')

def read_array(rdd):
    #output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped
    array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes)
    array=array.newbyteorder().byteswap() # big Endian
    return array.tolist()

unzipped=filenameRdd.flatMap(read_array)
bin_df=sqlContext.createDataFrame(unzipped,schema)

And now you can do whatever fancy stuff you want in Spark with your dataframe.

WilliamEllisWebb
  • 922
  • 2
  • 10
  • 17
2

Edit: Please review the use of sc.binaryFiles as mentioned here: https://stackoverflow.com/a/28753276/5088142


try using:

hdfs://machine_host_name:8020/user/bin_file1.bin

you the host-name in fs.defaultFS in core-site.xml

Community
  • 1
  • 1
Yaron
  • 10,166
  • 9
  • 45
  • 65
  • fs.defaultFS says nameservice1, but also with `hdfs://nameservice1:8020/user/bin_file1.bin` I still get a file not found error. Could it be linked with the function I put in the map? `def read_bin: with open("myfile", "rb") as f: byte = f.read(1) while byte != "": byte = f.read(1)` – WilliamEllisWebb May 24 '16 at 13:37
  • On which line do you get the "file not found error"? How do you plan to use the "read_bin" function? the open method doesn't seems to work with HDFS.... – Yaron May 24 '16 at 13:49
  • The error is on line 2 in read_bin. You are right, the open method doesn't like HDFS. I am looking for something similar to `sc.textfile(filename).map(lambda line:line.split(',')).map(lambda x: (int(x[0],int(x[1]....)` – WilliamEllisWebb May 24 '16 at 14:04
  • Please review the use of sc.binaryFiles as mentioned here: http://stackoverflow.com/a/28753276/5088142 – Yaron May 24 '16 at 14:14
1

Since Spark 3.0, Spark supports binary file data source, which reads binary files and converts each file into a single record that contains the raw content and metadata of the file.

https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html

Aydin K.
  • 3,309
  • 36
  • 44
0

I recently did something like this:

from struct import unpack_from

# creates an RDD of binaryrecords for determinted record length
binary_rdd = sc.binaryRecords("hdfs://" + file_name, record_length)

# map()s each binary record to unpack() it
unpacked_rdd = binary_rdd.map(lambda record: unpack_from(unpack_format, record))

# registers a data frame with this schema; registerTempTable() it as table_name
raw_df = sqlc.createDataFrame(unpacked_rdd, sparkSchema)
raw_df.registerTempTable(table_name)

where unpack_format and sparkSchema have to be "in-sync".

I have a script that generates unpack_format and sparkSchema variables dynamically; both at the same time. (it's part of a larger code-base, so not posting here for readbility)

unpack_format and sparkSchema could be defined as following, for example,

from pyspark.sql.types import *

unpack_format = '<'   # '<' means little-endian: https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment
sparkSchema = StructType()
record_length = 0

unpack_format += '35s'    # 35 bytes that represent a character string
sparkSchema.add("FirstName", 'string', True)  # True = nullable
record_length += 35

unpack_format += 'H'    # 'H' = unsigned 2-byte integer
sparkSchema.add("ZipCode", 'integer', True)
record_length += 2

# and so on for each field..
Tagar
  • 13,911
  • 6
  • 95
  • 110