I have a working spring project (I am new to spring family) that consume Json from Kafka and persist on MongoDB. now it is needed that consumed objects send to ElasticSearch as well. I have seen related question like 1 and 2 and try to separate Elastic and Mongo packages, that result in this state, first question is it true way doing this?:
second question: how to annotate model class when both of elastic and mongo has @Documnet annotation? now I get error that thinking it is related to this question: " No id property found for class"
@Document(indexName = "gold", type = "article")
@Document
public class GoldSnapShot {
public static final String TOPIC = "goldsnapshot";
Double ons;
Double gram18;
Double irec;
Double dollar;
@org.springframework.format.annotation.DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private DateTime timestamp;
At the end I will be grateful for any guidance to best-practices and cleaner coding.
SpringBootApplication:
@EnableMongoRepositories("com.sames.samesgoldconsumer.Repository")
@EnableElasticsearchRepositories(basePackages = "com.sames.samesgoldconsumer.elasticsearch.Repository")
@SpringBootApplication
public class SamesGoldConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SamesGoldConsumerApplication.class, args);
}
}
kafka Consumer service:
@Component
public class KafkaConsumerService {
private final GoldSnapShotMongoRepository goldSnapShotMongoRepository;
private final GoldSnapShotElasticRepository goldSnapShotElasticRepository;
private final Logger logger
= LoggerFactory.getLogger(KafkaConsumerService.class);
public KafkaConsumerService(GoldSnapShotMongoRepository goldSnapShotMongoRepository,
GoldSnapShotElasticRepository goldSnapShotElasticRepository) {
this.goldSnapShotMongoRepository = goldSnapShotMongoRepository;
this.goldSnapShotElasticRepository = goldSnapShotElasticRepository;
}
@KafkaListener(topics = "gold", groupId = AppConstants.GROUP_ID)
public void processMessage(GoldSnapShot goldSnapShot) {
logger.info(String.format("GoldSnapShot Received -> %s", goldSnapShot.toString()));
goldSnapShotMongoRepository.save(goldSnapShot);
goldSnapShotElasticRepository.save(goldSnapShot);
}
}
elastic config:
@Configuration
public class ElasticConfig {
@Bean
public RestHighLevelClient client() {
ClientConfiguration clientConfiguration
= ClientConfiguration.builder()
.connectedTo("localhost:9200")
.build();
return RestClients.create(clientConfiguration).rest();
}
@Bean
public ElasticsearchOperations elasticsearchTemplate() {
return new ElasticsearchRestTemplate(client());
}
}
kafka config:
@EnableKafka
@Configuration
public class KafkaConfig {
// @Value("${spring.kafka.consumer.group-id}")
// private String brokers;
private final String brokers = "localhost:9092";
// @Value("${spring.kafka.bootstrap-servers}")
// private String groupId;
private final String groupId = "group-id";
@Bean
public ConsumerFactory<String, GoldSnapShot> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, GoldSnapShot.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GoldSnapShot>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, GoldSnapShot> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}