I am using Scala and Spark to manage a big quantity of records and each of those records has the form:
single record => (String, Row)
and every Row
is composed by 45 different kind of values (String
, Integer
, Long
).
To aggregate them I am using:
myRecords.aggregateByKey ( List [Any]( ) ) (
(aggr, value) => aggr ::: (value :: Nil),
(aggr1, aggr2) => aggr1 ::: aggr2
)
The problem is that I am getting constanly the message:
15/11/21 17:54:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 147767 ms exceeds timeout 120000 ms
15/11/21 17:54:14 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 147767 ms
[Stage 3:====> (875 + 24) / 3252]
15/11/21 17:57:10 WARN BlockManager: Putting block rdd_14_876 failed
...and finally...
15/11/21 18:00:27 ERROR Executor: Exception in task 876.0 in stage 3.0 (TID 5465)
java.lang.OutOfMemoryError: GC overhead limit exceeded
What I can guess is that the aggregate is so big that to match the key for a new record it will require more and more time, till a task gets to some timeout because it didn't find the right place where add the record's value.
I played with different parameters from spark-submit
like:
spark.default.parallelism => to reduce the size of tasks augmenting this value
spark.executor.memory => usually I put much less then driver memory
spark.driver.memory => the whole driver memory (single machine tho)
--master local[number of cores]
Any idea how to get at the end of the process without out-of-memory/timeouts?
UPDATE
I am trying to merge two csv files based on:
1) join them based on a csv column 2) merge the join rows, based on 3 column values 3) aggregate/group this joined & merged files with the key at 2) 4) doing some stuff on the single aggregate data from 3)
This is the code:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object MyRecords {
def createKey(k1: String, k2: String, k3: String):String = {
Seq(k1, k2, k3).iterator.map ( r => if (r == null) "" else r.trim.toUpperCase ).mkString ("")
}
def main(args: Array[String]): Unit = {
val df1FilePath = args ( 0 )
val df2FilePath = args ( 1 )
val sc = new SparkContext ( new SparkConf ( ) )
val sqlContext = new SQLContext ( sc )
import sqlContext.implicits._
val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df1FilePath).as("one")
df1.registerTempTable("df1")
val df2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df2FilePath)
val df2Renamed = df2.select(
col ( "v0" ).as ( "y_v0" ),
col ( "v1" ).as ( "y_v1" ),
col ( "v2" ).as ( "y_v2" ),
col ( "v3" ).as ( "y_v3" ),
col ( "v4" ).as ( "y_v4" ),
col ( "v5" ).as ( "y_v5" ),
col ( "v6" ).as ( "y_v6" ),
col ( "v7" ).as ( "y_v7" ),
col ( "v8" ).as ( "y_v8" ),
col ( "v9" ).as ( "y_v9" ),
col ( "v10" ).as ( "y_v10" ),
col ( "v11" ).as ( "y_v11" ),
col ( "v12" ).as ( "y_v12" ),
col ( "v13" ).as ( "y_v13" ),
col ( "v14" ).as ( "y_v14" ),
col ( "v15" ).as ( "y_15" ),
col ( "v16" ).as ( "y_16" ),
col ( "v17" ).as ( "y_17" ),
col ( "v18" ).as ( "y_18" ),
col ( "v19" ).as ( "y_19" ),
col ( "v20" ).as ( "y_20" ),
col ( "v21" ).as ( "y_21" ),
col ( "v22" ).as ( "y_22" ),
col ( "v23" ).as ( "y_23" ),
col ( "v24" ).as ( "y_24" ),
col ( "v25" ).as ( "y_25" ),
col ( "v26" ).as ( "y_26" ),
col ( "v27" ).as ( "y_27" ),
col ( "v28" ).as ( "y_28" ),
col ( "v29" ).as ( "y_29" ),
col ( "v30" ).as ( "y_30" ),
col ( "v31" ).as ( "y_31" ),
col ( "v32" ).as ( "y_32" )
).as("two")
df2Renamed.registerTempTable("df2")
val dfJoined = dfArchive.join( df2Renamed, $"one.v0" === $"two.y_v0", "fullouter" ).as("j")
dfJoined.registerTempTable("joined")
val dfMerged = sqlContext.sql("SELECT * FROM joined").map(r =>
if (r.getAs("y_v1") != null) {
(createKey (r.getAs("y_v2"), r.getAs("y_v3"), r.getAs("y_v4") ), r)
} else {
(createKey (r.getAs("v2"), r.getAs("v3"), r.getAs("v4") ), r)
})
dfMerged.groupByKey().collect().foreach(println)
sc.stop()
}
}