0

(Note this issue might be connected to this question, but it has a much smaller scope.)

I have the simplest of jobs defined like this:


@Configuration
@EnableBatchProcessing
public class FileTransformerConfiguration {

  private JobBuilderFactory jobBuilderFactory;
  private StepBuilderFactory stepBuilderFactory;

  @Autowired
  public FileTransformerConfiguration(JobBuilderFactory jobBuilderFactory,
      StepBuilderFactory stepBuilderFactory) {
    this.jobBuilderFactory = jobBuilderFactory;
    this.stepBuilderFactory = stepBuilderFactory;
  }

  @Bean
  public Job transformJob() {
    return this.jobBuilderFactory.get("transformJob").incrementer(new RunIdIncrementer())
        .flow(transformStep()).end().build();
  }

  @Bean
  public Step transformStep() {
    return this.stepBuilderFactory.get("transformStep")
        .<String, String>chunk(1).reader(new ItemReader())
        .processor(processor())
        .writer(new ItemWriter()).build();
  }

  @Bean
  public ItemProcessor<String, String> processor() {
    return item -> {
      System.out.println("Converting item (" + item + ")...");
      return item;
    };
  }
}



public class ItemReader implements ItemStreamReader<String> {

  private Iterator<String> it;

  @Override
  public void open(ExecutionContext executionContext) throws ItemStreamException {
    this.it = Arrays.asList("A", "B", "C", "D", "E").iterator();
  }

  @Override
  public String read() throws Exception {
    return this.it.hasNext() ? this.it.next() : null;
  }

  @Override
  public void close() throws ItemStreamException  { }

  @Override
  public void update(ExecutionContext executionContext) throws ItemStreamException {}
}



@JobScope
public class ItemWriter implements ItemStreamWriter<String> {

  @Override
  public void open(ExecutionContext executionContext) throws ItemStreamException { }

  @Override
  public void write(List<? extends String> items) throws Exception {
    items.forEach(item -> System.out.println("Writing item: " + item));
  }

  @Override
  public void update(ExecutionContext executionContext) throws ItemStreamException { }

  @Override
  public void close() throws ItemStreamException { }

}

There is no fancy logic, just strings being moved through the pipeline.

The code is called like this:

@SpringBootApplication
public class TestCmpsApplication {

}

@SpringBootTest(classes = {TestCmpsApplication.class})
public class FileTransformerImplIT {

  @Autowired
  private JobLauncher jobLauncher;
  @Autowired
  private Job transformJob;

  @Test
  void test1() throws Exception {
    String id = UUID.randomUUID().toString();
    JobParametersBuilder jobParameters = new JobParametersBuilder();
    jobParameters.addLong("PARAM_START_TIME", System.currentTimeMillis());
    jobParameters.addString("PARAM_MAPPING_RULE_DEFINITION_ID", id, true);
    this.jobLauncher.run(this.transformJob, jobParameters.toJobParameters());
  }

  @Test
  void test2() throws Exception {
    String id = UUID.randomUUID().toString();
    JobParametersBuilder jobParameters = new JobParametersBuilder();
    jobParameters.addLong("PARAM_START_TIME", System.currentTimeMillis());
    jobParameters.addString("PARAM_MAPPING_RULE_DEFINITION_ID", id, true);
    this.jobLauncher.run(this.transformJob, jobParameters.toJobParameters());
  }

}

(Note there need to be two tests, even though they are identical. The first one will always work.)

So this works fine. However, once I add this:

  @Bean
  public Step transformStep() {
    return this.stepBuilderFactory.get("transformStep")
        .<String, String>chunk(1).reader(new ItemReader())
        .processor(processor())
        .writer(new ItemWriter())
        .transactionAttribute(transactionAttribute()).build();
  }

  private TransactionAttribute transactionAttribute() {
    DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
    attribute.setPropagationBehavior(Propagation.NEVER.value());
    return attribute;
  }

Now the second test fails. The test itself says

TransactionSuspensionNotSupportedException: Transaction manager [org.springframework.batch.support.transaction.ResourcelessTransactionManager] does not support transaction suspension

While the log helpfully provides this error:

IllegalTransactionStateException: Existing transaction found for transaction marked with propagation 'never'

Okay. I directly told the job to never use a transaction, but somehow, somebody creates one anyway. So let's try MANDATORY. Now the test has the same error as above, the log now says:

IllegalTransactionStateException: No existing transaction found for transaction marked with propagation 'mandatory'

Somehow, somebody creates a transaction, but not for all two jobs? Surely SUPPORTS will work then. No, then the test will fail with the same exception, and the log will have this:

OptimisticLockingFailureException: Attempt to update step execution id=1 with wrong version (2), where current version is 3

I have no idea what is happening. Clearly someone creates transactions outside the step, but I have no idea how to stop them. Because I'd rather have no transactions. Or at least a working transaction management were transactions will work the same when called twice in a row.

I tried Spring Batch 4.2, 4.2.5, 4.3 and 4.3.1.

What did I do wrong? How can I make this work?

Stefan S.
  • 3,950
  • 5
  • 25
  • 77
  • why are you overriding methods with empty implementations? – silentsudo Mar 18 '21 at 12:31
  • @silentsudo Aren't these interfaces? – Stefan S. Mar 18 '21 at 12:53
  • `Note there need to be two tests, even though they are identical`: Why is that? Any reason you use `@JobScope` on the writer? A `TaskletStep` requires a transaction manager. By default, a `ResourcelessTransactionManager` is used, which does not support transactions anyway (as you see from the log `Transaction manager [org.springframework.batch.support.transaction.ResourcelessTransactionManager] does not support transaction suspension`. So why are you trying to supply a `TransactionAttributes` with Propagation.NEVER? – Mahmoud Ben Hassine Mar 19 '21 at 11:10

1 Answers1

0

The problem is with the default job repository. It seems its transaction handling is buggy. To fix this, replace this with the JDBC job repository with an in-memory database. Just add this class to the Spring context:

@Configuration
@EnableBatchProcessing
public class InMemoryBatchContextConfigurer extends DefaultBatchConfigurer {

  @Override
  protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDatabaseType(DatabaseType.H2.getProductName());
    factory.setDataSource(dataSource());
    factory.setTransactionManager(getTransactionManager());
    return factory.getObject();
  }

  public DataSource dataSource() {
    EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
    return embeddedDatabaseBuilder
        .addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
        .addScript("classpath:org/springframework/batch/core/schema-h2.sql")
        .setType(EmbeddedDatabaseType.H2).build();
  }
}
Stefan S.
  • 3,950
  • 5
  • 25
  • 77
  • When I tried this solution, I am getting BadSqlGrammarException error : org.springframework.jdbc.BadSqlGrammarException: PreparedStatementCallback; bad SQL grammar [SELECT JOB_INSTANCE_ID, ....... Do we required to add "spring:batch:jdbc:initialize-schema: always" in application.yml with above solution? – Sam Jun 02 '22 at 02:35