1

I'm using a ClassifierCompositeItemWriter to insert different kinds of registers stored inside one fixed length flat file, and writing it into a postgres database with multiple JdbcBatchItemWriters each one in a different table, all this inside a step then insiste a spring batch job, it works fine, but when activating transactions they arent rolling back in case of an exception.

For example I have a 32 lines flat file, 1 line is a header record and then I insert it in header table, then there are 30 regular records and 1 footer record (in order), then in the record 29 of the regular one it fails with an database conversion exception (error created for testing) and then it finished with failed status for the job and thats ok, but when I look at the database I have found the 1 record for header, 29 records for regular data (except one with error) and no footer record, but I hope transaction rollback that 1 record for header and the other 29 records, but they are still in database after exception rollback.

I dont know if I'm wrong and transactions in spring batch don't work that way or if it's a mistake in my configuration or what.

Here is the code for ClassifierCompositeItemWriter and one item writer others are similar to this:

public ClassifierCompositeItemWriter<DTOBase> altasOffWriterClassifier(DataSource dataSource) {

    BackToBackPatternClassifier classifier = new BackToBackPatternClassifier();
    classifier.setRouterDelegate(dtoWriterClassifier);
    classifier.setMatcherMap(new HashMap<String, JdbcBatchItemWriter<? extends DTOBase>>() {
        private static final long serialVersionUID = -1247536568421993759L;
    {
        put(DTOHeader.class.getTypeName(), headerWriter());
        put(DTOData.class.getTypeName(), dataWriter());
        put(DTOFooter.class.getTypeName(), footerWriterFin());
    }});

    ClassifierCompositeItemWriter<DTOBase> writer = new ClassifierCompositeItemWriter<>();
    writer.setClassifier(classifier);

    return writer;
}

@Bean
public JdbcBatchItemWriter<DTOAltaOFF> altaOffWriter() {
    return new JdbcBatchItemWriterBuilder<DTOData>()
         .dataSource(dataSource)
         .sql("insert into tabla(ticket, identificador, fecha_alta_operacion, "
                + " ordenante, numero, moneda, cif, importe_emisor, "
                + " estado, telefono_destino, fecha_caducidad_hal, concepto, cabecera_num_orden_fichero) "
                + " VALUES (:ticket,:identificador,to_timestamp(:fechaAltaOperacion,'DDMMYYYYHH24MISS'), "
                + " :ordenante,:numero,:moneda,:cif,(cast(:importeEmisor as double precision)/100), "
                + " :estado,:telefonoDestino,to_timestamp(:fechaCaducidadHal,'DDMMYYYYHH24MISS'),:concepto,:idCabecera) ")
         .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>()) 
         .build();     
}

my configuration classes:

@Configuration
@EnableBatchProcessing
@Import({ DataSourceConfig.class })
@PropertySource("classpath:batch.properties")
@ComponentScan({ "..."})
public class BatchConfiguration {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Autowired
public JobRepository jobRepository;

@Autowired
public DataSource dataSource;

@Bean
public JdbcTemplate getJdbcTemplate() {
    return new JdbcTemplate(dataSource);
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor();
}

Datasource

@Configuration
@EnableTransactionManagement
public class DataSourceConfig {

...some @Value...   
    @Bean(name = "dataSource")
    public DriverManagerDataSource dataSource() {
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setDriverClassName(driverClassName);
        dataSource.setUrl(datasourceUrl);
        dataSource.setUsername(usuario);
        dataSource.setPassword(clave);
        return dataSource;
    }
}

Configuration:

@Configuration
@EnableBatchProcessing
@Import({ DataSourceConfig.class })
@PropertySource("classpath:batch.properties")
@ComponentScan({ "..."})
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public JobRepository jobRepository;

    @Autowired
    public DataSource dataSource;

    @Bean
    public JdbcTemplate getJdbcTemplate() {
        return new JdbcTemplate(dataSource);
    }

    @Bean
    public TaskExecutor taskExecutor() {
        return new SimpleAsyncTaskExecutor();
    }
}

Custom:

@Component
@EnableTransactionManagement
public class CustomBatchConfigurer extends DefaultBatchConfigurer {

   private final TaskExecutor taskExecutor;

    public CustomBatchConfigurer(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    @Override
    protected JobLauncher createJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(getJobRepository());
        jobLauncher.setTaskExecutor(this.taskExecutor);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

    @Autowired
    private DataSource dataSource;

    @Override
    public PlatformTransactionManager getTransactionManager() {
        DataSourceTransactionManager tm = new DataSourceTransactionManager();
        tm.setDataSource(dataSource);
        return tm;
    }
}

Any help would be great.

Xiul
  • 93
  • 1
  • 2
  • 7
  • What is your chunk size? If it is <= 29 then the transaction will be committed before your exception happens and hence the first 29 records will be in the database even if the job fails. As a side note, you need to register delegates as streams in the step (See https://docs.spring.io/spring-batch/4.1.x/reference/html/step.html#registeringItemStreams). A similar thread can be found here: https://stackoverflow.com/questions/51904498/how-does-spring-batch-compositeitemwriter-manage-transaction-for-delegate-writer – Mahmoud Ben Hassine Jun 20 '19 at 07:00
  • I tried to register writers as stream but an error was raised: *The method stream(ItemStream) in the type AbstractTaskletStepBuilder> is not applicable for the arguments (JdbcBatchItemWriter) * and I found this in a book: *JdbcBatchItemWriter doesn't implement the ItemStream interface because it doesn't maintain any state.* and this about transactions [link](https://stackoverflow.com/questions/11429197/commit-interval-in-spring-batch-and-dealing-with-rollbacks) I'm confused I dont know if I would be able to do all items rollback in case of exceptions – Xiul Jun 20 '19 at 13:19
  • 1
    That's a different story. This feature is called chunk scanning and is triggered when a skippable exception is thrown from the writer. Spring Batch will "scan" to chunk item by item and commits a transaction for each item (technically it will dynamically change the chunk-size to 1). So if you have this, it means that you have configured a skippable exception type and that type has been thrown from your writer. – Mahmoud Ben Hassine Jun 20 '19 at 14:42
  • No I'm not using a SkipPolicy, my problem is that I need to rollback all data inserted in database transaction in case an exception has been thrown, and right now behavior is: job stop and get failed status for step and job but all data inserted by JdbcBatchItemWriter at this point continue committed in their respective tables like a rollback wasnt applied. – Xiul Jun 20 '19 at 15:52
  • `No I'm not using a SkipPolicy`, in that case, your chunk size should be already = 1 as mentioned in my first comment. Otherwise, please share your step configuration or provide an [MCR](https://stackoverflow.com/help/minimal-reproducible-example) to reproduce the issue. – Mahmoud Ben Hassine Jun 21 '19 at 07:27
  • @MahmoudBenHassine I have changed chunk size and rollback works flawlessly. Thanks for your help. – Xiul Jun 27 '19 at 13:33

1 Answers1

0

It was chunk size as @MahmoudBenHassine wrote in comments, I added it in this way

@Bean
public Step Step1(@Qualifier("xyzWriterClassifier") ItemWriter<DTOxyz> writer) throws Exception {
    return stepBuilderFactory.get("Step1")
            .<DTOxyz, DTOxyz> chunk(100)
            .reader(dtoXyzItemReader(NULL because WILL_BE_INJECTED))
            .processor(XyzProcessor())
            .writer(writer)
            .build();        
}
Xiul
  • 93
  • 1
  • 2
  • 7