0

I have a Flink Job reading events from a Kafka queue then calling another service if certain conditions are met.
I wanted to use Retrofit2 to call the REST endpoint of that service but I get a is not Serializable Exception. I have several Flat Maps connected to each other (in series) then calling the service happens in the last FlatMap. The exception I get:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichFlatMapFunction is not serializable. The object probably contains or references non serializable fields.
...
Caused by: java.io.NotSerializableException: retrofit2.Retrofit$1
...

The way I am initializing retrofit:

RetrofitClient.getClient(BASE_URL).create(NotificationService.class);

And the NotificationService interface

public interface NotificationService {

    @PUT("/test")
    Call<String> putNotification(@Body Notification notification);
}

The RetrofitClient class

public class RetrofitClient {

    private static Retrofit retrofit = null;

    public static Retrofit getClient(String baseUrl) {
        if (retrofit == null) {
            retrofit = new Retrofit.Builder().baseUrl(baseUrl).addConverterFactory(GsonConverterFactory.create())
                    .build();
        }
        return retrofit;
    }
razvan
  • 559
  • 7
  • 23

1 Answers1

1

Put your Notification class code for more details, but looks like this answer helps java.io.NotSerializableException with "$1" after class

Community
  • 1
  • 1
Sergey Z.
  • 11
  • 2
  • But I did, it's an interface. – razvan May 12 '17 at 09:53
  • You added NotificationService interface but looks like problem in this line Call putNotification(@Body Notification notification); you add in body of request Notification class, looks like problem in it. So could you add Notification class source – Sergey Z. May 12 '17 at 09:59
  • public class Notification implements Serializable { private static final long serialVersionUID = 1L; private String id; private String text; getters & setters here } – razvan May 12 '17 at 10:24
  • In fact I replaced the Notification with String and adapted the code now, still getting the same Exception. – razvan May 12 '17 at 10:33
  • Oh clear. You try to serialise Retrofit class. But first item pf retrofit class: public final class Retrofit { private final Map> serviceMethodCache = new ConcurrentHashMap<>(); java.lang.reflect.Method can't be serialiized. – Sergey Z. May 12 '17 at 11:22