1

Assume there are two types T1 & T2 and a topic T. Both T1 & T2 must go in topic T (for some reason). What are ways to achieve this? And which one is better?

One way (of many) is to make use of inheritance, we can define a base class and then sub-classes can extends it. In our case we can define a base class TB and then T1 & T2 can extends TB.

Base class (TB)

package poc.kafka.domain;

    import java.io.Externalizable;
    import java.io.IOException;
    import java.io.ObjectInput;
    import java.io.ObjectOutput;

    import lombok.AllArgsConstructor;
    import lombok.NoArgsConstructor;
    import lombok.ToString;
    import lombok.extern.java.Log;

    @ToString
    @AllArgsConstructor
    @NoArgsConstructor
    @Log
    public class Animal implements Externalizable {
        public String name;

        public void whoAmI() {
            log.info("I am an Animal");
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            name = (String) in.readObject();
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeObject(name);
        }
    }

Derived class (T1)

package poc.kafka.domain;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;

@Log
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Cat extends Animal implements Externalizable {
    private int legs;

    public void whoAmI() {
        log.info("I am a Cat");
    }

    @Override
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        super.readExternal(in);
        legs = in.readInt();
    }

    @Override
    public void writeExternal(ObjectOutput out) throws IOException {
        super.writeExternal(out);
        out.writeInt(legs);
    }
}

Derived class (T2)

package poc.kafka.domain;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;

@Log
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Dog extends Animal implements Externalizable {
    private int legs;

    public void whoAmI() {
        log.info("I am a Dog");
    }

    @Override
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        super.readExternal(in);
        legs = in.readInt();
    }

    @Override
    public void writeExternal(ObjectOutput out) throws IOException {
        super.writeExternal(out);
        out.writeInt(legs);
    }
}

Deserializer

package poc.kafka.domain.serialization;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;

import poc.kafka.domain.Animal;

public class AnimalDeserializer implements Deserializer<Animal> {

    @Override
    public Animal deserialize(String topic, byte[] data) {
        return SerializationUtils.deserialize(data);
    }

}

Serializer

package poc.kafka.domain.serialization;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;

import poc.kafka.domain.Animal;

public class AnimalSerializer implements Serializer<Animal> {

    @Override
    public byte[] serialize(String topic, Animal data) {
        return SerializationUtils.serialize(data);
    }

}

Then we can send T1 & T2 like below

IntStream.iterate(0, i -> i + 1).limit(10).forEach(i -> {
            if (i % 2 == 0)
                producer.send(new ProducerRecord<Integer, Animal>("T", i, new Dog(i)));
            else
                producer.send(new ProducerRecord<Integer, Animal>("gs3", i, new Cat(i)));
        });
Ashish Bhosle
  • 609
  • 5
  • 18

4 Answers4

1

The simplest way is to use your custom org.apache.kafka.common.serialization.Serializer, which will be able to handle both type of events. Both type of events should inherit from same type/based class.

Sample code might look as follow:

public class CustomSerializer implements Serializer<T> {

    public void configure(Map<String, ?> configs, boolean isKey) {
        // nothing to do
    }

    public byte[] serialize(String topic, T data) {
        // serialization
        return null;
    }

    public void close() {
        // nothing to do
    }
} 
Bartosz Wardziński
  • 6,185
  • 1
  • 19
  • 30
  • This is what I have chosen. Inheritance is the tricky part here. If I would refactor my application, I would move from `@JsonSubType` to Mixin-Annotations. – sschrass Aug 30 '19 at 13:02
1

This might be not the direct answer to the question, but rather proposition to reconsider some aspects here, which might solve the original problem.

First of all, despite Kafka's ability to support any data format, for the serializable binary format, I would advice using Apache Avro, rather than serialized Java object.

With Avro, you'd get all the benefits of a compact binary, language-agnostic data type and wide set of tools to work with. For example, there are CLI tools to read Kafka topics with contents in Avro, but I am not aware of any single one able to deserialize Java objects there.

You can read about Avro itself here

Also some good insights onto why use Avro can be found in this SO question here

Second. Your question title says about Event Types, but judging the description probably implies "how to handle different data types via single Kafka topic". If the difference between events is just, well, event type - for example, Click, Submit, LogIn, LogOut and so on - then you can keep an enum field with this type inside and otherwise use generic container object.

If there is a difference in the structure of data payloads these events should carry, then, again, using Avro you could've solved it with Union types.

And finally, if the data difference is so much that these events are basically different data structures with nothing significant in common - go with different Kafka topics.

Despite the ability to use different partitions within the same topic to send different data types, it really only going to cause maintenance headache in the future and limitations on scaling as rightfully was pointed out in other responses here. So for this case, if there is an option to go with different topics - better do it that way.

Dmitry Kankalovich
  • 553
  • 2
  • 8
  • 19
0

If there is no concept of inheritance, for example the data is not like

Animal -> Cat
Animal -> Dog

Then the other way is to use a wrapper.

public class DataWrapper
{
private Object data;
private EventType type;
       // getter and setters omitted for the sake of brevity
}

Put all your events in the wrapper object and distinguish each event with their EventType which can be an enum for example.

Then you can serialize it the normal way (as you posted in the question) and while de-serializing it you can check the EventType and then delegate it to its corresponding event processor based on the EventType

Moreover, for the sake of ensuring that your DataWrapper doesn't wrap all kinds of data i.e. should be used only for a specific type of data, then you can use a Marker interface and make all of your classes whose objects you will push to the topic to implement this interface.

For example,

interface MyCategory {
}

and then your custom classes can have for example,

class MyEvent implements MyCategory {
}

and in the DataWrapper you can have..

public class DataWrapper<T extends MyCategory> {
private T data;
private EventType type;
            // getters and setters omitted for the sake of brevity
}
JavaTechnical
  • 8,846
  • 8
  • 61
  • 97
-2

The best approach is to create custom partition.

Produce each message to different partition by partitionKey

This is the default implementation , you need to implement your partition logic.

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        } 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

Check this tutorial for further examples.

This is a paragraph from kafka the definitive guide on when to choose costume partition.

Implementing a custom partitioning strategy So far, we have discussed the traits of the default partitioner, which is the one most commonly used. However, Kafka does not limit you to just hash partitions, and sometimes there are good reasons to partition data differently. For example, suppose that you are a B2B vendor and your biggest customer is a company that manufactures handheld devices called Bananas. Suppose that you do so much business with cus‐ tomer “Banana” that over 10% of your daily transactions are with this customer. If you use default hash partitioning, the Banana records will get allocated to the same partition as other accounts, resulting in one partition being about twice as large as the rest. This can cause servers to run out of space, processing to slow down, etc. What we really want is to give Banana its own partition and then use hash partitioning to map the rest of the accounts to partitions.

MIkCode
  • 2,655
  • 5
  • 28
  • 46