I just went through this and I found something that seems to work. As you note, @EnableBatchProcessing
causes a DataSourceTransactionManager
to be created, which messes up everything. I'm using modular=true in @EnableBatchProcessing
, so the ModularBatchConfiguration
class is activated.
What I did was to stop using @EnableBatchProcessing
and instead copy the entire ModularBatchConfiguration
class into my project. Then I commented out the transactionManager()
method, since the Atomikos configuration creates the JtaTransactionManager
. I also had to override the jobRepository()
method, because that was hardcoded to use the DataSourceTransactionManager
created inside DefaultBatchConfiguration
.
I also had to explicitly import the JtaAutoConfiguration
class. This wires everything up correctly (according to the Actuator's "beans" endpoint - thank god for that). But when you run it the transaction manager throws an exception because something somewhere sets an explicit transaction isolation level. So I also wrote a BeanPostProcesso
r to find the transaction manager and call txnMgr.setAllowCustomIsolationLevels(true)
;
Now everything works, but while the job is running, I cannot fetch the current data from batch_step_execution table using JdbcTemplate
, even though I can see the data in SQLYog. This must have something to do with transaction isolation, but I haven't been able to understand it yet.
Here is what I have for my configuration class, copied from Spring and modified as noted above. PS, I have my DataSource
that points to the database with the batch tables annotated as @Primary
. Also, I changed my DataSource
beans to be instances of org.apache.tomcat.jdbc.pool.XADataSource
; I'm not sure if that's necessary.
@Configuration
@Import(ScopeConfiguration.class)
public class ModularJtaBatchConfiguration implements ImportAware
{
@Autowired(required = false)
private Collection<DataSource> dataSources;
private BatchConfigurer configurer;
@Autowired
private ApplicationContext context;
@Autowired(required = false)
private Collection<BatchConfigurer> configurers;
private AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
@Bean
public JobRepository jobRepository(DataSource batchDataSource, JtaTransactionManager jtaTransactionManager) throws Exception
{
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(batchDataSource);
factory.setTransactionManager(jtaTransactionManager);
factory.afterPropertiesSet();
return factory.getObject();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
return getConfigurer(configurers).getJobLauncher();
}
// @Bean
// public PlatformTransactionManager transactionManager() throws Exception {
// return getConfigurer(configurers).getTransactionManager();
// }
@Bean
public JobExplorer jobExplorer() throws Exception {
return getConfigurer(configurers).getJobExplorer();
}
@Bean
public AutomaticJobRegistrar jobRegistrar() throws Exception {
registrar.setJobLoader(new DefaultJobLoader(jobRegistry()));
for (ApplicationContextFactory factory : context.getBeansOfType(ApplicationContextFactory.class).values()) {
registrar.addApplicationContextFactory(factory);
}
return registrar;
}
@Bean
public JobBuilderFactory jobBuilders(JobRepository jobRepository) throws Exception {
return new JobBuilderFactory(jobRepository);
}
@Bean
// hopefully this will autowire the Atomikos JTA txn manager
public StepBuilderFactory stepBuilders(JobRepository jobRepository, JtaTransactionManager ptm) throws Exception {
return new StepBuilderFactory(jobRepository, ptm);
}
@Bean
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
@Override
public void setImportMetadata(AnnotationMetadata importMetadata) {
AnnotationAttributes enabled = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(
EnableBatchProcessing.class.getName(), false));
Assert.notNull(enabled,
"@EnableBatchProcessing is not present on importing class " + importMetadata.getClassName());
}
protected BatchConfigurer getConfigurer(Collection<BatchConfigurer> configurers) throws Exception {
if (this.configurer != null) {
return this.configurer;
}
if (configurers == null || configurers.isEmpty()) {
if (dataSources == null || dataSources.isEmpty()) {
throw new UnsupportedOperationException("You are screwed");
} else if(dataSources != null && dataSources.size() == 1) {
DataSource dataSource = dataSources.iterator().next();
DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(dataSource);
configurer.initialize();
this.configurer = configurer;
return configurer;
} else {
throw new IllegalStateException("To use the default BatchConfigurer the context must contain no more than" +
"one DataSource, found " + dataSources.size());
}
}
if (configurers.size() > 1) {
throw new IllegalStateException(
"To use a custom BatchConfigurer the context must contain precisely one, found "
+ configurers.size());
}
this.configurer = configurers.iterator().next();
return this.configurer;
}
}
@Configuration
class ScopeConfiguration {
private StepScope stepScope = new StepScope();
private JobScope jobScope = new JobScope();
@Bean
public StepScope stepScope() {
stepScope.setAutoProxy(false);
return stepScope;
}
@Bean
public JobScope jobScope() {
jobScope.setAutoProxy(false);
return jobScope;
}
}