2

I have a large csv and I want to write to a kafka topic.

def producer():
    producer = KafkaProducer(bootstrap_servers='mykafka-broker')
    with open('/home/antonis/repos/testfile.csv') as file:
        reader = csv.DictReader(file, delimiter=";")
        for row in reader:
            producer.send(topic='stable_topic', value=row)
            producer.flush()

if __name__ == '__main__':
    producer()

This code produces an error:

AssertionError: value must be bytes

The file looks like:

"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22

Can anyone help me with this?

Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156
e7lT2P
  • 1,635
  • 5
  • 31
  • 57

2 Answers2

2

Rather than re-inventing the wheel, use the very good one that exists already :) It's Kafka Connect, which is part of Apache Kafka.

There are several connectors that can read from CSV including Kafka Connect spooldir (see example) and Filepulse.

Learn more about Kafka Connect in this talk.

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
2

You need to properly serialise your values.


The following should do the trick:

import json  

producer = KafkaProducer(
    bootstrap_servers='mykafka-broker',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
Giorgos Myrianthous
  • 36,235
  • 20
  • 134
  • 156