1

We are migrating our system of events to the functional programming model. We followed the next "guide" and it worked quite well for the consumers but the producers with StreamBridge are not creating the messages properly.

We have the next error:

java.lang.ClassCastException: class com.streamdemo.domain.Event cannot be cast to class java.lang.String (com.streamdemo.domain.Event is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

We are working with 2021.0.0 spring-cloud version.

with a simple configuration:

spring.cloud.stream:
  bindings:
    demo-channel-out-0:
      producer:
        use-native-encoding: true
      content-type: application/json

This is our custom MessageConverter which is used in all our microservices and for this reason, it is important to keep the format of the message:

public class StreamMessageConverter extends AbstractMessageConverter {

  private static final Logger log = LoggerFactory.getLogger(StreamMessageConverter.class);
  private static final MimeType JSON_MIME_TYPE = MimeType.valueOf("application/json");
  private final Class<?> clazz;
  private final ObjectMapper objectMapper;

  public StreamMessageConverter(ObjectMapper objectMapper) {
    this(objectMapper, Object.class);
  }

  public StreamMessageConverter(ObjectMapper objectMapper, Class<?> clazz) {
    super(JSON_MIME_TYPE);
    this.objectMapper = objectMapper;
    this.clazz = clazz;
  }

  protected boolean supports(Class<?> clazz) {
    return clazz.equals(clazz);
  }

  @Nullable
  protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
    Object payload = message.getPayload();

    try {
      return payload instanceof byte[] ? this.objectMapper.readValue((byte[]) payload, targetClass)
          : this.objectMapper.readValue((String) payload, targetClass);
    } catch (IOException var6) {
      log.info("Unable to read json payload as object", var6);
      return null;
    }
  }

  @Nullable
  protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint) {
    try {
      return this.objectMapper.writeValueAsString(payload).getBytes("UTF-8");
    } catch (IOException var5) {
      log.info("Unable to write payload as json", var5);
      return null;
    }
  }
}

and also we are using StreamBridge to produce the events:

@Service
public class MessageBroker {

  private final StreamBridge source;

  public MessageBroker(StreamBridge source) {
    this.source = source;
  }

  public void send(Event event) {
    source.send("demo-channel-out-0", event);
  }
}

When the source.send method is called, internally it calls to StreamMessageConverter.convertFromInternal when we think it should not do it. In the previous model (@EnableBinding, @StreamListener), only StreamMessageConverter.convertToInternal was called in order to serialize and send the message to the Kafka topic.

We created this demo project where you can reproduce the error.

Miguel
  • 536
  • 6
  • 20

1 Answers1

0

So what is happening is correct. The reason why StreamBridge is falling back on MessageConverter is because we define a pass-thru function on-the-fly (Function<Object, Object>) that is attached to the StreamBridge. That is to ensure that send/receive goes through the same exact process as if there was a a real function involved (like with real binding) to ensure consistency.

Your MessageConverter appears to be written with certain expectation in mind which is not correct nor it has ever been correct. You assume that if payload is not byte[] it is String. Even with the older programming model there are situations where payload could have been of type other than those two types. So, what I suggest is to fix the following block of code to a proper IF statement

return payload instanceof byte[] ? this.objectMapper.readValue((byte[]) payload, targetClass)
          : this.objectMapper.readValue((String) payload, targetClass);

. . .to ensure it handles the other scenario.

That said, your converter doesn't do anything special especially with the current programming model So technically to solve your problem all you need is to remove it all together.

Oleg Zhurakousky
  • 5,820
  • 16
  • 17