0

i have a spark sql job where i am doing

Dataset<Row> currentDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation");
currentDataSet.createOrReplaceTempView(CURRENT_USER_VIEW);

Dataset<Row> previousDataSet = hdfsHandler.loadDataSet(sparkSession, "somelocation2);
previousDataSet.createOrReplaceTempView(PREVIOUS_USER_VIEW);

String currentRunColumn = "c.".concat("userid");
String previousRunColumn = "p.".concat("userid");

Dataset<Row> addedRecordDataSets = sparkSession.sql("SELECT " + currentRunColumn + " FROM " +
        CURRENT_USER_VIEW + " AS c " +
        " LEFT JOIN " + PREVIOUS_USER_VIEW + " AS p " +
        " ON " + currentRunColumn + " == " + previousRunColumn +
        " WHERE " + previousRunColumn + " IS NULL ");

dataSet.coalesce(1).persist(StorageLevel.DISK_ONLY()).foreachPartition(persist());

for this job 3 jobs are created

  1. read from hdfs currentDataSet
  2. read from hdfs previousDataSet
  3. join two dataset, coalesce, call persist().

the 3rd step is executed only on 1 executor because foreachPartition(persist()) is the only terminal operation. If the datasets are huge it will take lots of time. is there a way i can do a distributed join and then only coalesce it?

one way i can think of is by adding one more terminal operator like count after computing addedRecordDataSets, but just that will not solve the issue i will have to cache as well so that we are not having to rejoin the data. But caching is a costly operation. Is it possible to to just force materialise the dataset?

this questions seems to be relevant to this SO

krezno
  • 392
  • 1
  • 12
best wishes
  • 5,789
  • 1
  • 34
  • 59

1 Answers1

2

You need to create what is called a stage barrier, something that will force spark not to push the coalesce and the join into the same stage.

If you can manage holding the whole result on a single partition, the join result probably isn't too large. Consider replacing coalesce with repartition, which will do a full shuffle but also create a stage barrier and perform the join in parallel. This was fast enough for my use cases but might not be best practice as repartition is an expensive operation.

It might be faster to call df.localCheckpoint which writes intermediate results to disk (but unlike cache creates a stage barrier) then coalesce but I haven't personally tried it.

More elaboration here

krezno
  • 392
  • 1
  • 12