I think you can use interfaces to separate logic of sending data to different endpoints. Take a look to code below:
Main class that sends data and receive Response. It doesn't know anything about email, SMS, network senders.
package com.example.demo.service;
import com.example.demo.dto.Response;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class KafkaSender {
private final List<EndpointSender> senders;
public KafkaSender(List<EndpointSender> senders) {
this.senders = senders;
}
public Response send(Object data, String endpoint) {
return senders
.stream()
.filter(it -> it.supports(endpoint))
.findAny()
.map(it -> it.send(data))
.orElseGet(() -> new Response("error"));
}
}
Then we create interface like this:
package com.example.demo.service;
import com.example.demo.dto.Response;
public interface EndpointSender {
Response send(Object obj);
boolean supports(String endpoint);
}
And implementations:
Base class to reduce boilplate code:
package com.example.demo.service.sender;
import com.example.demo.dto.Response;
import com.example.demo.service.EndpointSender;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.requestreply.RequestReplyFuture;
import org.springframework.kafka.support.SendResult;
import java.util.concurrent.TimeUnit;
public abstract class BaseSender implements EndpointSender {
public abstract ProducerRecord<String, Object> getRecord(Object obj);
public abstract ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate();
@Override
public Response send(Object obj) {
try {
RequestReplyFuture<String, Object, Object> replyFuture = kafkaTemplate().sendAndReceive(getRecord(obj));
SendResult<String, Object> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
ConsumerRecord<String, Object> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
return (Response) consumerRecord.value();
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
}
And implementations for senders:
Email sender:
package com.example.demo.service.sender;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class EmailSender extends BaseSender {
private final ReplyingKafkaTemplate<String, Object, Object> processingTransactionEmailReplyKafkaTemplate;
public EmailSender(ReplyingKafkaTemplate<String, Object, Object> processingTransactionEmailReplyKafkaTemplate) {
this.processingTransactionEmailReplyKafkaTemplate = processingTransactionEmailReplyKafkaTemplate;
}
@Override
public boolean supports(String endpoint) {
return "email".equals(endpoint);
}
@Override
public ProducerRecord<String, Object> getRecord(Object obj) {
return new ProducerRecord<>("tp-email.request", obj);
}
@Override
public ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate() {
return processingTransactionEmailReplyKafkaTemplate;
}
}
Sms sender:
package com.example.demo.service.sender;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class SmsSender extends BaseSender{
private final ReplyingKafkaTemplate<String, Object, Object> processingTransactionSmsReplyKafkaTemplate;
public SmsSender(ReplyingKafkaTemplate<String, Object, Object> processingTransactionSmsReplyKafkaTemplate) {
this.processingTransactionSmsReplyKafkaTemplate = processingTransactionSmsReplyKafkaTemplate;
}
@Override
public boolean supports(String endpoint) {
return "sms".equals(endpoint);
}
@Override
public ProducerRecord<String, Object> getRecord(Object obj) {
return new ProducerRecord<>("tp-sms.request", obj);
}
@Override
public ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate() {
return processingTransactionSmsReplyKafkaTemplate;
}
}
Network sender:
package com.example.demo.service.sender;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class NetworkSender extends BaseSender{
private final ReplyingKafkaTemplate<String, Object, Object> processingTransactionNetworkReplyKafkaTemplate;
public NetworkSender(ReplyingKafkaTemplate<String, Object, Object> processingTransactionNetworkReplyKafkaTemplate) {
this.processingTransactionNetworkReplyKafkaTemplate = processingTransactionNetworkReplyKafkaTemplate;
}
@Override
public boolean supports(String endpoint) {
return "network".equals(endpoint);
}
@Override
public ProducerRecord<String, Object> getRecord(Object obj) {
return new ProducerRecord<>("tp-network.request", obj);
}
@Override
public ReplyingKafkaTemplate<String, Object, Object> kafkaTemplate() {
return processingTransactionNetworkReplyKafkaTemplate;
}
}