2

Usecase : I have a small table(~1000 rows) available in spark, and a huge hive table(20 billion records). Let's call the small table as base and the huge table as main. Now, the base table has a column 'id' and I need to fetch all records from main table where main.external_id equals to base.id. Both external_id and id columns have unique values only.

Problem The obvious way is to register the base table as temp table in spark, and use something like

sparkSession.sql("select * from base_table JOIN main_table ON base_table.id = main_table.external_id")

However, this would mean that spark would fetch all rows from the huge hive table, and bring to memory, which I feel is very expensive considering we need only around 1000 rows. I am looking for a way to minimize this network data transfer.

What I have tried

  1. Partitioning/Bucketing: This was the first option we thought of but both were unfeasible since partitioning is better when columns have discreet values(like city/country) whereas the 'id' column is a unique key column. For bucketing, the issue is that we would need to create a huge number of buckets and this means a high number of files which can create some issues.

  2. JDBC query via Hiveserver2: As of now, we are able to do a read query on hive engine via JDBC driver. I was wondering if there was a way to send the base table from spark to hive engine and execute a broadcast join there, so that the network shuffle only involves the smaller table, and we don't need to bring the bigger table to the spark memory. However, I haven't been able to find anything which can help implement this.

(Obviously we can write the base table to hive first, and then do the join but as per the info I got from team, the hive write is not very performance efficient and has caused few issues in the past)

Does anyone have any solution to the problem I mentioned above? Or if there is another way to achieve the result?

P.S: I'm using spark 2.3.2 and have the same version for spark-sql, spark-hive, and hive-jdbc jars.

Anshul
  • 448
  • 2
  • 11
  • You can save base table as a Hive table as it has only 1000 records and that wont be taking more time and then you can perform Mapside join or broadcast join as you mentioned in your second option. – Nikunj Kakadiya Aug 06 '21 at 13:47
  • Hi @NikunjKakadiya, as I mentioned in the post, we have that as a backup option. Our only concern is some performance issues we have witnessed in past due to the hive write, so we wanted to know if there are other approaches that can avoid write in such cases. – Anshul Aug 06 '21 at 14:25
  • Spark uses [predicate_push _down](https://docs.datastax.com/en/dse/6.0/dse-dev/datastax_enterprise/spark/sparkPredicatePushdown.html), so not everything is going to get populated in the memory, it will filter out the rows while reading. – Rushabh Gujarathi Aug 07 '21 at 07:06

1 Answers1

0

If only main table values required, "in" clause can be used:

val ids = base_table.select("id").as(Encoders.INT).collect().mkString(",")
sparkSession.sql(s"select * from  main_table where external_id in ($ids)")
pasha701
  • 6,831
  • 1
  • 15
  • 22
  • This idea looks good. I will try this and update how it went – Anshul Aug 06 '21 at 14:28
  • 1
    In is an expensive operation, for each row it will iterate as many as times equal to the number of elements in $id ,if it does not find that match.In case of match it will iterate till it finds the matching element. – Rushabh Gujarathi Aug 07 '21 at 07:01
  • It would work but if values in the in clause goes above some count like 1000 it throws exception in sql and same might happen here. Apart from being expensive this is the other thing that you should consider. – Nikunj Kakadiya Aug 07 '21 at 08:08