While trying to re-partition a dataframe from S3, I am getting a generic error:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 1.0 failed 4 times, most recent failure: Lost task 33.4 in stage 1.0 (TID 88, 172.44.16.141, executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: worker lost
When I check the driver logs, I see the same generic error after a warning:
20/07/22 15:47:21 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
I am unable to understand why I am facing this warning even though I have tuned the Spark rationally.
I read about this warning here.
My Spark configs are:
Workers (8)
Worker Id Address State Cores Memory
worker id add ALIVE 2 (2 Used) 502.1 GB (27.0 GB Used)
worker id add ALIVE 2 (2 Used) 61.8 GB (27.0 GB Used)
worker id add ALIVE 2 (2 Used) 61.8 GB (27.0 GB Used)
worker id add ALIVE 2 (2 Used) 61.8 GB (27.0 GB Used)
Running Applications (1)
Application ID Name Cores Memory/Executor Submitted Time User State Duration
app-id app-name 8 27.0 GB t1 default RUNNING 22 min
Number of rows: 9367548942
Data partitions are: 3046
Code:
!pip install pyspark==2.4.3
import datetime
import os
import pyspark
import pandas
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
import socket
os.environ['PYSPARK_PYTHON'] = '/opt/app-root/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/app-root/bin/python3'
# spark.jars.ivy={os.environ['HOME']}
SPARK_CLUSTER = 'spark://'
S3_ENDPOINT = ''
SOURCE_BUCKET = ''
SOURCE_BUCKET_AWS_ACCESS_KEY_ID = ""
SOURCE_BUCKET_AWS_SECRET_ACCESS_KEY = ""
SPARK_APP_NAME = f'repartition - {datetime.datetime.now().strftime("%Y-%m-%d %H:%M")}'
HOSTNAME = socket.gethostbyname(socket.gethostname())
print('Spark Cluster: {}'.format(SPARK_CLUSTER))
print('S3 endpoint: {}'.format(S3_ENDPOINT))
print('Source Bukcet: {}'.format(SOURCE_BUCKET))
print('Spark App Name: {}'.format(SPARK_APP_NAME))
print('Hostname: {}'.format(HOSTNAME))
os.environ['AWS_ACCESS_KEY_ID'] = SOURCE_BUCKET_AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = SOURCE_BUCKET_AWS_SECRET_ACCESS_KEY
def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
print('Spark cluster is: {}'.format(spark_cluster))
sc_conf = (
pyspark.SparkConf().setMaster(spark_cluster) \
.set('spark.driver.host', HOSTNAME) \
.set('spark.driver.port', 42000) \
.set('spark.driver.bindAddress', '0.0.0.0') \
.set('spark.driver.blockManager.port', 42100) \
.set('spark.executor.memory', '27g') \
.set('spark.executor.cores', '2') \
.set('spark.sql.parquet.enableVectorizedReader', True)
# .set('spark.sql.autoBroadcastJoinThreshold', '524288000')
)
return sc_conf
def setup_spark():
spark_config = create_spark_config(SPARK_CLUSTER)
print('spark_config is: {}'.format(spark_config))
print("Creating Spark Session at cluster: {}".format(SPARK_CLUSTER))
spark = SparkSession.builder.appName(SPARK_APP_NAME).enableHiveSupport().config(conf=spark_config).getOrCreate()
hadoopConf = spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set('fs.s3a.endpoint', S3_ENDPOINT)
hadoopConf.set('fs.s3a.path.style.access', 'true')
hadoopConf.set('fs.s3a.access.key', os.environ.get('AWS_ACCESS_KEY_ID'))
hadoopConf.set('fs.s3a.secret.key', os.environ.get('AWS_SECRET_ACCESS_KEY'))
hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
print("hadoop is configured!")
return spark
try:
spark.stop()
spark = setup_spark()
except:
spark = setup_spark()
df = spark.read.parquet(f'{src_path}')
df.count()
df.rdd.getNumPartitions()
df.printSchema()
df.repartition("created_year", "created_month", "created_day").write.partitionBy("created_year", "created_month", "created_day").parquet(dest_path)
The complete stack trace is:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-19-61d00a6c140f> in <module>
----> 1 df.repartition("created_year", "created_month", "created_day").write.partitionBy("created_year", "created_month", "created_day").parquet(dest_path)
/opt/app-root/lib/python3.6/site-packages/pyspark/sql/readwriter.py in parquet(self, path, mode, partitionBy, compression)
837 self.partitionBy(partitionBy)
838 self._set_opts(compression=compression)
--> 839 self._jwrite.parquet(path)
840
841 @since(1.6)
/opt/app-root/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/app-root/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/app-root/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o147.parquet.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 34 in stage 5.0 failed 4 times, most recent failure: Lost task 34.10 in stage 5.0 (TID 3562, 172.44.32.75, executor 41): ExecutorLostFailure (executor 41 exited caused by one of the running tasks) Reason: worker lost
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
... 33 more