0

I am using two separate Mac laptops for my Kafka Streams learning. One laptop (10.0.0.253) is running dockerized Kafka:

3ed3f3b6989b   confluentinc/cp-enterprise-kafka:6.0.0   "/etc/confluent/dock…"   2 hours ago   Up 2 hours   9092/tcp, 0.0.0.0:29092->29092/tcp, :::29092->29092/tcp                       kafka
a8ab0ae59df7   confluentinc/cp-zookeeper:6.0.0          "/etc/confluent/dock…"   2 hours ago   Up 2 hours   2181/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:32181->32181/tcp, :::32181->32181/tcp   zookeeper

I use my other Mac laptop to compile and run the Kafka Streams app:

package com.example;

import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

class DslExample {

  public static void main(String[] args) {
    // the builder is used to construct the topology
    StreamsBuilder builder = new StreamsBuilder();

    // read from the source topic, "users"
    KStream<Void, String> stream = builder.stream("users");

    // for each record that appears in the source topic,
    // print the value
    stream.foreach(
        (key, value) -> {
          System.out.println("(DSL) Hello, " + value);
        });

    // you can also print using the `print` operator
    // stream.print(Printed.<String, String>toSysOut().withLabel("source"));

    // set the required properties for running Kafka Streams
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev1");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.0.253:29092");
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Void().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    // build the topology and start streaming
    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    streams.start();

    // close Kafka Streams when the JVM shuts down (e.g. SIGTERM)
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }
}

This same exact code connects to the Kafka broker just fine when it runs on the same laptop:

config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29092")

When I run this code on a separate laptop - I see no errors, but the connection to the remote Kafka broker is never established and the data from the "users" topic is never read.

Is there a particular technique to be able to connect a Kafka Stream app to a remote Kafka broker?

Eugene Goldberg
  • 14,286
  • 20
  • 94
  • 167
  • I just built an identical Kafka environment on Azure (wide open). Still unable to connect with Kafka Streams app, so this is not a Mac-specific issue – Eugene Goldberg Feb 19 '22 at 22:05
  • 1
    It sounds like your dockerised Kafka isn't allowing connections from outside the host - have you checked your firewall on that system to see if those ports are unblocked? – James McPherson Feb 20 '22 at 04:20
  • Port forwards are not enough. Read linked post about setting up the advertised listeners for remote access – OneCricketeer Feb 20 '22 at 07:36

0 Answers0