0

I have to read the gz file from local / hdfs / kafka, and decompress it and parse it. Who have any experiences about this?

Or the other type likes bin.tar.gz

steven
  • 13
  • 2
  • 6
  • This may help you - http://stackoverflow.com/questions/16302385/gzip-support-in-spark – Sumit Jan 28 '16 at 09:37
  • Yes, I have read it. But it looks like that it only support text type. Especially, If the gz file is combined severals bin files? – steven Jan 28 '16 at 10:55
  • I could correct deserialize the bin data. But there is no method for the gz file. – steven Jan 28 '16 at 10:58
  • Hi @steven, got any headway with the reading of the protobuf gz files? – Tanny Jun 21 '17 at 10:25

2 Answers2

0

You can use sc.binaryFiles to read binary files and do whatever you like with the content bytes.

As for tar.gz, see Read whole text files from a compression in Spark

Community
  • 1
  • 1
Niros
  • 632
  • 5
  • 18
0

This is what I did: 1. read binary data = sc.binaryFiles(path) 2. extract content

data = (data
        .map(lambda x: (x[0], ungzip(x[1])))
        )


def ungzip(df):
    compressed_file = io.BytesIO(df)
    decompressed_file = gzip.GzipFile(fileobj=compressed_file)

    return decompressed_file.read()
  1. parse messages

def _VarintDecoder(mask):

    local_ord = ord

    def DecodeVarint(buffer, pos):
        result = 0
        shift = 0
        while 1:
            if pos > len(buffer) - 1:
                raise NotEnoughDataExcption("Not enough data to decode varint")
            b = local_ord(buffer[pos])
            result |= ((b & 0x7f) << shift)
            pos += 1
            if not (b & 0x80):
                result &= mask
                return (result, pos)
            shift += 7
            if shift >= 64:
                raise ValueError('Too many bytes when decoding varint.')

    return DecodeVarint

.

def parse_binary(data):
    decoder = _VarintDecoder((1 << 64) - 1)
    next_pos, pos = 0, 0
    messages = []
    try:
        while 1:
            next_pos, pos = decoder(data[1], pos)
            messages.append((data[0], data[1][pos:pos + next_pos]))
            pos += next_pos
    except:
        return messages

.

data = (data
        .flatMap(lambda x: parse_binary(x))
        )

after this you have you protobuf messages one per row and you can apply your protobuf_parsing function in parallel