1

I'm new to Kubernetes, and fairly new to Kafka.

My goal is to get my Kafka Connect instance to properly connect to my Broker, so that it can receive messages.

So far, I've created a Zookeeper deployment and wrapped it in a service. (As far as I understand, this is necessary for the Kafka broker to be able to refer to zookeeper as simply "zookeeper:2181".)

Minikube diagram

I had hoped that setting up the Kafka Connect instance would be just as simple. I wrapped my broker in a service, and pointed my Connect instance to it by name.

worker.properties

bootstrap.servers=kafka-broker:9092

However, Connect crashes with this exception:

java.io.IOException: Can't resolve address: kafka-broker-57b74c6766-d9w5j:9092
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:889)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1104)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.nio.channels.UnresolvedAddressException
    at java.base/sun.nio.ch.Net.checkAddress(Net.java:130)
    at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:675)
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
    ... 6 more

I've created an alpine-based pod as described here, and I can successfully curl -v kafka-broker:9092. But this is not the address that Connect is trying to resolve. It's trying to resolve kafka-broker-57b74c6766-d9w5j:9092, which, like Connect, I cannot resolve. I'm not sure why Connect is trying to contact a specific pod, since this is the exact problem that Kubernetes services are supposed to prevent (in my limited understanding, anyway).

How can get Connect to communicate to the broker correctly?


Here's the output of kubectl describe svc kafka-broker:

Name:              kafka-broker
Namespace:         default
Labels:            <none>
Annotations:       <none>
Selector:          app=kafka-broker
Type:              ClusterIP
IP:                10.108.61.90
Port:              <unset>  9092/TCP
TargetPort:        9092/TCP
Endpoints:         172.17.0.7:9092
Session Affinity:  None
Events:            <none>
Cameron Hudson
  • 3,190
  • 1
  • 26
  • 38
  • 1
    Can you add some details about your `kafka-broker` service? (`kubectl describe svc kafka-broker`) – Eduardo Baitello Jul 03 '19 at 18:56
  • @Eduardo Baitello Added. – Cameron Hudson Jul 03 '19 at 20:18
  • Fine. As it has an `IP`, it's a "Normal" service (non-[headless](https://kubernetes.io/docs/concepts/services-networking/service/#headless-services)). So, I presume that the `kafka-broker-57b74c6766-d9w5j:9092` is being returned from the `advertised.listeners` broker configuration. Please, give it a try by changing your `bootstrap.servers` to use a [FDQN](https://github.com/kubernetes/community/blob/master/contributors/design-proposals/network/pod-resolv-conf.md#existing-workarounds) approach to see if it works (something like `kafka-broker.your-namespace.svc.cluster.local:9092`). – Eduardo Baitello Jul 03 '19 at 20:55
  • 3
    I also recommend that you read this paper from Confluent: [Recommendations for Deploying Apache Kafka® on Kubernetes](https://www.confluent.io/wp-content/uploads/Recommendations-for-Deploying-Apache-Kafka-on-Kubernetes.pdf), especially the `Traffic: How Do Clients Communicate with Individual Brokers?` section. Please, let me know about your tests using the FQDN as suggested above. – Eduardo Baitello Jul 03 '19 at 20:57
  • @Eduardo Baitello Thanks for this reference. Using the FQDN in my `worker.properties` file still results in Connect trying to use the pod name. However, based on your reference, I think the solution may be to use a headless service. I'll try that next. – Cameron Hudson Jul 03 '19 at 23:23

2 Answers2

2

To resolve my problem, I had to:

  • Change the kafka-broker Deployment to a StatefulSet.
  • Make the kafka-broker Service headless by setting spec.clusterIP to "None".

Thank you to @Eduardo Baitello for this guidance!

Cameron Hudson
  • 3,190
  • 1
  • 26
  • 38
  • what if kafka is not running on kubernetes but on its own server, what could be the issue in that case ? the error message still the same as the one above – Muzi Jack Jan 20 '20 at 14:11
0

When you connect to the broker it is advertising itself as its hostname. Taken from https://rmoff.net/2018/08/02/kafka-listeners-explained/:

tl;dr : You need to set advertised.listeners (or KAFKA_ADVERTISED_LISTENERS if you’re using Docker images) to the external address (host/IP) so that clients can correctly connect to it. Otherwise they’ll try to connect to the internal host address–and if that’s not reachable then problems ensue.

On the Kafka Broker, set KAFKA_ADVERTISED_LISTENERS env variable to the name of the service (kafka-broker) and it will advertise that to clients so they will connect to the proper service name.

Andy Shinn
  • 26,561
  • 8
  • 75
  • 93
  • what if kafka is not running on kubernetes but on its own server, what could be the issue in that case ? the error message still the same as the one above – Muzi Jack Jan 20 '20 at 14:11