-1

I have a api endpoint written by sparksql with the following sample code. Every time api accept a request it will run sparkSession.sql(sql_to_hive) which would create a single file in HDFS. Is there any way to do insert by appending data to existing file in HDFS ? Thanks.

    sqlContext = SQLContext(sparkSession.sparkContext)
    df = sqlContext.createDataFrame(ziped_tuple_list, schema=schema)
    df.registerTempTable('TMP_TABLE')
    sql_to_hive = 'insert into log.%(table_name)s partition%(partition)s select %(title_str)s from TMP_TABLE'%{
        'table_name': table_name,
        'partition': partition_day,
        'title_str': title_str
    }
    sparkSession.sql(sql_to_hive)
Chandler.Huang
  • 873
  • 3
  • 12
  • 24

2 Answers2

0

I don't think this is possible case to append data to the existing file.

But you can work around this case by using either of these ways

Approach1

Using Spark, write to intermediate temporary table and then insert overwrite to final table:

existing_df=spark.table("existing_hive_table") //get the current data from hive
current_df //new dataframe
union_df=existing_df.union(current_df) 
union_df.write.mode("overwrite").saveAsTable("temp_table") //write the data to temp table
temp_df=spark.table("temp_table") //get data from temp table
temp_df.repartition(<number>).write.mode("overwrite").saveAsTable("existing_hive_table") //overwrite to final table

Approach2:

Hive(not spark) offers overwriting and select same table .i.e

insert overwrite table default.t1 partition(partiton_column) 
select * from default.t1; //overwrite and select from same t1 table

If you are following this way then there needs to be hive job triggered once your spark job finishes.

Hive will acquire lock while running overwrite/select the same table so if any job which is writing to table will wait.

In Addition: Orc format will offer alter table concatenate which will merge small ORC files to create a new larger file.

 alter table <db_name>.<orc_table_name> [partition_column="val"] concatenate;

We can also use distributeby,sortby clauses to control number of files, refer this and this link for more details.

Another Approach3 is by using hadoop fs -getMerge to merge all small files into one (this method works for text files and i haven't tried for orc,avro ..etc formats).

notNull
  • 30,258
  • 4
  • 35
  • 50
0

When you write the resulted dataframe:

result_df = sparkSession.sql(sql_to_hive)

set it’s mode to append:

result_df.write.mode(SaveMode.Append).