I'm using Spark 3.1 in Databricks (Databricks Runtime 8) with a very large cluster (25 workers with 112 Gb of memory and 16 cores each) to replicate several SAP tables in an Azure Data Lake Storage (ADLS gen2). For doing this, a tool is writting the deltas of all these tables into an intermediate system (SQL Server) and then, if I have new data for a certain table, I execute a Databricks job to merge the new data with the existing data available in ADLS.
This process is working fine for most of the tables, but some of them (the biggest ones) take a lot of time to be merged (I merge the data using the PK of each table) and the biggest one has started failing since a week ago (When a big delta of the table was generated). Trace of the error that I can see in the job:
Py4JJavaError: An error occurred while calling o233.sql. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:234) at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$writeFiles$5(TransactionalWriteEdge.scala:246) ... .. ............................................................................................................................................................................................................................................................................................................................................................................ Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:428) at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.awaitShuffleMapStage$1(DeltaOptimizedWriterExec.scala:153) at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.getShuffleStats(DeltaOptimizedWriterExec.scala:158) at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.computeBins(DeltaOptimizedWriterExec.scala:106) at com.databricks.sql.transaction.tahoe.perf.DeltaOptimizedWriterExec.doExecute(DeltaOptimizedWriterExec.scala:174) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180) ... 141 more Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 68 (execute at DeltaOptimizedWriterExec.scala:97) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Connection from /XXX.XX.XX.XX:4048 closed at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:769) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:684) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:69) at .................................................................................................................................................................................................................................................................................................................................... ... java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Connection from /XXX.XX.XX.XX:4048 closed at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146) at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ... 1 more
As the error is non descriptive, I have taken a look to each executor log and I have seen following message:
21/04/07 09:11:24 ERROR OneForOneBlockFetcher: Failed while starting block fetches java.io.IOException: Connection from /XXX.XX.XX.XX:4048 closed
And in the executor that seems to be unable to connect, I see the following error message:
21/04/06 09:30:46 ERROR SparkThreadLocalCapturingRunnable: Exception in thread Task reaper-7 org.apache.spark.SparkException: Killing executor JVM because killed task 5912 could not be stopped within 60000 ms. at org.apache.spark.executor.Executor$TaskReaper.run(Executor.scala:1119) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.$anonfun$run$1(SparkThreadLocalForwardingThreadPoolExecutor.scala:104) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:54) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:101) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:104) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(
I have tried increasing the default shuffle parallelism (From 200 to 1200 as It's suggested here Spark application kills executor) and it seems that the job is more time in execution, but it fails again.
I have tried to monitor the SparkUI meanwhile the job is in execution:
But as you can see, the problem is the same: Some stages are failing because an executor its unreachable because a task has failed more than X times.
The big delta that I mentioned above has more or less 4-5 billion rows and the big dump that I want to merge has, more or less, 100 million rows. The table is not partitioned (yet) so the process is very work-intensive. What is failing is the merge part, not the process to copy the data from SQL Server to ADLS, so the merge is being done once the data to be merge is already in Parquet format.
Any idea of what is happening or what can I do in order to finish this merge?
Thanks in advance.