0

I am using scala to simply write a Data Frame in delta format in a hdfs but getting an error which I am unable to understand whats causing it, please help me with this

Below is the code using which I am writing a delta table in my local hdfs.

val columns=Array("id", "first", "last", "year")

val test_df =sc.parallelize(Seq(
              (1, "John", "Doe", 1986),
              (2, "Ive", "Fish", 1990),
              (4, "John", "Wayne", 1995)
            )).toDF(columns: _*);

test_df.write.format("delta").mode("overwrite").save("hdfs://localhost:9000/user/test/");

Dependencies being used are:

  1. Spark v3.4.0
  2. Hadoop v3.3.5
  3. Delta Package: io.delta:delta-core_2.12:1.2.1

Error Msg:

java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/FileFormatWriter$Empty2Null
    at org.apache.spark.sql.delta.DeltaLog.startTransaction(DeltaLog.scala:197)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:210)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:78)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:156)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:303)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
    at EventStream$.update_cta(EventStream.scala:25)
    at EventStream$.$anonfun$update_profiles_on_s3$1(EventStream.scala:68)
    at EventStream$.$anonfun$update_profiles_on_s3$1$adapted(EventStream.scala:68)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:34)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:729)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:726)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:726)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:284)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.execution.datasources.FileFormatWriter$Empty2Null
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 61 more

Spark Version

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/
         
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_362)

To reproduce this issue simply run


 ./spark-shell --packages io.delta:delta-core_2.12:1.2.1
    
 val columns=Array("id", "first", "last", "year")
    
 val test_df=sc.parallelize(Seq(
      (1, "John", "Doe", 1986),
      (2, "Ive", "Fish", 1990),
      (4, "John", "Wayne", 1995)
    )).toDF(columns: _*)

  test_df.write.format("delta").mode("append").save("hdfs://localhost:9000/user/test/") 

OR give any path if hdfs is not configured

Alex Ott
  • 80,552
  • 8
  • 87
  • 132
Yash Tandon
  • 345
  • 5
  • 18
  • 1
    Do you really need HDFS? Can you provide file:// path? How are you running this code? Using spark-submit? And creating an assembly jar? https://spark.apache.org/docs/latest/submitting-applications.html#bundling-your-applications-dependencies – OneCricketeer Apr 21 '23 at 12:30

1 Answers1

1

I can't reproduce NoClassDefFoundError. Managed to reproduce after replacing hdfs://... with file:///.... The following code seems to run with the following build.sbt (the code throws java.net.ConnectException: Call From .../127.0.1.1 to localhost:9000 failed on connection exception: java.net.ConnectException; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused for me because I didn't configure Hadoop service). Please prepare step-by-step reproduction.

build.sbt

ThisBuild / scalaVersion := "2.12.17"

libraryDependencies ++= Seq(
  "org.apache.spark"  %% "spark-sql"             % "3.4.0",
  "io.delta"          %% "delta-core"            % "1.2.1",
  "org.apache.hadoop" %  "hadoop-client"         % "3.3.5",
  "org.apache.hadoop" %  "hadoop-client-api"     % "3.3.5",
  "org.apache.hadoop" %  "hadoop-client-runtime" % "3.3.5"
)
import org.apache.spark.sql.SparkSession

object App {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder
      .master("local")
      .appName("Spark app")
      .getOrCreate()

    val sc = spark.sparkContext
    import spark.implicits._

    val columns = Array("id", "first", "last", "year")

    val test_df = sc.parallelize(Seq(
      (1, "John", "Doe", 1986),
      (2, "Ive", "Fish", 1990),
      (4, "John", "Wayne", 1995)
    )).toDF(columns: _*);

    test_df.write.format("delta")
      .mode("overwrite")
      .save("hdfs://localhost:9000/user/test/")
  }
}

The class org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null exists in spark-sql 3.3.2-

https://github.com/apache/spark/blob/v3.3.2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L59

In 3.4.0 it's absent

https://github.com/apache/spark/blob/v3.4.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

You should look at your classpath in order to figure out which of dependency is still using Spark 3.3.2- although you think that you're using Spark 3.4.0.

How do you build your project? With sbt?

  • You can do sbt dependencyTree (this doesn't show provided dependencies).

  • Or print System.getProperty("java.class.path") (this shows classpath only upon JVM start).

  • Or add scalacOptions += "-Ylog-classpath" to build.sbt

  • Or run the following script inside your actual Spark environment that you're using

var cl = getClass.getClassLoader
while (cl != null) {
  println(s"classloader: ${cl.getClass.getName}")
  cl match {
    case cl: URLClassLoader =>
      println("classloader urls:")
      cl.getURLs.foreach(println)
    case _ =>
      println("not URLClassLoader")
  }
  cl = cl.getParent
}

Run a scala code jar appear NoSuchMethodError:scala.Predef$.refArrayOps

Why am I getting a NoClassDefFoundError in Java?

What causes and what are the differences between NoClassDefFoundError and ClassNotFoundException?


The thing seems to be that delta-core 1.2.1 was compiled with respect to spark-sql 3.2.0

https://repo1.maven.org/maven2/io/delta/delta-core_2.13/1.2.1/delta-core_2.13-1.2.1.pom

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.13</artifactId>
  <version>3.2.0</version>
  <scope>provided</scope>
</dependency>

And even the newest delta-core 2.3.0 is compiled with respect to spark-sql 3.3.2

https://repo1.maven.org/maven2/io/delta/delta-core_2.13/2.3.0/delta-core_2.13-2.3.0.pom

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.13</artifactId>
  <version>3.3.2</version>
  <scope>provided</scope>
</dependency>

But as I said, Empty2Null is absent in spark-sql 3.4.0. So it seems all existing versions of delta-core can be incompatible with Spark 3.4.0.

If in build.sbt I upgrade delta-core to 2.3.0 then NoClassDefFoundError changes to

com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: org.apache.spark.ErrorInfo.messageFormat()Ljava/lang/String;

Try to downgrade Spark to 3.3.2-

Then your code works for me properly. I replaced "hdfs://..." with local path "file:///..." and added configuration

val spark = SparkSession.builder
  .master("local")
  .appName("Spark app")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") // !!!
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") // !!!
  .getOrCreate()

as was requested in another exception. And now in my local path file:///... several files appeared.

Actually this is written in Delta Lake docs

https://docs.delta.io/latest/releases.html

Compatibility with Apache Spark

Delta Lake version Apache Spark version
2.3.x 3.3.x
2.2.x 3.3.x
2.1.x 3.3.x
2.0.x 3.2.x
1.2.x 3.2.x
1.1.x 3.2.x
1.0.x 3.1.x
0.7.x and 0.8.x 3.0.x
Below 0.7.0 2.4.2 - 2.4.

https://github.com/delta-io/delta/issues/1696 [Feature Request] Support Spark 3.4

the plan is to release Delta 2.4 on Spark 3.4

Dmytro Mitin
  • 48,194
  • 3
  • 28
  • 66
  • I have added some steps to simply reproduce this on shell, the problem here seems to be of format "delta" irrespective of writing it on hdfs or somewhere else the issue remains the same. – Yash Tandon Apr 24 '23 at 03:36
  • @YashTandon Are you using `spark-shell` from `spark-3.4.0-bin-hadoop3.tgz` from https://spark.apache.org/downloads.html ? For me the last line `test_df.write...` in spark-shell throws `java.net.ConnectException` because I haven't set that up (not `NoClassDefFoundError`). Can you show your classpath? For example you can run that my classloader script or start `spark-shell` like `./spark-shell -Ylog-classpath --packages ...` – Dmytro Mitin Apr 24 '23 at 04:09
  • @YashTandon Also you can enter `System.getProperty("java.class.path")` but this can show not the full classpath https://stackoverflow.com/questions/18626396/whats-the-difference-between-system-getpropertyjava-class-path-and-getclass By default, Scala repl (Spark shell) truncates output so this should be set up with `:power` `vals.isettings.maxPrintString = Int.MaxValue` https://blog.ssanj.net/posts/2016-10-16-output-in-scala-repl-is-truncated.html – Dmytro Mitin Apr 24 '23 at 04:22
  • Yes I am using spark-shell from tgz package, also currently I am not able to edit my question as there are too many pending edits in stack so its not allowing me to edit, here is link for the classpath log that got printed https://docs.google.com/document/d/1SCXmcfxmNa9yV7_qOJM3rT37NXn5EUoyJmPRu1KhWfc/edit?usp=sharing – Yash Tandon Apr 24 '23 at 04:27
  • @YashTandon Actually I managed to reproduce `NoClassDefFoundError`. I replaced hdfs with local path `file:///...`. Thanks. – Dmytro Mitin Apr 24 '23 at 04:28
  • @YashTandon `NoClassDefFoundError` is also reproducible with `App.scala` and `build.sbt` as at the beginning of my answer if I replace `"hdfs://..."` with `"file:///..."`. – Dmytro Mitin Apr 24 '23 at 04:33
  • @YashTandon I guess I figured out. See update. – Dmytro Mitin Apr 24 '23 at 05:07
  • 1
    yes, I downgraded to spark v3.2.1 and it started working now, thanks for your help. – Yash Tandon Apr 24 '23 at 05:24