I had a problem with a Spring Batch job for reading a large CSV file (a few million records) and saving the records from it to a database. The job uses FlatFileItemReader
for reading the CSV and JpaItemWriter
for writing read and processed records to the database. The problem is that JpaItemWriter
doesn't clear the persistence context after flushing another chunk of items to the database and the job ends up with OutOfMemoryError
.
I have solved the problem by extending JpaItemWriter
and overriding the write method so that it calls EntityManager.clear()
after writing a bunch, but I was wondering whether Spring Batch addresses this issue already and the root of the problem is in the job config. How to address this issue the right way?
My solution:
class ClearingJpaItemWriter<T> extends JpaItemWriter<T> {
private EntityManagerFactory entityManagerFactory;
@Override
public void write(List<? extends T> items) {
super.write(items);
EntityManager entityManager = EntityManagerFactoryUtils.getTransactionalEntityManager(entityManagerFactory);
if (entityManager == null) {
throw new DataAccessResourceFailureException("Unable to obtain a transactional EntityManager");
}
entityManager.clear();
}
@Override
public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
super.setEntityManagerFactory(entityManagerFactory);
this.entityManagerFactory = entityManagerFactory;
}
}
You can see the added entityManager.clear();
in the write method.
Job config:
@Bean
public JpaItemWriter postgresWriter() {
JpaItemWriter writer = new ClearingJpaItemWriter();
writer.setEntityManagerFactory(pgEntityManagerFactory);
return writer;
}
@Bean
public Step appontmentInitStep(JpaItemWriter<Appointment> writer, FlatFileItemReader<Appointment> reader) {
return stepBuilderFactory.get("initEclinicAppointments")
.transactionManager(platformTransactionManager)
.<Appointment, Appointment>chunk(5000)
.reader(reader)
.writer(writer)
.faultTolerant()
.skipLimit(1000)
.skip(FlatFileParseException.class)
.build();
}
@Bean
public Job appointmentInitJob(@Qualifier("initEclinicAppointments") Step step) {
return jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.preventRestart()
.start(step)
.build();
}