7

I would like to build a python API developed using the Flask framework that consumes a Kafka topic and pushes the stream to a client (html page or other application).

I tried to generate a real time flow with dummy data (see realtime route below). The problem that occurs is that the result variable is only pushed after the loop finishes, while the result variable should be pushed on every iteration.

I also tried to generate a real time flow with a Kafka connection (see kafka route below). The problem is that no data is returned, instead the request does not finishes.

from flask import Response, Flask
import time
from kafka import KafkaConsumer

application = Flask(__name__)

@application.route('/')
def index():
    return "Hello, World!"


@application.route('/realtime/')
def realtime():

    def createGenerator():

        for i in range(1,10):
            yield str(i) + '\n'
            time.sleep(0.2)

    return Response(createGenerator())


@application.route('/kafka/')
def kafkaStream():
    consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',
                     client_id = 'name of client',
                     auto_offset_reset = 'earliest',
                     value_deserializer = lambda m: json.loads(m.decode('ascii')))

    consumer.subscribe(topics=['my-topic'])

    def events():
        result = []
        for message in consumer:
           if message is not None:
               result.append(message.value)
           yield result
    return Response(events())

if __name__ == '__main__':
    application.run(debug = True)

The only way I have effectively received data from Kafka so far is by printing the result in the console.

from kafka import KafkaConsumer
consumer = KafkaConsumer(bootstrap_servers = 'serverlocation',
                     client_id = 'name of client',
                     auto_offset_reset = 'earliest',
                     value_deserializer = lambda m: json.loads(m.decode('ascii')))

consumer.subscribe(topics=['my-topic'])

for message in consumer:
    print message

I think that the problem is that the API cannot push the data until the process has finished, and because the KafkaConsumer connection is infinite, nothing is pushed to the client.

How can I overcome this issue?

taras
  • 6,566
  • 10
  • 39
  • 50
stefan
  • 71
  • 1
  • 2

1 Answers1

0

for anyone else looking for a solution to this problem. Socket based solution would work here on top of kafka consumer that either constantly listens to messages in queue and posts.

Check this link for more info. kafka SocketIo