I want to overwrite a partition in a given table. The SQL command is already prepared and works just fine when executed in HIVE directly:
INSERT OVERWRITE TABLE mytable PARTITION (dt, event_name) SELECT * FROM mytable2
in order to manage the file sizes, I set some HIVE properties in advance like this:
SET hive.merge.smallfiles.avgsize=268435456;
SET mapreduce.map.memory.mb=20000;
SET hive.exec.max.dynamic.partitions=50000;
SET mapreduce.reduce.memory.mb=20000;
SET hive.exec.dynamic.partition=true;
SET mapreduce.map.java.opts=-Xmx18000m;
SET hive.merge.size.per.task=268435456;
SET mapred.max.split.size=70000000;
SET hive.exec.dynamic.partition.mode=nonstrict;
SET mapreduce.reduce.java.opts=-Xmx18000m;
SET mapred.min.split.size=35000000;
this ensures that all files that are written are >256MB which is what I want. Now I need to execute this script from PySpark as I am looping over different source tables and I use the following code to set the properties accordingly in my SparkContext and also HiveContext/SqlContext:
conf = SparkConf().setAppName("my script").setMaster(master)
config_settings = {
'mapreduce.map.memory.mb': '20000',
'mapreduce.map.java.opts': '-Xmx18000m',
'mapreduce.reduce.memory.mb': '20000',
'mapreduce.reduce.java.opts': '-Xmx18000m',
'hive.exec.dynamic.partition': 'true',
'hive.exec.dynamic.partition.mode': 'nonstrict',
'hive.merge.smallfiles.avgsize': '268435456',
'hive.merge.size.per.task': '268435456',
'mapred.max.split.size': '70000000',
'mapred.min.split.size': '35000000',
'hive.exec.max.dynamic.partitions': '50000',
#'hive.exec.compress.output': 'true',
#'parquet.compression': 'GZIP',
}
map(lambda x: conf.set(x[0], x[1]), config_settings.items())
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
map(lambda x: sqlContext.setConf(x[0], x[1]), config_settings.items())
sqlContext.sql("INSERT OVERWRITE TABLE mytable PARTITION (dt, event_name) SELECT * FROM mytable2")
however, this does not seem to work as it only generates files of the default size (64 MB) I tried this with Spark 1.6 and also with 2.3 and different variations on how to set those properties but none seem to work.
when I call sc._conf.getAll() or sqlContext.getConf(...) it looks like all properties are set correctly.
what would be the right syntax to set these configurations so these are also obeyed when using sqlContext.sql("INSERT OVERWRITE ...") ?