1

I'm working on Argo Kafka Eventsource and not able to set up SASL_SSL with it.

Below is the EventSource manifest

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: kafka
  namespace: MY_NAMESPACE
spec:
  eventBusName: MY_EVENTBUS
  kafka:
    kafka-event:
      # kafka broker url
      url: MY_BROKER_URL
      # name of the kafka topic
      topic: MY_TOPIC
      # jsonBody specifies that all event body payload coming from this
      # source will be JSON
      jsonBody: true
      # partition id
      # partition: "1"
      # optional backoff time for connection retries.
      # if not provided, default connection backoff time will be used.
      connectionBackoff:
        # duration in nanoseconds, or strings like "3s", "2m". following value is 10 seconds
        duration: 10s
        # how many backoffs
        steps: 5
        # factor to increase on each step.
        # setting factor > 1 makes backoff exponential.
        factor: 2
        jitter: 0.2
      # Use a consumer group, if this is used you do not need to specify a "partition: <id>"
      consumerGroup:
        groupName: argo-test-group
        oldest: false
        rebalanceStrategy: range
        limitEventsPerSecond: 1
        version: "2.5.0"
      #    Enable SASL authentication (not to be used with TLS)
      sasl:
        mechanism: SCRAM-SHA-512
        passwordSecret:
          key: password
          name: kafka-sasl-creds
        userSecret:
          key: username
          name: kafka-sasl-creds

But I'm gettting the below error

{"level":"info","ts":1679338599.3659816,"logger":"argo-events.eventsource","caller":"kafka/start.go:81","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"kafka-event"}
{"level":"error","ts":1679338600.1314366,"logger":"argo-events.eventsource","caller":"kafka/start.go:122","msg":"Error creating consumer group client: kafka: client has run out of available brokers to talk to: unexpected EOF","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"kafka-event","stacktrace":"github.com/argoproj/argo-events/eventsources/sources/kafka.(*EventListener).consumerGroupConsumer\n\t/home/runner/work/argo-events/argo-events/eventsources/sources/kafka/start.go:122\ngithub.com/argoproj/argo-events/eventsources/sources/kafka.(*EventListener).StartListening\n\t/home/runner/work/argo-events/argo-events/eventsources/sources/kafka/start.go:87\ngithub.com/argoproj/argo-events/eventsources.(*EventSourceAdaptor).run.func3.1\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:503\ngithub.com/argoproj/argo-events/common.DoWithRetry.func1\n\t/home/runner/work/argo-events/argo-events/common/retry.go:106\nk8s.io/apimachinery/pkg/util/wait.ConditionFunc.WithContext.func1\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.24.3/pkg/util/wait/wait.go:220\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtectionWithContext\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.24.3/pkg/util/wait/wait.go:233\nk8s.io/apimachinery/pkg/util/wait.runConditionWithCrashProtection\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.24.3/pkg/util/wait/wait.go:226\nk8s.io/apimachinery/pkg/util/wait.ExponentialBackoff\n\t/home/runner/go/pkg/mod/k8s.io/apimachinery@v0.24.3/pkg/util/wait/wait.go:421\ngithub.com/argoproj/argo-events/common.DoWithRetry\n\t/home/runner/work/argo-events/argo-events/common/retry.go:105\ngithub.com/argoproj/argo-events/eventsources.(*EventSourceAdaptor).run.func3\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:502"}

But when I'm adding the below to the manifests to skip TLS verify it works

tls:
  insecureSkipVerify: true

However, I want don't want to make it insecure.

Can someone please help me here to figure out what I'm doing wrong ?

Thanks

Argo Events Version: v1.7.6

crenshaw-dev
  • 7,504
  • 3
  • 45
  • 81
Aman
  • 193
  • 2
  • 15

1 Answers1

0

I have got it working now. So, posting the solution in case someone face the same issue. I found the documentation as well as the error misleading. I had to get into the source code to figure that out.

Issue: Basically the issue is with Kafka connection. I have private certs in kafka. That's why, If I don't enable insecureSkipVerify and don't provide CA cert, it fails to establish the connection with kafka. So, all we need to do is pass our private CA along with sasl creds.

Fix: Now, we can do that using tls and sasl block together as there's no validation logic for that and we can pass our ca into tls block along with sasl. This way we don't need to enable insecureSkipVerify and pass our CA cert. You need to create the k8s secrets beforehand

es-cacert-secret.yaml

apiVersion: v1
kind: Secret
metadata:
  name: YOUR_TLS_SECRET
type: kubernetes.io/tls
data:
  ca.crt: YOUR_CA_CERT

es-sasl-secret.yaml

apiVersion: v1
kind: Secret
metadata:
  name: YOUR_SASL_SECRET
type: Opaque
data:
  password: YOUR_PASSWORD
  username: YOUR_USERNAME

EventSource.yaml

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: "kafka"
spec:
  eventBusName: eventBusName
  kafka:
    quin-case-event:
      url: kafkaurl
      topic: kafkatopic
      jsonBody: true
      connectionBackoff:
        duration: 10s
        steps: 5
        factor: 2
        jitter: 0.2
      consumerGroup:
        groupName: kafkaconsumerGroup
        oldest: false
        rebalanceStrategy: range
        limitEventsPerSecond: 1
        version: "2.5.0"
      tls:
        caCertSecret:
          name: YOUR_TLS_SECRET
          key: ca.crt
      sasl:
        mechanism: SCRAM-SHA-512
        passwordSecret:
          key: password
          name: YOUR_SASL_SECRET
        userSecret:
          key: username
          name: YOUR_SASL_SECRET
Aman
  • 193
  • 2
  • 15