I found a solution to how to send data to kafka topics using Faust, but I don't really understand how it works.
There are several methods for this in Faust: send(), cast(), ask_nowait(), ask()
. In the documentation they are called RPC operations.
After creating the sending task, you need to run the Faust application in the mode Client-Only Mode. (start_client(), maybe_start_client()
)
The following code (the produce() function) demonstrates their application (pay attention to the comments):
import asyncio
import faust
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
result_topic = app.topic('result-topic', value_type=str)
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
s = f'Hello from {greeting.from_name} to {greeting.to_name}'
print(s)
yield s
async def produce(to_name):
# send - universal method for sending data to a topic
await hello.send(value=Greeting(from_name='SEND', to_name=to_name), force=True)
await app.maybe_start_client()
print('SEND')
# cast - allows you to send data without waiting for a response from the agent
await hello.cast(value=Greeting(from_name='CAST', to_name=to_name))
await app.maybe_start_client()
print('CAST')
# ask_nowait - it seems to be similar to cast
p = await hello.ask_nowait(
value=Greeting(from_name='ASK_NOWAIT', to_name=to_name),
force=True,
reply_to=result_topic
)
# without this line, ask_nowait will not work; taken from the ask implementation
await app._reply_consumer.add(p.correlation_id, p)
await app.maybe_start_client()
print(f'ASK_NOWAIT: {p.correlation_id}')
# blocks the execution flow
# p = await hello.ask(value=Greeting(from_name='ASK', to_name=to_name), reply_to=result_topic)
# print(f'ASK: {p.correlation_id}')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(produce('Faust'))
Starting Fast worker with the command faust -A <example> worker
Then we can launch the client part of the application and check that everything is working: python <example.py>
<example.py> output:
SEND
CAST
ASK_NOWAIT: bbbe6795-5a99-40e5-a7ad-a9af544efd55
It is worth noting that you will also see a traceback of some error that occurred after delivery, which does not interfere with the program (it seems so)
Faust worker output:
[2022-07-19 12:06:27,959] [1140] [WARNING] Hello from SEND to Faust
[2022-07-19 12:06:27,960] [1140] [WARNING] Hello from CAST to Faust
[2022-07-19 12:06:27,962] [1140] [WARNING] Hello from ASK_NOWAIT to Faust
I don't understand why it works this way, why it's so difficult and why very little is written about in the documentation .