1

I am new to kafka. I want to use kafka producer/consumer to replace activeMQ(jms) in my spring project. What I need is a kafka producer publish my message object to a topic and a consumer subscribe it from the topic.

first is my custom encoder,same thing for decoder(for my message class ConfigurationActionMsg):

@Component
public class ConfigActionMessageEncoder implements Encoder<ConfigurationActionMsg> {

    public ConfigActionMessageEncoder() {
    /* This constructor must be present for successful compile. */
    }
    public ConfigActionMessageEncoder(VerifiableProperties verifiableProperties) {
    /* This constructor must be present for successful compile. */
    }

    @Override
    public byte[] toBytes(ConfigurationActionMsg actionMsg){
        return SerializationUtils.serialize(actionMsg);
    }}

Below is my configuration for processor and consumer

@Configuration
@ComponentScan(basePackages = {"XXX"})
public class KafkaConfig {
@Bean
public KafkaProducer<String,ConfigurationActionMsg> kafkaProducer(){
    Properties props = new Properties();
    props.put("zk.connect", "127.0.0.1:2181");
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "com.atlas.configengine2.XXX.ConfigActionMessageEncoder");

    return new KafkaProducer<>(props);
}

@Bean
public KafkaConsumer<String, ConfigurationActionMsg> kafkaConsumer(){

    Properties props = new Properties();
    props.put("zk.connect", "127.0.0.1:2181");
    props.put("bootstrap.servers", "localhost:9092");
    //We should only have one process running for consumer
    props.put("group.id", "resolverActionTrigger");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "com.atlas.configengine2.XXX.ConfigActionMessageDecoder");
    KafkaConsumer<String, ConfigurationActionMsg> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("configAction"));
    return consumer;
}}

I am not sure if this is the proper way to instantiate a producer/consumer. But this way is not working. Because my kafkaProducer couldn't be instantiated.

some debug info:

Caused by: org.apache.kafka.common.KafkaException: com.atlas.configengine2.jms.ConfigActionMessageEncoder is not an instance of org.apache.kafka.common.serialization.Serializer

But I am not sure whether that is the only problem? And how to write the custom encoder then?

Acton
  • 255
  • 4
  • 13

3 Answers3

0

You should implement org.apache.kafka.common.serialization.Serializer not Encoder.

See CustomSerializer example.

Kamal Chandraprakash
  • 1,872
  • 18
  • 28
  • hey I am following the link http://stackoverflow.com/questions/23755976/kafka-writing-custom-serializer actually, as far as I understand, they just user the Encoder class as the serializer ? – Acton May 31 '16 at 14:02
  • You're referring to the old Producer implemented in scala ( < 0.8.1). In the new version (0.8.2 and above), KafkaProducer is purely written in java and it accepts only `Serializer` – Kamal Chandraprakash Jun 01 '16 at 07:47
0

First, you should use a subclass of org.apache.kafka.common.serialization.Serializer.

Second, I think you can send a string representation of your object, such as a JSON, and keep up with StringSerializer in producer to avoid overhead of implementing a custom org.apache.kafka.common.serialization.Serializer.

WesternGun
  • 11,303
  • 6
  • 88
  • 157
0
 //create properties
        //https://kafka.apache.org/documentation/#producerconfigs
        Properties prop = new Properties();
        prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9094,127.0.0.1:9098");
        prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Employee.class.getName());

        //Create Producer
        //For String
        //KafkaProducer<String, String> producer= new KafkaProducer(prop);
        //For Object
        KafkaProducer<String, Employee> producer = new KafkaProducer(prop);
        Address add = new Address("India");
        Employee emp = new Employee(1, "Arun", add);
        //For String
    // ProducerRecord prodRecord = new ProducerRecord("aryan_topic", "Any String Value");
        ProducerRecord prodRecord = new ProducerRecord("aryan_topic", emp);
    //Send Record

        producer.send(prodRecord);
        System.out.println("Data Send");

}
}
import org.apache.kafka.common.header.Headers;

import org.apache.kafka.common.serialization.Serializer;

import java.io.Serializable; import java.util.Map;

//Developed by Arun Singh

public class Employee implements Serializable, Serializer {
//add all unImplemented methods here
}
 and it will work as
//
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 77a89fcf8d7fa018
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1583515298948
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: h_EDcwYmThiz-iBTc0AsGw
Data Send
Arun
  • 1,011
  • 11
  • 13