2

We receive daily files from external system and store it into Hive. Want to enable versioning on data.

col1, col2 is composite key so if we receive same combination of data from file then it should be stored into Hive with new version. Latest data that comes from file should get the biggest version number. How could we do this in spark

file df

+----+----+-----+-------------------+-------+
||col1 |col2|value| ts            |version|
+----+----+-----+-------------------+-------+
| A| B| 777|2019-01-01 00:00:00| 1|
| K| D| 228|2019-01-01 00:00:00| 1|
| G| G| 241|2019-01-01 00:00:00| 1|
+----+----+-----+-------------------+-------+
Don't receive version from external system but if we need it for comparison then it will be always 1

hive df

+----+----+-----+-------------------+-------+
||col1 |col2|value| ts            |version|
+----+----+-----+-------------------+-------+
| A| B| 999|2018-01-01 00:00:00| 1|
| A| B| 888|2018-01-02 00:00:00| 2|
| B| C| 133|2018-01-03 00:00:00| 1|
| G| G| 231|2018-01-01 00:00:00| 1|
+----+----+-----+-------------------+-------+

After merge


+----+----+-----+-------------------+-----------+
|col1|col2|value| ts                    |new_version|
+----+----+-----+-------------------+-----------+
| B| C| 133|2018-01-03 00:00:00| 1|
| K| D| 228|2019-01-01 00:00:00| 1|
| A| B| 999|2018-01-01 00:00:00| 1|
| A| B| 888|2018-01-02 00:00:00| 2|
| A| B| 777|2019-01-01 00:00:00| 3|
| G| G| 231|2018-01-01 00:00:00| 1|
| G| G| 241|2019-01-01 00:00:00| 2|
+----+----+-----+-------------------+-----------+

GPopat
  • 445
  • 4
  • 14

1 Answers1

2

existing main hive table:

  INSERT INTO TABLE test_dev_db.test_1 VALUES
    ('A','B',124,1),
    ('A','B',123,2),
    ('B','C',133,1),
    ('G','G',231,1);

suppose you have loaded below data from file

INSERT INTO TABLE test_dev_db.test_2 VALUES
('A','B',222,1),
('K','D',228,1),
('G','G',241,1);

here is your query:

WITH CTE AS (
    SELECT col1,col2,value,version FROM test_dev_db.test_1
    UNION
    SELECT col1,col2,value,version FROM test_dev_db.test_2
)
insert overwrite table test_dev_db.test_1
SELECT a.col1,a.col2,a.value, row_number() over(partition by a.col1,a.col2 order by a.col1,a.col1) as new_version
FROM CTE a;


hive> select * from test_dev_db.test_1;
OK
A       B       123     1
A       B       124     2
A       B       222     3
B       C       133     1
G       G       231     1
G       G       241     2
K       D       228     1

for Spark:

create your dataframes reading from file and hive table and union them

uniondf=df1.unionAll(df2)

from pyspark.sql.functions import row_number,lit
from pyspark.sql.window import Window
w = Window().partitionBy('col1','col2').orderBy(lit('A'))
newdf= uniondf.withColumn("new_version", row_number().over(w)).drop('version')

>>> newdf.show();
+----+----+-----+-----------+
|col1|col2|value|new_version|
+----+----+-----+-----------+
|   B|   C|  133|          1|
|   K|   D|  228|          1|
|   A|   B|  124|          1|
|   A|   B|  123|          2|
|   A|   B|  222|          3|
|   G|   G|  231|          1|
|   G|   G|  241|          2|
+----+----+-----+-----------+

saving it to hive

newdf.write.format("orc").option("header", "true").mode("overwrite").saveAsTable('test_dev_db.new_test_1')
vikrant rana
  • 4,509
  • 6
  • 32
  • 72
  • unionAll is depricated. but can use union and it works for me! – GPopat Aug 15 '19 at 10:37
  • How can we make sure that version number is incremented for new data which we receive from file and not for data which is already stored in Table. – GPopat Aug 15 '19 at 11:17
  • Ok. You mean that you will not be having version number in the file? we can use full outer join also – vikrant rana Aug 15 '19 at 11:27
  • 1
    Here's a good link showing that hw to do it. Let me know if you need help on this.. https://stackoverflow.com/questions/37709411/hive-best-way-to-do-incremetal-updates-on-a-main-table/37744071#37744071 – vikrant rana Aug 15 '19 at 11:54
  • What I mean is , the latest record that we receive from file should get the greatest version number always – GPopat Aug 15 '19 at 13:30
  • 1
    Based on latest change in question, I am suggesting slight modification in your code uniondf.show(20); WindowSpec w = Window.partitionBy("col1", "col2").orderBy(new Column("ts").asc()); Dataset newdf = uniondf.withColumn("new_version", row_number().over(w)).drop("version"); – GPopat Aug 19 '19 at 09:13
  • 1
    Thanks I will take a look and will also try to answer your first question. Stuck with time – vikrant rana Aug 19 '19 at 09:15