6

I am having monthly Revenue data for the last 5 years and I am storing the DataFrames for respective months in parquet formats in append mode, but partitioned by month column. Here is the pseudo-code below -

def Revenue(filename):
    df = spark.read.load(filename)
    .
    .
    df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')

Revenue('Revenue_201501.csv')
Revenue('Revenue_201502.csv')
Revenue('Revenue_201503.csv')
Revenue('Revenue_201504.csv')
Revenue('Revenue_201505.csv')

The df gets stored in parquet format on monthly basis, as can be seen below -

enter image description here

Question: How can I delete the parquet folder corresponding to a particular month?

One way would be to load all these parquet files in a big df and then use .where() clause to filter out that particular month and then save it back into parquet format partitionBy month in overwrite mode, like this -

# If we want to remove data from Feb, 2015
df = spark.read.format('parquet').load('Revenue.parquet')
df = df.where(col('month') != lit('2015-02-01'))
df.write.format('parquet').mode('overwrite').partitionBy('month').save('/path/Revenue')

But, this approach is quite cumbersome.

Other way is to directly delete the folder of that particular month, but I am not sure if that's a right way to approach things, lest we alter the metadata in an unforseeable way.

What would be the right way to delete the parquet data for a particular month?

cph_sto
  • 7,189
  • 12
  • 42
  • 78
  • 1
    here's a link having good discussion if you choose later but it's not an answer to your original question. posting it just for reference. https://stackoverflow.com/questions/38318513/does-drop-partition-delete-data-from-external-table-in-hive – vikrant rana Aug 16 '19 at 07:15
  • 1
    @vikrantrana Many thanks Vikrant for refering me to the link. Let me try to make sene of it. – cph_sto Aug 19 '19 at 12:20
  • 1
    Please see answer below. It may work as a pointer to your original problem. You have to do little changes as per parquet format or your partition column. Also let me know incase you find some way doing it with spark functions. – vikrant rana Aug 22 '19 at 19:11
  • It seems that this issue has been debated so long.not too sure but may be. https://stackoverflow.com/questions/48090352/how-can-we-convert-an-external-table-to-managed-table-in-spark-2-2-0 – vikrant rana Aug 22 '19 at 19:48

3 Answers3

2

Spark supports deleting partition, both data and metadata.
Quoting the scala code comment

/**
 * Drop Partition in ALTER TABLE: to drop a particular partition for a table.
 *
 * This removes the data and metadata for this partition.
 * The data is actually moved to the .Trash/Current directory if Trash is configured,
 * unless 'purge' is true, but the metadata is completely lost.
 * An error message will be issued if the partition does not exist, unless 'ifExists' is true.
 * Note: purge is always false when the target is a view.
 *
 * The syntax of this command is:
 * {{{
 *   ALTER TABLE table DROP [IF EXISTS] PARTITION spec1[, PARTITION spec2, ...] [PURGE];
 * }}}
 */

In your case, there is no backing table. We could register the dataframe as a temp table and use the above syntax(temp table documentation)

From pyspark, we could run the SQL using the syntax in this link Sample:

df = spark.read.format('parquet').load('Revenue.parquet'). registerTempTable("tmp")
spark.sql("ALTER TABLE tmp DROP IF EXISTS PARTITION (month='2015-02-01') PURGE")
DaRkMaN
  • 1,014
  • 6
  • 9
  • 1
    Do you need to do first part? df = ... I would tend to do that in scripting. Interesting if so. – thebluephantom Aug 16 '19 at 13:36
  • We could write SQL directly like mentioned in https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#run-sql-on-files-directly. I am not sure how `ALTER TABLE` fits with the syntax mentioned in link. – DaRkMaN Aug 16 '19 at 13:54
  • @DaRkMaN Hi! Taking a cue from your answer, I found out that this `Alter TABLE` with `PURGE` option will only delete the data lying on `HDFS` when the table is an `internal table`, and not when it's an `external table`. Mine is `external` one. How can I delete the corresponding data from HDFS? – cph_sto Aug 20 '19 at 07:56
  • As you also alluded in your comment, this `ALTER TABLE...` code doesn't fit in the PySpark framework. Probably it will work on HIVE, but I have to do it in `PySpark`. Thanks for your efforts. – cph_sto Aug 20 '19 at 11:04
2

Below statement will only delete the metadata related to partition information.

ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="2019-08-22");

you need to set the tblproperties for your hive external table as False, if you want to delete the data as well. It will set your hive table as managed table.

alter table db.yourtable set TBLPROPERTIES('EXTERNAL'='FALSE');

you can set it back to external table.

alter table db.yourtable set TBLPROPERTIES('EXTERNAL'='TRUE');

I tried setting given properties using spark session but was facing some issues.

 spark.sql("""alter table db.test_external set tblproperties ("EXTERNAL"="TRUE")""")
pyspark.sql.utils.AnalysisException: u"Cannot set or change the preserved property key: 'EXTERNAL';"

I am sure there must be someway to do this. I ended up using python. I defined below function in pyspark and it did the job.

query=""" hive -e 'alter table db.yourtable set tblproperties ("EXTERNAL"="FALSE");ALTER TABLE db.yourtable DROP IF EXISTS PARTITION(loaded_date="2019-08-22");' """

def delete_partition():
        print("I am here")
        import subprocess
        import sys
        p=subprocess.Popen(query,shell=True,stderr=subprocess.PIPE)
        stdout,stderr = p.communicate()
        if p.returncode != 0:
            print stderr
            sys.exit(1) 

>>> delete_partition()

This will delete the metadata and data both. Note. I have tested this with Hive ORC external partition table, which is partitioned on loaded_date

# Partition Information
# col_name              data_type               comment

loaded_date             string

Update: Basically your data is lying at hdfs location in subdirectory named as

/Revenue/month=2015-02-01
/Revenue/month=2015-03-01
/Revenue/month=2015-03-01

and so on

def delete_partition(month_delete):
      print("I am here")
      hdfs_path="/some_hdfs_location/Revenue/month="
      final_path=hdfs_path+month_delete
      import subprocess
      subprocess.call(["hadoop", "fs", "-rm", "-r", final_path])
      print("got deleted")

delete_partition("2015-02-01")
vikrant rana
  • 4,509
  • 6
  • 32
  • 72
  • 1
    Well, will try it on Monday and subsequently inform you. – cph_sto Aug 24 '19 at 19:15
  • 1
    Well, I tried to look into it, but since I do not have code in HIVE, but directly in `PySpark` `Jupyter`, so I don't know what will be `db` in my case. – cph_sto Aug 26 '19 at 07:31
  • 1
    ohh ok.. you are saving dataframe directly to some hdfs location. I will check this. Thanks – vikrant rana Aug 26 '19 at 10:02
  • Hi, Yes, you are totally correct. I have saved my `df` on HDFS as `parquet` format, partitioned by `month`, just as shown in the question. I am loading my `df` from there directly. – cph_sto Aug 26 '19 at 10:25
  • Pleasure Vikrant :) – cph_sto Aug 26 '19 at 14:36
0

If you want to do it in pyspark itself and not using Hive tables, you can do it in these steps:

1- Get the partitions of your new data

2- check if their corresponding parquet partition exist and delete

3- write in append mode

so, here I assume 'month' is the partition column in your dataframe:

new_data_paritions = df.select('month').distinct().collect()
new_data_paritions = [v['month'] for v in new_data_paritions] #list of all new partitions

#Check if they exist and delete them:
for partition in new_data_paritions:
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
    if fs.exists(sc._jvm.org.apache.hadoop.fs.Path("/path/Revenue/month={}".format(partition))):
        dbutils.fs.rm("/path/Revenue/month={}".format(partition)), True)

df.write.format('parquet').mode('append').partitionBy('month').save('/path/Revenue')

I use Databricks notebooks, hence I used "dbutils.fs.rm" to delete the folder (partition).

amir.rafieian
  • 96
  • 1
  • 2