1

I'm quite new to spark and currently running spark 2.1.2 on a hadoop 2.6.5 setup as a single node on a t3.xlarge (16gb mem). Been increasing the spark.executor.memory -> 12g, spark.driver.maxResultSize -> 12g, spark.driver.memory -> 6g, yet am repeatedly getting GC overhead limit, what could be the issue and any advise?

Secondary qns: In this single node set up, is it better to assign more mem to executor or driver?

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 112, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 117, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/airflow/dags/fncore/tasks/graph_to_neo4j.py", line 431, in graph_to_neo4j
    for edges in edges_result:
  File "/airflow/dags/fncore/tasks/graph_to_neo4j.py", line 343, in get_transformed_edges
    for dataframe in to_pandas_iterator(transformed, max_result_size=max_result_size):
  File "/airflow/dags/fncore/tasks/graph_to_neo4j.py", line 111, in to_pandas_iterator
    yield cur_dataframe.toPandas()
  File "/opt/spark-2.1.2/python/pyspark/sql/dataframe.py", line 1585, in toPandas
    return pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "/opt/spark-2.1.2/python/pyspark/sql/dataframe.py", line 391, in collect
    port = self._jdf.collectToPython()
  File "/opt/spark-2.1.2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/spark-2.1.2/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/spark-2.1.2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o2163.collectToPython.
An error occurred while calling o2163.collectToPython.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.zip.DeflaterOutputStream.<init>(DeflaterOutputStream.java:89)
    at java.util.zip.GZIPOutputStream.<init>(GZIPOutputStream.java:90)
    at java.util.zip.GZIPOutputStream.<init>(GZIPOutputStream.java:109)
    at org.apache.hadoop.io.WritableUtils.writeCompressedByteArray(WritableUtils.java:64)
    at org.apache.hadoop.io.WritableUtils.writeCompressedString(WritableUtils.java:94)
    at org.apache.hadoop.io.WritableUtils.writeCompressedStringArray(WritableUtils.java:155)
    at org.apache.hadoop.conf.Configuration.write(Configuration.java:2836)
    at org.apache.spark.util.SerializableConfiguration$$anonfun$writeObject$1.apply$mcV$sp(SerializableConfiguration.scala:27)
    at org.apache.spark.util.SerializableConfiguration$$anonfun$writeObject$1.apply(SerializableConfiguration.scala:25)
    at org.apache.spark.util.SerializableConfiguration$$anonfun$writeObject$1.apply(SerializableConfiguration.scala:25)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1281)
    at org.apache.spark.util.SerializableConfiguration.writeObject(SerializableConfiguration.scala:25)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:272)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:272)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1315)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:273)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1410)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReader(ParquetFileFormat.scala:343)
kent
  • 243
  • 1
  • 3
  • 11
  • This seems like an error on the driver and that you're having this error when running it on an Airflow Task. Does the Airflow Executor have sufficient resources? – EDG956 Nov 22 '21 at 13:27
  • There's only 1 executor, and the resource is as described above. Does it improve if there are more executors? The instance has only 4vCPUs. – kent Nov 22 '21 at 14:37
  • where are you running airflow on? The same t3.xlarge instance or a managed instance? I would think that the Out Of Memory occurs in the host running the airflow task (acting as Spark Driver) not on the hadoop cluster – EDG956 Nov 22 '21 at 14:42
  • Airflow runs as a docker container on a separate t3.medium instance? Oh hmm.. that's interesting, but Im submitting the spark job in --master yarn-client mode, so would the driver reside in the container instead of yarn and is so the spark.driver.memory is dependent on the container mem? There are other smaller tasks before this in the airflow job that executes fine. – kent Nov 22 '21 at 14:49
  • [this](https://stackoverflow.com/questions/20793694/what-is-yarn-client-mode-in-spark) tells that what you're saying is right, the driver resides in the airflow task, hence in the airflow worker/scheduler (on sequential or local executor) – EDG956 Nov 22 '21 at 15:16

1 Answers1

2

You are exceeding driver capacity (6GB) when calling collectToPython. This makes sense as your executor has much larger memory limit than the driver (12Gb). The problem I see in your case is that increasing driver memory may not be a good solution as you are already near the virtual machine limits (16GB). Consider that you need room for JVM memory requirements and the rest of the operating system, a part of the Spark environment requisites.

Said so, my suggestion would be:

First check the size of the dataframe you are trying to collect from the executor to the driver. Maybe you can apply some filtering or column drop of data you don't need, reducing the memory size.

If first point d not work, then increase the driver memory and see what happens. If you still get memory errors try to increase VM memory or think on distributed solutions. I would consider this last option as it is where Spark really shines. Multiple machines with several executors per machine to significantly increase processing power ( be aware of the cost too).

BSP
  • 735
  • 1
  • 10
  • 27
  • 1
    JVM doc https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/memleaks002.html: _The detail message "GC overhead limit exceeded" indicates that the garbage collector is running all the time and Java program is making very slow progress. After a garbage collection, if the Java process is spending more than approximately 98% of its time doing garbage collection and if it is recovering less than 2% of the heap and has been doing so far the last 5 (compile time constant) consecutive garbage collections, then a java.lang.OutOfMemoryError is thrown..._ – mazaneicha Nov 22 '21 at 21:02