4

I am trying to save a dataframe to hive table. Simplified code looks like this:

DataFrame df = hiveContext.sql(<some query>);
df.write().mode("overwrite").saveAsTable("schemaName.tableName");

Here is the error which I am getting while running this program:

jdbc.JDBCRDD: closed connection
16/09/11 23:27:14 INFO scheduler.DAGScheduler: Job 2 failed: saveAsTable at DataAccessService.java:48, took 4.131753 s
16/09/11 23:27:14 INFO scheduler.DAGScheduler: ResultStage 5 (saveAsTable at DataAccessService.java:48) failed in 1.788 s
16/09/11 23:27:14 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@24554804)
16/09/11 23:27:14 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(2,1473650834074,JobFailed(org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down))
16/09/11 23:27:14 ERROR datasources.InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658)
        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581)
        at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1751)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1750)
        at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:607)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:267)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:239)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:239)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1766)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:239)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:239)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:239)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:218)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1933)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
        at org.apache.spark.sql.hive.execution.CreateMetastoreDataSourceAsSelect.run(commands.scala:258)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:251)
        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:221)
        at DataAccessService.saveToHive(DataAccessService.java:48)
        at Main.main(Main.java:42)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

From few google searches I found that this could be because of sufficient memory on executor. It is not possible for me to allocate more memory than what I am allocating currently. Is there any configurable setting that I should provide as work around of this issue.

vatsal mevada
  • 5,148
  • 7
  • 39
  • 68
  • 1
    You can try to re partition your data frame and increase the no of partition. – sau Sep 12 '16 at 10:19
  • Did you try to use that arguments `driver-memory` and `executor-memory` to limit the memory to use? http://spark.apache.org/docs/latest/configuration.html#application-properties – Henrique Limas Oct 13 '16 at 21:17

1 Answers1

0

Divide and Conquer

  1. create chunks
def chunker(seq, 
            size_chunks=5):
    """
    This function generation chunks of a dataframe.
    As in: https://stackoverflow.com/a/25701576/7127519 
    Parameters
    ----------
    :param seq. Like a datafaframe
    :param size_chunks. Size of the chunks. Number of rows.

    """
    return (seq[pos:pos + size_chunks] for pos in range(0, len(seq), size_chunks))
  1. save each of them. Something like this:
   for data_chunked in tqdm(chunker(pandas_df_transformed, size_chunks=size_chunks)):
        print(data_chunked.shape)
        spq_spark = spark.createDataFrame(data_chunked, schema=schema)
        # Create temp view in Hive
        spq_spark_updated.createOrReplaceTempView(table_to_save_name)
        # Write the data
        spq_spark.write.mode('append').format('hive').saveAsTable("{}.{}".format(database,table_to_save_name))
Rafael Valero
  • 2,736
  • 18
  • 28