0

I am a newbie and trying to take a large (1.25 TB uncompressed) hdfs file and put it into a Hive managed table. It is already on HDFS in csv format (from sqoop) with an arbitrary partition and I am putting it into a more organized format for querying and joining. I'm on HDP 3.0 using Tez. Here is my hql:

USE MYDB;

DROP TABLE IF EXISTS new_table;

CREATE TABLE IF NOT EXISTS new_table (
 svcpt_id VARCHAR(20),
 usage_value FLOAT,
 read_time SMALLINT)
PARTITIONED BY (read_date INT)
CLUSTERED BY (svcpt_id) INTO 9600 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS ORC
TBLPROPERTIES("orc.compress"="snappy");

SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions.pernode=2000;
SET hive.exec.max.dynamic.partitions=10000;
SET hive.vectorized.execution.enabled = true;
SET hive.vectorized.execution.reduce.enabled = true;
SET hive.enforce.bucketing = true;
SET mapred.reduce.tasks = 10000;

INSERT OVERWRITE TABLE new_table
PARTITION (read_date)
SELECT svcpt_id, usage, read_time, read_date
FROM raw_table;

The way Tez sets this up is (from my most recent failure):

--------------------------------------------------------------------------------
VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1      SUCCEEDED   1043       1043        0        0       0       0
Reducer 2    RUNNING   9600        735       19     8846       0       0
Reducer 3     INITED  10000          0        0    10000       0       0
--------------------------------------------------------------------------------
VERTICES: 01/03  [==>>------------------------] 8%    ELAPSED TIME: 45152.59 s
--------------------------------------------------------------------------------

I've been working on this for a while. At first I could not get the first map 1 vertex to run so I added in buckets. 96 buckets got the first mapper to run, but the reducer 2 failed citing issues with disk space that did not make sense. Then I upped the number of buckets to 9600 and reduce tasks to 10000 and the reduce 2 vertex started running, albeit slowly. This morning I found it had errored out, because my namenode had shut down due to a java heap space error with the garbage collector.

Does anyone have any guiding advice for me? I feel like I'm shooting in the dark with the number of reduce tasks, number of buckets, and all the configs shown below.

hive.tez.container.size = 5120MB
hive.exec.reducers.bytes.per.reducer = 1GB
hive.exec.max.dynamic.partitions = 5000
hive.optimize.sort.dynamic.partition = FALSE
hive.vectorized.execution.enabled = TRUE
hive.vectorized.execution.reduce.enabled = TRUE
yarn.scheduler.minimum-allocation-mb = 2G
yarn.scheduler.maximum-allocation-mb = 8G
mapred.min.split.size=?
mapred.max.split.size=?
hive.input.format=?
mapred.min.split.size=?

Have not set up LLAP

My cluster has 4 nodes, 32 cores, and 120 GB of memory. I have not used more than 1/3 of the the cluster's storage.

Zafar
  • 1,897
  • 15
  • 33
  • increase the number of reducers/reduce bytes.per.reducer. But the cluster is small and only small number of reducers will run simultaneously, others will be pending – leftjoin Sep 10 '18 at 17:01
  • Thanks so much for the comment. Is there a formula I can use to figure out how to set these? Any resources? It's about 1.5 hours per iteration so may be difficult for me to test out different values. – Zafar Sep 10 '18 at 17:34
  • 1
    And you have 9600 reducers because of bucketing. It is equal to the number of buckets in this case. Try to remove buckets and set hive.enforce.bucketing =false first. Then hive.exec.reducers.bytes.per.reducer will work. See this: https://stackoverflow.com/a/42842117/2700344 and https://stackoverflow.com/a/51061613/2700344 – leftjoin Sep 10 '18 at 17:59
  • thanks @leftjoin, what I meant to say is that I want bucketting- I think they will make my queries and joins faster since I will be commonly querying on `svcpt_id` but I don't know how to set the # of buckets. Any ideas there? Typical sizes per bucket? – Zafar Sep 10 '18 at 19:11
  • The number of buckets is table property CLUSTERED BY (svcpt_id) INTO 9600 BUCKETS – leftjoin Sep 10 '18 at 19:13
  • right- I chose 9600 pretty randomly. Do you know how I would make a better informed decision on calculating the best # of buckets? I have ~1M distinct values in that column. – Zafar Sep 10 '18 at 19:19
  • 1
    3-5 M rows IMHO. Because such figure is good per one container. But it depends on too many factors. Too many files is also not good. BTW ORC has indexes and bloom filters, can it be beneficial instead of buckets in your case? – leftjoin Sep 10 '18 at 19:26
  • Wow bloom filters sound great for that column! I will get rid of my buckets and add in: `‘orc.create.index’=’true’, ‘orc.bloom.filter.columns’=’svcpt_id’` and a `SORTED BY svcpt_id`. Sound right? I'm going to keep stripe size and row index stride as the default – Zafar Sep 10 '18 at 19:45

1 Answers1

0
SET hive.execution.engine = tez;
SET hive.vectorized.execution.enabled = false;
SET hive.vectorized.execution.reduce.enabled = false;
SET hive.enforce.bucketing = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
SET hive.stats.autogather = true;
SET hive.exec.parallel = true;
SET hive.exec.parallel.thread.number = 60;
SET mapreduce.job.skiprecords = true;
SET mapreduce.map.maxattempts =10;
SET mapreduce.reduce.maxattempts =10;
SET mapreduce.map.skip.maxrecords = 300;
SET mapreduce.task.skip.start.attempts = 1;
SET mapreduce.output.fileoutputformat.compress = false;
SET mapreduce.job.reduces = 1000;

You can try some of the above settings!