3

I'm using hive.

When I write dynamic partitions with INSERT query and turn on hive.optimize.sort.dynamic.partition option(SET hive.optimize.sort.dynamic.partition=true), always there is single file in each partition.

But if I turn of that option(SET hive.optimize.sort.dynamic.partition=false), I got out of memory exception like this.

TaskAttempt 3 failed, info=[Error: Error while running task ( failure ) : attempt_1534502930145_6994_1_01_000008_3:java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
        at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:194)
        at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370)
        at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)
        at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
        at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
        at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
        at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
        at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
        at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
        at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229)
        at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131)
        at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
        at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
        at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
        at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
        at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
        at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:184)
        at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:376)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
        at org.apache.parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:327)
        at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288)
        at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:67)
        at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:128)
        at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:117)
        at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:286)
        at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:271)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketForFileIdx(FileSinkOperator.java:619)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:563)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createNewPaths(FileSinkOperator.java:867)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.getDynOutPaths(FileSinkOperator.java:975)
        at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:715)
        at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
        at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
        at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource$GroupIterator.next(ReduceRecordSource.java:356)
        at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:287)
        at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.run(ReduceRecordProcessor.java:317)
]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:299, Vertex vertex_1534502930145_6994_1_01 [Reducer 2] killed/failed due to:OWN_TASK_FAILURE]Vertex killed, vertexName=Map 1, vertexId=vertex_1534502930145_6994_1_00, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to OTHER_VERTEX_FAILURE, failedTasks:0 killedTasks:27, Vertex vertex_1534502930145_6994_1_00 [Map 1] killed/failed due to:OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1

I guess this exception raised because reducer write to many partitions simultaneously. But I can't find how to control that. And I followed this article, but it doesn't help me.

My environment is it:

  • AWS EMR 5.12.1
  • Use tez as execution engine
  • hive version is 2.3.2, and tez version is 0.8.2
  • HDFS Block size is 128MB
  • There are about 30 dynamic partitions to write with INSERT query

Here is my sample query.

SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.optimize.sort.dynamic.partition=true;
SET hive.exec.reducers.bytes.per.reducer=1048576;
SET mapred.reduce.tasks=300;
FROM raw_data
INSERT OVERWRITE TABLE idw_data
  PARTITION(event_timestamp_date)
  SELECT
    *
  WHERE 
    event_timestamp_date BETWEEN '2018-09-09' AND '2018-10-09' 
DISTRIBUTE BY event_timestamp_date
;
leftjoin
  • 36,950
  • 8
  • 57
  • 116
Juhong Jung
  • 101
  • 1
  • 7
  • this: `SET mapred.reduce.tasks=300;` - it probably overrides bytes.per.reducer and forces 300 reducers. How many reducers has started? – leftjoin Oct 22 '18 at 13:00
  • And distribute by event_timestamp_date + one more additional column (not much correlated with partition) will definitely create more than one file per partition – leftjoin Oct 22 '18 at 13:20
  • @leftjoin Sorry for late reply. I tried `distribute by event_timestamp_date + one more additional column (not much correlated with partition)` and variable `hive.exec.reducers.bytes.per.reducer` as `1024` to `104857600`, but there is always only one file in partition. For example, there is more than 10GB file in partition. I cannot understand this situation because there are more than 1000 reduce task(vertices) task but result is one file. – Juhong Jung Oct 23 '18 at 07:15
  • For your information, I tried with `SET hive.optimize.sort.dynamic.partition=true;` – Juhong Jung Oct 23 '18 at 07:21
  • I tried execution engine as mr, and it's same result. – Juhong Jung Oct 23 '18 at 08:04
  • @leftjoin Hi! I tried with another table that has few columns, and it does not cause exception. It was `SET hive.optimize.sort.dynamic.partition=false;` and distribute by partition column and one more additional column. I cannot understand why there is not exception when destination table has few columns.... – Juhong Jung Oct 24 '18 at 02:46
  • Because much less data is being processed. Also If big table is ORC, try to make it text file, ORC requires more memory when writing – leftjoin Oct 24 '18 at 06:17
  • @leftjoin I'm using parquet. How can use large parquet table? I heard some people use more than thousand columns. But why I got error even there is only about 100 columns? – Juhong Jung Oct 24 '18 at 14:51
  • @leftjoin Thank you! Your advice was really helpful!! – Juhong Jung Oct 24 '18 at 16:01

2 Answers2

1

distribute by partition key helps with OOM issue, but this configuration may cause each reducer writing the whole partition, depending on hive.exec.reducers.bytes.per.reducer configuration, which can be set very high value by default, like 1Gb. distribute by partition key may cause additional reduce stage, the same does hive.optimize.sort.dynamic.partition.

So, to avoid OOM and achieve maximum performance:

  1. add distribute by partition key at the end of your insert query, this will cause the same partition keys to be processed by the same reducer(s). Alternatively, or in addition to this setting, you can use hive.optimize.sort.dynamic.partition=true
  2. set hive.exec.reducers.bytes.per.reducer to the value which will trigger more reducers if there are too much data in one partition. Just check what is current value of hive.exec.reducers.bytes.per.reducer and reduce or increase it accordingly to get proper reducer parallelism. This setting will determine how much data single reducer will process and how many files per partition will be created.

Example:

set hive.exec.reducers.bytes.per.reducer=33554432;

insert overwrite table partition (load_date)
select * from src_table
distribute by load_date;

See also this answer about controlling the number of mappers and reducers: https://stackoverflow.com/a/42842117/2700344

leftjoin
  • 36,950
  • 8
  • 57
  • 116
  • I set `hive.optimize.sort.dynamic.partition=true` and `hive.exec.reducers.bytes.per.reducer=1024` but there is still single file(more than 10GB) in partition. More higher value such as 1048576, 10485760, more less value sush as 512 didn't make any changes. – Juhong Jung Oct 22 '18 at 07:51
  • 1
    @JuhongJung Try also to use distribute by with bytes per reducer. Without hive.optimize.sort.dynamic.partition. What is the use of setting bytes per reducer to such small value, 10485760 = 10M, it is less than a block? – leftjoin Oct 22 '18 at 08:11
  • I tried `SET hive.optimize.sort.dynamic.partition=false` and `SET hive.exec.reducers.bytes.per.reducer=1048576` and using `DISTRIBUTE BY event_timestamp_date` but it caused out of memory exception. I add sample query on my question. Please check it :) Thank you very much! – Juhong Jung Oct 22 '18 at 10:20
1

Finally I found what's wrong.

First of all, execution engine was tez. mapreduce.reduce.memory.mb option was not help. You should use hive.tez.container.size option. When write dynamic partition, reducer open multiple record writers. Reducer need enough memory to write multiple partitions simultaneously.

If you use hive.optimize.sort.dynamic.partition option, global partition sorting is run but sorting means there are reducers. In this case, if there isn't another reducer tasks, each partition is processed by one reducer. Thats why there is only one file in partition. DISTRIBUTE BY make more reduce tasks, so it can make more files in each partition, but there is same memory problem.

Consequently, containers memory size is really important! Don't forgot use hive.tez.container.size option to change tez container memory size!

Juhong Jung
  • 101
  • 1
  • 7