0

I am trying to update the few fields of each row of a big mysql table (having close to 500 million rows). The table doesn't have any primary key (or having string primary key like UUID). I don't have enough executor memory to read and hold the entire data in once. Can anyone please let me know what are my options to process such tables.

Below is the schema

CREATE TABLE Persons ( Personid varchar(255) NOT NULL, LastName varchar(255) NOT NULL, FirstName varchar(255) DEFAULT NULL, Email varchar(255) DEFAULT NULL, Age int(11) DEFAULT NULL) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;

Spark code is like

 SparkSession spark = SparkSession.builder().master("spark://localhost:7077").appName("KMASK").getOrCreate();
DataFrame rawDataFrame = spark.read().format("jdbc").load();
rawDataFrame.createOrReplaceTempView("data");
//encrypt is UDF
String sql = "select Personid, LastName, FirstName, encrypt(Email), Age from data";
Dataset newData = spark.sql(sql);
newData.write().mode(SaveMode.Overwrite).format("jdbc").options(options).save();

This table has around 150 million records, size of data is around 6GB. My executor memory is just 2 gb. Can I process this table using Spark - jdbc.

Sunny Gupta
  • 191
  • 1
  • 4
  • 14
  • can you share the udpate operation you are attempting to perform? Are you using python/R/scala/java with spark? – ggordon Apr 17 '21 at 02:57
  • I am using Java, I will update the question with the code. Basically we are try to run UDF on one of the column that will encrypt it. – Sunny Gupta Apr 18 '21 at 20:17

1 Answers1

0

Ideally you can alter the spark jdbc fetchsize option to reduce/increase how many records are fetched and processed each time.

Partitioning the data can also help to reduce shuffles and additional overhead. Since you have Age as a numerical field. You may also process the data in partitions determined by the Age. First determine the min and max age and use the Spark JDBC Options.

Notably:

  • partitionColumn : Age
  • lowerBound : min age you identified
  • upperBound : max age you identified
  • numPartitions: really dependent on the number of cores and worker nodes but more hints and links are here

You may also use custom queries to only select and update a few records that can hold in memory with the query option. NB. when using the query option you should not use dbtable option.

ggordon
  • 9,790
  • 2
  • 14
  • 27
  • Will just putting fetchsize should work? Suppose we set fetchsize to 1000.Like jdbc will read 1000 recrods, update and save it to output? I don't see the behaviour – Sunny Gupta Apr 18 '21 at 20:15