i started connected my spring cloud app with kafka and zookeeper hosted in docker-compose file, they're connected, but when i run the application and expect the Producer to publish a message, get nothing... i don't know what the error is, i followed this example: https://github.com/ihuaylupo/manning-smia/tree/master/chapter10/licensing-service/src/main/java/com/optimagrowth/license/events
Workflow is that from the github repo example, when i call and endPoint Post on a Service, for example, i wish to my kafka producer to publish a message on topic and Kafka consumer to consume that message, but everything works - Endpoint execute against DB, except kafka producing and consuming messaging, not even an error to know where i did wrong...
Docker-compose file:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafkaserver:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 #kafka 192.168.99.100 #kafka - ip because i want to access kafka and zookeeper from outside of containers, i.e localhost
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS=dresses:1:1, ratings:1:1
- KAFKA_SOCKET_REQUEST_MAX_BYTES=2000000000
- KAFKA_HEAP_OPTS=-Xmx512M -Xmx5g
- listeners=PLAINTEXT://:9092
- advertised.listeners=PLAINTEXT://192.168.99.100:9092
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
depends_on:
- zookeeper
Kafka consumer spring cloud properties:
spring.cloud.stream.bindings.inboundOrgChanges.destination=orgChangeTopic
spring.cloud.stream.bindings.inboundOrgChanges.content-type=application/json
spring.cloud.stream.bindings.inboundOrgChanges.group=studentsGroup
spring.cloud.stream.kafka.binder.brokers=localhost #kafka
spring.cloud.stream.kafka.binder.zkNodes=localhost
Kafka producer spring cloud properties:
spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.zkNodes=http://192.168.99.100:2181
spring.cloud.stream.kafka.binder.brokers=http://192.168.99.100:9092
Kafka Producer...
@EnableBinding(Source.class) // - on my main Spring app
@Component
public class SimpleSourceBean {
private Source source;
private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);
@Autowired
public SimpleSourceBean(Source source){
this.source = source;
}
public void publishOrganizationChange(String action, String organizationId){
logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
OrganizationChangeModel change = new OrganizationChangeModel(
OrganizationChangeModel.class.getTypeName(),
action,
organizationId,
UserContext.getCorrelationId());
source.output().send(MessageBuilder.withPayload(change).build());
}
}
@Getter @Setter @ToString
public class OrganizationChangeModel {
private String type;
private String action;
private String organizationId;
private String correlationId;
public OrganizationChangeModel(String type, String action, String organizationId, String correlationId) {
super();
this.type = type;
this.action = action;
this.organizationId = organizationId;
this.correlationId = correlationId;
}
}
@Service class ServiceEx {
@Autowired
SimpleSourceBean simpleSourceBean;
public Organization findById(String organizationId) {
Optional<Organization> opt = repository.findById(organizationId);
simpleSourceBean.publishOrganizationChange("GET", organizationId);
return (opt.isPresent()) ? opt.get() : null;
} //wont do anything }
Edited: Docker-compose file:
version: '3'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- spring-cloud-network
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS=dresses:1:1, ratings:1:1
- KAFKA_SOCKET_REQUEST_MAX_BYTES=2000000000
- KAFKA_HEAP_OPTS=-Xmx512M -Xmx5g
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
depends_on:
- zookeeper
networks:
- spring-cloud-network
facultate:
container_name: facultate
build: C:\Users\marius\com.balabasciuc.springmicroservicesinaction\facultateservice
restart: on-failure
ports:
- "1002:1002"
environment:
SPRING_CLOUD_CONFIG_URI: "http://config:7070"
EUREKA_HOST: server
EUREKA_PORT: 9001
DATABASE_HOST: database
# SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS: kafka:9092
depends_on:
- kafka
- zookeeper
- server
networks:
- spring-cloud-network
volumes:
simple:
driver: local
networks:
spring-cloud-network:
driver: bridge
Spring Cloud producer prop:
spring.cloud.stream.bindings.output.destination=orgChangeTopic
spring.cloud.stream.bindings.output.content-type=application/json
spring.cloud.stream.kafka.binder.brokers=kafka
spring.cloud.stream.kafka.binder.zkNodes=zookeeper
Docker Logs:
facultate | 2022-02-01 11:15:47.246 INFO 1 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application-1.errorChannel' has 1 subscriber(s).
facultate | 2022-02-01 11:15:47.247 INFO 1 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
facultate | 2022-02-01 11:15:47.251 INFO 1 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Creating binder: kafka
facultate | 2022-02-01 11:15:47.876 INFO 1 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Caching the binder: kafka
facultate | 2022-02-01 11:15:47.876 INFO 1 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Retrieving cached binder: kafka
facultate | 2022-02-01 11:15:48.138 INFO 1 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: orgChangeTopic
facultate | 2022-02-01 11:15:48.150 INFO 1 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
facultate | bootstrap.servers = [kafka:9092]
facultate | client.dns.lookup = use_all_dns_ips
facultate | client.id =
facultate | connections.max.idle.ms = 300000
facultate | default.api.timeout.ms = 60000
facultate | metadata.max.age.ms = 300000
facultate | metric.reporters = []
facultate | metrics.num.samples = 2
facultate | metrics.recording.level = INFO
facultate | metrics.sample.window.ms = 30000
facultate | receive.buffer.bytes = 65536
facultate | reconnect.backoff.max.ms = 1000
facultate | reconnect.backoff.ms = 50
facultate | request.timeout.ms = 30000
facultate | retries = 2147483647
facultate | retry.backoff.ms = 100
facultate | sasl.client.callback.handler.class = null
facultate | sasl.jaas.config = null
facultate | sasl.kerberos.kinit.cmd = /usr/bin/kinit
facultate | sasl.kerberos.min.time.before.relogin = 60000
facultate | sasl.kerberos.service.name = null
facultate | sasl.kerberos.ticket.renew.jitter = 0.05
facultate | sasl.kerberos.ticket.renew.window.factor = 0.8
facultate | sasl.login.callback.handler.class = null
facultate | sasl.login.class = null
facultate | sasl.login.refresh.buffer.seconds = 300
facultate | sasl.login.refresh.min.period.seconds = 60
facultate | sasl.login.refresh.window.factor = 0.8
facultate | sasl.login.refresh.window.jitter = 0.05
facultate | sasl.mechanism = GSSAPI
facultate | security.protocol = PLAINTEXT
facultate | security.providers = null
facultate | send.buffer.bytes = 131072
facultate | socket.connection.setup.timeout.max.ms = 30000
facultate | socket.connection.setup.timeout.ms = 10000
facultate | ssl.cipher.suites = null
facultate | ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
facultate | ssl.endpoint.identification.algorithm = https
facultate | ssl.engine.factory.class = null
facultate | ssl.key.password = null
facultate | ssl.keymanager.algorithm = SunX509
facultate | ssl.keystore.certificate.chain = null
facultate | ssl.keystore.key = null
facultate | ssl.keystore.location = null
facultate | ssl.keystore.password = null
facultate | ssl.keystore.type = JKS
facultate | ssl.protocol = TLSv1.3
facultate | ssl.provider = null
facultate | ssl.secure.random.implementation = null
facultate | ssl.trustmanager.algorithm = PKIX
facultate | ssl.truststore.certificates = null
facultate | ssl.truststore.location = null
facultate | ssl.truststore.password = null
facultate | ssl.truststore.type = JKS
facultate |
facultate | 2022-02-01 11:15:48.614 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.0.0
facultate | 2022-02-01 11:15:48.618 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8cb0a5e9d3441962
facultate | 2022-02-01 11:15:48.619 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1643714148612
facultate | 2022-02-01 11:15:53.683 INFO 1 --- [| adminclient-1] o.a.kafka.common.utils.AppInfoParser : App info kafka.admin.client for adminclient-1 unregistered
facultate | 2022-02-01 11:15:53.767 INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
facultate | 2022-02-01 11:15:53.775 INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
facultate | 2022-02-01 11:15:53.775 INFO 1 --- [| adminclient-1] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
facultate | 2022-02-01 11:15:53.899 INFO 1 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
facultate | acks = 1
facultate | batch.size = 16384
facultate | bootstrap.servers = [kafka:9092]
facultate | buffer.memory = 33554432
facultate | client.dns.lookup = use_all_dns_ips
facultate | client.id = producer-1
facultate | compression.type = none
facultate | connections.max.idle.ms = 540000
facultate | delivery.timeout.ms = 120000
facultate | enable.idempotence = true
facultate | interceptor.classes = []
facultate | key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
facultate | linger.ms = 0
facultate | max.block.ms = 60000
facultate | max.in.flight.requests.per.connection = 5
facultate | max.request.size = 1048576
facultate | metadata.max.age.ms = 300000
facultate | metadata.max.idle.ms = 300000
facultate | metric.reporters = []
facultate | metrics.num.samples = 2
facultate | metrics.recording.level = INFO
facultate | metrics.sample.window.ms = 30000
facultate | partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
facultate | receive.buffer.bytes = 32768
facultate | reconnect.backoff.max.ms = 1000
facultate | reconnect.backoff.ms = 50
facultate | request.timeout.ms = 30000
facultate | retries = 2147483647
facultate | retry.backoff.ms = 100
facultate | sasl.client.callback.handler.class = null
facultate | sasl.jaas.config = null
facultate | sasl.kerberos.kinit.cmd = /usr/bin/kinit
facultate | sasl.kerberos.min.time.before.relogin = 60000
facultate | sasl.kerberos.service.name = null
facultate | sasl.kerberos.ticket.renew.jitter = 0.05
facultate | sasl.kerberos.ticket.renew.window.factor = 0.8
facultate | sasl.login.callback.handler.class = null
facultate | sasl.login.class = null
facultate | sasl.login.refresh.buffer.seconds = 300
facultate | sasl.login.refresh.min.period.seconds = 60
facultate | sasl.login.refresh.window.factor = 0.8
facultate | sasl.login.refresh.window.jitter = 0.05
facultate | sasl.mechanism = GSSAPI
facultate | security.protocol = PLAINTEXT
facultate | security.providers = null
facultate | send.buffer.bytes = 131072
facultate | socket.connection.setup.timeout.max.ms = 30000
facultate | socket.connection.setup.timeout.ms = 10000
facultate | ssl.cipher.suites = null
facultate | ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
facultate | ssl.endpoint.identification.algorithm = https
facultate | ssl.engine.factory.class = null
facultate | ssl.key.password = null
facultate | ssl.keymanager.algorithm = SunX509
facultate | ssl.keystore.certificate.chain = null
facultate | ssl.keystore.key = null
facultate | ssl.keystore.location = null
facultate | ssl.keystore.password = null
facultate | ssl.keystore.type = JKS
facultate | ssl.protocol = TLSv1.3
facultate | ssl.provider = null
facultate | ssl.secure.random.implementation = null
facultate | ssl.trustmanager.algorithm = PKIX
facultate | ssl.truststore.certificates = null
facultate | ssl.truststore.location = null
facultate | ssl.truststore.password = null
facultate | ssl.truststore.type = JKS
facultate | transaction.timeout.ms = 60000
facultate | transactional.id = null
facultate | value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
facultate |
facultate | 2022-02-01 11:15:54.147 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.0.0
facultate | 2022-02-01 11:15:54.148 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 8cb0a5e9d3441962
facultate | 2022-02-01 11:15:54.148 INFO 1 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1643714154147
facultate | 2022-02-01 11:15:54.293 INFO 1 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: RK0OLlwdRKK-oQ5dsWuHBw
facultate | 2022-02-01 11:15:54.431 INFO 1 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application-1.output' has 1 subscriber(s).
Kafka logs container:
kafka | [2022-02-01 10:57:12,073] INFO [Partition dresses-0 broker=1001] Log loaded for partition dresses-0 with initial high watermark 0 (kafka.cluster.Partition)
zookeeper | 2022-02-01 10:57:09,689 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x100001f44830000 type:multi cxid:0x4e zxid:0x31 txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/prefer
red_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
kafka | [2022-02-01 11:15:52,573] INFO Creating topic orgChangeTopic with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
kafka | [2022-02-01 11:15:53,447] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(orgChangeTopic-0) (kafka.server.ReplicaFetcherManager)
kafka | [2022-02-01 11:15:53,484] INFO [Log partition=orgChangeTopic-0, dir=/kafka/kafka-logs-bde9c032b736] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka | [2022-02-01 11:15:53,488] INFO Created log for partition orgChangeTopic-0 in /kafka/kafka-logs-bde9c032b736/orgChangeTopic-0 with properties {} (kafka.log.LogManager)
kafka | [2022-02-01 11:15:53,505] INFO [Partition orgChangeTopic-0 broker=1001] No checkpointed highwatermark is found for partition orgChangeTopic-0 (kafka.cluster.Partition)
If i post something to generate a message being produced for kafka, nothing appears in the logs... or in kafka