1

I have a piece of code that creates a DataFrame and persists it to S3. Below creates a DataFrame of 1000 rows and 100 columns, populated by math.Random. I'm running this on a cluster with 4 x r3.8xlarge worker nodes, and configuring plenty of memory. I've tried with the maximum number of executors, and one executor per node.

// create some random data for performance and scalability testing
val df = sqlContext.range(0,1000).map(x => 
             Row.fromSeq((1 to 100).map(y => math.Random)))

df.saveAsParquetFile("s3://kirk/my_file.parquet")

My problem is that I can create a much larger DataFrame in memory than I can save to S3.

For example, 1 billion rows and 1000 columns can be constructed and queried, but 100 million rows and 100 columns fails when I write to S3 in this manner. I don't get great messages from the Spark context, but the job will fail because too many of the tasks failed.

Is there some configuration to save a file more efficiently? Should I configure Spark differently in order to saveAsParquetFile?

This is the stacktrace from an executor:

15/09/09 18:10:26 ERROR sources.InsertIntoHadoopFsRelation: Aborting task.
java.lang.OutOfMemoryError: Java heap space
    at parquet.column.values.dictionary.IntList.initSlab(IntList.java:87)
    at parquet.column.values.dictionary.IntList.<init>(IntList.java:83)
    at parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:85)
    at parquet.column.values.dictionary.DictionaryValuesWriter$PlainIntegerDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:549)
    at parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:88)
    at parquet.column.impl.ColumnWriterImpl.<init>(ColumnWriterImpl.java:74)
    at parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:68)
    at parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
    at parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
    at parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
    at parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
    at parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
    at parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
    at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
    at parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
    at org.apache.spark.sql.parquet.ParquetOutputWriter.<init>(newParquet.scala:83)
    at org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
    at org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470)
    at org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360)
    at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172)
    at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
    at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
zero323
  • 322,348
  • 103
  • 959
  • 935
Kirk Broadhurst
  • 27,836
  • 16
  • 104
  • 169
  • 1
    What does the executor/container log say for errors? If there is no errors in the log is the nodemanager killing off the container for the executor due to it exceeding memory? – ChristopherB Sep 09 '15 at 21:03
  • @ChristopherB updated the question with the executor log. Java heap out of memory. But how do I configure that? – Kirk Broadhurst Sep 09 '15 at 22:12

1 Answers1

0

I'm thinking that you need to repartition your dataframe (you should have at least numberOfWorkerInstances * numberOfCoresOnEachInstance number of partitions) to allow for parallel writes to S3.

Glennie Helles Sindholt
  • 12,816
  • 5
  • 44
  • 50