0

I have a working spring project (I am new to spring family) that consume Json from Kafka and persist on MongoDB. now it is needed that consumed objects send to ElasticSearch as well. I have seen related question like 1 and 2 and try to separate Elastic and Mongo packages, that result in this state, first question is it true way doing this?: project structure

second question: how to annotate model class when both of elastic and mongo has @Documnet annotation? now I get error that thinking it is related to this question: " No id property found for class"

@Document(indexName = "gold", type = "article")
@Document
public class GoldSnapShot {
    public static final String TOPIC = "goldsnapshot";
    Double ons;
    Double gram18;
    Double irec;
    Double dollar;
    @org.springframework.format.annotation.DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private DateTime timestamp;

At the end I will be grateful for any guidance to best-practices and cleaner coding.

SpringBootApplication:

@EnableMongoRepositories("com.sames.samesgoldconsumer.Repository")
@EnableElasticsearchRepositories(basePackages = "com.sames.samesgoldconsumer.elasticsearch.Repository")
@SpringBootApplication
public class SamesGoldConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SamesGoldConsumerApplication.class, args);
    }

}

kafka Consumer service:

@Component
public class KafkaConsumerService {
    private final GoldSnapShotMongoRepository goldSnapShotMongoRepository;
    private final GoldSnapShotElasticRepository goldSnapShotElasticRepository;

    private final Logger logger
            = LoggerFactory.getLogger(KafkaConsumerService.class);

    public KafkaConsumerService(GoldSnapShotMongoRepository goldSnapShotMongoRepository,
                                GoldSnapShotElasticRepository goldSnapShotElasticRepository) {
        this.goldSnapShotMongoRepository = goldSnapShotMongoRepository;
        this.goldSnapShotElasticRepository = goldSnapShotElasticRepository;
    }
    @KafkaListener(topics = "gold", groupId = AppConstants.GROUP_ID)
    public void processMessage(GoldSnapShot goldSnapShot) {
        logger.info(String.format("GoldSnapShot Received -> %s", goldSnapShot.toString()));
        goldSnapShotMongoRepository.save(goldSnapShot);
        goldSnapShotElasticRepository.save(goldSnapShot);
    }
}

elastic config:

@Configuration
public class ElasticConfig {

    @Bean
    public RestHighLevelClient client() {
        ClientConfiguration clientConfiguration
                = ClientConfiguration.builder()
                .connectedTo("localhost:9200")
                .build();

        return RestClients.create(clientConfiguration).rest();
    }

    @Bean
    public ElasticsearchOperations elasticsearchTemplate() {
        return new ElasticsearchRestTemplate(client());
    }
}

kafka config:

@EnableKafka
@Configuration
public class KafkaConfig {
//    @Value("${spring.kafka.consumer.group-id}")
//    private String brokers;
    private final String brokers = "localhost:9092";
//    @Value("${spring.kafka.bootstrap-servers}")
//    private String groupId;
    private final String groupId = "group-id";
    @Bean
    public ConsumerFactory<String, GoldSnapShot> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, GoldSnapShot.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GoldSnapShot>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GoldSnapShot> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
Bheid
  • 306
  • 3
  • 11

1 Answers1

1

Do you really need to store the whole object in ES? I am just wondering what searches you can do in ES that are not provided by MongoDB? There are no text fields in your data, only numeric values and a date.

I'd strongly recommend to not use the same class for 3 purposes: read from Kafka, store into MongoDB and into Elasticsearch. Define separate classes for the Spring Data modules and write mappers to convert from the data you read from Kafka into the separate data models.

Otherwise you end up with having two different @Document annotations on the class, one would need to be fully qualified.

As for the Elasticsearch part: You are obviously using an outdated version of Spring Data Elasticsearch, type parameter of @Document was removed in version 4.1. The annotation @org.springframework.format.annotation.DateTimeFormat does not work in SDE, you will need to add a @Field annotation with a proper Elasticsearch supported date-time format to that property.

And as the error showed, you need to provide a property that's either annotated with @Id or that is named id that contains the unique identifier for this object (it might be null, then Elasticsearch will create a unique String value for that).

P.J.Meisch
  • 18,013
  • 6
  • 50
  • 66
  • Thanks, the purpose of using ES is Kibana for rapid visualization. I got the idea of model separation, but don't know how to aggrege it with DRY. I will search and test and give feedback soon. – Bheid Jul 10 '21 at 07:39