i have a spark sql job where i am doing
Dataset<Row> currentDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation");
currentDataSet.createOrReplaceTempView(CURRENT_USER_VIEW);
Dataset<Row> previousDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation2);
previousDataSet.createOrReplaceTempView(PREVIOUS_USER_VIEW);
String currentRunColumn = "c.".concat("userid");
String previousRunColumn = "p.".concat("userid");
Dataset<Row> addedRecordDataSets = sparkSession.sql("SELECT " + currentRunColumn + " FROM " +
CURRENT_USER_VIEW + " AS c " +
" LEFT JOIN " + PREVIOUS_USER_VIEW + " AS p " +
" ON " + currentRunColumn + " == " + previousRunColumn +
" WHERE " + previousRunColumn + " IS NULL ");
dataSet.coalesce(1).persist(StorageLevel.DISK_ONLY()).foreachPartition(persist());
for this job 3 jobs are created
- read from hdfs currentDataSet
- read from hdfs previousDataSet
- join two dataset, coalesce, call persist().
the 3rd step is executed only on 1 executor because foreachPartition(persist()) is the only terminal operation. If the datasets are huge it will take lots of time. is there a way i can do a distributed join and then only coalesce it?
one way i can think of is by adding one more terminal operator like count after computing addedRecordDataSets, but just that will not solve the issue i will have to cache as well so that we are not having to rejoin the data. But caching is a costly operation. Is it possible to to just force materialise the dataset?
this questions seems to be relevant to this SO