3

Iam trying to publish messages to a topic in GCP's Pub/Sub using Spark Scala using IntelliJ. Here is the code: GcpPublish.scala

val publisher = Publisher.newBuilder(s"projects/projectid/topics/test")
                .setCredentialsProvider(FixedCredentialsProvider
                .create(ServiceAccountCredentials
                .fromStream(new FileInputStream("gs://credsfiles/projectid.json"))))
                .build()

publisher.publish(PubsubMessage
         .newBuilder
         .setData(ByteString.copyFromUtf8(JSONData.toString()))
         .build())

And this is the build.sbt:

name := "TryingSomething"

version := "1.0"

scalaVersion := "2.11.12"

val sparkVersion = "2.3.2"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.3.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "2.3.2" ,
  "com.google.cloud" % "google-cloud-bigquery" % "1.106.0",
  "org.apache.beam" % "beam-sdks-java-core" % "2.19.0" ,
  "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % "2.19.0",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0" ,
  "org.apache.beam" % "beam-sdks-java-extensions-google-cloud-platform-core" % "2.19.0" ,
  "org.apache.beam" % "beam-sdks-java-io-google-cloud-platform" % "2.19.0" ,
  "com.google.apis" % "google-api-services-bigquery" % "v2-rev456-1.25.0" ,
  "com.google.cloud" % "google-cloud-pubsub" % "1.102.1",
  "com.google.guava" % "guava" % "28.2-jre",
  "org.apache.httpcomponents" % "httpclient" % "4.5.11"
)

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case _ => MergeStrategy.first
}

but when I create the fat jar and run it on the Dataprocs cluster I get the below error:

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;I)V
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$Builder.setPoolSize(InstantiatingGrpcChannelProvider.java:527)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$Builder.setChannelsPerCpu(InstantiatingGrpcChannelProvider.java:546)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$Builder.setChannelsPerCpu(InstantiatingGrpcChannelProvider.java:535)
    at com.google.cloud.pubsub.v1.Publisher$Builder.<init>(Publisher.java:633)
    at com.google.cloud.pubsub.v1.Publisher$Builder.<init>(Publisher.java:588)
    at com.google.cloud.pubsub.v1.Publisher.newBuilder(Publisher.java:584)

I followed the solutions stated here and added the guava and httpcomponents dependencies but I still get the same exception.

I even changed the code to instantiate the Publisher to:

val publisher = Publisher.newBuilder(s"projects/projectid/topics/test").build()

But this gives the same error as well.

Any suggestions what could cause this error?

Carol
  • 347
  • 5
  • 17

1 Answers1

1

The problem was that both Spark and Hadoop injected their own versions of guava that is also present in the Google Pubsub package. I got around this by adding shade rules in the build.sbt file:

assemblyShadeRules in assembly := Seq(
  ShadeRule.rename("com.google.common.**" -> "repackaged.com.google.common.@1").inAll,
  ShadeRule.rename("com.google.protobuf.**" -> "repackaged.com.google.protobuf.@1").inAll,
  ShadeRule.rename("io.netty.**" -> "repackaged.io.netty.@1").inAll
)

The shade rule for com.google.common and com.google.protobuf is the one which solves the guava dependencies. I added the others for later dependency conflicts I encountered on the way.

Carol
  • 347
  • 5
  • 17