0

I have two tables to be cross joined,

table 1: query 300M rows
table 2: product description 3000 rows

The following query does a cross join and calculate a score between the tuple, and pick the top 3 matches,

query_df.repartition(10000).registerTempTable('queries')

product_df.coalesce(1).registerTempTable('products')

CREATE TABLE matches AS
SELECT *
FROM
  (SELECT *,
          row_number() over (partition BY a.query_id
                             ORDER BY 0.40 + 0.15*score_a + 0.20*score_b + 0.5*score_c DESC) AS rank
   FROM
     (SELECT /*+ MAPJOIN(b) */ a.query_id,
                               b.product_id,
                               func_a(a.qvec,b.pvec) AS score_a,
                               func_b(a.qvec,b.pvec) AS score_b,
                               func_c(a.qvec,b.pvec) AS score_c
      FROM queries a CROSS
      JOIN products b) a) a
WHERE rn <= 3

My spark cluster looks like the following,

MASTER="yarn-client" /opt/mapr/spark/spark-1.6.1/bin/pyspark --num-executors 22 --executor-memory 30g --executor-cores 7 --driver-memory 10g --conf spark.yarn.executor.memoryOverhead=10000 --conf spark.akka.frameSize=2047

Now the issue is, as expected, due to memory leak the job fails after a couple of stages because of the extremely large temp data produced. I'm looking for some help/suggestion in optimizing the above operation in a such a way that the job should be able to run both the match and filter operation for a query_id before picking the next query_id, in a parallel fashion - similar to a sort within for loop against the queries table. If the job is slow but successful, I'm ok with it, since I can request a bigger cluster.

The above query works fine for a smaller query table, say one with 10000 records.

Mike
  • 197
  • 1
  • 2
  • 15

2 Answers2

0

In the scenario where you want to join table A (big) with table B (small), the best practice is to leverage broadcast join.

A clear overview is given in https://stackoverflow.com/a/39404486/1203837.

Hope this helps.

erond
  • 270
  • 5
  • 15
  • I've utilized the broadcast join using the hive syntax /*+ MAPJOIN(b) */, it's included in the query. – Mike Sep 22 '17 at 16:42
0

Cartesian joins or cross join in spark is extremely expensive. I would suggest to join the tables with inner join and save the output data first. Then use that dataframe for further aggregation.

One small suggestion the map join or broadcast join could fail sometime if the smaller table is not small enough. Unless you are sure about the size of the small table refrain using the broadcast join.

Avishek Bhattacharya
  • 6,534
  • 3
  • 34
  • 53