0

I have a client sending me Snappy compressed hadoop sequence files for analysis. What I ultimately want to do is to put this data into a pandas df. The format looks like the following

>>> body_read

b'SEQ\x06!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable\x01\x01)org.apache.hadoop.io.compress.SnappyCodec\x00\x00\x00\x00\x0b\xabZ\x92f\xceuAf\xa1\x9a\xf0-\x1d2D\xff\xff\xff\xff\x0b\xabZ\x92f\xceuAf\xa1\x9a\xf0-\x1d2D\x8e\x05^N\x00\x00\x05^\x00\x00\x00F\xde\n\x00\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00r\x01\x00\x04\x00\x00\x00\x00\x8e\x08\x92\x00\x00\x10\x1a\x00\x00\x08\x8a\x9a h\x8e\x02\xd6\x8e\x02\xc6\x8e\x02T\x8e\x02\xd4\x8e\x02\xdb\x8e\x02\xd8\x8e\x02\xdf\x8e\x02\xd9\x8e\x02\xd3\x05\x0c0\xd9\x8e\x02\xcc\x8e\x02\xfc\x8e\x02\xe8\x8e\x02\xd0\x05!\x00\xdb\x05\x06\x0c\xd1\x8e\x02\xd7\x05\'\x04\xde\x8e\x01\x03\x18\xce\x8e\x02\xe7\x8e\x02\xd2\x05<\x00\xd4\x05\x1b\x04\xdc\x8e

I think what I need to do is first decompress the file using python-snappy, and then read the sequence files. I'm not sure what the best method is for reading hadoop sequence files in python. I am also getting and error when trying to decompress this file

>>> body_decomp = snappy.uncompress(body_read)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ec2-user/anaconda3/lib/python3.5/site-packages/snappy/snappy.py", line 91, in uncompress
    return _uncompress(data)
snappy.UncompressError: Error while decompressing: invalid input

What do I need to do in order to read these files?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
user3456269
  • 465
  • 2
  • 4
  • 14
  • Try Pyspark...? – OneCricketeer Feb 06 '18 at 01:50
  • Can I use Pyspark even if I don't use Apache Spark? I don't want to put these files into a Hadoop cluster, all I want to do is convert them to a pandas df and possibly load into MySQL – user3456269 Feb 06 '18 at 20:21
  • You don't need a Hadoop cluster. Spark has nothing to do with HDFS or YARN. It just includes libraries to read sequence files (and convert them to a Pandas dataframe, as well as insert into a database). PySpark requires the Apache Spark libraries, yes. See https://stackoverflow.com/a/29498104/2308683 – OneCricketeer Feb 06 '18 at 21:28
  • Thanks! I was able to follow that guidance and get pyspark working. One weird discrepancy that is still stumping me is that in my spark-shell, my file is automatically decompressed: – user3456269 Feb 06 '18 at 23:03

1 Answers1

1

Thanks to @cricket_007's helpful comments and some more digging, I was able to solve this. PySpark will accomplish the tasks that I need, and can read Hadoop Sequence Files directly from S3 locations, which is great. The tricky part was setting up PySpark, and I found this guide really helpful once I had downloaded Apache Spark - https://markobigdata.com/2017/04/23/manipulating-files-from-s3-with-apache-spark/.

One weird discrepancy I have though is that my spark-shell automatically decompresses the file:

scala> val fRDD = sc.textFile("s3a://bucket/file_path")
fRDD: org.apache.spark.rdd.RDD[String] = s3a://bucket/file_path MapPartitionsRDD[5] at textFile at <console>:24

scala> fRDD.first()
res4: String = SEQ?!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable??)org.apache.hadoop.io.compress.SnappyCodec???? �Z�f�uAf���- 2D���� �Z�f�uAf���- 2D�?^N???^???F�

but PySpark does not:

>>> from pyspark import SparkContext, SparkConf
>>> sc = SparkContext()
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/02/06 23:00:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

>>> fRDD = sc.textFile("s3a://bucket/file_path")
>>> fRDD.first()
'SEQ\x06!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable\x01\x01)org.apache.hadoop.io.compress.SnappyCodec\x00\x00\x00\x00\x0b�Z�f�uAf���-\x1d2D����\x0b�Z�f�uAf���-\x1d2D�\x05^N\x00\x00\x05^\x00\x00\x00F�'

Any ideas how I get PySpark to do this?

EDIT: Thanks again to cricket_007, I started using .sequenceFile() instead. This was initially giving me the error

    >>> textFile = sc.sequenceFile("s3a://bucket/file_path")
18/02/07 18:13:12 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.

I was able to solve that issue by following this guide - https://community.hortonworks.com/questions/18903/this-version-of-libhadoop-was-built-without-snappy.html. I am now able to read the sequence file and decompile the protobuf message

>>> seqs = sc.sequenceFile("s3a://bucket/file_path").values()
>>> feed = protobuf_message_pb2.feed()
>>> row = bytes(seqs.first())
>>> feed.ParseFromString(row)
>>> feed.user_id_64
3909139888943208259

This is exactly what I needed. What I want to do now is find an efficient way to decompile the entire sequenceFile and turn it into a DataFrame, rather than doing it one record at a time as I have done above.

user3456269
  • 465
  • 2
  • 4
  • 14
  • 1
    I think that output is just the difference in the encoding of Scala vs Python REPLs. The string itself is not "decompressed" – OneCricketeer Feb 07 '18 at 00:41
  • 1
    Also, don't use `textFile`. There is a method specifically for SequenceFiles http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext.sequenceFile – OneCricketeer Feb 07 '18 at 00:43
  • 1
    Thanks cricket_007, that was a good idea. I had to follow this solution to get snappy decompression working https://community.hortonworks.com/questions/18903/this-version-of-libhadoop-was-built-without-snappy.html, but I'm all good now. My last task is to find an efficient way to decompile the protobuf messages and turn the results into a dataframe – user3456269 Feb 07 '18 at 20:26