3

Is it possible to use both these starters in a single application?

I want to load records from a CSV file into a database table. The Spring Batch tables are stored in a different database, so I assume I need to use JTA to handle the transaction.

Whenever I add @EnableBatchProcessing to my @Configuration class it configures a PlatformTransactionManager, which stops this being auto-configured by Atomikos.

Are there any spring boot + batch + jta samples out there that show how to do this?

Many Thanks, James

2 Answers2

1

I just went through this and I found something that seems to work. As you note, @EnableBatchProcessing causes a DataSourceTransactionManager to be created, which messes up everything. I'm using modular=true in @EnableBatchProcessing, so the ModularBatchConfiguration class is activated.

What I did was to stop using @EnableBatchProcessing and instead copy the entire ModularBatchConfiguration class into my project. Then I commented out the transactionManager() method, since the Atomikos configuration creates the JtaTransactionManager. I also had to override the jobRepository() method, because that was hardcoded to use the DataSourceTransactionManager created inside DefaultBatchConfiguration.

I also had to explicitly import the JtaAutoConfiguration class. This wires everything up correctly (according to the Actuator's "beans" endpoint - thank god for that). But when you run it the transaction manager throws an exception because something somewhere sets an explicit transaction isolation level. So I also wrote a BeanPostProcessor to find the transaction manager and call txnMgr.setAllowCustomIsolationLevels(true);

Now everything works, but while the job is running, I cannot fetch the current data from batch_step_execution table using JdbcTemplate, even though I can see the data in SQLYog. This must have something to do with transaction isolation, but I haven't been able to understand it yet.

Here is what I have for my configuration class, copied from Spring and modified as noted above. PS, I have my DataSource that points to the database with the batch tables annotated as @Primary. Also, I changed my DataSource beans to be instances of org.apache.tomcat.jdbc.pool.XADataSource; I'm not sure if that's necessary.

@Configuration
@Import(ScopeConfiguration.class)
public class ModularJtaBatchConfiguration implements ImportAware 
{
    @Autowired(required = false)
    private Collection<DataSource> dataSources;

    private BatchConfigurer configurer;

    @Autowired
    private ApplicationContext context;

    @Autowired(required = false)
    private Collection<BatchConfigurer> configurers;

    private AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();

    @Bean
    public JobRepository jobRepository(DataSource batchDataSource, JtaTransactionManager jtaTransactionManager) throws Exception 
    {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(batchDataSource);
        factory.setTransactionManager(jtaTransactionManager);
        factory.afterPropertiesSet();
        return  factory.getObject();
    }

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        return getConfigurer(configurers).getJobLauncher();
    }

//  @Bean
//  public PlatformTransactionManager transactionManager() throws Exception {
//      return getConfigurer(configurers).getTransactionManager();
//  }

    @Bean
    public JobExplorer jobExplorer() throws Exception {
        return getConfigurer(configurers).getJobExplorer();
    }

    @Bean
    public AutomaticJobRegistrar jobRegistrar() throws Exception {
        registrar.setJobLoader(new DefaultJobLoader(jobRegistry()));
        for (ApplicationContextFactory factory : context.getBeansOfType(ApplicationContextFactory.class).values()) {
            registrar.addApplicationContextFactory(factory);
        }
        return registrar;
    }

    @Bean
    public JobBuilderFactory jobBuilders(JobRepository jobRepository) throws Exception {
        return new JobBuilderFactory(jobRepository);
    }

    @Bean
    // hopefully this will autowire the Atomikos JTA txn manager
    public StepBuilderFactory stepBuilders(JobRepository jobRepository, JtaTransactionManager ptm) throws Exception {
        return new StepBuilderFactory(jobRepository, ptm);
    }

    @Bean
    public JobRegistry jobRegistry() throws Exception {
        return new MapJobRegistry();
    }

    @Override
    public void setImportMetadata(AnnotationMetadata importMetadata) {
        AnnotationAttributes enabled = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(
                EnableBatchProcessing.class.getName(), false));
        Assert.notNull(enabled,
                "@EnableBatchProcessing is not present on importing class " + importMetadata.getClassName());
    }

    protected BatchConfigurer getConfigurer(Collection<BatchConfigurer> configurers) throws Exception {
        if (this.configurer != null) {
            return this.configurer;
        }
        if (configurers == null || configurers.isEmpty()) {
            if (dataSources == null || dataSources.isEmpty()) {
                throw new UnsupportedOperationException("You are screwed");
            } else if(dataSources != null && dataSources.size() == 1) {
                DataSource dataSource = dataSources.iterator().next();
                DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(dataSource);
                configurer.initialize();
                this.configurer = configurer;
                return configurer;
            } else {
                throw new IllegalStateException("To use the default BatchConfigurer the context must contain no more than" +
                                                        "one DataSource, found " + dataSources.size());
            }
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException(
                    "To use a custom BatchConfigurer the context must contain precisely one, found "
                            + configurers.size());
        }
        this.configurer = configurers.iterator().next();
        return this.configurer;
    }

}

@Configuration
class ScopeConfiguration {

    private StepScope stepScope = new StepScope();

    private JobScope jobScope = new JobScope();

    @Bean
    public StepScope stepScope() {
        stepScope.setAutoProxy(false);
        return stepScope;
    }

    @Bean
    public JobScope jobScope() {
        jobScope.setAutoProxy(false);
        return jobScope;
    }

}
Ken DeLong
  • 341
  • 2
  • 5
  • 16
  • In the end, even this didn't work out for me. I could not query the db without making the Atomikos JTA Txn Mgr go insane and lock up and kill all my jobs. Then I realized my second data source was read only for a single job, so I reverted all the config to standard non-JTA config, took out Atomikos completely, and created the second read-only datasource as a Tomcat DataSource pool bean with autoCommit=true and created it only when that particular job was launched. – Ken DeLong Jan 09 '16 at 03:15
1

I found a solution where I was able to keep @EnableBatchProcessing but had to implement BatchConfigurer and atomikos beans, see my full answer in this so answer.

Community
  • 1
  • 1
rhorvath
  • 3,525
  • 2
  • 23
  • 30