I have a problem with Spark SQL. I read some data from csv files. Next to I do groupBy
and join operation, and finished task is write joined data to file. My problem is time gap (I show that on log below with space).
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1069
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1003
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 965
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1073
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1038
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 900
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 903
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 938
18/08/07 23:39:40 INFO storage.BlockManagerInfo: Removed broadcast_84_piece0 on 10.4.110.24:36423 in memory (size: 32.8 KB, free: 4.1 GB)
18/08/07 23:39:40 INFO storage.BlockManagerInfo: Removed broadcast_84_piece0 on omm104.in.nawras.com.om:43133 in memory (size: 32.8 KB, free: 4.1 GB)
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 969
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1036
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 970
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1006
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1039
18/08/07 23:39:47 WARN util.Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
18/08/07 23:39:54 INFO parquet.ParquetFileFormat: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with:
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters:
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters:
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with:
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters:
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters:
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with:
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters:
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters:
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with:
Dataframes are small sized ~5000 records, and ~800 columns. I using following code:
val parentDF = ...
val childADF = ...
val childBDF = ...
val aggregatedAColName = "CHILD_A"
val aggregatedBColName = "CHILD_B"
val columns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3", "val_0")
val keyColumns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3")
val nestedAColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedAColName)
val childADataFrame = childADF
.select(nestedAColumns: _*)
.repartition(keyColumns.map(col): _*)
.groupBy(keyColumns.map(col): _*)
.agg(collect_list(aggregatedAColName).alias(aggregatedAColName))
val joinedWithA = parentDF.join(childADataFrame, keyColumns, "left")
val nestedBColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedBColName)
val childBDataFrame = childBDF
.select(nestedBColumns: _*)
.repartition(keyColumns.map(col): _*)
.groupBy(keyColumns.map(col): _*)
.agg(collect_list(aggregatedBColName).alias(aggregatedBColName))
val joinedWithB = joinedWithA.join(childBDataFrame, keyColumns, "left")
Processing time on 30 files (~85 k records all) is strange high ~38 min. Have you ever seen similar problem?