i am trying to use the comination of flink and springboot and im having some problems. Lets say i am having this flow.
- Getting json string that have one field date that contains date string.
- using map function and ObjectMapper to parse it into object of LocalDateTime
This is simple usecase that will describe my probem.
So, i have Word Class represnting Word that contains LocalDateTime field.
@Data
public class Word {
@JsonDeserialize(using = LocalDateTimeSerde.class)
LocalDateTime date;
}
The LocalDateTimeDeserlization is looking like that(I want to autowire the app configuration):
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@JsonComponent
public class LocalDateTimeSerde extends JsonDeserializer<LocalDateTime> {
private final AppConf conf;
@Override
public LocalDateTime deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern(this.conf.getDateFormatter());
return LocalDateTime.parse(jsonParser.getText(), formatter);
}
}
AppConf.java represneting the configuration of the application is:
@Data
@Configuration
@ConfigurationProperties(value = "app")
public class AppConf {
private String dateFormatter;
}
DemoApplication.java:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
String example = "{\"date\":\"2019-01-29 00:00\"}";
var stream = env
.fromElements(example)
.map(x->new ObjectMapper().readValue(x,Word.class))
.returns(Word.class);
stream.print();
env.execute("Demo App");
The exception im getting is :
Caused by: java.lang.IllegalArgumentException: Class com.example.demo.LocalDateTimeSerde has no default (no arg) constructor
The main problem here is that the code of the deserialization is running on the TaskManager and over there springboot doesnt take a part, so it doesn`t inject AppConf into the class.
Adding @NoArgsConstructor will not solve the problem
I think i know why it is hapenning (because flink master serialize the classes to the workers and then springboot doesn`t "ScanComponents" and takes control.
Is there any solution for that? I really want to combine spring with flink also in the worker`s function.
Thanks.