2

I have a zip compressed csv stored on S3. I would like to convert this file to parquet format, partitioned on a specific column in the csv. When I try the following (using Python 3.6.5, and Pyspark 2.7.14):

from pyspark.sql import SQLContext, SparkSession

spark = SparkSession.builder.appName("Python Spark SQL basic example").config('spark.hadoop.fs.s3a.access.key','<my access key>').config('spark.hadoop.fs.s3a.secret.key','<my secret key>').getOrCreate()

df = spark.read.csv("s3a://mybucket/path/myfile.zip")
df.show(n=10)

This is the output:

+--------------------+                                                          
|                 _c0|
+--------------------+
|PK-    4*PM<ȏ...|
|����W����lq��...|
|jk�ulE����
           Uձ�...|
|U횵Сc�=t�kd�0z...|
|T�;t��gն>t�:�y...|
|ݵK!뼠PT���DЉ*�...|
|�}�B��h)t����H!k?...|
|              ��y�B֧|
|��� �1�NTȞB(�...|
+--------------------+
only showing top 10 rows

When I convert to parquet using:

df.write.partitionBy("column").parquet("s3a://otherbucket/path/myfile_partitioned",mode="overwrite")

The results in S3 do not match the actual column values in the source files.

I have also tried using:

sqlctx = SQLContext(spark)
df = sqlctx.read.csv("s3a://cta-ridership/seeds/nm45dayall_2.zip")

But the results are the same. Is something wrong with my csv? I'm new to Pyspark so I might be missing something basic.

UPDATE: Per @Prazy's help, I've updated my code to:

spark = SparkSession.builder.appName("Python Spark SQL basic example").config('spark.hadoop.fs.s3a.access.key','<my key>').config('spark.hadoop.fs.s3a.secret.key','<my secret key>').getOrCreate()
sc = spark.sparkContext

rdd = sc.textFile("s3a://mybucket/mypath/myfile.zip")
print(rdd.take(10))

But this still returns:

['PK\x03\x04-\x00\x00\x00\t\x004*PM<ȏ\x1f��������\x13\x00\x14\x00nm45dayall_2017.csv\x01\x00\x10\x00� �\x07\x00\x00\x00�_J�\x00\x00\x00\x00����㺳���;�7Q�;�"%R�a�{;ܶg��3��9���\x0b\x17I��<Y��of��ڿU��\x19�\x01A\x10\x04\x12�?��\x7f�\x1f������/����������\x7f��\x7f�����?���������?��\x7f�������\x7f����?�����\x7f���������������������\x1f��\x7f����_����\x7f�\x7f��n\x7f������?�����?��������_�\x7f\x1b����\x7f���������g�\\�i�]���\x7f�����3���������ǟ��<_����}���_�������?�\x7f�n�1h��t��.5�Z\x05ͺk��1Zߎ��\x16���ڿE�A��\x1fE�<UAs\x11���z�\\�n��s��\x1ei�XiĐ\x1ej\x0cݪ������`�yH�5��8Rh-+�K�\x11z�N�)[�v̪}', "���\x10�W�\x07���\x12l\x10q��� �qq,i�6ni'��\x10\x14�h\x08��V\x04]��[P�!h�ڢ���GwF\x04=�.���@��>����h", 'jk�\x1culE\x15����\x0cUձ\x7f���#\x1d��\x10Tu���o����\x0eݎ\x16�E\x0f\x11r�q\x08Ce[�\x0c\x0e�s\x10z�?Th\x1aj��O\x1f�\x0f�\x10A��X�<�HC�Y�=~;���!', 'U횵Сc�=t�k\x15d�0\x14z\x16\x1d��R\x05M��', 'T�;t��\x10\x11gն>t�\x01:�y:�c�U��\x1d\x7ff�Т�a', 'ݵ\x19K!뼠PT�\x11��DЉ*\x10\u2d2e�<d� Й��\x08AQ\x03\x04AQ�� {��P����\x1e��Z\x7f���AG�3�b\x19T�E�%;�"ޡ�El�rס�}��qg���qg|������7�8�k\x1e:j�\x7f���c�Bv���\\t�[�ܚ�nz��PU���(\x14��\x08�����CϢc�=|\x14���Ⱥ', ')d]�\x10Z�o\x0e:�v����\x0er�oѣj��\x06DA%b�>', '�}�B��h)t����H!k?R�zf)���5k�B��h?�h���Ao}�S��\x17i\x14�\x1eU', '��y�B֧', '��\x16� �1�NT\x1b1ȞB(�\x16�k\x7f�B!�d��m\x0c:�\x03��˵\x1f�����ޥa�\x16@� ���V"Ա�k']

UPDATE Again, thanks to Prazy for their help. I'm trying to convert the RDD to a dataframe using:

spark = SparkSession.builder.appName("Python Spark SQL basic example").config('spark.hadoop.fs.s3a.access.key','<mykey>').config('spark.hadoop.fs.s3a.secret.key','<myotherkey>').getOrCreate()

sc = spark.sparkContext
schema = StructType([
StructField("YYYYMMDD", IntegerType(), True),
StructField("ENTRANCE_ID", IntegerType(), True),
StructField("FARE_MEDIA_TYPE", IntegerType(), True),
StructField("TRANS_EVENT", IntegerType(), True),
StructField("HALFHOUR", FloatType(), True),
StructField("RIDES", IntegerType(), True)])

rdd = sc.textFile("s3a://mybucket/path/myfile.zip")
df = sqlctx.createDataFrame(rdd, schema)

df.show(n=10)
user2752159
  • 1,182
  • 3
  • 13
  • 29
  • @Prazy: Python 3.6.5. Will edit in original question! – user2752159 Oct 18 '18 at 17:45
  • @Prazy I see! I'l research to see how to unzip the file into a pyspark DataFrame – user2752159 Oct 18 '18 at 17:50
  • 1
    https://stackoverflow.com/questions/27477730/how-to-read-multiple-gzipped-files-from-s3-into-a-single-rdd – pvy4917 Oct 18 '18 at 17:53
  • @Prazy Pyspark 2.7.14. Will also edit into original quetsion – user2752159 Oct 18 '18 at 17:55
  • Look at that answer. And, you said it is in `gzip` but, you are reading it as `.zip`. – pvy4917 Oct 18 '18 at 17:57
  • @Prazy, thank for the help, it's much appreciated. I've read the file in per the link above, but the data still looks unusual, even as a RDD – user2752159 Oct 18 '18 at 18:16
  • How are you sure that RDD is unusual? Are you converting it into a DataFrame? – pvy4917 Oct 18 '18 at 18:23
  • @Prazy: I used rdd.take(10), and still got the strange output. I tired to create it into a dataframe using: `sqlctx = SQLContext(spark)` `schema = StructType([ StructField("YYYYMMDD", IntegerType(), True), StructField("ENTRANCE_ID", IntegerType(), True])` `df = sqlctx.createDataFrame(rdd, schema)` `df.show(n=10)` And got: `TypeError: StructType can not accept object 'PK\x03\x04-\x00\x00\x00\t\x000kQM;k����������\x17\x00.....` – user2752159 Oct 18 '18 at 18:29
  • After discussion with @Prazy, we arrived at a similar conclusion to Tanveer's solution below. ZIP compression is not supported in pyspark, and I need to unzip and compress into another format. I used the following work around: https://stackoverflow.com/a/36511190/2752159 In my case the zip files were ~30GB uncompressed, so I need to recreate the base files using gzip, and avoiding zip compression entirely. – user2752159 Oct 19 '18 at 15:49
  • But, TrigonaMinima in his answer https://stackoverflow.com/a/36511190/2752159, he clearly said he used `.zip` file – pvy4917 Oct 19 '18 at 15:51

1 Answers1

2

zip file is not directly supported. You can follow the link here and here to try the workaround. Use gzip and other supported formats if possible

Tanveer Uddin
  • 1,520
  • 9
  • 15