I am using spring boot starter 2.2.0.RELEASE for my application. I have one kafka consumer. Now I want to inject my spring entities into my kafka consumer (kafka consumer is not managed by spring container).
I tried ApplicationContextAware but it didn't helped me. I am getting applicationContext as null in my kafka consumer and hence I am not able to get bean from spring container. First time applicationContext is getting set properly but when context loads second time it set to null. Below are the details of my application in short
@SpringBootApplication
@ComponentScan({"com.xyz.config_assign.service"})
public class ConfigAssignServer {
private static Logger log = LoggerFactory.getLogger(ConfigAssignServer.class);
public static void main(String[] args) {
ConfigurableApplicationContext applicationContext = SpringApplication.run(ConfigAssignServer.class, args);
log.info("Started ConfigAssign Server!!! AppName= {} ", applicationContext.getApplicationName());
QkafkaClient.loadConfiguration();
}
}
All my application classes are present in com.xyz.config_assign.service so no issue of bean scanning problem. It worked well before I add Kafka consumer
My ApplicationContextProvider which is using famous ApplicationContextAware
@Component
public class ApplicationContextProvider implements ApplicationContextAware{
public static ApplicationContext applicationContext;
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
ApplicationContextProvider.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
}
And now my kafkaconsumer
public class ConfigAssignmentAssetTagChangeKafkaTopicProcessor implements BatchTopicProcessor<String, String> {
private Logger log = LoggerFactory.getLogger(ConfigAssignmentAssetTagChangeKafkaTopicProcessor.class);
private AssignConfigServiceImpl assignConfigServiceImpl;
public ConfigAssignmentAssetTagChangeKafkaTopicProcessor(){
ApplicationContext applicationContext = ApplicationContextProvider.getApplicationContext();
assignConfigServiceImpl = applicationContext.getBean(AssignConfigServiceImpl.class);
}
@Override
public void handleError(ConsumerRecords<String, String> arg0, Exception arg1) {
// TODO Auto-generated method stub
}
@Override
public void process(ConsumerRecords<String, String> records, long arg1) {}
}
And the service I want to inject into kafka consumer is
@Service
public class AssignConfigServiceImpl implements AssignConfigService {
private Logger log = LoggerFactory.getLogger(AssignConfigServiceImpl.class);
@Autowired
private ConfigProfileDBService dbService;
public boolean assignConfigToAgents(List<UUID> agentList, long customerId) {
List<AgentConfigurationProfile> configProfiles =
dbService.getConfigurationProfilesForCustomer(customerId);
boolean isAssignSuccessful = assignConfigToAgents(agentList, customerId, configProfiles);
log.info("Config Assignment status ={}", isAssignSuccessful);
return isAssignSuccessful;
}
The other service I used is
@Service
public class ConfigProfileDBService implements DBService {
private static Logger log = LoggerFactory.getLogger(ConfigProfileDBService.class);
@Autowired
private JdbcTemplate jdbcTemplate;
public List<AgentConfigurationProfile> getConfigurationProfilesForCustomer(Long customerId) {
List<AgentConfigurationProfile> agentConfigList;
}
}
Can someone please let me know what went wrong, I tried multiple online solution but didn't worked for me. Thanks in advance. Note : I haven't initialized any class using new operator.