I have a Flink Job reading events from a Kafka queue then calling another service if certain conditions are met.
I wanted to use Retrofit2 to call the REST endpoint of that service but I get a is not Serializable Exception. I have several Flat Maps connected to each other (in series) then calling the service happens in the last FlatMap. The exception I get:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
...
Caused by: java.io.NotSerializableException: retrofit2.Retrofit$1
...
The way I am initializing retrofit:
RetrofitClient.getClient(BASE_URL).create(NotificationService.class);
And the NotificationService interface
public interface NotificationService {
@PUT("/test")
Call<String> putNotification(@Body Notification notification);
}
The RetrofitClient class
public class RetrofitClient {
private static Retrofit retrofit = null;
public static Retrofit getClient(String baseUrl) {
if (retrofit == null) {
retrofit = new Retrofit.Builder().baseUrl(baseUrl).addConverterFactory(GsonConverterFactory.create())
.build();
}
return retrofit;
}