My service:
@Service
public interface EntryTerminalService {
Page<EntryTerminal> getEntryTerminalEventsLog(Pageable pageable);
void saveEntryTerminalEventToLog(EntryTerminal entryTerminal);
}
Service implementation:
@Service
public class EntryTerminalServiceImpl implements EntryTerminalService {
private final EntryTerminalRepository entryTerminalRepository;
@Autowired
public EntryTerminalServiceImpl(EntryTerminalRepository entryTerminalRepository) {
this.entryTerminalRepository = entryTerminalRepository;
}
@Override
public Page<EntryTerminal> getEntryTerminalEventsLog(Pageable pageable) {
return entryTerminalRepository.findAllByOrderByIdDesc(pageable);
}
@Override
public void saveEntryTerminalEventToLog(EntryTerminal entryTerminal) {
entryTerminalRepository.save(entryTerminal);
}
}
My repository:
public interface EntryTerminalRepository extends JpaRepository<EntryTerminal, Long> {
Page<EntryTerminal> findAllByOrderByIdDesc(Pageable pageable);
}
Record processor:
Here I use EntryTerminalService so I can write to database. I use field injection but I always get null here!!!
public class RecordProcessor implements ShardRecordProcessor {
private static final Logger log = LoggerFactory.getLogger(RecordProcessor.class);
@Autowired
private EntryTerminalService entryTerminalService;
/**
* Invoked by the KCL before data records are delivered to the ShardRecordProcessor instance (via
* processRecords).
*
* @param initializationInput Provides information related to initialization.
*/
@Override
public void initialize(InitializationInput initializationInput) {
log.info("Initializing @ Sequence: {}", initializationInput.extendedSequenceNumber());
}
/**
* Handles record processing logic. The Amazon Kinesis Client Library will invoke this method to deliver
* data records to the application.
*
* @param processRecordsInput Provides the records to be processed as well as information and capabilities
* related to them (e.g. checkpointing).
*/
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
log.info("Processing {} record(s)", processRecordsInput.records().size());
// Data is read here from the Kinesis data stream
for (KinesisClientRecord record : processRecordsInput.records()) {
log.info("Processing Record For Partition Key : {}", record.partitionKey());
log.info("Processing Record With Sequence Number : {}", record.sequenceNumber());
String originalData;
try {
byte[] b = new byte[record.data().remaining()];
record.data().get(b);
log.info(new String(b, StandardCharsets.UTF_8).split("#")[0]);
originalData = new String(b, StandardCharsets.UTF_8).split("#")[0];
log.info("Data from kinesis stream : {}", originalData);
ObjectMapper objectMapper = new ObjectMapper();
EntryTerminalDTO entryTerminalDTO = objectMapper.readValue(originalData, EntryTerminalDTO.class);
EntryTerminal entryTerminal = new EntryTerminal(entryTerminalDTO.getEntryTerminalID(), entryTerminalDTO.getEntryTerminalHardwareVersion(), entryTerminalDTO.getEntryTerminalSoftwareVersion());
entryTerminalService.saveEntryTerminalEventToLog(entryTerminal); **<------- null**
log.info(entryTerminalDTO.toString());
} catch (Exception e) {
log.error("Caught throwable while processing records. Aborting.");
Runtime.getRuntime().halt(1);
}
try {
/*
* KCL assumes that the call to checkpoint means that all records have been
* processed, records which are passed to the record processor.
*/
processRecordsInput.checkpointer().checkpoint();
} catch (Exception e) {
log.error("Error during Processing of records", e);
}
}
}
/**
* Called when the lease tied to this record processor has been lost. Once the lease has been lost,
* the record processor can no longer checkpoint.
*
* @param leaseLostInput Provides access to functions and data related to the loss of the lease.
*/
@Override
public void leaseLost(LeaseLostInput leaseLostInput) {
log.error("LeaseLostInput {}", leaseLostInput);
}
/**
* Called when all data on this shard has been processed. Checkpointing must occur in the method for record
* processing to be considered complete; an exception will be thrown otherwise.
*
* @param shardEndedInput Provides access to a checkpointer method for completing processing of the shard.
*/
@Override
public void shardEnded(ShardEndedInput shardEndedInput) {
try {
log.info("Reached shard end checkpointing.");
shardEndedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at shard end. Giving up.", e);
}
}
/**
* Invoked when Scheduler has been requested to shut down (i.e. we decide to stop running the app by pressing
* Enter). Checkpoints and logs the data a final time.
*
* @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint
* before the shutdown is completed.
*/
@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
try {
log.info("Scheduler is shutting down, checkpointing.");
shutdownRequestedInput.checkpointer().checkpoint();
} catch (ShutdownException | InvalidStateException e) {
log.error("Exception while checkpointing at requested shutdown. Giving up.", e);
}
}
}
Record processor factory:
@Component
public class RecordProcessorFactory implements ShardRecordProcessorFactory {
@Override
public ShardRecordProcessor shardRecordProcessor() {
return new RecordProcessor();
}
}
Application:
@SpringBootApplication
public class KinesisConsumerApplication implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(KinesisConsumerApplication.class);
@Value(value = "${aws.stream_name}")
private String streamName;
@Value(value = "${consumer.type}")
private String consumerType;
@Autowired
private ApplicationContext context;
public static void main(String[] args) {
SpringApplication.run(KinesisConsumerApplication.class, args);
}
@Override
public void run(String... args) {
log.info("Running consumer application!");
ConsumerConfig consumerConfig = context.getBean(ConsumerConfig.class);
ConfigsBuilder configsBuilder = consumerConfig.getConfigBuilder();
/**
* The Scheduler is the entry point to the KCL. This instance is configured with defaults
* provided by the ConfigsBuilder.
*/
Scheduler scheduler;
switch (consumerType) {
case "2": {
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig()
.cleanupLeasesUponShardCompletion(true)
.maxLeasesForWorker(25)
.maxLeasesToStealAtOneTime(1)
.consistentReads(false),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig()
.callProcessRecordsEvenForEmptyRecordList(false),
configsBuilder.retrievalConfig()
.maxListShardsRetryAttempts(5)
.initialPositionInStreamExtended(InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)));
break;
}
default: {
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig()
.cleanupLeasesUponShardCompletion(true)
.maxLeasesForWorker(25)
.maxLeasesToStealAtOneTime(1)
.consistentReads(false),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig()
.callProcessRecordsEvenForEmptyRecordList(false),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(new PollingConfig(streamName, consumerConfig.getConfigBuilder().kinesisClient()))
.maxListShardsRetryAttempts(5)
.initialPositionInStreamExtended(InitialPositionInStreamExtended
.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)));
break;
}
}
Thread schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
}
}
I get null pointer exception on my EntryTerminalService why?
This service is interface and have implementation where I autowired repository so I can write to database.
Thanks for all your time
UPDATE:
In my controller I use this service and is ok!!!
My controller
@RestController
@RequestMapping(value = "/api")
public class ConsumerController {
private final EntryTerminalService entryTerminalService;
@Autowired
public ConsumerController(EntryTerminalService entryTerminalService) {
this.entryTerminalService = entryTerminalService;
}
@GetMapping(value = "/getEntryTerminalEventsLog")
public ResponseEntity<Page<EntryTerminal>> getEntryTerminalEventsLog(Pageable pageable) {
return ResponseEntity.ok(entryTerminalService.getEntryTerminalEventsLog(pageable));
}
}
Is the problem that service is filed anotated?
How to solve this problem?