1

i am trying to use the comination of flink and springboot and im having some problems. Lets say i am having this flow.

  1. Getting json string that have one field date that contains date string.
  2. using map function and ObjectMapper to parse it into object of LocalDateTime
  3. print

This is simple usecase that will describe my probem.

So, i have Word Class represnting Word that contains LocalDateTime field.

@Data
public class Word {
    @JsonDeserialize(using = LocalDateTimeSerde.class)
    LocalDateTime date;
}

The LocalDateTimeDeserlization is looking like that(I want to autowire the app configuration):

@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@JsonComponent
public class LocalDateTimeSerde extends JsonDeserializer<LocalDateTime> {
    private final AppConf conf;

    @Override
    public LocalDateTime deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern(this.conf.getDateFormatter());
        return LocalDateTime.parse(jsonParser.getText(), formatter);
    }
}

AppConf.java represneting the configuration of the application is:

@Data
@Configuration
@ConfigurationProperties(value = "app")
public class AppConf {
    private String dateFormatter;

}

DemoApplication.java:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
String example = "{\"date\":\"2019-01-29 00:00\"}";
var stream = env
        .fromElements(example)
        .map(x->new ObjectMapper().readValue(x,Word.class))
        .returns(Word.class);
stream.print();

env.execute("Demo App");

The exception im getting is :

Caused by: java.lang.IllegalArgumentException: Class com.example.demo.LocalDateTimeSerde has no default (no arg) constructor

The main problem here is that the code of the deserialization is running on the TaskManager and over there springboot doesnt take a part, so it doesn`t inject AppConf into the class.

Adding @NoArgsConstructor will not solve the problem

I think i know why it is hapenning (because flink master serialize the classes to the workers and then springboot doesn`t "ScanComponents" and takes control.

Is there any solution for that? I really want to combine spring with flink also in the worker`s function.

Thanks.

ShemTov
  • 687
  • 3
  • 8

1 Answers1

1

In general, I personally don't think it's a good idea to mix those concepts. The easiest solution is to use AutoWired only on the job manager and use explicit dependency injection when you go into Flink-land.

For example, you could extract the date pattern in the DemoApplication and set it on the ObjectMapper. (Don't forget to initialize ObjectMapper only once in your real code!)

If you really want to use AutoWiring. I guess you need to manually trigger the autowiring on taskmanager. There is a related post specifically for ObjectMapper.

Arvid Heise
  • 3,524
  • 5
  • 11