44

I have a sample application working to read from csv files into a dataframe. The dataframe can be stored to a Hive table in parquet format using the method df.saveAsTable(tablename,mode).

The above code works fine, but I have so much data for each day that i want to dynamic partition the hive table based on the creationdate(column in the table).

is there any way to dynamic partition the dataframe and store it to hive warehouse. Want to refrain from Hard-coding the insert statement using hivesqlcontext.sql(insert into table partittioin by(date)....).

Question can be considered as an extension to :How to save DataFrame directly to Hive?

any help is much appreciated.

ZygD
  • 22,092
  • 39
  • 79
  • 102
Chetandalal
  • 674
  • 1
  • 7
  • 18

7 Answers7

48

I believe it works something like this:

df is a dataframe with year, month and other columns

df.write.partitionBy('year', 'month').saveAsTable(...)

or

df.write.partitionBy('year', 'month').insertInto(...)
mdurant
  • 27,272
  • 5
  • 45
  • 74
  • Tried this Partitionby method. It only works on RDD level, once dataframe is created most of the methods are DBMS styled e.g. groupby, orderby but they don't serve the purpose of writing in different partitions folders on Hive. – Chetandalal Jul 13 '15 at 13:01
  • 8
    Ok, so was able to work it out with 1.4 version. df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename"); . This however changes my date field to integer value and remove the actual date. e.g. there are 9 unique dates in the column but they are now stored as 1,2,3.... and folder name is date=1,2,3,... instead of date=20141121. Let me know if there is a way to do this. – Chetandalal Jul 16 '15 at 18:21
  • 2
    @subramaniam-ramasubramanian: pls reply to OP s question as answer instead of editing existing answer – Ram Ghadiyaram May 01 '17 at 10:59
  • Does this work for overwriting multiple dynamic partition without loosing other partitions in base directory – nir Aug 06 '18 at 20:36
  • @mdurant : I am getting following exception: org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException: Partition spec {rivalname=, rivalName=_________} contains non-partition columns; Actually my partition column is also present in df. – Harshvardhan Solanki May 03 '20 at 17:14
  • 1
    This answer is five years old - would be happy to see it updated with whatever new syntax spark might have. – mdurant May 05 '20 at 13:42
  • When using `insertInto(...)`, it expects an existing table thus, the schema for the table is already defined, and you are not allowed to use `.partitionBy()`. If your table is partitioned Spark uses that info for partitioning. – Bahman Oct 19 '22 at 19:55
42

I was able to write to partitioned hive table using df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table")

I had to enable the following properties to make it work.

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
Rao
  • 20,781
  • 11
  • 57
  • 77
Jins George
  • 456
  • 4
  • 5
  • Where should i set the above 2 parameters ? I tried logging in hive shell and run above commands, it failed. i am sure i am doing it wrong. Could you please tell where can i set these properties ? – Vrushank Doshi Jun 23 '16 at 20:58
  • 2
    @VrushankDoshi You would set it in the spark program, right after you create your hiveContext. val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.setConf("hive.exec.dynamic.partition","true") hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") – MV23 Jun 29 '16 at 20:16
  • from my side this code overwrites but do not appends any data. why? – enneppi Mar 20 '18 at 17:08
  • it will give error : with append it's necessary to use insertInto in a RDD-based tables, insertinto demands already existing table in hive. – Tutu Kumari Mar 12 '19 at 10:20
  • @TutuKumari RDD based tables? With hive tables it works fine. – pavel_orekhov Jul 24 '23 at 20:05
9

I also faced same thing but using following tricks I resolved.

  1. When we Do any table as partitioned then partitioned column become case sensitive.

  2. Partitioned column should be present in DataFrame with same name (case sensitive). Code:

    var dbName="your database name"
    var finaltable="your table name"
    
    // First check if table is available or not..
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
         //If table is not available then it will create for you..
         println("Table Not Present \n  Creating table " + finaltable)
         sparkSession.sql("use Database_Name")
         sparkSession.sql("SET hive.exec.dynamic.partition = true")
         sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
         sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
         sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID        string,EMP_Name          string,EMP_Address               string,EMP_Salary    bigint)  PARTITIONED BY (EMP_DEP STRING)")
         //Table is created now insert the DataFrame in append Mode
         df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
    }
    
Nathan Tuggy
  • 2,237
  • 27
  • 30
  • 38
Nilesh Shinde
  • 457
  • 5
  • 10
5

it can be configured on SparkSession in that way:

spark = SparkSession \
    .builder \
    ...
    .config("spark.hadoop.hive.exec.dynamic.partition", "true") \
    .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

or you can add them to .properties file

the spark.hadoop prefix is needed by Spark config (at least in 2.4) and here is how Spark sets this config:

  /**
   * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
   * configuration without the spark.hadoop. prefix.
   */
  def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
    SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
  }
wiesiu_p
  • 558
  • 7
  • 6
3

This is what works for me. I set these settings and then put the data in partitioned tables.

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", 
"nonstrict")
1

This worked for me using python and spark 2.1.0.

Not sure if it's the best way to do this but it works...

# WRITE DATA INTO A HIVE TABLE
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .config("hive.exec.dynamic.partition", "true") \
    .config("hive.exec.dynamic.partition.mode", "nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

### CREATE HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'hive_df'
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###

### CREATE NON HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_hive_df'
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###

### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")

spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
isichei
  • 31
  • 4
1
df1.write
   .mode("append")
   .format('ORC')
   .partitionBy("date")
   .option('path', '/hdfs_path')
   .saveAsTable("DB.Partition_tablename")

It will create the partition with "date" column values and will also write as Hive External Table in hive from spark DF.

Dyno Fu
  • 8,753
  • 4
  • 39
  • 64
Moin Uddin
  • 91
  • 1
  • 4