1

This question is relevant to my previous question at aggregate multiple columns in sql table as json or array

I post some updates/follow-up questions here because I got a new problem.

I would like to query a table on presto database from pyspark hive and create a pyspark dataframe based on it. I have to save the dataframe to s3 faster and then read it as parquet (or any other formats as long as it can be read/written fast) from s3 efficiently.

In order to keep the size as small as possible, I have aggregated some columns into a json object.

The original table (> 10^9 rows, some columns (e.g. obj_desc) may have more than 30 English words):

id.    cat_name.   cat_desc.       obj_name.    obj_desc.   obj_num
1.     furniture    living office   desk         4 corners    1.5.      
1      furniture.   living office.  chair.       4 legs.      0.8
1.     furniture.   restroom.       tub.         white wide.  2.7
1.     cloth.       fashion.        T-shirt.     black large. 1.1

I have aggregated some columns to json object.

aggregation_cols = ['cat_name','cat_desc','obj_name','obj_description', 'obj_num'] # they are all string

df_temp = df.withColumn("cat_obj_metadata", F.to_json(F.struct([x for x in aggregation_cols]))).drop(*agg_cols)

df_temp_agg = df_temp.groupBy('id').agg(F.collect_list('cat_obj_metadata').alias('cat_obj_metadata'))

df_temp_agg.cache()
df_temp_agg.printSchema()
# df_temp_agg.count() # this cost a very long time but still cannot return result so I am not sure how large it is.
df_temp_agg.repartition(1024) # not sure what optimal one should be?
df_temp_agg.write.parquet(s3_path, mode='overwrite') # this cost a long time (> 12 hours) but no return.

I work on a m4.4xlarge with 4 nodes and all cores look not busy. I also checked the s3 bucket, no folder created at "s3_path". For other small dataframe, I can see the "s3_path" can be created when "write.parquet()" is run. But, for this large dataframe, nothing fodlers or files are created on "s3_path".

Because the

df_meta_agg.write.parquet()

never returns, I am. not sure what errors could happen here on spark cluster or on s3.

Anybody could help me about this ? thanks

user3448011
  • 1,469
  • 1
  • 17
  • 39
  • The only dataframes that I can see in your code snippet are df, df_temp, df_temp_agg. Where is df_meta_agg coming from? What is the logic that defines it? – ARCrow Jan 24 '22 at 04:02
  • @ARCrow, Thanks for your reply. It is a typo, which has been fixed. Any help would be appreciated. – user3448011 Jan 25 '22 at 15:36
  • The purpose of repartitioning should be to create partitions that are close to 1GB of size. Caching wouldn't take effect unless an action (count, display, write) is initiated on the dataframe. – ARCrow Jan 31 '22 at 04:17

0 Answers0