I am trying to connect one server(111.111.111.11) where I have Kafka configurated, to another server(222.222.222.22) where I am using python to connect with Kafka.
If I use my application in the same host where Kafka is installed I have no problems to make a successful connection. But I am trying the connection from the external host and I can't connect.
This is my docker-compose.yml
where Kafka is running:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.2
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.2
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
- "9093:9093"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT,PLAINTEXT_EXTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092,PLAINTEXT_EXTERNAL://222.222.222.22:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
And this is my app.py python code from the external host to start listening Kafka:
import json, time, requests
from flask import Flask, render_template
from kafka import KafkaConsumer
from kafka import KafkaProducer
import threading, queue
from src.messages import processMessage
ORDER_KAFKA_TOPIC = "senderTopic"
ORDER_CONFIRMED_KAFKA_TOPIC = "receiverTopic"
app = Flask(__name__)
q = queue.Queue()
producer = KafkaProducer(
bootstrap_servers="111.111.111.11:9093"
)
consumer = KafkaConsumer(
ORDER_KAFKA_TOPIC,
bootstrap_servers="111.111.111.11:9093"
)
def worker():
while True:
message = q.get()
consumed_message = json.loads(message.value.decode())
message_id = consumed_message['data_id']
ts = int(time.time())*1000
print(f"{ts}:[SUCCESS]:[{message_id}]:Successful message received...")
processMessage(consumed_message)
q.task_done()
def load_consumer():
print("Gonna start listening...")
for message in consumer:
try:
q.put(message)
except Exception as e:
pass
@app.before_first_request
def launch_consumer():
t = threading.Thread(target=worker)
t.start()
@app.route('/', methods=["GET","POST"])
def index():
return render_template('index.html')
launch_consumer()
load_consumer()
q.join()
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
So, when I try to run this python code in my external host I receive NoBrokersAvailable
but if I run it in the server where Kafka is hosted it works.
Can you help me please?