1

I am using Scala and Spark to manage a big quantity of records and each of those records has the form:

single record => (String, Row)

and every Row is composed by 45 different kind of values (String, Integer, Long).

To aggregate them I am using:

myRecords.aggregateByKey ( List [Any]( ) ) (
      (aggr, value) => aggr ::: (value :: Nil),
      (aggr1, aggr2) => aggr1 ::: aggr2
)

The problem is that I am getting constanly the message:

15/11/21 17:54:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 147767 ms exceeds timeout 120000 ms

15/11/21 17:54:14 ERROR TaskSchedulerImpl: Lost executor driver on localhost: Executor heartbeat timed out after 147767 ms

[Stage 3:====>               (875 + 24) / 3252]

15/11/21 17:57:10 WARN BlockManager: Putting block rdd_14_876 failed

...and finally...

15/11/21 18:00:27 ERROR Executor: Exception in task 876.0 in stage 3.0 (TID 5465)
java.lang.OutOfMemoryError: GC overhead limit exceeded

What I can guess is that the aggregate is so big that to match the key for a new record it will require more and more time, till a task gets to some timeout because it didn't find the right place where add the record's value.

I played with different parameters from spark-submit like:

spark.default.parallelism => to reduce the size of tasks augmenting this value

spark.executor.memory => usually I put much less then driver memory

spark.driver.memory => the whole driver memory (single machine tho)

--master local[number of cores] 

Any idea how to get at the end of the process without out-of-memory/timeouts?

UPDATE

I am trying to merge two csv files based on:

1) join them based on a csv column 2) merge the join rows, based on 3 column values 3) aggregate/group this joined & merged files with the key at 2) 4) doing some stuff on the single aggregate data from 3)

This is the code:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel._
import org.apache.spark.sql.{Column, DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object MyRecords {

  def createKey(k1: String, k2: String, k3: String):String = {
    Seq(k1, k2, k3).iterator.map ( r => if (r == null) "" else r.trim.toUpperCase ).mkString ("")
  }

  def main(args: Array[String]): Unit = {

    val df1FilePath = args ( 0 )
    val df2FilePath = args ( 1 )

    val sc = new SparkContext ( new SparkConf ( ) )
    val sqlContext = new SQLContext ( sc )
    import sqlContext.implicits._

    val df1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df1FilePath).as("one")

    df1.registerTempTable("df1")

    val df2 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("delimiter", "\t").load(df2FilePath)

    val df2Renamed = df2.select(
      col ( "v0" ).as ( "y_v0" ),
      col ( "v1" ).as ( "y_v1" ),
      col ( "v2" ).as ( "y_v2" ),
      col ( "v3" ).as ( "y_v3" ),
      col ( "v4" ).as ( "y_v4" ),
      col ( "v5" ).as ( "y_v5" ),
      col ( "v6" ).as ( "y_v6" ),
      col ( "v7" ).as ( "y_v7" ),
      col ( "v8" ).as ( "y_v8" ),
      col ( "v9" ).as ( "y_v9" ),
      col ( "v10" ).as ( "y_v10" ),
      col ( "v11" ).as ( "y_v11" ),
      col ( "v12" ).as ( "y_v12" ),
      col ( "v13" ).as ( "y_v13" ),
      col ( "v14" ).as ( "y_v14" ),
      col ( "v15" ).as ( "y_15" ),
      col ( "v16" ).as ( "y_16" ),
      col ( "v17" ).as ( "y_17" ),
      col ( "v18" ).as ( "y_18" ),
      col ( "v19" ).as ( "y_19" ),
      col ( "v20" ).as ( "y_20" ),
      col ( "v21" ).as ( "y_21" ),
      col ( "v22" ).as ( "y_22" ),
      col ( "v23" ).as ( "y_23" ),
      col ( "v24" ).as ( "y_24" ),
      col ( "v25" ).as ( "y_25" ),
      col ( "v26" ).as ( "y_26" ),
      col ( "v27" ).as ( "y_27" ),
      col ( "v28" ).as ( "y_28" ),
      col ( "v29" ).as ( "y_29" ),
      col ( "v30" ).as ( "y_30" ),
      col ( "v31" ).as ( "y_31" ),
      col ( "v32" ).as ( "y_32" )
    ).as("two")

    df2Renamed.registerTempTable("df2")

    val dfJoined = dfArchive.join( df2Renamed, $"one.v0" === $"two.y_v0", "fullouter" ).as("j")

    dfJoined.registerTempTable("joined")

    val dfMerged = sqlContext.sql("SELECT * FROM joined").map(r =>
      if (r.getAs("y_v1") != null) {
        (createKey (r.getAs("y_v2"), r.getAs("y_v3"), r.getAs("y_v4") ), r)
      } else {
        (createKey (r.getAs("v2"), r.getAs("v3"), r.getAs("v4") ), r)
      })

    dfMerged.groupByKey().collect().foreach(println)

    sc.stop()
  }
}
Randomize
  • 8,651
  • 18
  • 78
  • 133

1 Answers1

2

Since all you do here is grouping by key it is better to use groupByKey instead of aggregateByKey, especially a one which creates a huge number of temporary objects like value :: Nil (why not simply value :: aggr?).

Since it doesn't perform map side aggregations it should put less stress on a garbage collector (see SPARK-772).

See also: Is groupByKey ever preferred over reduceByKey

Edit:

Regarding code you've provided in the update it doesn't really make sense. If you want to use DataFrames there is no reason to group data using RDDs in the first place. Also you duplicate your data by keeping both Strings and casted values increasing memory usage and additionally stressing GC. It looks like what you need is roughly something like this (with a small help of spark-csv):

// Load data, optionally add .option("inferSchema", "true")
val df1 = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("delimiter", "\t")
    .load(file1Path)

val df2 = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .option("delimiter", "\t")
    .load(file2Path)

// Join and cache
val df = df1.join(
  df2,
  // Join condition
  df1("foo") === df2("foo") &&
    df1("bar") === df2("bar") &&
    df1("baz") === df2("baz"),
  "fullouter")
df.registerTempTable("df")
sqlContext.cacheTable("df")

// Perform all the required casting using safe cast methods
// and replace existing columns
df.withColumn("some_column", $"some_column".cast(IntegerType))

Any aggregations you need you can perform you can execute on a data frame without physically grouping the data. If you want to subset simply use where or filter.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 3
    I already expected to see you here ;) Your answers are high quality. – WestCoastProjects Nov 21 '15 at 19:03
  • same issue: `java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271)` – Randomize Nov 21 '15 at 21:07
  • `java.lang.OutOfMemoryError: GC overhead limit exceeded` and `java.lang.OutOfMemoryError: Java heap space at ...` are not the same issues. Actually why do you convert this data to rows and group in the first place? Or collect for that matter. – zero323 Nov 21 '15 at 21:19
  • @zero323 I cannot see how to do differently. I merge two rows from two files in a new array. I think that is. – Randomize Nov 21 '15 at 21:36
  • Thanks for the answer first. One file has ~ 100,000,000 and the other ~ 30,000,000 records. One machine only with memory drive = 80g and memory executor = 12g and 24 cores cpu. Following your answer, will I run the `mergedFiles` inside the `foreach` after the `groupByKey` as final transformation? – Randomize Nov 21 '15 at 22:26
  • I have semplified a lot the code now, using spark-sql/csv as you suggested but I am still getting `java.lang.OutOfMemoryError: Requested array size exceeds VM limit` during the `groupByKey` stage. This is the configuration: spark-submit --name "my records" --master local[4] --packages com.databricks:spark-csv_2.11:1.3.0 --conf "spark.local.dir=/opt/spark-tmp/" --conf spark.driver.maxResultSize=0 --conf spark.executor.memory=12g --conf spark.driver.memory=80g target/scala-2.11/er-archive_2.11-0.0.1.jar /opt/files/file1.txt /opt/files/file2.txt. The total memory of the machine is 96g. Any idea? – Randomize Nov 22 '15 at 19:26
  • Like I've already said - use `filter` / `where` and SQL aggregates not `groupBy`. Regarding error you see right now it is most likely a problem with JVM configuration and has been covered multiple times on SO. – zero323 Nov 22 '15 at 23:11
  • can you please show where/how you mean to use filter/where? I can trying to figure out a simple way to replace groupBy separating by the same indexes. Is that more memory efficient? – Randomize Nov 23 '15 at 12:26
  • If I understood what you meant, for example, with a list of names that of course start with letter A to Z, I could split the big aggregate in 26 parts as in the alphabet letters, filtering the data, so I will get 26 smaller chunk of data to group by? – Randomize Nov 23 '15 at 12:49