9

So, I'm trying to enable EXACTLY_ONCE semantic in my Flink Kafka streaming job along with checkpointing.

However I am not getting it to work, so I tried downloading the test sample code from Github: https://github.com/apache/flink/blob/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test/src/main/java/org/apache/flink/streaming/kafka/test/KafkaExample.java

So running this works fine. However, when enabling checkpointing I get errors. Or if I change EXACTLY_ONCE to AT_LEAST_ONCE semantics and enable checkpointing, it works fine. But then when changing it to EXACTLY_ONCE, I get this error again.

The exception I'm getting:

org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1099)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1036)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:430)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:76)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$2(FlinkKafkaProducer.java:1107)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:304)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:360)
    ... 12 more

I've made slight changes to the code to work in my enviroment. I'm running it inside the flink operations playground inside docker. (This https://ci.apache.org/projects/flink/flink-docs-stable/getting-started/docker-playgrounds/flink-operations-playground.html). Latest version, 1.10 and the provided kafka inside that is verison 2.2.1

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(1_000);

        String inputTopic = "my-input";
        String outputTopic = "my-output";
        String kafkaHost = "kafka:9092";

        Properties kafkaProps = new Properties();
        kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost);
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");


        DataStream<KafkaEvent> input = env
                .addSource(new FlinkKafkaConsumer<>(inputTopic, new KafkaEventSchema(), kafkaProps)
                        .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
                .keyBy("word")
                .map(new RollingAdditionMapper());

        input.addSink(
                new FlinkKafkaProducer<>(
                        outputTopic,
                        new KafkaEventSerializationSchema(outputTopic),
                        kafkaProps,
                        FlinkKafkaProducer.Semantic.EXACTLY_ONCE));

        env.execute("Modern Kafka Example");
    }

Other classes from the example can be found: https://github.com/apache/flink/tree/c025407e8a11dff344b587324ed73bdba2024dff/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base

I did try to change the serialization to use KafkaSerializationSchema rather than the example code which use SerializationSchema. However that code, below did not help either. Same error.

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaEventSerializationSchema implements KafkaSerializationSchema<KafkaEvent> {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    private String topic;

    public KafkaEventSerializationSchema(String topic) {
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(KafkaEvent element, Long timestamp) {
        return new ProducerRecord<>(topic, element.toString().getBytes());
    }


}

All help appreciated. I haven't been able to find any online working code of EXACTLY_ONCE garantuee between flink and kafka. Only loads articles talking about it but not actual substancial working code. That's all I'm trying to achieve here.

Olindholm
  • 340
  • 4
  • 16
  • Here (https://stackoverflow.com/a/58644689/2096986) they implemented their own `KafkaSerializationSchema` on the class `ObjSerializationSchema`. I guess it can help to solve your issue since it is a serialization error. – Felipe Jun 19 '20 at 08:29
  • The KafkaEventSerializationSchema is the one I use from the example. Which i can't link right now, seems github is down. Anyway it also extends KafkaSerializationSchema just like you're suggesting. I can't understand what is the problem. If the serialization was a problem. Why does it work when not using EXACTLY_ONCE? and not work when using it? – Olindholm Jun 19 '20 at 08:47
  • So `KafkaEventSerializationSchema` is the class that you created that implements `KafkaSerializationSchema`. On the java docs says this caveat `Please also implement KafkaContextAware if your serialization schema needs information about the available partitions and the number of parallel subtasks along with the subtask ID on which the Kafka Producer is running.` https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.html . Maybe if you post your class it is easier to identify the error. – Felipe Jun 19 '20 at 08:57
  • I will update the post. I cannot post code here. – Olindholm Jun 19 '20 at 09:05
  • I guess it is related to timeout when you are using `EXACTLY_ONCE` according to this answer https://stackoverflow.com/a/58648019/2096986 – Felipe Jun 19 '20 at 09:13
  • But that is not what the exception is saying. If `transaction.max.timeout.ms` is the problem. What should it be set to? if it is 15 minutes by default, and I get this exception immediately when trying to run the job. I don't see how it could've timeout in few seconds. – Olindholm Jun 19 '20 at 10:37
  • I am seeing the same error, but the behaviour is inconsistent for me, the same application may fail with this error or work fine( exatly_once and checkpointing enabled in both cases). In my test env when I enabled exactly once I had the transaction.max.timeout.ms related error first, which I fixed by reducing the kafka producer transaction.timeout.ms, but that was a different error). – xrcsblue Jun 19 '20 at 12:06
  • @WiggyLindholm were you able to resolve this? – vikash dat Jul 29 '20 at 22:05
  • @vikash-dat Not really. I made many tweaks back and forth and nothing made it work. The exception seem to come less and less but still occur sometimes. I don't know why and I got some code provided from a colleague (which I cannot share publicly) however, as far as my eye can see. Practically the same so I don't know why it works now, or why it didn't work then. Sorry. Maybe if I have time at some point I can try to get a working example and post on github. – Olindholm Jul 30 '20 at 16:11

2 Answers2

0

I ran into the same problem and explicitly setting a timeout for the producer helped. properties.setProperty("transaction.timeout.ms", "900000");

0

Following parameters are required to enable EXACTLY_ONCE processing

  • transaction.max.timeout.ms on your Kafka broker should be beyond the maximum expected Flink and/or Kafka downtime.
  • The value for transaction.timeout.ms is lower than transaction.max.timeout.ms.

For example,

Properties producerProperties = new Properties();
producerProperties.setProperty("transaction.timeout.ms", "60000");

In addition to this you need to set the value for setTransactionalIdPrefix. This value has to be unique for each Flink application that you run in the same Kafka cluster.

Swapnil Khante
  • 547
  • 2
  • 10