I've a job with one Step. The step has the usual setup.
- reader: read from a Stock DB-table "instrument"
- processor: retrieving the latest price from an external service
- writer: writing the latest price in the price DB-table "instrumentprice"
There is a ItemWriteListener which does some calculations on the latest price in db-table "instrumentprice". So, it's important that the writer is persisting the latest price immediately in the DB-table "instrumentprice"
In the writer the methods, persist and flush are called on the EntityManager for the Entity "instrumentprice". But Hibernate doesn't write into the table "instrumentprice" immediately. It does write into the table "instrumentprice" at a later stage, but I haven't figured out the mechanism yet.
@Configuration
@ComponentScan(basePackages = {"com.chartinvestbatch.alphaVantageHistory"})
public class JobConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private ChartInvestJobListener chartInvestJobListener;
@Bean
public Job jobAlphaVantage(@Qualifier("stepProcessingPrices") Step stepProcessingPrices) throws IOException {
return jobBuilderFactory
.get("JobAlphaVantage")
.listener(chartInvestJobListener)
.incrementer(new RunIdIncrementer())
.start(stepProcessingPrices)
.build();
}
}
@Configuration
public class StepConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private TaCalcListener taCalcListener;
@Bean
@Qualifier("stepProcessingPrices")
public Step stepProcessingPrices(HibernateCursorItemReader<Instrument> hibernateCursorItemReader, ItemProcessor<Instrument, InstrumentAndPricesDto> itemProcessor, ItemWriter<InstrumentAndPricesDto> itemWriter) throws IOException {
return stepBuilderFactory
.get("stepProcessingPrices")
.<Instrument, InstrumentAndPricesDto>chunk(1)
.listener((ItemWriteListener<InstrumentAndPricesDto>) taCalcListener)
.reader(hibernateCursorItemReader)
.processor(itemProcessor)
.writer(itemWriter)
.build();
}
}
@Scope(value = "step")
@Component
@Transactional
public class StockItemWriter implements ItemWriter<InstrumentAndPricesDto> {
static Logger log = LogManager.getLogger(StockItemWriter.class);
@Autowired
private IntrumentPriceDao intrumentPriceDao;
@Override
public void write(List<? extends InstrumentAndPricesDto> instrumentAndPricesDtoList) throws Exception {
for (InstrumentAndPricesDto dto : instrumentAndPricesDtoList) {
// check some stuff etc ....
InstrumentPrice instrumentPrice = new InstrumentPrice();
instrumentPrice.setDate(dto.getDate());
...
intrumentPriceDao.persist(instrumentPrice);
intrumentPriceDao.flush();
}
}
}
@Transactional(propagation = Propagation.MANDATORY)
public abstract class GenericDao<T> {
@PersistenceContext
protected EntityManager entityManager;
public EntityManager getEntityManager() {
return entityManager;
}
public T persist(final T t) {
entityManager.persist(t);
return t;
}
public void flush() {
entityManager.flush();
}
}
@Repository
@Transactional(propagation = Propagation.MANDATORY)
public class IntrumentPriceDao extends GenericDao<InstrumentPrice> {
}