I would like to build a python API developed using the Flask framework that consumes a Kafka topic and pushes the stream to a client (html page or other application).
I tried to generate a real time flow with dummy data (see realtime route below). The problem that occurs is that the result
variable is only pushed after the loop finishes, while the result
variable should be pushed on every iteration.
I also tried to generate a real time flow with a Kafka connection (see kafka route below). The problem is that no data is returned, instead the request does not finishes.
from flask import Response, Flask
import time
from kafka import KafkaConsumer
application = Flask(__name__)
@application.route('/')
def index():
return "Hello, World!"
@application.route('/realtime/')
def realtime():
def createGenerator():
for i in range(1,10):
yield str(i) + '\n'
time.sleep(0.2)
return Response(createGenerator())
@application.route('/kafka/')
def kafkaStream():
consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',
client_id = 'name of client',
auto_offset_reset = 'earliest',
value_deserializer = lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(topics=['my-topic'])
def events():
result = []
for message in consumer:
if message is not None:
result.append(message.value)
yield result
return Response(events())
if __name__ == '__main__':
application.run(debug = True)
The only way I have effectively received data from Kafka so far is by printing the result in the console.
from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',
client_id = 'name of client',
auto_offset_reset = 'earliest',
value_deserializer = lambda m: json.loads(m.decode('ascii')))
consumer.subscribe(topics=['my-topic'])
for message in consumer:
print message
I think that the problem is that the API cannot push the data until the process has finished, and because the KafkaConsumer
connection is infinite, nothing is pushed to the client.
How can I overcome this issue?