I have a topic with different messages. But I need to consume only certain messages irrespective of the order in which message published. I need to apply data filter condition on specific field/attribute of the message. like file-name element of the JSON message is "XXXX.txt". Simply say consume message with where condition like traditional DB.
Asked
Active
Viewed 3,138 times
2
-
check this http://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KStream.html#filter-org.apache.kafka.streams.kstream.Predicate- – Bartosz Wardziński Sep 16 '19 at 19:29
-
`kafka-streams` is a Java library... Are you still [using c#](https://stackoverflow.com/questions/57681875/getting-serialization-error-when-publish-message-to-kafka-topic)? – OneCricketeer Sep 16 '19 at 20:44
-
Yes..I need to use C# client. – Arun Sep 16 '19 at 22:12
-
Then your only option would be to subscribe and poll like normal. Afterwards, deserialize each record, and perform a simple if statement on your search criteria... Kafka Streams or KSQL do exactly that, as well – OneCricketeer Sep 17 '19 at 00:25
-
Use Reactive Kafka and use a filter. – Prashant Pandey Sep 17 '19 at 11:26
1 Answers
1
.I need to use C# client
Then your only option would be to subscribe and poll like normal, adding an if statement for your condition
Borrowing snippet from the Confluent consumer example
// TODO: build consumer that reads string messages
consumer.Subscribe(topics);
...
while (true) {
var consumeResult = consumer.Consume(cancellationToken);
string value = consumeResult.Value;
// TODO: parse string value to JSON object using your favorite JSON library
// Add your condition
if (jsonObj["file-name"].Equals("XXXX.txt"))
{
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {value}");
}
Regarding the JSON parser, you can either make a Dictionary or a strongly typed class, but this example assumes a dictionary. How can I deserialize JSON to a simple Dictionary<string,string> in ASP.NET?
irrespective of the order in which message published.
As with any consumer, order is guaranteed within a partition

OneCricketeer
- 179,855
- 19
- 132
- 245