I am developing an application which can validate json payload via apicurio registry. I am running apicurio in memory. I have created a artifact in apicurio registry with topicname-value. My version of apicurio is 2.3.1.final
But when I produce message I get the below error
javax.ws.rs.WebApplicationException: Not Found
at io.apicurio.registry.client.request.RequestHandler$ResultCallback.onResponse(RequestHandler.java:38)
at retrofit2.OkHttpCall$1.onResponse(OkHttpCall.java:161)
at okhttp3.internal.connection.RealCall$AsyncCall.run(RealCall.kt:519)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
My sample java class is below
/*
* Copyright 2020 Red Hat
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.tillias.spbka.json;
import io.apicurio.registry.utils.serde.AbstractKafkaSerDe;
import io.apicurio.registry.utils.serde.AbstractKafkaSerializer;
import io.apicurio.registry.utils.serde.JsonSchemaKafkaSerializer;
import io.apicurio.registry.utils.serde.JsonSchemaSerDeConstants;
import io.apicurio.registry.utils.serde.strategy.FindLatestIdStrategy;
import io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.Properties;
public class SimpleJsonSchemaProducerApp {
private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleJsonSchemaProducerApp.class);
public static void main(String [] args) throws Exception {
// Config properties!
Properties props = PropertiesUtil.properties(args);
// Configure kafka.
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.putIfAbsent(ProducerConfig.CLIENT_ID_CONFIG, "Producer-" + SimpleJsonSchemaAppConstants.TOPIC_NAME);
props.putIfAbsent(ProducerConfig.ACKS_CONFIG, "all");
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class.getName());
// Configure Service Registry location and ID strategies
props.putIfAbsent(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, "http://localhost:8082/apis/registry/v1");
props.putIfAbsent(AbstractKafkaSerializer.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, SimpleTopicIdStrategy.class.getName());
props.putIfAbsent(AbstractKafkaSerializer.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, FindLatestIdStrategy.class.getName());
props.putIfAbsent(JsonSchemaSerDeConstants.REGISTRY_JSON_SCHEMA_VALIDATION_ENABLED, Boolean.TRUE);
// Create the Kafka producer
Producer<Object, Message> producer = new KafkaProducer<>(props);
String topicName = SimpleJsonSchemaAppConstants.TOPIC_NAME;
String subjectName = SimpleJsonSchemaAppConstants.SUBJECT_NAME;
// Now start producing messages!
int producedMessages = 0;
try {
while (Boolean.TRUE) {
Date now = new Date();
// Create the message to send
Message message = new Message();
message.setMessage("Hello (" + producedMessages++ + ")!");
message.setTime(now.getTime());
// Send/produce the message on the Kafka Producer
LOGGER.info("=====> Sending message {} to topic {}", message, topicName);
ProducerRecord<Object, Message> producedRecord = new ProducerRecord<>(topicName, subjectName, message);
producer.send(producedRecord);
Thread.sleep(3000);
}
} catch (Exception e) {
LOGGER.error("Failed to PRODUCE message!", e);
} finally {
producer.flush();
producer.close();
System.exit(1);
}
}
}
my pom.xml file is
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.4</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>17</java.version>
<avro.version>1.10.0</avro.version>
<apicurio.version>1.3.2.Final</apicurio.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-serde</artifactId>
<version>${apicurio.version}</version>
</dependency>
<dependency>
<groupId>org.jboss.resteasy</groupId>
<artifactId>resteasy-client</artifactId>
<version>4.5.6.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>