0

I am trying to update my code to the new spark-bigquery connector to 0.15.{0,1}-beta and I got that delta format is not working anymore.

I cannot read or write using delta form.

Here you can find a minimal example for writing a dataframe using delta format:

scala code

import org.apache.spark.sql.SparkSession

object Delta extends App {

  val spark = SparkSession.builder.master("local[*]").getOrCreate()

  import spark.implicits._

  val df = Seq(("hi",1),("bye",2)).toDF("first","second")

  val output = "/tmp/test"

  val format = "delta"

  df.write.format(format).save(output)

}

If I use the following configuration, the code runs without problems

build.sbt

name := "delta-gcs"

version := "0.1"

scalaVersion := "2.12.10"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.5"
libraryDependencies += "io.delta" %% "delta-core" % "0.6.0"
libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery" % "0.14.0-beta"

But if I changed the version of spark-bigquery to the new one:

libraryDependencies += "com.google.cloud.spark" %% "spark-bigquery" % "0.15.1-beta"

I got this error:

Exception in thread "main" java.lang.NoSuchMethodError: com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper.$init$(Lcom/fasterxml/jackson/module/scala/experimental/ScalaObjectMapper;)V
    at org.apache.spark.sql.delta.util.JsonUtils$$anon$1.<init>(JsonUtils.scala:27)
    at org.apache.spark.sql.delta.util.JsonUtils$.<init>(JsonUtils.scala:27)
    at org.apache.spark.sql.delta.util.JsonUtils$.<clinit>(JsonUtils.scala)
    at org.apache.spark.sql.delta.DeltaOperations$Write.$anonfun$parameters$1(DeltaOperations.scala:58)
    at scala.Option.map(Option.scala:230)
    at org.apache.spark.sql.delta.DeltaOperations$Write.<init>(DeltaOperations.scala:58)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:66)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:134)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at Delta$.delayedEndpoint$Delta$1(Delta.scala:15)
    at Delta$delayedInit$body.apply(Delta.scala:3)
    at scala.Function0.apply$mcV$sp(Function0.scala:39)
    at scala.Function0.apply$mcV$sp$(Function0.scala:39)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
    at scala.App.$anonfun$main$1$adapted(App.scala:80)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.App.main(App.scala:80)
    at scala.App.main$(App.scala:78)
    at Delta$.main(Delta.scala:3)
    at Delta.main(Delta.scala)

Something similar happens when you try to read from delta format

2 Answers2

0

Both libraries are using xml library. One of them is using an older version.

try adding to sbt

libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.0"

Later edit:

I'm seeing the big query connetor is using:

"com.fasterxml.jackson.core" % "jackson-databind" % "2.10.3",
"com.fasterxml.jackson.module" % "jackson-module-paranamer" % "2.10.3",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.10.3",

And spark-sql

<fasterxml.jackson.version>2.6.7</fasterxml.jackson.version>
<fasterxml.jackson-module-scala.version>2.6.7.1</fasterxml.jackson-module-scala.version>
<fasterxml.jackson.databind.version>2.6.7.3</fasterxml.jackson.databind.version>

Try to for them to use the same libray How to force a specific version of dependency?

M. Alexandru
  • 614
  • 5
  • 20
0

Due to dependency updates in version 0.15.x, the jackson version has been updated. The easier way to satisfy the version requirements of both spark-sql and the BigQuery connector is to move to the shaded version of the connector, where all the dependencies are re-packaged and provided as part of the connector's jar. This way there is no version colusions. The shaded dependency is "com.google.cloud.spark" %% "spark-bigquery-with-dependencies" % "0.15.1-beta".

David Rabinowitz
  • 29,904
  • 14
  • 93
  • 125