0

i started connected my spring cloud app with kafka and zookeeper hosted in docker-compose file, they're connected, but when i run the application and expect the Producer to publish a message, get nothing... i don't know what the error is, i followed this example: https://github.com/ihuaylupo/manning-smia/tree/master/chapter10/licensing-service/src/main/java/com/optimagrowth/license/events

Workflow is that from the github repo example, when i call and endPoint Post on a Service, for example, i wish to my kafka producer to publish a message on topic and Kafka consumer to consume that message, but everything works - Endpoint execute against DB, except kafka producing and consuming messaging, not even an error to know where i did wrong...

Docker-compose file:

 zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    

  kafkaserver:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 #kafka 192.168.99.100 #kafka - ip because i want to access kafka and zookeeper from outside of containers, i.e localhost
      - KAFKA_ADVERTISED_PORT=9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CREATE_TOPICS=dresses:1:1, ratings:1:1
      - KAFKA_SOCKET_REQUEST_MAX_BYTES=2000000000
      - KAFKA_HEAP_OPTS=-Xmx512M -Xmx5g
      - listeners=PLAINTEXT://:9092
      - advertised.listeners=PLAINTEXT://192.168.99.100:9092
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
    depends_on:
      - zookeeper

Kafka consumer spring cloud properties:

spring.cloud.stream.bindings.inboundOrgChanges.destination=orgChangeTopic
spring.cloud.stream.bindings.inboundOrgChanges.content-type=application/json
spring.cloud.stream.bindings.inboundOrgChanges.group=studentsGroup
spring.cloud.stream.kafka.binder.brokers=localhost #kafka
spring.cloud.stream.kafka.binder.zkNodes=localhost

Kafka producer spring cloud properties:

spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.zkNodes=http://192.168.99.100:2181
spring.cloud.stream.kafka.binder.brokers=http://192.168.99.100:9092

Kafka Producer...

@EnableBinding(Source.class) // - on my main Spring app
@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source){
        this.source = source;
    }

    public void publishOrganizationChange(String action, String organizationId){
       logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
        OrganizationChangeModel change =  new OrganizationChangeModel(
                OrganizationChangeModel.class.getTypeName(),
                action,
                organizationId,
                UserContext.getCorrelationId());

        source.output().send(MessageBuilder.withPayload(change).build());
    }
}

@Getter @Setter @ToString
public class OrganizationChangeModel {
    private String type;
    private String action;
    private String organizationId;
    private String correlationId;

    public OrganizationChangeModel(String type, String action, String organizationId, String correlationId) {
        super();
        this.type = type;
        this.action = action;
        this.organizationId = organizationId;
        this.correlationId = correlationId;
    }
}

@Service class ServiceEx {
    @Autowired
    SimpleSourceBean simpleSourceBean;

    public Organization findById(String organizationId) {
        Optional<Organization> opt = repository.findById(organizationId);
        simpleSourceBean.publishOrganizationChange("GET", organizationId);
        return (opt.isPresent()) ? opt.get() : null;
    }   //wont do anything }

Edited: Docker-compose file:

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - spring-cloud-network

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      - KAFKA_ADVERTISED_HOST_NAME=kafka
      - KAFKA_ADVERTISED_PORT=9092
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CREATE_TOPICS=dresses:1:1, ratings:1:1
      - KAFKA_SOCKET_REQUEST_MAX_BYTES=2000000000
      - KAFKA_HEAP_OPTS=-Xmx512M -Xmx5g
    volumes:
      - "/var/run/docker.sock:/var/run/docker.sock"
    depends_on:
      - zookeeper
    networks:
      - spring-cloud-network

  facultate:
    container_name: facultate
    build: C:\Users\marius\com.balabasciuc.springmicroservicesinaction\facultateservice
    restart: on-failure
    ports:
      - "1002:1002"
    environment:
      SPRING_CLOUD_CONFIG_URI: "http://config:7070"
      EUREKA_HOST: server
      EUREKA_PORT: 9001
      DATABASE_HOST: database
     # SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: kafka:9092
    depends_on:
      - kafka
      - zookeeper
      - server
    networks:
      - spring-cloud-network
volumes:
  simple:
    driver: local
networks:
  spring-cloud-network:
    driver: bridge

Spring Cloud producer prop:

spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.brokers=kafka
spring.cloud.stream.kafka.binder.zkNodes=zookeeper

Docker Logs:

facultate    | 2022-02-01 11:15:47.246  INFO 1 --- [           main] o.s.i.channel.PublishSubscribeChannel    : Channel 'application-1.errorChannel' has 1 subscriber(s).
facultate    | 2022-02-01 11:15:47.247  INFO 1 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started bean '_org.springframework.integration.errorLogger'
facultate    | 2022-02-01 11:15:47.251  INFO 1 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Creating binder: kafka
facultate    | 2022-02-01 11:15:47.876  INFO 1 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: kafka
facultate    | 2022-02-01 11:15:47.876  INFO 1 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: kafka
facultate    | 2022-02-01 11:15:48.138  INFO 1 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : Using kafka topic for outbound: orgChangeTopic
facultate    | 2022-02-01 11:15:48.150  INFO 1 --- [           main] o.a.k.clients.admin.AdminClientConfig    : AdminClientConfig values:
facultate    |  bootstrap.servers = [kafka:9092]
facultate    |  client.dns.lookup = use_all_dns_ips
facultate    |  client.id =
facultate    |  connections.max.idle.ms = 300000
facultate    |  default.api.timeout.ms = 60000
facultate    |  metadata.max.age.ms = 300000
facultate    |  metric.reporters = []
facultate    |  metrics.num.samples = 2
facultate    |  metrics.recording.level = INFO
facultate    |  metrics.sample.window.ms = 30000
facultate    |  receive.buffer.bytes = 65536
facultate    |  reconnect.backoff.max.ms = 1000
facultate    |  reconnect.backoff.ms = 50
facultate    |  request.timeout.ms = 30000
facultate    |  retries = 2147483647
facultate    |  retry.backoff.ms = 100
facultate    |  sasl.client.callback.handler.class = null
facultate    |  sasl.jaas.config = null
facultate    |  sasl.kerberos.kinit.cmd = /usr/bin/kinit
facultate    |  sasl.kerberos.min.time.before.relogin = 60000
facultate    |  sasl.kerberos.service.name = null
facultate    |  sasl.kerberos.ticket.renew.jitter = 0.05
facultate    |  sasl.kerberos.ticket.renew.window.factor = 0.8
facultate    |  sasl.login.callback.handler.class = null
facultate    |  sasl.login.class = null
facultate    |  sasl.login.refresh.buffer.seconds = 300
facultate    |  sasl.login.refresh.min.period.seconds = 60
facultate    |  sasl.login.refresh.window.factor = 0.8
facultate    |  sasl.login.refresh.window.jitter = 0.05
facultate    |  sasl.mechanism = GSSAPI
facultate    |  security.protocol = PLAINTEXT
facultate    |  security.providers = null
facultate    |  send.buffer.bytes = 131072
facultate    |  socket.connection.setup.timeout.max.ms = 30000
facultate    |  socket.connection.setup.timeout.ms = 10000
facultate    |  ssl.cipher.suites = null
facultate    |  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
facultate    |  ssl.endpoint.identification.algorithm = https
facultate    |  ssl.engine.factory.class = null
facultate    |  ssl.key.password = null
facultate    |  ssl.keymanager.algorithm = SunX509
facultate    |  ssl.keystore.certificate.chain = null
facultate    |  ssl.keystore.key = null
facultate    |  ssl.keystore.location = null
facultate    |  ssl.keystore.password = null
facultate    |  ssl.keystore.type = JKS
facultate    |  ssl.protocol = TLSv1.3
facultate    |  ssl.provider = null
facultate    |  ssl.secure.random.implementation = null
facultate    |  ssl.trustmanager.algorithm = PKIX
facultate    |  ssl.truststore.certificates = null
facultate    |  ssl.truststore.location = null
facultate    |  ssl.truststore.password = null
facultate    |  ssl.truststore.type = JKS
facultate    |
facultate    | 2022-02-01 11:15:48.614  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.0.0
facultate    | 2022-02-01 11:15:48.618  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 8cb0a5e9d3441962
facultate    | 2022-02-01 11:15:48.619  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1643714148612
facultate    | 2022-02-01 11:15:53.683  INFO 1 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser     : App info kafka.admin.client for adminclient-1 unregistered
facultate    | 2022-02-01 11:15:53.767  INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
facultate    | 2022-02-01 11:15:53.775  INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
facultate    | 2022-02-01 11:15:53.775  INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
facultate    | 2022-02-01 11:15:53.899  INFO 1 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values:
facultate    |  acks = 1
facultate    |  batch.size = 16384
facultate    |  bootstrap.servers = [kafka:9092]
facultate    |  buffer.memory = 33554432
facultate    |  client.dns.lookup = use_all_dns_ips
facultate    |  client.id = producer-1
facultate    |  compression.type = none
facultate    |  connections.max.idle.ms = 540000
facultate    |  delivery.timeout.ms = 120000
facultate    |  enable.idempotence = true
facultate    |  interceptor.classes = []
facultate    |  key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
facultate    |  linger.ms = 0
facultate    |  max.block.ms = 60000
facultate    |  max.in.flight.requests.per.connection = 5
facultate    |  max.request.size = 1048576
facultate    |  metadata.max.age.ms = 300000
facultate    |  metadata.max.idle.ms = 300000
facultate    |  metric.reporters = []
facultate    |  metrics.num.samples = 2
facultate    |  metrics.recording.level = INFO
facultate    |  metrics.sample.window.ms = 30000
facultate    |  partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
facultate    |  receive.buffer.bytes = 32768
facultate    |  reconnect.backoff.max.ms = 1000
facultate    |  reconnect.backoff.ms = 50
facultate    |  request.timeout.ms = 30000
facultate    |  retries = 2147483647
facultate    |  retry.backoff.ms = 100
facultate    |  sasl.client.callback.handler.class = null
facultate    |  sasl.jaas.config = null
facultate    |  sasl.kerberos.kinit.cmd = /usr/bin/kinit
facultate    |  sasl.kerberos.min.time.before.relogin = 60000
facultate    |  sasl.kerberos.service.name = null
facultate    |  sasl.kerberos.ticket.renew.jitter = 0.05
facultate    |  sasl.kerberos.ticket.renew.window.factor = 0.8
facultate    |  sasl.login.callback.handler.class = null
facultate    |  sasl.login.class = null
facultate    |  sasl.login.refresh.buffer.seconds = 300
facultate    |  sasl.login.refresh.min.period.seconds = 60
facultate    |  sasl.login.refresh.window.factor = 0.8
facultate    |  sasl.login.refresh.window.jitter = 0.05
facultate    |  sasl.mechanism = GSSAPI
facultate    |  security.protocol = PLAINTEXT
facultate    |  security.providers = null
facultate    |  send.buffer.bytes = 131072
facultate    |  socket.connection.setup.timeout.max.ms = 30000
facultate    |  socket.connection.setup.timeout.ms = 10000
facultate    |  ssl.cipher.suites = null
facultate    |  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
facultate    |  ssl.endpoint.identification.algorithm = https
facultate    |  ssl.engine.factory.class = null
facultate    |  ssl.key.password = null
facultate    |  ssl.keymanager.algorithm = SunX509
facultate    |  ssl.keystore.certificate.chain = null
facultate    |  ssl.keystore.key = null
facultate    |  ssl.keystore.location = null
facultate    |  ssl.keystore.password = null
facultate    |  ssl.keystore.type = JKS
facultate    |  ssl.protocol = TLSv1.3
facultate    |  ssl.provider = null
facultate    |  ssl.secure.random.implementation = null
facultate    |  ssl.trustmanager.algorithm = PKIX
facultate    |  ssl.truststore.certificates = null
facultate    |  ssl.truststore.location = null
facultate    |  ssl.truststore.password = null
facultate    |  ssl.truststore.type = JKS
facultate    |  transaction.timeout.ms = 60000
facultate    |  transactional.id = null
facultate    |  value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
facultate    |
facultate    | 2022-02-01 11:15:54.147  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.0.0
facultate    | 2022-02-01 11:15:54.148  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 8cb0a5e9d3441962
facultate    | 2022-02-01 11:15:54.148  INFO 1 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1643714154147
facultate    | 2022-02-01 11:15:54.293  INFO 1 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: RK0OLlwdRKK-oQ5dsWuHBw
facultate    | 2022-02-01 11:15:54.431  INFO 1 --- [           main] o.s.c.s.m.DirectWithAttributesChannel    : Channel 'application-1.output' has 1 subscriber(s).

Kafka logs container:

kafka        | [2022-02-01 10:57:12,073] INFO [Partition dresses-0 broker=1001] Log loaded for partition dresses-0 with initial high watermark 0 (kafka.cluster.Partition)
zookeeper    | 2022-02-01 10:57:09,689 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x100001f44830000 type:multi cxid:0x4e zxid:0x31 txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/prefer
red_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka        | [2022-02-01 11:15:52,573] INFO Creating topic orgChangeTopic with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
kafka        | [2022-02-01 11:15:53,447] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(orgChangeTopic-0) (kafka.server.ReplicaFetcherManager)
kafka        | [2022-02-01 11:15:53,484] INFO [Log partition=orgChangeTopic-0, dir=/kafka/kafka-logs-bde9c032b736] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka        | [2022-02-01 11:15:53,488] INFO Created log for partition orgChangeTopic-0 in /kafka/kafka-logs-bde9c032b736/orgChangeTopic-0 with properties {} (kafka.log.LogManager)
kafka        | [2022-02-01 11:15:53,505] INFO [Partition orgChangeTopic-0 broker=1001] No checkpointed highwatermark is found for partition orgChangeTopic-0 (kafka.cluster.Partition)

If i post something to generate a message being produced for kafka, nothing appears in the logs... or in kafka

MyProblems
  • 69
  • 1
  • 10
  • 1) Kafka nor Zookeeper are HTTP servives. Why have you added `http://` to the properties? 2) As I [have answered to you before](https://stackoverflow.com/questions/66834379/spring-cloud-stream-kafka-with-microservices-and-docker-compose-error)... [You shouldn't be using IP addresses to connect to Docker containers](https://stackoverflow.com/questions/51630260/connect-to-kafka-running-in-docker) on the same host 3) You should have error logs in your client for connection issues – OneCricketeer Jan 24 '22 at 20:58
  • Also, `listeners` and `advertised.listeners` are not valid environment variables, so I suggest going back to what you had in your previous questions, where the compose file and spring properties were more correct – OneCricketeer Jan 24 '22 at 21:07
  • Hello @OneCricketeer and thanks for the answer, i managed to connect producer with kafka and zookeeper docker containers, but when i do something with my spring boot app to produce a message to kafka, i get nothing, just: [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='byte[164]' to topic orgChangeTopic: org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for orgChangeTopic-0:120000 ms has passed since batch creation – MyProblems Jan 25 '22 at 18:07
  • and nothing else, not the message i expected... i didn't start my consumer yet but i should see the producer message to console, i guess... i can show logs from kafka and spring cloud app if it s needed – MyProblems Jan 25 '22 at 18:08
  • That message means you've not sent enough data for the producer batch to get filled, so it sits in memory. In other words, you need to call `.flush()` on the producer, or whatever the equivalent is for cloud-stream – OneCricketeer Jan 25 '22 at 18:50
  • Hello @OneCricketeer and thanks for your time to answering to my stupid questions... i edited this post to see what my final locks like, but not working for a way of another... I was decreased request.timeout and batch.size to not calling flush explicitly, and it worked then, after that, i think i somehow "break" the entire thing trying to add slk to the services and doesn't work anymore, last 2 days i was optimistic about getting the thing done, now...i was trying with confluentic image, any error to see if i did something wrong, not a clue... please, can take a look and give an advice? – MyProblems Feb 01 '22 at 11:18
  • 1
    Your configs look better now. I'm not sure I understand the problem anymore; what is "slk"? Changing a docker image shouldn't fix anything. – OneCricketeer Feb 01 '22 at 14:00
  • Hey, it was another component, problem is if i do something in my app resulting to sending a message in kafka (Making a Post Endpoint for ex) doesn't end in the expected behaviour... i wait for the message to be published but it's not, and i don't know why or what i'm doing wrong. I know it worked when that batch size thing, but now, even if i expect to pass those 120000 ms to get an error (not enough bytes) like the previous error - nothing... don't know from where to start or what to do in this regard – MyProblems Feb 01 '22 at 14:45
  • 1
    hey @OneCricketeer it works... sorry for wasted your time with me and thanks a lot for your patience, i'm just stupid sometime – MyProblems Feb 01 '22 at 19:14
  • Feel free to post an answer below that details what you've changed – OneCricketeer Feb 02 '22 at 15:23
  • 1
    Hei... it was basically just enabling the Debug logging for Spring cloud app, with this i can see the messages being posted from producer 'logging: level: com.netflix: WARN org.springframework.web: WARN com.project.springmicroservicesinaction: DEBUG' – MyProblems Feb 03 '22 at 18:00

1 Answers1

0

The annotation-based programming model (things such as EnableBinding, Source, Sink etc.) is deprecated in Spring Cloud Stream and going to be completely removed in the upcoming 4.0 line of the framework.

You can add a source using the functional style as below:

public Supplier< OrganizationChangeModel> source() {
  return () -> {
    //return OrganizationChangeModel here. 
  }
}

The binding name becomes source-out-0 in this case since the supplier method is named as source.

However, in your case, since you want to programmatically publish using the REST endpoint, I suggest you use the StreamBridge API for doing that. See the StreamBridge docs for more details on that. The basic idea is that you use the send method of StreamBridge to publish data through an output binding.

A lot of examples in the Spring Cloud Stream samples repository use this version of the dockerized Kafka. You might want to compare how it is set up there.

sobychacko
  • 5,099
  • 15
  • 26