I have a single node kafka broker and simple streams application. I created 2 topics (topic1 and topic2).
Produced on topic1 - processed message - write to topic2
Note: For each message produced only one message is written to destination topic
I produced a single message. After it was written to topic2, I stopped the kafka broker. After sometime I restarted the broker and produced another message on topic1. Now streams app processed that message 3 times. Now without stopping the broker I produced messages to topic1 and waited for streams app to write to topic2 before producing again.
Streams app is behaving strangely. Sometimes for one produced message there are 2 messages written to destination topic and sometimes 3. I don't understand why is this happening. I mean even the messages produced after broker restart are being duplicated.
Update 1:
I am using Kafka version 1.0.0 and Kafka-Streams version 1.1.0
Below is the code.
Main.java
String credentials = env.get("CREDENTIALS");
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "activity-collection");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> activityStream = builder.stream("activity_contenturl");
KStream<String, String> activityResultStream = AppUtil.hitContentUrls(credentials , activityStream);
activityResultStream.to("o365_user_activity");
AppUtil.java
public static KStream<String, String> hitContentUrls(String credentials, KStream<String, String> activityStream) {
KStream<String, String> activityResultStream = activityStream
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
ArrayList<String> log = new ArrayList<String>();
JSONObject received = new JSONObject(value);
String url = received.get("url").toString();
String accessToken = ServiceUtil.getAccessToken(credentials);
JSONObject activityLog = ServiceUtil.getActivityLogs(url, accessToken);
log.add(activityLog.toString());
}
return log;
}
});
return activityResultStream;
}
Update 2:
In a single broker and single partition environment with the above config, I started the Kafka broker and streams app. Produced 6 messages on source topic and when I started a consumer on destination topic there are 36 messages and counting. They keep on coming.
So I ran this to see consumer-groups
:
kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
Output:
streams-collection-app-0
Next I ran this:
kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app-0
Output:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 1 0 streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer-3a2940c2-47ab-49a0-ba72-4e49d341daee /127.0.0.1 streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer
After a while the output showed this:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 6 5 - - -
And then:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
o365_activity_contenturl 0 1 7 6 - - -