2

I have a Scala HashMap created out of a Spark dataframe. I want to convert it to a Java HashMap which I want to write to disk. Later I intend to load the Java HashMap in the production and use it in a non-spark environment.

So far I am able to convert a Spark dataframe to a Scala HashMap as follows:

val mydf1 = Seq((1, "a"), (2, "b"),(3, "c"),(4, "d"),(5, "e")).toDF("id", "col2")
mydf1.show

+---+----+
| id|col2|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
|  4|   d|
|  5|   e|
+---+----+

val mydfHash = mydf1.rdd.map{
    case Row(routeItemKey: String, kwrExpectedScore: Double) => (routeItemKey, kwrExpectedScore)}.collectAsMap()

However, when I try to convert the above Scala HashMap to a Java HashMap as follows:

import java.util._
import scala.collection.JavaConverters._


mydfHash.asJava

I am getting a java.lang.OutOfMemoryError: Java heap space error.

Below is the stacktrace logs that I am getting for reference:

java.lang.OutOfMemoryError: Java heap space
  at java.util.Arrays.copyOf(Arrays.java:3332)
  at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
  at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
  at java.lang.StringBuilder.append(StringBuilder.java:136)
  at java.lang.StringBuilder.append(StringBuilder.java:131)
  at java.util.AbstractMap.toString(AbstractMap.java:559)
  at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332)
  at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337)
  at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345)
  at .$print$lzycompute(<console>:10)
  at .$print(<console>:6)
  at $print(<console>)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
  at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
  at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
  at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
  at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
  at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
  at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
  at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
  at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
  at org.apache.zeppelin.spark.SparkScala211Interpreter.scalaInterpret(SparkScala211Interpreter.scala:143)
  at org.apache.zeppelin.spark.SparkScala211Interpreter$$anonfun$interpret$1$$anonfun$apply$2.apply(SparkScala211Interpreter.scala:122)
  at org.apache.zeppelin.spark.SparkScala211Interpreter$$anonfun$interpret$1$$anonfun$apply$2.apply(SparkScala211Interpreter.scala:116)
  at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)

Why does collectAsMap() works but asJava fails? I thought collectAsMap also collects every RDD to the Spark master node. So if collectAsMap does not fail then ideally asJava should not also fail by running out of heap memory.

Update 1

Do I really need to convert Scala HashMap to a Java Hashmap? Is it not possible to export the Scala HashMap serialize to a file, and load this Scala HashMap to a Java HashMap in a Java environment? Because both Scala and Java runs in JVMs.

halfer
  • 19,824
  • 17
  • 99
  • 186
user3243499
  • 2,953
  • 6
  • 33
  • 75
  • Possible duplicate of: https://stackoverflow.com/questions/38307258/out-of-memory-error-when-writing-out-spark-dataframes-to-parquet-format – Sid Jan 01 '19 at 13:20
  • 2
    To convert any data structure to another, you need enough space both for the existing data structure and the new one. Thus, you need approximately twice the memory size, possibly more, depending on the data structures in question. So of course you could run out of memory when you do that. – RealSkeptic Jan 01 '19 at 13:23
  • @RealSkeptic in my Spark cluster, when I save the dataframe, to a csv file, it comes out to be just 90 MB. Then I converted the same dataframe to a hashmap then to a Java hashmap and getting this exception. So 90 * 3 = 270 MB is still less than my driver nodes memory of 5GB. – user3243499 Jan 01 '19 at 13:27
  • 3
    What you are collecting is not your Dataframe. It's a PairRDD. You should use dataset operations if you want to stay in the Dataframe world. It's really unclear why you want to serialize the result as an object anyway, rather than some readable, language-agnostic format like JSON or CSV. – RealSkeptic Jan 01 '19 at 13:35
  • Why not write to disk in some agnostic format? JSON? YAML? Or go fancy and use a compact format like ProtoBuf or Avro? – Boris the Spider Jan 01 '19 at 13:37
  • @BoristheSpider do you mean dataframe to ProtoBuf? – user3243499 Jan 01 '19 at 13:39
  • I presume you _need_ the Scala `Map` for some reason and can't just write the dataframe to disk directly. Convert your Scala `Map` directly to some on-disk format, there's good support for [proto in Scala](https://scalapb.github.io/). JSON is simplier. – Boris the Spider Jan 01 '19 at 13:41

1 Answers1

0

Why does collectAsMap() works but asJava fails?

As per my understanding, below is the justification:

When you use asJava, internally it's using StringBuilder to create an array and StringBuilder which tries to build an array bigger than Integer.MAX_VALUE(cannot allocate an array that has more than Integer.MAX_VALUE elements). With StringBuilder you can accumulate 1,207,959,550 chars - far less than Integer.MAX_VALUE.

Hope this helped.