20

i need to test a message, which contains headers, so i need to use MessageBuilder, but I can not serialize.

I tried adding the serialization settings on the producer props but it did not work.

Can someone help me?

this error:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

My test class:

public class TransactionMastercardAdapterTest extends AbstractTest{

@Autowired
private KafkaTemplate<String, Message<String>> template;

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

@BeforeClass
public static void setUp() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

@Test
public void sendTransactionCommandTest(){

    String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
            + "\"cardId\" : \"11\","
            + "\"transactionId\" : \"20110405123456\","
            + "\"amount\" : 200.59,"
            + "\"partnerId\" : \"11\"}";

    Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, Message<String>> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, Message<String>> ("notification_topic", MessageBuilder.withPayload(payload)
            .setHeader("status", "RECEIVED")
            .setHeader("service", "MASTERCARD")
            .build()));

    Map<String, Object> configs = KafkaTestUtils.consumerProps("test1", "false", embeddedKafka);

    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);

    Consumer<byte[], byte[]> consumer = cf.createConsumer();
    consumer.subscribe(Collections.singleton("transaction_topic"));
    ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
    consumer.commitSync();

    assertThat(records.count()).isEqualTo(1);
}

}

Tiago Costa
  • 1,004
  • 4
  • 17
  • 31

6 Answers6

5

I'd say the error is obvious:

Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

Where your value is GenericMessage, but StringSerializer can work only with strings.

What you need is called JavaSerializer which does not exist, but not so difficult to write:

public class JavaSerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        try {
            ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
            ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
            objectStream.writeObject(data);
            objectStream.flush();
            objectStream.close();
            return byteStream.toByteArray();
        }
        catch (IOException e) {
            throw new IllegalStateException("Can't serialize object: " + data, e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public void close() {

    }

}

And configure it for that value.serializer property.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • tks for answer, but dont work, 2017-04-25 11:11:12,133 ERROR -kafka-listener-1 o.s.c.s.b.k.KafkaMessageChannelBinder:287 - Could not convert message: ACED000573720 java.lang.StringIndexOutOfBoundsException: String index out of range: -19 – Tiago Costa Apr 25 '17 at 14:12
  • org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unexpected character ('¬' (code 172)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: [B@4e076253; line: 1, column: 2]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unexpected character ('¬' (code 172)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: [B@4e076253; line: 1, column: 2] – Tiago Costa Apr 25 '17 at 14:13
  • I believe the problem is the 2 attributes of the GenericMessage class, it creates 2 jsons, headers = {id = 067-879b0, service = MASTER, status = RECEIVED, timestamp = 1493130032409} And payload = "{..}". Maybe this is not the correct way to send messages with headers = / – Tiago Costa Apr 25 '17 at 14:24
  • 1
    Well, that is absolutely different story. You talked about sending message, but now it is about reading. You deserve "-1" in terms of SO. If you want to send JSON, you should consider to write message to JSON manually in advance `producer.send()` – Artem Bilan Apr 25 '17 at 14:29
  • The error happens in the act of serializing, in the return of the serialize method. But thanks for the help. – Tiago Costa Apr 25 '17 at 14:39
  • It worked perfect Artem Bilan, it was a mistake on my part here, thank you very much. – Tiago Costa Apr 25 '17 at 17:39
4
private void configureProducer() {
Properties props = new Properties();
props.put("key.serializer",
        "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
        "org.apache.kafka.common.serialization.ByteArraySerializer");

producer = new KafkaProducer<String, String>(props);

}

This will do the job.

Bharath
  • 41
  • 2
2

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

CollinsKe
  • 73
  • 8
1

This is what I used and it worked for me

Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class);

From the JsonSerializer documentation:

Generic Serializer for sending Java objects to Kafka as JSON.

Ilya Serbis
  • 21,149
  • 6
  • 87
  • 74
SatyaG
  • 25
  • 6
0

In my case i am using spring cloud and did not added the below property in the properties file

spring.cloud.stream.kafka.binder.configuration.value.serializer=org.apache.kafka.common.serialization.StringSerializer
nitin737
  • 39
  • 8
-9

annotate the JSON class with @XmlRootElement

Sterling Archer
  • 22,070
  • 18
  • 81
  • 118