0

I am developing an application which can validate json payload via apicurio registry. I am running apicurio in memory. I have created a artifact in apicurio registry with topicname-value. My version of apicurio is 2.3.1.final

But when I produce message I get the below error

javax.ws.rs.WebApplicationException: Not Found
    at io.apicurio.registry.client.request.RequestHandler$ResultCallback.onResponse(RequestHandler.java:38)
    at retrofit2.OkHttpCall$1.onResponse(OkHttpCall.java:161)
    at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

My sample java class is below

/*
* Copyright 2020 Red Hat
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.tillias.spbka.json;

import io.apicurio.registry.utils.serde.AbstractKafkaSerDe;
import io.apicurio.registry.utils.serde.AbstractKafkaSerializer;
import io.apicurio.registry.utils.serde.JsonSchemaKafkaSerializer;
import io.apicurio.registry.utils.serde.JsonSchemaSerDeConstants;
import io.apicurio.registry.utils.serde.strategy.FindLatestIdStrategy;
import io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;
import java.util.Properties;


public class SimpleJsonSchemaProducerApp {

   private static final Logger LOGGER = 
 LoggerFactory.getLogger(SimpleJsonSchemaProducerApp.class);

public static void main(String [] args) throws Exception {
    // Config properties!
    Properties props = PropertiesUtil.properties(args);

    // Configure kafka.
    props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + SimpleJsonSchemaAppConstants.TOPIC_NAME);
    props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
    props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class.getName());

    // Configure Service Registry location and ID strategies
    props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, "http://localhost:8082/apis/registry/v1");
    props.putIfAbsent(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, SimpleTopicIdStrategy.class.getName());
    props.putIfAbsent(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, FindLatestIdStrategy.class.getName());
    props.putIfAbsent(JsonSchemaSerDeConstants.REGISTRY_JSON_SCHEMA_VALIDATION_ENABLED, Boolean.TRUE);
    
    // Create the Kafka producer
    Producer<Object, Message> producer = new KafkaProducer<>(props);

    String topicName = SimpleJsonSchemaAppConstants.TOPIC_NAME;
    String subjectName = SimpleJsonSchemaAppConstants.SUBJECT_NAME;
    
    // Now start producing messages!
    int producedMessages = 0;
    try {
        while (Boolean.TRUE) {
            Date now = new Date();
            
            // Create the message to send
            Message message = new Message();
            message.setMessage("Hello (" + producedMessages++ + ")!");
            message.setTime(now.getTime());
            
            // Send/produce the message on the Kafka Producer
            LOGGER.info("=====> Sending message {} to topic {}", message, topicName);
            ProducerRecord<Object, Message> producedRecord = new ProducerRecord<>(topicName, subjectName, message);
            producer.send(producedRecord);
            
            Thread.sleep(3000);
        }
    } catch (Exception e) {
        LOGGER.error("Failed to PRODUCE message!", e);
    } finally {
        producer.flush();
        producer.close();
        System.exit(1);
    }
}
}

my pom.xml file is

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.4</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
    <java.version>17</java.version>
     <avro.version>1.10.0</avro.version>
<apicurio.version>1.3.2.Final</apicurio.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

<dependency>
    <groupId>io.apicurio</groupId>
    <artifactId>apicurio-registry-utils-serde</artifactId>
    <version>${apicurio.version}</version>
</dependency>
<dependency>
    <groupId>org.jboss.resteasy</groupId>
    <artifactId>resteasy-client</artifactId>
    <version>4.5.6.Final</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <configuration>
                <excludes>
                    <exclude>
                        <groupId>org.projectlombok</groupId>
                        <artifactId>lombok</artifactId>
                    </exclude>
                </excludes>
            </configuration>
        </plugin>
         
    </plugins>
</build>
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
user1834664
  • 443
  • 3
  • 12
  • 22
  • I ran into similar problem - exception message was Unauthorized instead of Not Found. My problem was that the spring boot app had wrong value of `spring.kafka.producer.properties.apicurio.registry.global-id` (namely CachedIdStrategy) and strategy implementation tried to invoke POST method on Apicurio registry, which was forbidden. Your problem might be similar - debugging of REST call (or attempt of) might help. – Tomáš Záluský Jun 03 '23 at 23:16

0 Answers0