Usecase : I have a small table(~1000 rows) available in spark, and a huge hive table(20 billion records). Let's call the small table as base and the huge table as main. Now, the base table has a column 'id' and I need to fetch all records from main table where main.external_id
equals to base.id
. Both external_id and id columns have unique values only.
Problem The obvious way is to register the base table as temp table in spark, and use something like
sparkSession.sql("select * from base_table JOIN main_table ON base_table.id = main_table.external_id")
However, this would mean that spark would fetch all rows from the huge hive table, and bring to memory, which I feel is very expensive considering we need only around 1000 rows. I am looking for a way to minimize this network data transfer.
What I have tried
Partitioning/Bucketing: This was the first option we thought of but both were unfeasible since partitioning is better when columns have discreet values(like city/country) whereas the 'id' column is a unique key column. For bucketing, the issue is that we would need to create a huge number of buckets and this means a high number of files which can create some issues.
JDBC query via Hiveserver2: As of now, we are able to do a read query on hive engine via JDBC driver. I was wondering if there was a way to send the base table from spark to hive engine and execute a broadcast join there, so that the network shuffle only involves the smaller table, and we don't need to bring the bigger table to the spark memory. However, I haven't been able to find anything which can help implement this.
(Obviously we can write the base table to hive first, and then do the join but as per the info I got from team, the hive write is not very performance efficient and has caused few issues in the past)
Does anyone have any solution to the problem I mentioned above? Or if there is another way to achieve the result?
P.S: I'm using spark 2.3.2 and have the same version for spark-sql, spark-hive, and hive-jdbc jars.