0

I have a Spring Boot application which sends some message to Kafka. We use Apicurio registry provided by our customer to manage AVRO schema for message serialization. The application uses Apicurio registry client, namely the Maven artifact io.apicurio:apicurio-registry-utils-converter:1.3.2.Final and its dependency apicurio-registry-utils-serde. The client communicates with registry using REST API.

Recently the Apicurio was migrated to higher version and there are some issues with REST communication. (They are beyond scope of this question - in a nutshell, we play with changing global id strategy to avoid some unwanted REST calls, more info in comment under this question.)

The Apicurio migration seems somewhat turbulent and I observe now that any REST call can fail with various reason (missing configuration, missing artifacts, availability...). Actually the only thing in log is javax.ws.rs.WebApplicationException without further cause description. To understand what's happening there, I need to enable logging of all REST API calls from Apicurio client.

The Apicurio client internally uses OkHttpClient. I am aware the OkHttpClient is configured using builder pattern and the standard way is to add HttpLoggingInterceptor. However, the process of creation of OkHttpClient seems to be too strictly closed in internals of Apicurio client (private static OkHttpClient createHttpClientWithConfig in RegistryRestClientImpl). Hence the Apicurio client itself does not let user to set up custom configuration of underlying OkHttpClient.

Is there some way to do it in a simple way or am I missing something?

(Note some of these application.properties settings which were recommended to us are helpful too but they does not relate to core topic of question:

logging.level.okhttp3=TRACE
logging.level.org.apache.http=TRACE
logging.level.io.apicurio=TRACE
logging.level.org.springframework.kafka=TRACE
logging.level.org.apache.kafka=TRACE

)

Tomáš Záluský
  • 10,735
  • 2
  • 36
  • 64

1 Answers1

1

After browsing Apicurio client source, the only way I know is following. Caution: it is very dirty but works.

  1. get access to OkHttpClient
    • start with autowired Spring bean DefaultKafkaProducerFactory
    • create auxiliary KafkaProducer which instantiates AvroKafkaSerializer as valueSerializer
    • go through reference chain AvroKafkaSerializer.client -> CompatibleClient.delegate -> RegistryRestClientImpl.httpClient finally to OkHttpClient
  2. tweak OkHttpClient
    • The builder is gone and OkHttpClient's interceptor list is final and immutable. Fortunately the list is just Collections.unmodifiableList hence we can invade into underlying list and modify it.

Whole solution:

# application.properties
spring.kafka.producer.value-serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
spring.kafka.producer.properties.apicurio.registry.url=${apicurio.registry.url}
...
@Component
public class Service {

    private final DefaultKafkaProducerFactory defaultKafkaProducerFactory;

    Service(DefaultKafkaProducerFactory defaultKafkaProducerFactory) {
        this.defaultKafkaProducerFactory = defaultKafkaProducerFactory;

        Producer producer = defaultKafkaProducerFactory.createProducer();
        KafkaProducer delegate = field(producer,"delegate");
        AvroKafkaSerializer valueSerializer = field(delegate, "valueSerializer");
        CompatibleClient client = field(valueSerializer, "client");
        RegistryRestClientImpl rest = field(client, "delegate");
        OkHttpClient okHttpClient = field(rest, "httpClient");
        List unmodifiableInterceptorList = field(okHttpClient, "interceptors");
        List modifiable = field(unmodifiableInterceptorList, "list");
        HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor();
        loggingInterceptor.setLevel(Level.BASIC);
        modifiable.add(loggingInterceptor);
    }

    static <T> T field(Object instance, String fieldName) {
        Field field = ReflectionUtils.findField(instance.getClass(), fieldName);
        ReflectionUtils.makeAccessible(field);
        return (T)ReflectionUtils.getField(field, instance);
    }

}

Yes it's awful. I will be happy to reaccept to better answer.

Tomáš Záluský
  • 10,735
  • 2
  • 36
  • 64
  • You should only need reflection for the private http client, not your producer factory. Also, with the Confluent Avro serializer, I know it has a public constructor to accept a RegistryClient, so Apicurio does not? In any case, the "better solution" would be to fork or create a feature request PR – OneCricketeer Aug 23 '23 at 13:40
  • Yes, formally there exists `AvroKafkaSerializer(RegistryService client)` constructor but it is never called. The library actually calls no-arg `AvroKafkaSerializer()` constructor and some super constructor *instantiates* `RegistryService` based on configuration properties. The field is protected so - again - not accessible without hacks. Alternatively I could define `MyAvroKafkaSerializer extends AvroKafkaSerializer` and in constructor assign the client instance to some global place but it would spare only single reflection call. Anyway thanks for your comment. – Tomáš Záluský Aug 23 '23 at 14:08