0

I have testing cluster with 3 Kafka instances in KRaft mode. Each instance is Broker and Controller. I have inter communication secured with SSL certificate.

After cluster start, each instances know about each. When I create new topic with replicas, problems started.

I create it by command:

/opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=ofd-kafka-0:9092 --create --topic containers --partitions 3 --replication-factor 3

Topic exists, but it seems, that Kafka get stuck in adding replicas, looks like:

Topic: containers    TopicId: ZrOENsloQXuqTn2jt4NbNA    PartitionCount: 3    ReplicationFactor: 1    Configs: segment.bytes=1073741824,retention.ms=2160000,max.message.bytes=15728640,retention.bytes=-1
    Topic: containers    Partition: 0    Leader: 1    Replicas: 0,1,2    Isr: 1    Adding Replicas: 0,2    Removing Replicas: 
    Topic: containers    Partition: 1    Leader: 2    Replicas: 0,1,2    Isr: 2    Adding Replicas: 0,1    Removing Replicas: 
    Topic: containers    Partition: 2    Leader: 0    Replicas: 2,1,0    Isr: 0    Adding Replicas: 1,2    Removing Replicas: 

In each Kafka instance log I can see warnings:

[2023-06-21 14:02:31,827] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 8458 ms. (org.apache.kafka.clients.NetworkClient)
[2023-06-21 14:02:31,827] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Client requested connection close from node 1 (org.apache.kafka.clients.NetworkClient)
[2023-06-21 14:02:31,827] INFO [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: (org.apache.kafka.clients.FetchSessionHandler)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
    at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:109)
    at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:78)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:309)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:124)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:123)
    at scala.Option.foreach(Option.scala:407)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:123)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106)
    at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:97)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2023-06-21 14:02:31,827] WARN [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=2, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={containers-1=PartitionData(topicId=wA7p8AUSRCmvg-Nc4qL9hA, fetchOffset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[1], lastFetchedEpoch=Optional.empty)}, isolationLevel=READ_UNCOMMITTED, removed=, replaced=, metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) (kafka.server.ReplicaFetcherThread)
java.net.SocketTimeoutException: Failed to connect within 30000 ms
    at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:109)
    at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:78)
    at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:309)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:124)
    at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:123)
    at scala.Option.foreach(Option.scala:407)
    at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:123)
    at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:106)
    at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:97)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

It seems, like instances try to sync each other, but get disconnected. This is final state.

My current setup looks like:

apiVersion: apps/v1
kind: StatefulSet
...
spec:
  template:
    spec:
      containers:
      - args:
        - -ec
        - |
          export KAFKA_CFG_NODE_ID="$(echo "$MY_POD_NAME" | grep -o -E '[0-9[[]*$')"
          /opt/bitnami/scripts/kafka/entrypoint.sh /opt/bitnami/scripts/kafka/run.sh
        command:
        - /bin/bash
        env:
        - name: BITNAMI_DEBUG
          value: "true"
        - name: KAFKA_KRAFT_CLUSTER_ID
          value: "Y2MyZmRlNDY3MjM0NGU4Yj"
        - name: KAFKA_CFG_PROCESS_ROLES
          value: controller,broker
        - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
          value: 0@kafka-inst-0:9093,1@kafka-inst-1:9093,2@kafka-inst-2:9093
        - name: KAFKA_CFG_LISTENERS
          value: PLAINTEXT://:9092,CONTROLLER://:9093,INTERNAL://:9094
        - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
          value: CONTROLLER
        - name: KAFKA_HEAP_OPTS
          value: -Xmx1G -Xms1G
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_CFG_MIN_INSYNC_REPLICAS
          value: "2"
        - name: KAFKA_CFG_DEFAULT_REPLICATION_FACTOR
          value: "3"
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        - name: KAFKA_CFG_ADVERTISED_LISTENERS
          value: INTERNAL://$(MY_POD_NAME).test.svc.cluster.local:9094,PLAINTEXT://$(MY_POD_NAME).test.svc.cluster.local:9092
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: INTERNAL
        - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
          value: INTERNAL:SSL,CONTROLLER:SSL,PLAINTEXT:PLAINTEXT
        - name: KAFKA_TLS_TYPE
          value: PEM
        - name: KAFKA_CFG_INITIAL_BROKER_REGISTRATION_TIMEOUT_MS
          value: "240000"
        image: bitnami/kafka:3.3.2-debian-11-r49
        imagePullPolicy: IfNotPresent
...

Same problem I have with different Kafka version (3.3.2, 3.4.1) or image release. Cluster works fine, until I add replicas.

Have please somebody any clue, what is wrong? Or some hints will be valuable as well.

Many thanks!

Edit: Making SSL certificates

I taked inspiration from the blog and adapted scripts for me. Self signed CA cert I assemble in the helm chart via function genCA, if secret with CAcert is missing. If exist, let it as is. This is enough for development purpose. Later, especial in the production, I add handling liveness of the CAcert via CertManager.

CA cert and key are available only for init script. Init make preparation (inspired from the scripts above) for the broker cert. For pod with broker are available only CA cert, CA key is ignored, and of course broker cert and broker key.

As a broker cert store is used PEM file and corresponding KEY file. As a broker trust store is used CAcert PEM file. In statefullset it looks like (based on the Bitnami documentation):

volumeMounts:
- name: cacert
  mountPath: /opt/bitnami/kafka/config/certs/kafka.truststore.pem
  subPath: tls.crt
- name: storage
  mountPath: /opt/bitnami/kafka/config/certs/kafka.keystore.pem
  subPath: certs/kafka.keystore.pem
- name: storage
  mountPath: /opt/bitnami/kafka/config/certs/kafka.keystore.key
  subPath: certs/kafka.keystore.key

I think, that this approach working fine, because when I add -Djavax.net.debug=all parameter, in the logs from brokers I can see, that SSL communication is used without problems.

Same situation, when I made just empty topic with 3 partitions. All brokers made it on their own storage.

If the certs or approach should be bad, this isnt working, I think.

Edit: Usage

Kafka cluster is used in our helm chart as a loadbalancer for messages. Many times we get huge amount of messages in short time period, but by default the flow is stable. Sometimes we need to restart consumer application, so, we using Kafka as a cache in the case. Deploying operator to manage Kafka making new dependency on the helm chart and in the cluster by default exist only one Kafka cluster. Using operator have sense, if we need to have many Kafka cluster installations in the Kubernetes cluster.

Edit: Plaintext listeners

This is just first step. In the step I attempt to secure communication only between brokers. So, producers and consumers stay as is. When I finished this step, of course, next step will be secure communication between Kafka cluster and the applications one by one. When no application without security not exist, parameter ALLOW_PLAINTEXT_LISTENER=yes will be removed.

Adavan
  • 63
  • 2
  • 6
  • If you remove SSL cert does the problem still exist? – Fermi-4 Jun 21 '23 at 14:29
  • @Fermi-4 Yes, previous setup without SSL is working fine. My goal is add security to the Kafka, but get stucked :(. – Adavan Jun 21 '23 at 17:15
  • 1
    Ok then that narrows it down to the security configuration then right? What can you share about the security setup? – Fermi-4 Jun 21 '23 at 18:22
  • Strimzi Operator comes with secure options. Why build the StatefulSet on your own? – OneCricketeer Jun 21 '23 at 19:09
  • In any case, `KAFKA_CFG_ADVERTISED_LISTENERS` has no SSL configuration, so why are they part of `KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP` and/or why have `ALLOW_PLAINTEXT_LISTENER=yes` if you dont want plaintext connection? – OneCricketeer Jun 21 '23 at 19:11
  • I edited my original post regarding to your questions. > In any case, KAFKA_CFG_ADVERTISED_LISTENERS has no SSL configuration, so why are they part of KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP @OneCricketeer What exactly you suggesting? Skip parameter `KAFKA_CFG_ADVERTISED_LISTENERS`? – Adavan Jun 22 '23 at 07:39
  • That property is necessary. I am saying that if you want to use security, as your comment above mentioned, then you need to remove usages of plaintext, and replace them with SSL or SASL_SSL listeners. Regarding your edit, you could still use SASL_PLAINTEXT which is more secure than only PLAINTEXT ... Or, as I mentioned above. Don't create your own resources, and instead use an Operator like Strimzi which configures this all for you in other ways – OneCricketeer Jun 22 '23 at 22:39
  • I'm also trying to get 3 Kafka instances to work in kraft mode using the bitnami kafka image. But I got a DNS issue indicating the brokers don't know each other: `[2023-07-16 21:38:20,695] WARN [RaftManager id=2] Error connecting to node kafka-1:9093 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient) java.net.UnknownHostException: kafka-1` Wonder if you had hit similar issues during bringup and how you managed to resolve that? – cafemike Jul 16 '23 at 21:50
  • @cafemike No, I not have that problem. But I think, that you not have correct generated certificates for the brokers. Look to the script [here](https://github.com/codingharbour/kafka-docker-compose/blob/master/kafka-ssl/security/create-pem-certificate.sh#L44) and [here](https://github.com/codingharbour/kafka-docker-compose/blob/master/kafka-ssl/security/create-pem-certificate.sh#L48). Also read the blog, which I mentioned in my question. I dont recommend disabling name check for security reason, but for testing it will be fine. – Adavan Jul 18 '23 at 12:44
  • @Adavan Thanks for the pointers. Let me look into this. I found out more about my previous issues. It was only due to the fact that I had other bad config, which made the pod constantly failing and hence endpoint not available. – cafemike Jul 19 '23 at 03:47

1 Answers1

0

Thank you for your effort, I solved the problem. Was on typically place - between chair and keyboard :). I focused to Kafka too much :(.

In the step I introduce new listener (INTERNAL://:9094), but forgot add it to the Kubernetes service object. Looks like:

spec:
  ports:
    - port: 9092
      name: listener
    - port: 9093
      name: controller
    - port: 9094
      name: internal

Last two lines solved the problem :).

Adavan
  • 63
  • 2
  • 6