5

I am trying to use robinhood / faust but without success!

I have already created a producer that inserts in the original topic, in my confluent-kafka localhost instance, successfully!

but the faust is unable to connect to localhost.

My app.py:

import faust
import base64
import random
from datetime import datetime


SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"

app = faust.App("messages-stream", 
    broker="kafka://"+'localhost:9092',
    topic_partitions=1,
    store="memory://")

class OriginalMessage(faust.Record):
    msg: str


class TransformedMessage(faust.Record):
    msg_id: int
    msg_data: str
    msg_base64: str
    created_at: float 
    source_topic: str
    target_topic: str
    deleted: bool

topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)

table = app.Table(
    "output_msgs",
    default=TransformedMessage,
    partitions=1,
    changelog_topic=out_topic,
)

print('Initializing Thread Processor...')


@app.agent(topic)
async def transformedmessage(messageevents):
    async for transfmessage in messageevents:
        try:

            table[transfmessage.msg_id] = random.randint(1, 999999)
            table[transfmessage.msg_data] = transfmessage.msg
            table[transfmessage.msg_base64] = base64.b64encode(transfmessage.msg)
            table[transfmessage.created_at] = datetime.now().isoformat()
            table[transfmessage.source_topic] = SOURCE_TOPIC
            table[transfmessage.target_topic] = TARGET_TOPIC
            table[transfmessage.deleted] = False

        except Exception as e:
            print(f"Error: {e}")


if __name__ == "__main__":
    app.main()

Error

[2020-01-24 18:05:36,910] [55712] [ERROR] Unable connect to node with id 1: [Errno 8] nodename nor servname provided, or not known 
[2020-01-24 18:05:36,910] [55712] [ERROR] [^Worker]: Error: ConnectionError('No connection to node with id 1') 

    "No connection to node with id {}".format(node_id))
kafka.errors.ConnectionError: ConnectionError: No connection to node with id 1

I'm running with: faust -A app worker -l debug

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
FelipeAgger
  • 51
  • 1
  • 5
  • I decided to go up the full stack of the confluent-kafka and solved it. [confluent-kafka all in one](https://github.com/confluentinc/examples/tree/5.4.0-post/cp-all-in-one) – FelipeAgger Jan 25 '20 at 19:42

1 Answers1

0

I encountered this error, and luckily I was somewhat anticipating it so it wasn't too hard to figure out the problem.

In Confluent, you have to configure the domain that should be used to reach all the Kafka brokers that are bootstrapped for you. I didn't really know how important the domain was going to be so I just put something random in until I got stuck.

Of course I got stuck here like you did, so I fired up Wireshark to see what was happening between Faust and the bootstrap server. It turns out a bootstrap conversation goes something like this:

..............faust-1.10.4..PLAIN <-- client name
................PLAIN             <-- authentication protocol
....foobar.foobar.foobar2000.     <-- credentials!
b0.svs.cluster.local..#.......b1.svs.cluster.local..# <-- individual Kafka brokers

These follow the pattern of the domain names I chose and that are described in Confluent documentation: https://docs.confluent.io/current/installation/operator/co-endpoints.html

If these names do not resolve, you get the vague error here because despite bootstrapping successfully, the Kafka client could not hence actually connect to the endpoints. So, pick a domain you actually control, or put the answers you need in your local /etc/hosts or equivalent file.

17  123.111.222.1 b0.svs.cluster.local
18  123.111.222.2 b1.svs.cluster.local

After restarting Faust, the bootstrap and Kafka connection succeeded.

deed02392
  • 4,799
  • 2
  • 31
  • 48