12

I'm curious about how you are supposed to express that you want a message delivered to a Kafka topic in faust. The example in their readme doesn't seem to write to a topic:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

I would expect hello.send in the above code to publish a message to the topic, but it doesn't appear to.

There are many examples of reading from topics, and many examples of using the cli to push an ad-hoc message. After combing through the docs, I don't see any clear examples of publishing to topics in code. Am I just being crazy and the above code should work?

melchoir55
  • 6,842
  • 7
  • 60
  • 106
  • 1
    `hello.send` is part of `asyncio`, I think, not a Faust function... Faust is primarily for stream processing, not stream "producing". Meaning you'd already have data in the topic, and Kafka Streams works similarly – OneCricketeer Jul 05 '19 at 03:15
  • 2
    the example-sender above really does publish data to the 'hello-topic'. You can check this with the kafka-console-consumer. – user152468 Aug 07 '19 at 14:38

6 Answers6

7

The send() function is the correct one to call to write to topics. You can even specify a particular partition, just like the equivalent Java API call.

Here is the reference for the send() method:

https://faust.readthedocs.io/en/latest/reference/faust.topics.html#faust.topics.Topic.send

deed02392
  • 4,799
  • 2
  • 31
  • 48
6

You can use sink to tell Faust where to deliver the results of an agent function. You can also use multiple topics as sinks at once if you want.

@app.agent(topic_to_read_from, sink=[destination_topic])
async def fetch(records):
    async for record in records:
        result = do_something(record)
        yield result
BWStearns
  • 2,567
  • 2
  • 19
  • 33
  • Hello, I like the `sink` argument in the agent decorator. But how do you test it? I can only unit test this by mocking the topic if I use directly the send in the agent. – Thomas Mar 31 '20 at 15:10
  • 1
    Hm. Maybe make a faust App and Agent in the test that listens to the sink topic and verify the output there? The docs have some recommendations https://faust.readthedocs.io/en/latest/userguide/testing.html#testing-that-an-agent-sends-to-topic-calls-another-agent and https://faust.readthedocs.io/en/latest/userguide/livecheck.html – BWStearns Mar 31 '20 at 18:19
  • I finally end up using and testing what is output in the yield in unit test. In integration test I can test what is output in the kafka topic – Thomas May 25 '20 at 15:23
2

If you want a Faust producer only (not combined with a consumer/sink), the original question actually has the right bit of code, here's a fully functional script that publishes messages to a 'faust_test' Kafka topic that is consumable by any Kafka/Faust consumer.

Run the code below like this: python faust_producer.py worker

"""Simple Faust Producer"""
import faust

if __name__ == '__main__':
    """Simple Faust Producer"""

    # Create the Faust App
    app = faust.App('faust_test_app', broker='localhost:9092')
    topic = app.topic('faust_test')

    # Send messages
    @app.timer(interval=1.0)
    async def send_message(message):
        await topic.send(value='my message')

    # Start the Faust App
    app.main()
Brian Wylie
  • 2,347
  • 28
  • 29
0

So we just ran into the need to send a message to a topic other than the sink topics.

The easiest way we found was: foo = await my_topic.send_soon(value="wtfm8").

You can also use send directly like below using the asyncio event loop.

loop = asyncio.get_event_loop()
foo = await ttopic.send(value="wtfm8??")
loop.run_until_complete(foo)
BWStearns
  • 2,567
  • 2
  • 19
  • 33
0

Dont know how relevant this is anymore but I came across this issue when trying to learn Faust. From what I read, here is what is happening:

topic = app.topic('hello-topic', value_type=Greeting)

The misconception here is that the topic you have created is the topic you are trying to consume/read from. The topic you created currently does not do anything.

await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

this essentially creates an intermediate kstream which sends the values to your hello(greetings) function. def hello(...) will be called when there is a new message to the stream and will process the message that is being sent.

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

This is receiving the kafka stream from hello.send(...) and simply printing it to the console (no output to the 'topic' created). This is where you can send a message to a new topic. so instead of printing you can do:

topic.send(value = "my message!")

Alternatively:

Here is what you are doing:

  1. example_sender() sends a message to hello(...) (through intermediate kstream)
  2. hello(...) picks up the message and prints it NOTICE: no sending of messages to the correct topic

Here is what you can do:

  1. example_sender() sends a message to hello(...) (through intermediate kstream)

  2. hello(...) picks up the message and prints

  3. hello(...) ALSO sends a new message to the topic created(assuming you are trying to transform the original data)

     app = faust.App('hello-app', broker='kafka://localhost')
     topic = app.topic('hello-topic', value_type=Greeting)
     output_topic = app.topic('test_output_faust', value_type=str)
    
     @app.agent(topic)
     async def hello(greetings):
         async for greeting in greetings:
             new_message = f'Hello from {greeting.from_name} to {greeting.to_name}'
             print(new_message)
             await output_topic.send(value=new_message)
    
Ethan Kulla
  • 333
  • 2
  • 9
-1

I found a solution to how to send data to kafka topics using Faust, but I don't really understand how it works.

There are several methods for this in Faust: send(), cast(), ask_nowait(), ask(). In the documentation they are called RPC operations.

After creating the sending task, you need to run the Faust application in the mode Client-Only Mode. (start_client(), maybe_start_client())

The following code (the produce() function) demonstrates their application (pay attention to the comments):

import asyncio

import faust


class Greeting(faust.Record):
    from_name: str
    to_name: str


app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
result_topic = app.topic('result-topic', value_type=str)


@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        s = f'Hello from {greeting.from_name} to {greeting.to_name}'
        print(s)
        yield s


async def produce(to_name):
    # send - universal method for sending data to a topic
    await hello.send(value=Greeting(from_name='SEND', to_name=to_name), force=True)
    await app.maybe_start_client()
    print('SEND')

    # cast - allows you to send data without waiting for a response from the agent
    await hello.cast(value=Greeting(from_name='CAST', to_name=to_name))
    await app.maybe_start_client()
    print('CAST')

    # ask_nowait - it seems to be similar to cast
    p = await hello.ask_nowait(
        value=Greeting(from_name='ASK_NOWAIT', to_name=to_name),
        force=True,
        reply_to=result_topic
    )
    # without this line, ask_nowait will not work; taken from the ask implementation
    await app._reply_consumer.add(p.correlation_id, p)
    await app.maybe_start_client()
    print(f'ASK_NOWAIT: {p.correlation_id}')

    # blocks the execution flow
    # p = await hello.ask(value=Greeting(from_name='ASK', to_name=to_name), reply_to=result_topic)
    # print(f'ASK: {p.correlation_id}')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(produce('Faust'))

Starting Fast worker with the command faust -A <example> worker

Then we can launch the client part of the application and check that everything is working: python <example.py>

<example.py> output:

SEND
CAST
ASK_NOWAIT: bbbe6795-5a99-40e5-a7ad-a9af544efd55

It is worth noting that you will also see a traceback of some error that occurred after delivery, which does not interfere with the program (it seems so)

Faust worker output:

[2022-07-19 12:06:27,959] [1140] [WARNING] Hello from SEND to Faust 
[2022-07-19 12:06:27,960] [1140] [WARNING] Hello from CAST to Faust 
[2022-07-19 12:06:27,962] [1140] [WARNING] Hello from ASK_NOWAIT to Faust 

I don't understand why it works this way, why it's so difficult and why very little is written about in the documentation .

belo4ya
  • 1
  • 1