2

I have a challenge where I need to read "unprocessed" data from an SQL Server database, process the data, then selectively update two to six tables in a DB2 database and then mark that data as processed in the original database on SQL Server. At any point, should anything fail, I want all the updates to rollback. If I have 10 unprocessed items and 9 are good but one fails I still want the 9 good ones to complete and the tenth one to return to it's original state until we can research the problem and make a correction.

The overall architecture is that one input instance may result in inserts into at least 3 DB2 tables and as many as 7 tables. Several of the DB2 tables could end up with multiple inserts from one input. I would have to develop a different writer for each table update and figure out how to pass to each writer the specific data necessary for that table. I need also to utilize 2 data sources for updates to DB2 and SQL Server, respectively.

I am not an experienced Spring Batch developer. And I seldom have a project where I can "read 1, process 1, write 1" and repeat. Usually I need to read several files/databases, process that data, then write to one or more reports, files and/or databases. I see where support is provided for this sort of application but it is more complex and takes more research, with limited examples to be found.

In my attempt to implement a solution I took the easy road. I developed a class that implements Tasklet and wrote the code the way my real-time process works. It fetches the input data from SQL using JDBCTemplate then passes the data to code which processes the data and determines what needs to be updated. I have a Transaction Manager class that implements @Transactional with REQUIRES_NEW and rollbackFor my custom unchecked exception. The Transactional class catches all DataAccessException events and will throw the custom exception. At the moment I am only using the DB2 data source so as not to over-complicate the situation.

In my testing I added code at the end of the update process which throws an unchecked exception. I expected the updates to be rolled back. But it did not happen. If I re-run the process I get 803 errors on DB2.

One last thing. In our shop we are required to use Stored Procedures on DB2 for all access. So I am using SimpleJdbcCall to execute the SP's.

Here is my code:

The main java class for the Tasklet:

public class SynchronizeDB2WithSQL   implements Tasklet
{

private static final BatchLogger logger = BatchLogger.getLogger();    

private Db2UpdateTranManager tranMgr;
public void setTranMgr(Db2UpdateTranManager tranMgr) {
    this.tranMgr = tranMgr;
}

private AccessPaymentIntegrationDAO pmtIntDAO;
public void setPmtIntDAO(AccessPaymentIntegrationDAO pmtIntDAO) {
    this.pmtIntDAO = pmtIntDAO;
}

@Override
public RepeatStatus execute(StepContribution arg0, ChunkContext arg1) throws Exception {
    logger.logInfoMessage("=============================================");
    logger.logInfoMessage("   EB0255IA - Synchronize DB2 with SQL");
    logger.logInfoMessage("=============================================");

    List<UnprocessedPaymentDataBean> orderList = this.pmtIntDAO.fetchUnprocessedEntries();

    if(CollectionUtils.isNotEmpty(orderList)) {
        for(UnprocessedPaymentDataBean ent: orderList) {
            logger.logDebugMessage("  Processing payment ");
            logger.logDebugMessage(ent.toString());
            Map<String, List<PaymentTransactionDetailsBean>> paymentDetails = arrangePayments(this.pmtIntDAO.getDetailsByOrder(ent.getOrderNbr()));
            try {
                this.tranMgr.createNewAuthorizedPayment(ent, paymentDetails);
            } catch (DataException e) {
                logger.logErrorMessage("Encountered a Data Exception: "+e);
            }
        }
    } else {
        logger.logInfoMessage("=============================================");
        logger.logInfoMessage("No data was encountered that needed to be processed");
        logger.logInfoMessage("=============================================");
    }

    return RepeatStatus.FINISHED;
}

And the Spring Batch xml:

<job id="EB0255IA" parent="baseJob" job-repository="jobRepository"
    xmlns="http://www.springframework.org/schema/batch" restartable="true"
    incrementer="parameterIncrementer">
    <description>Job to maintain the DB2 updates for payment activity</description>         
    <step id="SynchronizeDB2WithSQL">
        <tasklet ref="synchronizeTasklet" />
    </step> 
</job>

<bean id="synchronizeTasklet" class="com.ins.pmtint.synchdb2.SynchronizeDB2WithSQL" >
    <property name="pmtIntDAO" ref="pmtIntDAO" />
    <property name="tranMgr" ref="db2TranMgr" />    
</bean>

<bean id="jdbcUpdateDB2" class="com.ins.pmtint.db.JDBCUpdateDB2">
    <property name="dataSource" ref="dataSourceBnkDB2" />
</bean>

<bean id="updateDB2DataDAO" class="com.ins.pmtint.db.dao.UpdateDB2DataDAOImpl">
    <property name="jdbcUpdateDB2" ref="jdbcUpdateDB2" />
</bean>

<bean id="db2TranMgr" class="com.ins.pmtint.db.tranmgr.Db2UpdateTranManagerImpl">
    <property name="updateDB2DataDAO" ref="updateDB2DataDAO" />
</bean>

<bean id="jdbcPaymentIntegration" class="com.ins.pmtint.db.JDBCPaymentIntegration" >
    <property name="dataSource" ref="dataSourcePmtIntegration" />
</bean>

<bean id="pmtIntDAO" class="com.ins.pmtint.db.dao.AccessPaymentIntegrationDAOImpl">
    <property name="jdbcPaymentIntegration" ref="jdbcPaymentIntegration" />
</bean>

Part of the transaction manager implementation.

public class Db2UpdateTranManagerImpl implements Db2UpdateTranManager, DB2FieldNames {

private static final BatchLogger logger = BatchLogger.getLogger();

UpdateDB2DataDAO updateDB2DataDAO;
public void setUpdateDB2DataDAO(UpdateDB2DataDAO updateDB2DataDAO) {
    this.updateDB2DataDAO = updateDB2DataDAO;
}

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = false, rollbackFor = DataException.class)
public void createNewAuthorizedPayment(UnprocessedPaymentDataBean dataBean, Map<String, List<PaymentTransactionDetailsBean>> paymentDetails) {
    logger.logDebugMessage("At Db2UpdateTranManagerImpl.createNewAuthorizedPayment(");
    logger.logDebugMessage(dataBean.toString());
    String orderNbr = String.valueOf(dataBean.getOrderNbr());
    String eventCode = TranTypeCode.fromValue(dataBean.getTransactionTypeCode()).getDB2Event();
    if(eventCode == null) {
        try {
            KFBDistBatchEMail.createAndSendMessage("There is no event code for current entry\n\nOrder: "+orderNbr+"  Tran type: "+dataBean.getTransactionTypeCode(), "EB0255IA - Database error" ,EnhancedPropertyPlaceholderConfigurer.getEmailFrom(), EnhancedPropertyPlaceholderConfigurer.getEmailTo(), null);
            throw new DataException("Update failed:  No event code to apply");
        } catch (EMailExcpetion e2) {
            logger.logErrorMessage("Generating email", e2);
        }
    }
    String orginatingSystemId;
    if (dataBean.getPaymentTypeCode().equalsIgnoreCase("EFT"))
            orginatingSystemId = "FS";
        else
            orginatingSystemId = "IN";

    try {
        if(dataBean.getTransactionTypeCode().equalsIgnoreCase("A")) {
            this.updateDB2DataDAO.updatePaymentDetails(orderNbr, DB_INITIAL_EVENT_CODE, "", dataBean.getTransactionAmt(), orginatingSystemId);
        } 

**** FOR TESTING - AT THE END I HAVE ADDED ****
    throw new DataException("I finished processing and backed out. \n\n"+dataBean);
}

And this is part of the JDBC code:

public class JDBCUpdateDB2 extends JdbcDaoSupport 
                        implements DB2FieldNames
{
private static final BatchLogger logger = KFBBatchLogger.getLogger();

public void updatePaymentDetails(String orderNbr, String eventCd, String authnbr, Double amount, String orginatingSystemId) {


    SimpleJdbcCall jdbcCall = new SimpleJdbcCall(getDataSource()).withSchemaName(EnhancedPropertyPlaceholderConfigurer.getDB2Schema()).withProcedureName(UPDATE_PAYMENT_TRANSACTION_DB2_PROC);
    MapSqlParameterSource sqlIn = new MapSqlParameterSource();
    sqlIn.addValue(SP_BNKCRD_PMT_ORD_NBR, orderNbr);
    sqlIn.addValue(SP_CLUSTERING_NBR_2, new StringBuilder(orderNbr.substring(Math.max(orderNbr.length() - 2, 0))).reverse().toString());
    sqlIn.addValue(SP_BNKCRD_EVNT_CD, eventCd);
    sqlIn.addValue(SP_CCTRAN_ERR_CD, "N");
    sqlIn.addValue(SP_BNKCRD_PROC_RET_CD, "");
    sqlIn.addValue(SP_BNKCRD_AUTH_CD, "G");
    sqlIn.addValue(SP_ORIG_SYS_ID_TXT, orginatingSystemId);
    sqlIn.addValue(SP_BNKCRD_TRAN_AMT, amount);
    try {
        jdbcCall.execute(sqlIn);
    } catch (DataAccessException e) {
        logger.logErrorMessage("Database error in updatePaymentDetails", e);
        throw e;
    }
}
GISNovis
  • 117
  • 11

2 Answers2

0

Since you need to write to multiple tables, you can use a CompositeItemWriter having a delegate item writer for each table. In this case, delegates should be registered as streams in the step. You can also create a single item writer that issues 3 (or more) insert statements to different tables (But I would not recommend that).

If I have 10 unprocessed items and 9 are good but one fails I still want the 9 good ones to complete and the tenth one to return to it's original state

If you use a fault tolerant step and a skippable exception is thrown during the writing of a chunk, Spring Batch will scan the chunk for the faulty item (because it can not know which item caused the error). Technically, Spring Batch will set the chunk size to 1 and use one transaction per item, so only the faulty item will be rolled back. This allows you to achieve the requirement above. Here is a self contained example to show you how it works:

import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ChunkScanningTest.JobConfiguration.class)
public class ChunkScanningTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Before
    public void setUp() {
        jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
    }

    @Test
    public void testChunkScanningWhenSkippableExceptionInWrite() throws Exception {
        // given
        int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        Assert.assertEquals(0, peopleCount);

        // when
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();

        // then
        peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
        int bazCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 3 and name = 'baz'");
        Assert.assertEquals(1, fooCount); // foo is inserted
        Assert.assertEquals(1, bazCount); // baz is inserted
        Assert.assertEquals(2, peopleCount); // bar is not inserted

        Assert.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode());
        StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
        Assert.assertEquals(3, stepExecution.getCommitCount()); // one commit for foo + one commit for baz + one commit for the last (empty) chunk
        Assert.assertEquals(2, stepExecution.getRollbackCount()); // initial rollback for whole chunk + one rollback for bar
        Assert.assertEquals(2, stepExecution.getWriteCount()); // only foo and baz have been written
    }

    @Configuration
    @EnableBatchProcessing
    public static class JobConfiguration {

        @Bean
        public DataSource dataSource() {
            return new EmbeddedDatabaseBuilder()
                    .setType(EmbeddedDatabaseType.HSQL)
                    .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
                    .addScript("/org/springframework/batch/core/schema-hsqldb.sql")
                    .build();
        }

        @Bean
        public JdbcTemplate jdbcTemplate(DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }

        @Bean
        public ItemReader<Person> itemReader() {
            Person foo = new Person(1, "foo");
            Person bar = new Person(2, "bar");
            Person baz = new Person(3, "baz");
            return new ListItemReader<>(Arrays.asList(foo, bar, baz));
        }

        @Bean
        public ItemWriter<Person> itemWriter() {
            return new PersonItemWriter(dataSource());
        }

        @Bean
        public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
            return jobBuilderFactory.get("job")
                    .start(stepBuilderFactory.get("step")
                            .<Person, Person>chunk(3)
                            .reader(itemReader())
                            .writer(itemWriter())
                            .faultTolerant()
                            .skip(IllegalStateException.class)
                            .skipLimit(10)
                            .build())
                    .build();
        }

        @Bean
        public JobLauncherTestUtils jobLauncherTestUtils() {
            return new JobLauncherTestUtils();
        }
    }

    public static class PersonItemWriter implements ItemWriter<Person> {

        private JdbcTemplate jdbcTemplate;

        PersonItemWriter(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }

        @Override
        public void write(List<? extends Person> items) {
            System.out.println("Writing items: "); items.forEach(System.out::println);
            for (Person person : items) {
                if ("bar".equalsIgnoreCase(person.getName())) {
                    System.out.println("Throwing exception: No bars here!");
                    throw new IllegalStateException("No bars here!");
                }
                jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
            }
        }
    }

    public static class Person {

        private long id;

        private String name;

        public Person() {
        }

        Person(long id, String name) {
            this.id = id;
            this.name = name;
        }

        public long getId() {
            return id;
        }

        public void setId(long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    '}';
        }
    }
}

This example prints:

Writing items: 
Person{id=1, name='foo'}
Person{id=2, name='bar'}
Person{id=3, name='baz'}
Throwing exception: No bars here!
Writing items: 
Person{id=1, name='foo'}
Writing items: 
Person{id=2, name='bar'}
Throwing exception: No bars here!
Writing items: 
Person{id=3, name='baz'}

As you can see, after the skippable has been thrown, each chunk contains only one item (Spring Batch is scanning items one by one to determine the faulty one), and only valid items are written.

with limited examples to be found

I hope this example makes the feature clear. If you want an example with the composite item writer, please take a look at this question/answer: How does Spring Batch CompositeItemWriter manage transaction for delegate writers?

Hope this helps.

Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50
  • Hey, Thanks for great solution, but this doesn't works when we've CompositeItemWriter – Jeff Cook Aug 07 '20 at 13:04
  • @JeffCook This question is about a regular writer and not a composite writer. For composite writer, I've added an example here: https://stackoverflow.com/a/51918357/5019386. – Mahmoud Ben Hassine Apr 19 '21 at 08:52
0

In my research I discovered the ChainedTransactionManager class. I instantiated it in my spring configuration and added it to my application using:

<tx:annotation-driven proxy-target-class="true" transaction-manager="transactionManager" />

<bean id="transactionManager" class="org.springframework.data.transaction.ChainedTransactionManager">
<constructor-arg>
<list>
  <bean  class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="SqlServerDataSource" />
  </bean>
  <bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="DB2DataSource" />
  </bean>
</list>

Then in the code I added transaction annotations.

@Transactional(propagation = Propagation.REQUIRES_NEW, readOnly = false, rollbackFor = DataException.class)
public int createOrder(PaymentTransactionBean paymentTransaction) {
    logger.logDebugMessage("PmtIntTransactionManager.createOrder");
    int orderNbr = -1;
    try {
        orderNbr = this.pmtIntSqlDao.createPaymentTransaction(paymentTransaction);
    } catch (DataAccessException e) {
        logger.logDebugMessage(LogHelper.LOG_SEPARATOR_LINE);
        logger.logDebugMessage("Caught a DataAccessException", e);
        PmtUtility.notifySysUser("Database error", "A database error was encountered and rolled back", e);
        throw new DataException("Update failed", e.getCause());
    }
    
    return orderNbr;
}

Any code that is executed below this level can throw a custom DataException which extends RunTimeException and all SQL updates that have been executed will be rolled back. Any updates that occur in that code will be automatically committed when control exits the CreateOrder method.

One thing I discovered in my testing is I can't catch the DataException then re-throw it and expect a rollback. My purpose in doing that was to produce log entries. I ended up having to throw a checked exception, create log entries then through the DataException in order to initiate a rollback.

GISNovis
  • 117
  • 11