0

I want to write my Kafka messages to a jsonl file which should each contain a number of lines (let's say 2). My producer currently writes 3 messages at a time so I should get two jsonl files: one with 2 events and a second one with 1 event.

My producer only runs for 3 events (it's just an example project) while my consumer should run as long as it finds messages.

Right now I lose the third event and my consumer only writes if I interrupt the program manually because it can't reach if records and self.running = False. How can I fix the while loop and batch logic?

import os
import json
from datetime import datetime
from kafka import KafkaConsumer


class WikiConsumer:
    def __init__(self, topic: str, server: str, group_id: str, output_dir: str) -> None:
        self.consumer = KafkaConsumer(
            topic, bootstrap_servers=server, group_id=group_id
        )
        self.running = True
        self.output_dir = output_dir

    def consume_messages(self, batch_size: int):
        records = []

        while self.running:
            for message in self.consumer:
                if message.value != b'""':
                    json_str = message.value.decode("utf-8")
                    json_obj = json.loads(json.loads(json_str))

                    records.append(json_obj)
                    print(f"records before write: {records}")

                    if len(records) >= batch_size:
                        self.write_to_jsonl(records)
                        records = []

            print(f"records after write: {records}")
            if records:
                self.write_to_jsonl(records)
                print("remainder written")

                self.running = False

    def write_to_jsonl(self, records):
        timestamp = now.strftime("%Y-%m-%d-%H-%M-%S")
        filename = f"data_{timestamp}.jsonl"

        with open(
            os.path.join(self.output_dir, filename), "a"
        ) as f:
            for record in records:
                json.dump(record, f)
                f.write("\n")

    def run(self) -> None:
        self.consume_messages(2)


if __name__ == "__main__":
    output_directory = "my_dir/output/"
    consumer = WikiConsumer(
        "my_project", "localhost:9092", "project-group", output_directory
    )
    consumer.run()
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Omega
  • 750
  • 1
  • 8
  • 24

1 Answers1

0

If I understood the question, you are stopping the script, and you have data left-over in records list, which has not been written to any file as it is less than your batch size.

Your options are

  1. Ignore it, and disable Kafka Consumer auto offset commit. Commit manually after you write a file. When the Kafka Consumer restarts, it'll resume reading the records that have not yet been written (assuming you restart the consumer before the data is lost from Kafka retention window)
  2. Add a shutdown handler that properly closes your consumer and writes the pending records list to a partial file.

If you don't really need timestamped files, I'd recommend using Kafka Connect FileSink connector for this, instead, as that comes with Kafka and you don't need to write any code.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245