0

We are using spark 2.0.2 managed by a DCOS system that fetch data from a Kafka 1.0.0 messaging service and writes parquet in a hdfs system. Every thing was working ok, but when we increase the number of topics in Kafka, our spark executors began to crash constantly with OOM errors:

    java.lang.OutOfMemoryError: Java heap space
    at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
    at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainDoubleDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:422)
    at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:139)
    at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
    at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
    at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
    at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183)
    at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:175)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:146)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:113)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:87)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:62)
    at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:47)
    at npm.parquet.ParquetMeasurementWriter.ensureOpenWriter(ParquetMeasurementWriter.java:91)
    at npm.parquet.ParquetMeasurementWriter.write(ParquetMeasurementWriter.java:75)
    at npm.ingestion.spark.StagingArea$Measurements.store(StagingArea.java:100)
    at npm.ingestion.spark.StagingArea$StagingAreaStorage.store(StagingArea.java:80)
    at npm.ingestion.spark.StagingArea.add(StagingArea.java:40)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.sendToStagingArea(Kafka2HDFSPM.java:207)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.consumeRecords(Kafka2HDFSPM.java:193)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.process(Kafka2HDFSPM.java:169)
    at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:133)
    at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:111)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)
18/03/20 18:41:13 ERROR [Executor task launch worker-0] SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError: Java heap space
    at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
    at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainDoubleDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:422)
    at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:139)
    at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
    at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
    at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
    at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183)
    at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:217)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:175)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:146)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:113)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:87)
    at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:62)
    at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:47)
    at npm.parquet.ParquetMeasurementWriter.ensureOpenWriter(ParquetMeasurementWriter.java:91)
    at npm.parquet.ParquetMeasurementWriter.write(ParquetMeasurementWriter.java:75)
    at npm.ingestion.spark.StagingArea$Measurements.store(StagingArea.java:100)
    at npm.ingestion.spark.StagingArea$StagingAreaStorage.store(StagingArea.java:80)
    at npm.ingestion.spark.StagingArea.add(StagingArea.java:40)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.sendToStagingArea(Kafka2HDFSPM.java:207)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.consumeRecords(Kafka2HDFSPM.java:193)
    at npm.ingestion.spark.Kafka2HDFSPM$SubsetProcessor.process(Kafka2HDFSPM.java:169)
    at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:133)
    at npm.ingestion.spark.Kafka2HDFSPM$FetchSubsetsAndStore.call(Kafka2HDFSPM.java:111)
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)

We tried to increase the available the executors memory, review the code, but we couldn't find anything wrong.

Another info: we are using RDDs in spark.

Have someone encountered a similar problem, that already been solved

Lucifer
  • 29,392
  • 25
  • 90
  • 143

1 Answers1

0

What is the heap configuration for the executor? By default, Java will autotune its heap according to machine memory. You need to change it to fit in your container with -Xmx setting.

See this article about running Java in the container

https://github.com/fabianenardon/docker-java-issues-demo/tree/master/memory-sample

janisz
  • 6,292
  • 4
  • 37
  • 70