1

I try to implement a method for my customized class, producing data on Kafka using Flink Kafka connector. The class prototype is the following:

public class StreamData implements Serializable {
    private transient StreamExecutionEnvironment env;
    private DataStream<byte[]> data ;
    ...

The method for writing data to specific Kafka topic is like:

public void writeDataIntoESB(String id) throws Exception {

        FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>(
                "localhost:9092",
                id,
                new KeyedSerializationSchema<byte[]>() {
                    @Override
                    public byte[] serializeKey(byte[] bytes) {
                        return bytes;
                    }

                    @Override
                    public byte[] serializeValue(byte[] bytes) {
                        return bytes;
                    }

                    @Override
                    public String getTargetTopic(byte[] bytes) {
                        return null;
                    }
                });               
        data.addSink(producer);
    }

I have another method for getting data from Kafka topic to data filed of an object, that works fine. Now trying to get data from Kafka topic and write it to another Kafka topic I got the error:

org.apache.flink.api.common.InvalidProgramException: Object StreamData$2@1593948d is not serializable

The main code:

StreamData temp = new StreamData();
temp = temp.getDataFromESB("data", 0);
temp.writeDataIntoESB("flink_test");

It seems Java tries to serialize the object not just the field data! The code for producing data to Kafka using Flink Kafka connector is tested and works in regular usage (I mean not using classes and write all code in main)

How can I disappear the error?

Soheil Pourbafrani
  • 3,249
  • 3
  • 32
  • 69
  • I think the issue is when it's trying to create your actual execution environment, it needs to be able to serialize everything over into the Job Manager, but you are nesting the DataStream within your object, so that isn't going to translate over very well. Would you be able to edit your example with as much as you can, sanitized as you see fit? Its a bit difficult to see exactly what your'e trying to do, but I don't think it's related to issues with Kafka – Joshua DeWald Mar 31 '18 at 17:02
  • @JoshuaDeWald As I said it's because of Java or Flink characteristics since I tested the same code without class in `main` function and it works. Other methods like reading from Kafka or writing data on Cassandra works well, just method for writing data to Kafka doesn't work! – Soheil Pourbafrani Mar 31 '18 at 18:10

2 Answers2

4

I believe the cause of the issue is that your code is doing this:

new KeyedSerializationSchema<byte[]>() {...}

What this code does is it creates an anonymous subclass of KeyedSerializationSchema as an inner class of the defining class (StreamData). Every inner class holds an implicit reference to the outer class instance, therefore serializing it using the default Java serialization rules will also transitively try to serialize the outer object (StreamData). The nicest ways of resolving this issue is to declare your subclass of KeyedSerializationSchema as either:

That last approach I think would look like this:

public class StreamData {
    static KeyedSerializationSchema<byte[]> schema = new KeyedSerializationSchema<byte[]>() {
        ...
    };
    ...
    public void writeDataIntoESB(String id) throws Exception {

        FlinkKafkaProducer011<byte[]> producer = new FlinkKafkaProducer011<byte[]>("localhost:9092", id, schema);               
        data.addSink(producer);
    }
}
Michal Borowiecki
  • 4,244
  • 1
  • 11
  • 18
0

You can do Serialization in Flink like this also

dataStream.addSink(new FlinkKafkaProducer<KafkaObject>(ProducerTopic, new 
                                             CustomSerializerSchema(),properties));

  public class CustomSerializerSchema implements SerializationSchema<MyUser> {

    private static final long serialVersionUID = 1L;

    @Override
    public byte[] serialize(MyUser element) {
        byte[] b = null;
         try {
             b= new ObjectMapper().writeValueAsBytes(element);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return b; 
    }
}
Aarya
  • 121
  • 1
  • 4