4

I am fetching data from Kafka topics and storing them in Deltalake(parquet) format. I wish to find the number of messages fetched in particular day.

My thought process: I thought to read the directory where the data is stored in parquet format using spark and apply count on the files with ".parquet" for a particular day. This returns a count but I am not really sure if that's the correct way.

Is this way correct ? Are there any other ways to count the number of messages fetched from a Kafka topic for a particular day(or duration) ?

Pankaj Mishra
  • 151
  • 2
  • 10

3 Answers3

0

You can make use of the "time travel" capabilities that Delta Lake offers.

In your case you can do

// define location of delta table
val deltaPath = "file:///tmp/delta/table"

// travel back in time to the start and end of the day using the option 'timestampAsOf'
val countStart = spark.read.format("delta").option("timestampAsOf", "2021-04-19 00:00:00").load(deltaPath).count()
val countEnd = spark.read.format("delta").option("timestampAsOf", "2021-04-19 23:59:59").load(deltaPath).count()

// print out the number of messages stored in Delta Table within one day
println(countEnd - countStart)

See documentation on Query an older snapshot of a table (time travel).

Michael Heil
  • 16,250
  • 3
  • 42
  • 77
0

Message we consume from topic not only have key-value but also have other information like timestamp

Which can be used to track the consumer flow.

Timestamp Timestamp get updated by either Broker or Producer based on Topic configuration. If Topic configured time stamp type is CREATE_TIME, the timestamp in the producer record will be used by the broker whereas if Topic configured to LOG_APPEND_TIME , timestamp will be overwritten by the broker with the broker local time while appending the record.

  1. So if you are storing any where if you keep timestamp you can very well track per day, or per hour message rate.

  2. Other way you can use some Kafka dashboard like Confluent Control Center (License price) or Grafana (free) or any other tool to track the message flow.

  3. In our case while consuming message and storing or processing along with that we also route meta details of message to Elastic Search and we can visualize it through Kibana.

Nitin
  • 3,533
  • 2
  • 26
  • 36
0

Another way to retrieve this information without counting the rows between two versions is to use Delta table history. There are several advantages of that - you don't read the whole dataset, you can take into account updates & deletes as well, for example if you're doing MERGE operation (it's not possible to do with comparing .count on different versions, because update is replacing the actual value, or delete the row).

For example, for just appends, following code will count all inserted rows written by normal append operations (for other things, like, MERGE/UPDATE/DELETE we may need to look into other metrics):

from delta.tables import *

df = DeltaTable.forName(spark, "ml_versioning.airbnb").history()\
  .filter("timestamp > 'begin_of_day' and timestamp < 'end_of_day'")\
  .selectExpr("cast(nvl(element_at(operationMetrics, 'numOutputRows'), '0') as long) as rows")\
  .groupBy().sum()
Alex Ott
  • 80,552
  • 8
  • 87
  • 132