I have two dataframes, df1 has 6 million rows, df2 has 1 billion.
I have tried the standard df1.join(df2,df1("id")<=>df2("id2"))
, but run out of memory.
df1 is too large to be put into a broadcast join.
I have even tried a bloom filter, but it was also too large to fit in a broadcast and still be useful.
The only thing I have tried that doesn't error out is to break df1 into 300,000 row chunks and join with df2 in a foreach loop. But this takes an order of magnitude longer than it probably should (likely because it is too large to fit as a persist causing it to redo the split upto that point). Recombining the results also takes awhile.
How have you solved this issue?
A few notes:
df1 is a subset of df2. df1=df2.where("fin<1").selectExpr("id as id2").distinct()
I am interested in all rows in df2 that have an id that at one time have a fin<1, which means I can't do it as one step.
there are about 200 million unique ids in df2.
here are some relevant spark settings:
spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000
The error I get is :
16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)
and
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: Direct buffer memory