1

I have a problem with Spark SQL. I read some data from csv files. Next to I do groupBy and join operation, and finished task is write joined data to file. My problem is time gap (I show that on log below with space).

18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1069
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1003
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 965
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1073
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1038
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 900
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 903
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 938
18/08/07 23:39:40 INFO storage.BlockManagerInfo: Removed broadcast_84_piece0 on 10.4.110.24:36423 in memory (size: 32.8 KB, free: 4.1 GB)
18/08/07 23:39:40 INFO storage.BlockManagerInfo: Removed broadcast_84_piece0 on omm104.in.nawras.com.om:43133 in memory (size: 32.8 KB, free: 4.1 GB)
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 969
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1036
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 970
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1006
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1039
18/08/07 23:39:47 WARN util.Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
18/08/07 23:39:54 INFO parquet.ParquetFileFormat: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter

18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 

Dataframes are small sized ~5000 records, and ~800 columns. I using following code:

val parentDF = ...
val childADF = ...
val childBDF = ...

val aggregatedAColName = "CHILD_A"
val aggregatedBColName = "CHILD_B"

val columns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3", "val_0")
val keyColumns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3")

val nestedAColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedAColName)
val childADataFrame = childADF
  .select(nestedAColumns: _*)
  .repartition(keyColumns.map(col): _*)
  .groupBy(keyColumns.map(col): _*)
  .agg(collect_list(aggregatedAColName).alias(aggregatedAColName))
val joinedWithA = parentDF.join(childADataFrame, keyColumns, "left")

val nestedBColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedBColName)
val childBDataFrame = childBDF
  .select(nestedBColumns: _*)
  .repartition(keyColumns.map(col): _*)
  .groupBy(keyColumns.map(col): _*)
  .agg(collect_list(aggregatedBColName).alias(aggregatedBColName))
val joinedWithB = joinedWithA.join(childBDataFrame, keyColumns, "left")

Processing time on 30 files (~85 k records all) is strange high ~38 min. Have you ever seen similar problem?

1 Answers1

-1

Try to avoid repartition() call as it causes unnecessary data movements within the nodes.

According to Learning Spark

Keep in mind that repartitioning your data is a fairly expensive operation. Spark also has an optimized version of repartition() called coalesce() that allows avoiding data movement, but only if you are decreasing the number of RDD partitions.

In a simple way COALESCE :- is only for decreases the no of partitions , No shuffling of data it just compress the partitions.

khushbu kanojia
  • 250
  • 1
  • 3
  • This is not entirely accurate. ``Coalesce`` minimizes shuffling of data as it is used to reduce the number of partitions. While ``repartition`` can reduce or increase the number of partitions. Still, it does not completely eliminate shuffling of data completely. It is also dependent on the ``shuffle flag`` being set to ``true`` or ``false``.I refer to these questions: https://stackoverflow.com/questions/31610971/spark-repartition-vs-coalesce and https://stackoverflow.com/questions/42034314/does-coalescenumpartitions-in-spark-undergo-shuffling-or-not – Jeremy Aug 10 '18 at 14:26
  • Please paste your physical plan – Chandan Ray Aug 10 '18 at 15:11