I have two tables to be cross joined,
table 1: query 300M rows
table 2: product description 3000 rows
The following query does a cross join and calculate a score between the tuple, and pick the top 3 matches,
query_df.repartition(10000).registerTempTable('queries')
product_df.coalesce(1).registerTempTable('products')
CREATE TABLE matches AS
SELECT *
FROM
(SELECT *,
row_number() over (partition BY a.query_id
ORDER BY 0.40 + 0.15*score_a + 0.20*score_b + 0.5*score_c DESC) AS rank
FROM
(SELECT /*+ MAPJOIN(b) */ a.query_id,
b.product_id,
func_a(a.qvec,b.pvec) AS score_a,
func_b(a.qvec,b.pvec) AS score_b,
func_c(a.qvec,b.pvec) AS score_c
FROM queries a CROSS
JOIN products b) a) a
WHERE rn <= 3
My spark cluster looks like the following,
MASTER="yarn-client" /opt/mapr/spark/spark-1.6.1/bin/pyspark --num-executors 22 --executor-memory 30g --executor-cores 7 --driver-memory 10g --conf spark.yarn.executor.memoryOverhead=10000 --conf spark.akka.frameSize=2047
Now the issue is, as expected, due to memory leak the job fails after a couple of stages because of the extremely large temp data produced. I'm looking for some help/suggestion in optimizing the above operation in a such a way that the job should be able to run both the match and filter operation for a query_id before picking the next query_id, in a parallel fashion - similar to a sort within for loop against the queries table. If the job is slow but successful, I'm ok with it, since I can request a bigger cluster.
The above query works fine for a smaller query table, say one with 10000 records.