Assume there are two types T1 & T2 and a topic T. Both T1 & T2 must go in topic T (for some reason). What are ways to achieve this? And which one is better?
One way (of many) is to make use of inheritance, we can define a base class and then sub-classes can extends it. In our case we can define a base class TB and then T1 & T2 can extends TB.
Base class (TB)
package poc.kafka.domain;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.java.Log;
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Log
public class Animal implements Externalizable {
public String name;
public void whoAmI() {
log.info("I am an Animal");
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
name = (String) in.readObject();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(name);
}
}
Derived class (T1)
package poc.kafka.domain;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;
@Log
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Cat extends Animal implements Externalizable {
private int legs;
public void whoAmI() {
log.info("I am a Cat");
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
legs = in.readInt();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(legs);
}
}
Derived class (T2)
package poc.kafka.domain;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;
@Log
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Dog extends Animal implements Externalizable {
private int legs;
public void whoAmI() {
log.info("I am a Dog");
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
legs = in.readInt();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(legs);
}
}
Deserializer
package poc.kafka.domain.serialization;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;
import poc.kafka.domain.Animal;
public class AnimalDeserializer implements Deserializer<Animal> {
@Override
public Animal deserialize(String topic, byte[] data) {
return SerializationUtils.deserialize(data);
}
}
Serializer
package poc.kafka.domain.serialization;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;
import poc.kafka.domain.Animal;
public class AnimalSerializer implements Serializer<Animal> {
@Override
public byte[] serialize(String topic, Animal data) {
return SerializationUtils.serialize(data);
}
}
Then we can send T1 & T2 like below
IntStream.iterate(0, i -> i + 1).limit(10).forEach(i -> {
if (i % 2 == 0)
producer.send(new ProducerRecord<Integer, Animal>("T", i, new Dog(i)));
else
producer.send(new ProducerRecord<Integer, Animal>("gs3", i, new Cat(i)));
});