7

This issue is now solved on Message Hub

I am having some trouble creating a KTable in Kafka. I am new to Kafka, which is probably the root of my problem, but I thought I could ask here anyway. I have a project where I would like to keep track of different IDs by counting their total occurrence. I am using Message Hub on IBM Cloud to manage my topics, and it has worked splendid so far.

I have a topic on Message Hub that produces messages like {"ID":"123","TIMESTAMP":"1525339553", "BALANCE":"100", "AMOUNT":"4"}, for now, the only key of relevance is ID.

My Kafka code, along with the Streams configuration, looks like this:

import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, "3");    
props.put("security.protocol","SASL_SSL");
props.put("sasl.mechanism","PLAIN");
props.put("ssl.protocol","TLSv1.2");
props.put("ssl.enabled.protocols","TLSv1.2");
String saslJaasConfig = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USERNAME\" password=\"PASSWORD\";";
saslJaasConfig = saslJaasConfig.replace("USERNAME", user).replace("PASSWORD", password);
props.put("sasl.jaas.config",saslJaasConfig);

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> Kstreams = builder.stream(myTopic);

KTable<String, Long> eventCount = Kstreams
        .flatMapValues(value -> getID(value)) //function that retrieves the ID
        .groupBy((key, value) -> value)
        .count();

When I run the code, I get the following error(s):

Exception in thread "KTableTest-e2062d11-0b30-4ed0-82b0-00d83dcd9366->StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Could not create topic KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition.

Followed by:

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.PolicyViolationException: Invalid configuration: {segment.index.bytes=52428800, segment.bytes=52428800, cleanup.policy=delete, segment.ms=600000}. Only allowed configs: [retention.ms, cleanup.policy]

I have no idea why this error occurs, and what could be done about it. Is the way I have built the KStream and KTable incorrect somehow? Or perhaps the message hub on bluemix?

Solved:

Adding an extract from the comments below the answer I have marked as correct. Turned out my StreamsConfig was fine, and that there (for now) is an issue on Message Hub's side, but there is a workaround:

It turns out Message Hub has an issue when creating repartition topics with Kafka Streams 1.1. While we work on a fix, you'll need to create the topic KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition by hand. It needs as many partitions as your input topic (myTopic) and set the retention time to the max. I'll post another comment once it's fixed

Many thanks for the help!

jawwe
  • 638
  • 1
  • 5
  • 13

1 Answers1

5

Message Hub has some restrictions on the configurations that can be used when creating topics.

From the PolicyViolationException you received, it looks like your Streams application tried to use a few configs we don't allow:

  • segment.index.bytes
  • segment.bytes
  • segment.ms

I'm guessing you set those somewhere in your Streams configuration and they should be removed.

Note that you also need to set StreamsConfig.REPLICATION_FACTOR_CONFIG to 3 in your config to work with Message Hub as mentioned in our docs.

Mickael Maison
  • 25,067
  • 7
  • 71
  • 68
  • Thank you for replying! I think you are definitely right. I added my streams configuration, but I think the necessities in order to make it work with Message Hub(MH) is already in place. At least according to the docs. What I do not understand is why I get "Could not create topic", when I am not trying to, unless a KTable counts as one? Could it be solved by allocating a topic on the MH? Like I wrote in the start of my question, I would like to count the occurrence of the IDs in the topic from MH, does it have to do more with MH than simply listen to the topic? – jawwe May 03 '18 at 13:39
  • Yes your transformation logic will create "internal" topics, see http://kafka.apache.org/11/documentation/streams/developer-guide/manage-topics.html#streams-developer-guide-topics-internal. You could pre-create them by hand beforehand but it's usually easier to let Streams do it. Otherwise, I think your logic looks fine. – Mickael Maison May 04 '18 at 08:52
  • I see. So when I try to create a KTable, that is when I get the error since I am restricted from creating a topic in this way on Message Hub. Have I understood right if the code I have is retrieving the messages from a MH topic and try to create an internal topic(KTable) on MH? Is there a way alter my code to create the KTable elsewhere with Streams? Or do I need another Kafka server in order to achieve what I am after? Can MH handle KTables? Sorry for all the questions, I am really appreciating your help. – jawwe May 04 '18 at 11:18
  • 1
    I managed to reproduce your issue. It turns out Message Hub has an issue when creating repartition topics with Kafka Streams 1.1. While we work on a fix, you'll need to create the topic `KTableTest-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition` by hand. It needs as many partitions as your input topic (myTopic) and set the retention time to the max. I'll post another comment once it's fixed – Mickael Maison May 04 '18 at 15:51
  • 1
    I completely forgot to post here, but this issue has been fixed a few weeks ago – Mickael Maison Jul 17 '18 at 11:39