I have to read the gz file from local / hdfs / kafka, and decompress it and parse it. Who have any experiences about this?
Or the other type likes bin.tar.gz
I have to read the gz file from local / hdfs / kafka, and decompress it and parse it. Who have any experiences about this?
Or the other type likes bin.tar.gz
You can use sc.binaryFiles
to read binary files and do whatever you like with the content bytes.
As for tar.gz, see Read whole text files from a compression in Spark
This is what I did: 1. read binary data = sc.binaryFiles(path) 2. extract content
data = (data
.map(lambda x: (x[0], ungzip(x[1])))
)
def ungzip(df):
compressed_file = io.BytesIO(df)
decompressed_file = gzip.GzipFile(fileobj=compressed_file)
return decompressed_file.read()
def _VarintDecoder(mask):
local_ord = ord
def DecodeVarint(buffer, pos):
result = 0
shift = 0
while 1:
if pos > len(buffer) - 1:
raise NotEnoughDataExcption("Not enough data to decode varint")
b = local_ord(buffer[pos])
result |= ((b & 0x7f) << shift)
pos += 1
if not (b & 0x80):
result &= mask
return (result, pos)
shift += 7
if shift >= 64:
raise ValueError('Too many bytes when decoding varint.')
return DecodeVarint
.
def parse_binary(data):
decoder = _VarintDecoder((1 << 64) - 1)
next_pos, pos = 0, 0
messages = []
try:
while 1:
next_pos, pos = decoder(data[1], pos)
messages.append((data[0], data[1][pos:pos + next_pos]))
pos += next_pos
except:
return messages
.
data = (data
.flatMap(lambda x: parse_binary(x))
)
after this you have you protobuf messages one per row and you can apply your protobuf_parsing function in parallel