4

While trying to re-partition a dataframe from S3, I am getting a generic error:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 1.0 failed 4 times, most recent failure: Lost task 33.4 in stage 1.0 (TID 88, 172.44.16.141, executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: worker lost

When I check the driver logs, I see the same generic error after a warning:

20/07/22 15:47:21 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.

I am unable to understand why I am facing this warning even though I have tuned the Spark rationally.

I read about this warning here.

My Spark configs are:

 Workers (8)
Worker Id   Address State   Cores   Memory
worker id   add     ALIVE   2 (2 Used)  502.1 GB (27.0 GB Used)
worker id   add     ALIVE   2 (2 Used)  61.8 GB (27.0 GB Used)
worker id   add     ALIVE   2 (2 Used)  61.8 GB (27.0 GB Used)
worker id   add     ALIVE   2 (2 Used)  61.8 GB (27.0 GB Used)

Running Applications (1)
Application ID  Name    Cores   Memory/Executor Submitted Time  User    State   Duration
app-id          app-name 8          27.0 GB     t1              default RUNNING 22 min

Number of rows: 9367548942

Data partitions are: 3046

Code:

!pip install pyspark==2.4.3

import datetime
import os
import pyspark
import pandas
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
import socket

os.environ['PYSPARK_PYTHON'] = '/opt/app-root/bin/python3'
# os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/app-root/bin/python3'
# spark.jars.ivy={os.environ['HOME']}
SPARK_CLUSTER = 'spark://'
S3_ENDPOINT = ''
SOURCE_BUCKET = ''
SOURCE_BUCKET_AWS_ACCESS_KEY_ID = ""
SOURCE_BUCKET_AWS_SECRET_ACCESS_KEY = ""
SPARK_APP_NAME = f'repartition - {datetime.datetime.now().strftime("%Y-%m-%d %H:%M")}'
HOSTNAME = socket.gethostbyname(socket.gethostname())
print('Spark Cluster: {}'.format(SPARK_CLUSTER))
print('S3 endpoint: {}'.format(S3_ENDPOINT))
print('Source Bukcet: {}'.format(SOURCE_BUCKET))
print('Spark App Name: {}'.format(SPARK_APP_NAME))
print('Hostname: {}'.format(HOSTNAME))

os.environ['AWS_ACCESS_KEY_ID'] = SOURCE_BUCKET_AWS_ACCESS_KEY_ID
os.environ['AWS_SECRET_ACCESS_KEY'] = SOURCE_BUCKET_AWS_SECRET_ACCESS_KEY

def create_spark_config(spark_cluster, executor_memory='16g', executor_cores='4', max_cores='16'):
    print('Spark cluster is: {}'.format(spark_cluster))
    sc_conf = (
        pyspark.SparkConf().setMaster(spark_cluster) \
        .set('spark.driver.host', HOSTNAME) \
        .set('spark.driver.port', 42000) \
        .set('spark.driver.bindAddress', '0.0.0.0') \
        .set('spark.driver.blockManager.port', 42100) \
        .set('spark.executor.memory', '27g') \
        .set('spark.executor.cores', '2') \
        .set('spark.sql.parquet.enableVectorizedReader', True)
#         .set('spark.sql.autoBroadcastJoinThreshold', '524288000')
    )
    return sc_conf

def setup_spark():
    spark_config = create_spark_config(SPARK_CLUSTER)
    print('spark_config is: {}'.format(spark_config))
    print("Creating Spark Session at cluster: {}".format(SPARK_CLUSTER))
    spark = SparkSession.builder.appName(SPARK_APP_NAME).enableHiveSupport().config(conf=spark_config).getOrCreate()
    hadoopConf = spark.sparkContext._jsc.hadoopConfiguration()
    hadoopConf.set('fs.s3a.endpoint', S3_ENDPOINT)
    hadoopConf.set('fs.s3a.path.style.access', 'true')
    hadoopConf.set('fs.s3a.access.key', os.environ.get('AWS_ACCESS_KEY_ID'))
    hadoopConf.set('fs.s3a.secret.key', os.environ.get('AWS_SECRET_ACCESS_KEY'))
    hadoopConf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
    print("hadoop is configured!")
    return spark

try:
    spark.stop()
    spark = setup_spark()
except:
    spark = setup_spark()


df = spark.read.parquet(f'{src_path}')

df.count()

df.rdd.getNumPartitions()

df.printSchema()


df.repartition("created_year", "created_month", "created_day").write.partitionBy("created_year", "created_month", "created_day").parquet(dest_path)

The complete stack trace is:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-19-61d00a6c140f> in <module>
----> 1 df.repartition("created_year", "created_month", "created_day").write.partitionBy("created_year", "created_month", "created_day").parquet(dest_path)

/opt/app-root/lib/python3.6/site-packages/pyspark/sql/readwriter.py in parquet(self, path, mode, partitionBy, compression)
    837             self.partitionBy(partitionBy)
    838         self._set_opts(compression=compression)
--> 839         self._jwrite.parquet(path)
    840 
    841     @since(1.6)

/opt/app-root/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/app-root/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/app-root/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o147.parquet.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 34 in stage 5.0 failed 4 times, most recent failure: Lost task 34.10 in stage 5.0 (TID 3562, 172.44.32.75, executor 41): ExecutorLostFailure (executor 41 exited caused by one of the running tasks) Reason: worker lost
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
    ... 33 more
Aviral Srivastava
  • 4,058
  • 8
  • 29
  • 81

0 Answers0