0

I am trying to connect one server(111.111.111.11) where I have Kafka configurated, to another server(222.222.222.22) where I am using python to connect with Kafka.

If I use my application in the same host where Kafka is installed I have no problems to make a successful connection. But I am trying the connection from the external host and I can't connect.

This is my docker-compose.yml where Kafka is running:

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.2
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.2
    container_name: broker
    ports:
    # To learn about configuring Kafka for access across networks see
    # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
      - "9092:9092"
      - "9093:9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT,PLAINTEXT_EXTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092,PLAINTEXT_EXTERNAL://222.222.222.22:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

And this is my app.py python code from the external host to start listening Kafka:

import json, time, requests
from flask import Flask, render_template

from kafka import KafkaConsumer
from kafka import KafkaProducer

import threading, queue

from src.messages import processMessage

ORDER_KAFKA_TOPIC = "senderTopic"
ORDER_CONFIRMED_KAFKA_TOPIC = "receiverTopic"

app = Flask(__name__)

q = queue.Queue()

producer = KafkaProducer(
    bootstrap_servers="111.111.111.11:9093"
)

consumer = KafkaConsumer(
    ORDER_KAFKA_TOPIC,
    bootstrap_servers="111.111.111.11:9093"
)


def worker():
    while True:
        message = q.get()
        consumed_message = json.loads(message.value.decode())
        message_id = consumed_message['data_id']
        ts = int(time.time())*1000

        print(f"{ts}:[SUCCESS]:[{message_id}]:Successful message received...")

        processMessage(consumed_message)
        q.task_done()



def load_consumer():
    print("Gonna start listening...")
    for message in consumer:
        try:
            q.put(message)
            
        except Exception as e:
            pass

@app.before_first_request
def launch_consumer():
    t = threading.Thread(target=worker)
    t.start()


@app.route('/', methods=["GET","POST"])
def index():
    return render_template('index.html')

launch_consumer()
load_consumer()
q.join()

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

So, when I try to run this python code in my external host I receive NoBrokersAvailable but if I run it in the server where Kafka is hosted it works.

Can you help me please?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • Does this answer your question? [Connect to Kafka running in Docker](https://stackoverflow.com/questions/51630260/connect-to-kafka-running-in-docker) – OneCricketeer Mar 30 '23 at 15:53
  • Sounds to be like you have a firewall that is blocking the requests. You should check `nc -vz 222.222.222.222 9093` can connect from the external machine. (This your advertised address, so why are you using `111.111.111.111` in the code if that is not the server that is running Kafka?) – OneCricketeer Mar 30 '23 at 15:55

0 Answers0