0

I am upgrading from spark 2.2 to 3.4.0 and my application also uses kafka stream as well, and with spark-3.4.0 I had to update kafka-client along with. I read somewhere that kafka-3.4.0 does not need depend on ZooKeeper and hence I had to update my code to create topic as follows

 import org.apache.kafka.clients.admin.{AdminClientConfig, NewTopic, Admin}
    val properties = new Properties
    properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9094")
    val admin = Admin.create(properties)
    if (!admin.listTopics().names().get().contains(topicName)) {
      val newTopic = new NewTopic(topicName, 1, 1.toShort)
      val result = admin.createTopics(Collections.singleton(newTopic))
      val future = result.values.get(topicName)
      future.get()
    }

This successfully creates topic but When same broker are accessed for reading stream from topic I get exception,

val kafkaStream = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9094")
  .option("kafka.max.partition.fetch.bytes", settings.kafka.maxRequestSize)
  .option("startingOffsets", settings.kafka.startingOffsets)
  .option("maxOffsetsPerTrigger", settings.kafka.maxOffsetsPerTrigger.getOrElse(1000000L))
  .option("failOnDataLoss", "false")
  .option("subscribe", topicName)
  .load()

Exception looks like

java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:165)
    at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions(ConsumerStrategy.scala:66)
    at org.apache.spark.sql.kafka010.ConsumerStrategy.retrieveAllPartitions$(ConsumerStrategy.scala:65)
    at org.apache.spark.sql.kafka010.SubscribeStrategy.retrieveAllPartitions(ConsumerStrategy.scala:102)
    at org.apache.spark.sql.kafka010.SubscribeStrategy.assignedTopicPartitions(ConsumerStrategy.scala:113)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.$anonfun$partitionsAssignedToAdmin$1(KafkaOffsetReaderAdmin.scala:499)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.withRetries(KafkaOffsetReaderAdmin.scala:518)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.partitionsAssignedToAdmin(KafkaOffsetReaderAdmin.scala:498)
    at org.apache.spark.sql.kafka010.KafkaOffsetReaderAdmin.fetchLatestOffsets(KafkaOffsetReaderAdmin.scala:297)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:246)
    at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:98)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$getStartOffset$2(MicroBatchExecution.scala:455)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.getStartOffset(MicroBatchExecution.scala:455)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:489)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:488)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:932)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:477)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:12)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:802)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:473)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:266)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:411)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:409)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:247)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:237)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:306)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:284)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:207)

Appreciate any help in this regards. Thank you in advance.

Chandan Gawri
  • 364
  • 1
  • 4
  • 15
  • `kafka-clients:2.2` didn't depend on Zookeeper either... You shouldn't upgrade `kafka-clients` outside of `spark-sql-kafka-0-10` without good reason. But why are you using port 9094? Does this require some authentication settings, perhaps? Or what is `settings.kafka.brokers`? – OneCricketeer Apr 25 '23 at 19:26
  • port 9094 is kafka broker running out side docker container, settings.kafka.brokers is configurable value for kafka broker as localhost:9094 – Chandan Gawri Apr 26 '23 at 07:58
  • having kafka topic already created could get rid of this error but if kafka topic does not exists will get still same error – Chandan Gawri Apr 26 '23 at 08:00
  • You mentioned Docker. Did you configure the broker correctly? Is your Spark code also running in a container? If so, then localhost will never connect. Otherwise, I still don't know why you'd need to change Kafka default port... And your error has nothing to do with the AdminClient. It's the consumer that's failing, which means it's your `settings.kafka.broker` value that you've not shared, which seems to be incorrect – OneCricketeer Apr 26 '23 at 12:54
  • @OneCricketeer spark code is running from intellij and kafka is running inside docker to connect kafka port is mapped with localhost so if I keep kafka topic pre-created, my code works well but if I delete kafka topics and try to run above code does not work. I need to create topic manually which was not the case with spark 2.2 – Chandan Gawri Apr 27 '23 at 07:29
  • Please show your docker compose file / run commands for Kafka. A simple port mapping [will not work](https://stackoverflow.com/a/51634499/2308683) – OneCricketeer Apr 27 '23 at 14:41

2 Answers2

1

Try to set this option

spark.sql.streaming.kafka.useDeprecatedOffsetFetching = true

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#offset-fetching

-3

The exception you are encountering, UnknownTopicOrPartitionException: This server does not host this topic-partition, typically occurs when there is a mismatch between the configuration of the Kafka broker and the client. In your case, you have upgraded your Kafka version to 3.4.0 and removed the dependency on ZooKeeper.

However, it seems that your Spark application is still using the old Kafka client version that relies on ZooKeeper. To resolve this issue, you need to update the Kafka client version used by Spark to match the version you are using in your Kafka broker (3.4.0 in your case).

You can update the Kafka client version by modifying the dependencies in your project's build configuration file (e.g., pom.xml for Maven or build.gradle for Gradle). Find the dependency for Kafka and update it to version 3.4.0. For example, if you are using Maven, update the Kafka dependency in your pom.xml file as follows:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

After updating the Kafka client version, rebuild and redeploy your Spark application. This should ensure compatibility between the Kafka broker and the client and resolve the UnknownTopicOrPartitionException.

batman
  • 267
  • 2
  • 11
  • 3
    Welcome back to Stack Overflow. It looks like it's been a while since you've posted and may not be aware of the current policies since most or all of your **5 answers in the last 20 minutes** appear likely to have been entirely or partially written by AI (e.g., ChatGPT). Please be aware that [posting of AI-generated content is banned here](//meta.stackoverflow.com/q/421831). If you used an AI tool to assist with any answer, I would encourage you to delete it. – NotTheDr01ds Jun 12 '23 at 20:22
  • 2
    **Readers should review this answer carefully and critically, as AI-generated information often contains fundamental errors and misinformation.** If you observe quality issues and/or have reason to believe that this answer was generated by AI, please leave feedback accordingly. The moderation team can use your help to identify quality issues. – NotTheDr01ds Jun 12 '23 at 20:22