1

I am using pyspark to overwrite my parquet partitions in an s3 bucket. Below are how my partitioned folders look like :

parent_folder
      -> year=2019 
            -->month=1
                ---->date=2019-01-01
                ---->date=2019-01-02
            -->month=2
                 ........
      -> year=2020
            -->month=1
                    ---->date=2020-01-01
                    ---->date=2020-01-02
            -->month=2
                    ........

Now when i run a spark script that needs to overwrite only specific partitions by using the below line , lets say the partitions for year=2020 and month=1 and dates=2020-01-01 and 2020-01-02 :

df_final.write.partitionBy([["year","month","date"]"]).mode("overwrite").format("parquet").save(output_dir_path)

The above line deletes all the other partitions and writes back the data thats only present in the final dataframe - df_final. I have also set overwrite model to dynamic using below , but doesn't seem to work:

conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

My questions is , is there a way to only overwrite specific partitions(more than one ) . ANy help will be much appreciated. Thanks in advance.

user1411837
  • 1,494
  • 6
  • 27
  • 41
  • what does `spark.sparkContext.getConf().get('spark.sql.sources.partitionOverwriteMode')` show? – Vamsi Prabhala Jan 24 '20 at 19:49
  • I bumped into this issue on a project I worked on. This [answer](https://stackoverflow.com/a/38576659/5089324) is useful. – Djoko Jan 24 '20 at 20:27

1 Answers1

0

I guess, you are looking for solution where user can insert and overwrite the existing partition in parquet table using sparksql and hope at the end parquet is referring to partitioned hive table.

You can create spark sql context with by enabling hive support to it, below is step for same, this one is something on exact code but like sudo code for same,

spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

sqlCtx = SQLContext(spark)

try:
   df = spark.read.parquet("filename.parquet")
   df.createOrReplaceTempView("temp")

   insertQuery = """INSERT OVERWRITE INTO TABLE {}.{} PARTITION (part1, part2) \
                    SELECT *
                    FROM temp a""".format(hiveSchema, hiveTable)
   sqlCtx.sql(insertQuery)
except:
   logger.error("Error while loading data into table..")
   exit()

I have written sample one for same. Below are the insert overwrite functionality from hive you can choose any one relevant to you,

Hive extension (multiple inserts):
FROM from_statement
INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;
FROM from_statement
INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1
[INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2]
[INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] ...;

Hive extension (dynamic partition inserts):
INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement;
INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement;
Ajay Kharade
  • 1,469
  • 1
  • 17
  • 31