I want to write my Kafka messages to a jsonl file which should each contain a number of lines (let's say 2). My producer currently writes 3 messages at a time so I should get two jsonl files: one with 2 events and a second one with 1 event.
My producer only runs for 3 events (it's just an example project) while my consumer should run as long as it finds messages.
Right now I lose the third event and my consumer only writes if I interrupt the program manually because it can't reach if records
and self.running = False
. How can I fix the while loop and batch logic?
import os
import json
from datetime import datetime
from kafka import KafkaConsumer
class WikiConsumer:
def __init__(self, topic: str, server: str, group_id: str, output_dir: str) -> None:
self.consumer = KafkaConsumer(
topic, bootstrap_servers=server, group_id=group_id
)
self.running = True
self.output_dir = output_dir
def consume_messages(self, batch_size: int):
records = []
while self.running:
for message in self.consumer:
if message.value != b'""':
json_str = message.value.decode("utf-8")
json_obj = json.loads(json.loads(json_str))
records.append(json_obj)
print(f"records before write: {records}")
if len(records) >= batch_size:
self.write_to_jsonl(records)
records = []
print(f"records after write: {records}")
if records:
self.write_to_jsonl(records)
print("remainder written")
self.running = False
def write_to_jsonl(self, records):
timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
filename = f"data_{timestamp}.jsonl"
with open(
os.path.join(self.output_dir, filename), "a"
) as f:
for record in records:
json.dump(record, f)
f.write("\n")
def run(self) -> None:
self.consume_messages(2)
if __name__ == "__main__":
output_directory = "my_dir/output/"
consumer = WikiConsumer(
"my_project", "localhost:9092", "project-group", output_directory
)
consumer.run()