1

Issue

I have a Spark application in Scala in Amazon EMR (version emr-6.10.0). It tries to write data to Kafka in Amazon MSK (Kafka version 3.4.0) through IAM authentication way.

I have created a topic called hm.motor.avro in the Amazon MSK by:

bin/kafka-topics.sh \
    --bootstrap-server=b-1.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098,b-2.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098 \
    --command-config=config/client.properties \
    --create \
    --topic=hm.motor.avro \
    --partitions=3 \
    --replication-factor=2

Here is the related Spark code writing to MSK using IAM authentication way:

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "b-1.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098,b-2.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098",
  )
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "AWS_MSK_IAM")
  .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
  .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
  .option("topic", "hm.motor.avro")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

build.sbt (I am using Spark 3.3.1 which is the Spark version used in Amazon EMR)

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"

// https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-6100-release.html
val sparkVersion = "3.3.1"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.3" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.3" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.397" % "provided",

  "org.apache.spark" %% "spark-avro" % sparkVersion,
  "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
  "io.delta" %% "delta-core" % "2.2.0",
  "za.co.absa" %% "abris" % "6.3.0",
  "software.amazon.msk" % "aws-msk-iam-auth" % "1.1.6"
)

ThisBuild / assemblyMergeStrategy := {
  // https://stackoverflow.com/a/67937671/2000548
  case PathList("module-info.class") => MergeStrategy.discard
  case x if x.endsWith("/module-info.class") => MergeStrategy.discard
  // https://stackoverflow.com/a/76129963/2000548
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  // https://stackoverflow.com/a/54634225/2000548
  case x if x.contains("io.netty.versions.properties") => MergeStrategy.discard
  case x =>
    val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
    oldStrategy(x)
}

The app assembled by sbt assembly with Java 1.8 (This is default Java version used in EMR).

However, when I spark-submit in both YARN cluster and client modes, I got error:

Caused by: org.apache.spark.SparkException: Writing job aborted
    at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:767)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:302)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:313)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3932)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3161)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3922)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:554)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3920)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3920)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:3161)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:669)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    ... 1 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 106) (ip-xxx-xxx-xxx-xxx.xxx.com executor 1): org.apache.kafka.common.errors.TimeoutException: Topic hm.motor.avro not present in metadata after 60000 ms.

Note the last line

org.apache.kafka.common.errors.TimeoutException: Topic hm.motor.avro not present in metadata after 60000 ms.

Experiment 1

I installed Kafka CLI in Amazon EMR master node. I am able to list the topics in Amazon MSK by

[hadoop@ip-xxx-xxx-xxx-xxx kafka_2.13-3.4.0]$ bin/kafka-topics.sh \
    --bootstrap-server=b-1.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098,b-2.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098 \
    --command-config=config/client.properties \
    --list

__amazon_msk_canary
__consumer_offsets
_schema_encoders
_schemas
hm.motor.avro

config/client.properties:

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

You can see hm.motor.avro is there. It also means using same IAM way, in Spark master node, it is able to access the topics in Amazon MSK.

Also, when I created the EMR cluster, it shows

The instance profile assigns a role to every EC2 instance in a cluster.

enter image description here

So I expect both Spark master and worker nodes in EMR share same access to MSK.

Any guide would be appreciated, thanks!

Hongbo Miao
  • 45,290
  • 60
  • 174
  • 267
  • I suggest contacting EMR support and verifying MSK auth is expected to work. Otherwise, you can use a different port for MSK bootstrap servers that doesn't use IAM – OneCricketeer Jun 04 '23 at 13:07

0 Answers0