Questions tagged [faust]

Faust is a stream processing library, porting the ideas from Kafka Streams to Python.

Faust is used to build high performance distributed systems and real-time data pipelines that process billions of events every day.

Faust provides both stream processing and event processing, sharing similarity with tools such as Kafka Streams, Apache Spark/Storm/Samza/Flink,

It does not use a DSL, it’s just Python! This means you can use all your favorite Python libraries when stream processing: NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy, ++

Faust requires Python 3.6 or later for the new async/await syntax, and variable type annotations.

Repository: https://github.com/robinhood/faust

96 questions
22
votes
1 answer

How to integrate Faust with Django?

I am trying to integrate Faust with Django to publish the messages to Kafka. Here is the example in Faust repo: https://github.com/robinhood/faust/tree/master/examples/django I modified it a bit, and created views to push data to Kafka via…
Maverick
  • 2,738
  • 24
  • 91
  • 157
12
votes
3 answers

How to connect kafka topic with web endpoint using Faust Python package?

I have a simple app, with two functions, one for listening to topic and other for web endpoint. I want to create server side event streaming (SSE) i.e text/event-stream, so that on client end I could listen to it using EventSource. I have the…
Maverick
  • 2,738
  • 24
  • 91
  • 157
12
votes
6 answers

Faust example of publishing to a kafka topic

I'm curious about how you are supposed to express that you want a message delivered to a Kafka topic in faust. The example in their readme doesn't seem to write to a topic: import faust class Greeting(faust.Record): from_name: str to_name:…
melchoir55
  • 6,842
  • 7
  • 60
  • 106
6
votes
2 answers

How to test a Faust agent that sends data to a sink?

I am trying to write unit tests using pytest for my Faust application. I have referred to the documentation here but it does not mention what to do when my Faust agent is sending data to a sink. Without a sink, my tests are working fine but when I…
5
votes
1 answer

Difference between Faust vs Kafka-python

I could not find any answer to this: what is the difference between Faust and kafka-python? Is there any pros/cons on preferring any one of them? As I understand it: Kafka is written in Java, and Kafka-python is a Python client to communicate with…
Pavol
  • 57
  • 1
  • 3
5
votes
1 answer

Unable connect to node with id 1: [Worker]: Error: ConnectionError('No connection to node with id')

I am trying to use robinhood / faust but without success! I have already created a producer that inserts in the original topic, in my confluent-kafka localhost instance, successfully! but the faust is unable to connect to localhost. My…
FelipeAgger
  • 51
  • 1
  • 5
5
votes
2 answers

How to share faust table between multiple agents or faust timers?

I'm trying to publish faust table's data(count) to a kafka topic after some time interval. timer is working when I publish some simple string, but it is not able to access table's data somehow. Following is the code for…
5
votes
2 answers

Integrate Flask with Faust

I'm trying to get a faust agent to cast a message inside a flask view/endpoint, I can't find any example of it and i'm really struggling. Has anyone tried this successfully? the docs say to use gevent or eventlet as a bridge to asyncio but can't…
shipperizer
  • 1,641
  • 1
  • 13
  • 19
4
votes
1 answer

Running Faust with kafka crashes with ConsumerStoppedError

I freshly installed Faust and ran a basic program to send and receive messages over Kafka.I used the sample code mentioned in (Faust example of publishing to a kafka topic) While running the program initially it connects to Kafka(which is also…
user2611300
  • 133
  • 2
  • 9
4
votes
0 answers

Faust Table Lookup Performance

I am trying to perform Lookup for messages arriving in a Kafka topic (say "datatopic"). Lookup source is another Kafka Topic (say "lookuptopic"). To do this using faust, I created a Table using lookuptopic and created an agent to update this table…
some_user
  • 315
  • 2
  • 14
4
votes
2 answers

how to set the consumer in Faust to a specific offset

From the Faust documentation I can't find out how to set the consumer to a specific offset. With confluent-kafka I use consumer.offsets_for_times to find a start_offset and then assign the TopicPartition to that specific offset, something…
galinden
  • 610
  • 8
  • 13
3
votes
1 answer

Debugging Faust Stream Processing - Restart App from Beginning of Topic

I am debugging a simple application: import faust app = faust.App('app08') # want to start from the beginning of the # topic every time the application restarts @app.agent(topic) async def process(stream): async for event in stream: …
qbzenker
  • 4,412
  • 3
  • 16
  • 23
3
votes
0 answers

Python Faust await agent.ask() doesn't return reply and hangs function calling it

I am new to Python, playing around with stuff, trying to communicate python services via Kafka using Faust. So I have small PoC project. Faust app definition: # app.py import faust as f from models import ReadRequest, ReadResponse app =…
2
votes
1 answer

Faust Streaming retry topic mechanism

I have two topics main_topic retry_topic I want if the logic fails in main_topic it should call retry_topic and if that fails so there should be max 3 retries that it should do. I've tried using sink in faust streaming what it does is that it…
2
votes
0 answers

Python Faust, how to use take to run on multiple values

I'm trying to implement a faust agent using take to process multiple messages at the same time. app = faust.App('vectors-stream') vector_topic = app.topic('vector', value_type=VectorRecord) @app.agent(vector_topic) async def…
yovel cohen
  • 267
  • 3
  • 12
1
2 3 4 5 6 7