0

My problem is as follows:

I have a large dataframe called customer_data_pk containing 230M rows and the other one containing 200M rows named customer_data_pk_isb.

Both have a column callID on which I would like to do a left join, the left dataframe being customer_data_pk.

What is the best possible way to achieve the join operation?

What have I tried?

The simple join i.e.

customer_data_pk.join(customer_data_pk_isb, customer_data_pk.btn == 
customer_data_pk_isb.btn, 'left') 

gives out of memory or (just times out with Error: Removing executor driver with no recent heartbeats: 468990 ms exceeds timeout 120000 ms).

After all this, the join still doesn't work. I am still learning PySpark so I might have misunderstood the fundamentals. If someone could shed light on this, it would be great.

I have tried this as well but didn't work and code gets stuck: customer_data_pk.persist(StorageLevel.DISK_ONLY)

Further more from configuration end I am using: --conf spark.sql.shuffle.partitions=5000

My complete code is as under:

from pyspark import SparkContext
from pyspark import SQLContext
import time
import pyspark

sc = SparkContext("local", "Example")
sqlContext = SQLContext(sc);

customer_data_pk = sqlContext.read.format('jdbc').options(
          url='jdbc:mysql://localhost/matchingqueryautomation',
      driver='com.mysql.jdbc.Driver',
          dbtable='customer_pk',
          user='XXXX',
          password='XXXX').load()

customer_data_pk.persist(pyspark.StorageLevel.DISK_ONLY)

customer_data_pk_isb = sqlContext.read.format('jdbc').options(
           url='jdbc:mysql://localhost/lookupdb',
           driver='com.mysql.jdbc.Driver',
           dbtable='customer_pk_isb',
           user='XXXX',
           password='XXXX').load()

print('###########################', customer_data_pk.join(customer_data_pk_isb, customer_data_pk.btn == customer_data_pk_isb.btn, 'left').count(), 
'###########################')
  • Possible duplicate of [How to optimize partitioning when migrating data from JDBC source?](https://stackoverflow.com/questions/52603131/how-to-optimize-partitioning-when-migrating-data-from-jdbc-source) – 10465355 Jan 14 '19 at 20:54
  • Are you trying to process 230M + 230M records on local? – Manu Gupta Jan 14 '19 at 23:13
  • Based on your code, try persisting another DF as well since it has almost same number of records. You can use option MEMORY_AND_DISK also. – Manu Gupta Jan 14 '19 at 23:16
  • @Maru Gupta, I am processing this data on a VM that has around 250 GB of memory and 64 GB of RAM. – tabish tehseen Jan 15 '19 at 06:54
  • I have tried to persist the dataframe named as 'customer_data_pk' as mentioned in code above but still not sure if it's working perfectly fine or not. – tabish tehseen Jan 15 '19 at 06:56

0 Answers0