We are migrating our system of events to the functional programming model. We followed the next "guide" and it worked quite well for the consumers but the producers with StreamBridge are not creating the messages properly.
We have the next error:
java.lang.ClassCastException: class com.streamdemo.domain.Event cannot be cast to class java.lang.String (com.streamdemo.domain.Event is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
We are working with 2021.0.0
spring-cloud version.
with a simple configuration:
spring.cloud.stream:
bindings:
demo-channel-out-0:
producer:
use-native-encoding: true
content-type: application/json
This is our custom MessageConverter
which is used in all our microservices and for this reason, it is important to keep the format of the message:
public class StreamMessageConverter extends AbstractMessageConverter {
private static final Logger log = LoggerFactory.getLogger(StreamMessageConverter.class);
private static final MimeType JSON_MIME_TYPE = MimeType.valueOf("application/json");
private final Class<?> clazz;
private final ObjectMapper objectMapper;
public StreamMessageConverter(ObjectMapper objectMapper) {
this(objectMapper, Object.class);
}
public StreamMessageConverter(ObjectMapper objectMapper, Class<?> clazz) {
super(JSON_MIME_TYPE);
this.objectMapper = objectMapper;
this.clazz = clazz;
}
protected boolean supports(Class<?> clazz) {
return clazz.equals(clazz);
}
@Nullable
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
Object payload = message.getPayload();
try {
return payload instanceof byte[] ? this.objectMapper.readValue((byte[]) payload, targetClass)
: this.objectMapper.readValue((String) payload, targetClass);
} catch (IOException var6) {
log.info("Unable to read json payload as object", var6);
return null;
}
}
@Nullable
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
try {
return this.objectMapper.writeValueAsString(payload).getBytes("UTF-8");
} catch (IOException var5) {
log.info("Unable to write payload as json", var5);
return null;
}
}
}
and also we are using StreamBridge
to produce the events:
@Service
public class MessageBroker {
private final StreamBridge source;
public MessageBroker(StreamBridge source) {
this.source = source;
}
public void send(Event event) {
source.send("demo-channel-out-0", event);
}
}
When the source.send
method is called, internally it calls to StreamMessageConverter.convertFromInternal
when we think it should not do it. In the previous model (@EnableBinding, @StreamListener
), only StreamMessageConverter.convertToInternal
was called in order to serialize and send the message to the Kafka topic.
We created this demo project where you can reproduce the error.