My problem is as follows:
I have a large dataframe called customer_data_pk containing 230M rows and the other one containing 200M rows named customer_data_pk_isb.
Both have a column callID on which I would like to do a left join, the left dataframe being customer_data_pk.
What is the best possible way to achieve the join operation?
What have I tried?
The simple join i.e.
customer_data_pk.join(customer_data_pk_isb, customer_data_pk.btn ==
customer_data_pk_isb.btn, 'left')
gives out of memory or (just times out with Error: Removing executor driver with no recent heartbeats: 468990 ms exceeds timeout 120000 ms).
After all this, the join still doesn't work. I am still learning PySpark so I might have misunderstood the fundamentals. If someone could shed light on this, it would be great.
I have tried this as well but didn't work and code gets stuck: customer_data_pk.persist(StorageLevel.DISK_ONLY)
Further more from configuration end I am using: --conf spark.sql.shuffle.partitions=5000
My complete code is as under:
from pyspark import SparkContext
from pyspark import SQLContext
import time
import pyspark
sc = SparkContext("local", "Example")
sqlContext = SQLContext(sc);
customer_data_pk = sqlContext.read.format('jdbc').options(
url='jdbc:mysql://localhost/matchingqueryautomation',
driver='com.mysql.jdbc.Driver',
dbtable='customer_pk',
user='XXXX',
password='XXXX').load()
customer_data_pk.persist(pyspark.StorageLevel.DISK_ONLY)
customer_data_pk_isb = sqlContext.read.format('jdbc').options(
url='jdbc:mysql://localhost/lookupdb',
driver='com.mysql.jdbc.Driver',
dbtable='customer_pk_isb',
user='XXXX',
password='XXXX').load()
print('###########################', customer_data_pk.join(customer_data_pk_isb, customer_data_pk.btn == customer_data_pk_isb.btn, 'left').count(),
'###########################')