My usecase: I need to read data from an oracle database and load it to multiple SQL server DBs located on different servers. To keep the design and code generic and scalable, the list of tables to read and the list of servers where they should be loaded is kept in DB.
Note that there are multiple servers involved and not just multiple DBs on a single server. So datasource would be different for each writer.
My strategy: My strategy was to create a job with one step each for a table. Each step thus reads one table's data in chunks then writes it to all servers. Thus when one step completes, its data would have been loaded on all servers. This way each step would load one table data to all servers.
Step1: read table1's chunks -> write each chunk to multiple servers
Step2: read table2's chunks -> write each chunk to multiple servers
.
.
.
StepN: read tableN's chunks -> write each chunk to multiple servers
How to implement above strategy: For this I could use CompositeItemWriter with delegate writers each having its own datasource instance pointing to individual servers. Thus I was thinking and hoping that I would be able to write a single chunk to multiple servers from a single step. But that does not work!
The problem with above strategy: The problem is one step can have only one transaction manager which can have only one datasource. That means no matter how many writers I have per step, all should be working with the same datasource to connect to the same single server!
From what I have observed so far, if the datasource "instance" in transaction manager of step is different from datasource "instance" of writer, rollback does not work even if it is just a single writer. With multiple writers having different datasources, rollback only works for the writer whose data source "instance" is same as the one assigned to step's transaction manager. All other weriters don't get rolled back leaving DB in insconsistent state.
Just using same configuration and creating a new datasource instance for manager and writer does not work. The same instance must be used for both.
Because of this issue, I am unable to use above strategy.
What options do I have for this usecase? I came across the ChainedTransactionManager but that can also leave the DB in inconsistent state plus that is deprecated anyway so I cannot use it.
What else can I do to ensure all the data is read only once and gets written to all servers?
Constraints:
- Data can be read only once from the oracle source DB
- Please note I CANNOT use beans to initialize any datasources/job/steps etc since I don't know in advance how many tables and servers would be involved. All that is created dynamically for scalability reading needed information from DB.
Sample code explaination: For the POC, I am reading only 15 records from a single table in source DB with a chunk size of 5. Now in SourceRowMapper, for a specific row, I set col3 to null. This column in target SQL server DB is not-null so the insert fails as expected. The write failure should rollback the entire chunk and none of the records from that chunk should be inserted. I would be enabling skipping here so that only the failed records are not written but rest of the records get written to all the servers and which records failed are available for reports.
As I explained above, that only works when the data source instance (and not just the configuration) of the step's transaction manager is same as that was assigned to the writer. If they are not same on a writer, all records from the failing chunk except the problem record get inserted in the table of that server. Only the matching datasource writer gets rolled back correctly.
Now, if I try to add a skip policy, above issue causes all except problem record to be inserted twice due to chunk scanning after the chunk has failed on first attempt.
This only happens when DBMS itself throws some exception. If I try to simulate the failure in code ( search for “Simulating write error” in SampleJob), it all works fine!
Below are the code details. I have simplified the code to remove loops etc to load the data to multiple servers.
Batch configurer:
@Slf4j
@Configuration
@EnableBatchProcessing
public class SpringBatchConfig implements BatchConfigurer{
@Value("${spring.datasource.dest.url}")
private String destDbUrl;
@Value("${spring.datasource.dest.driverClassName}")
private String destDbDriverClassName;
@Value("${spring.profiles.active:default}")
private String activeProfile;
@Value("${spring.batch.use.in-mem.db:false}")
private Boolean useInMemDb;
@Autowired
private DataSource dbDataSource;
private JobRepository jobRepo;
private PlatformTransactionManager transactionMgr;
private JobLauncher jobLauncher;
private JobExplorer jobExplorer;
@SuppressWarnings("deprecation")
@Bean
@Override
public JobRepository getJobRepository() throws Exception {
log.info("Active profile: {}", activeProfile);
PlatformTransactionManager transMgr = getTransactionManager();
if(jobRepo != null)
return jobRepo;
if(!(activeProfile.equals("prod") || activeProfile.equals("uat")) && useInMemDb)
{
log.info("Creating in-mem jobRepository bean for SpringBatchConfig with destDbUrl: {}", destDbUrl);
MapJobRepositoryFactoryBean jobRepoFactory = new MapJobRepositoryFactoryBean(transMgr);
jobRepoFactory.afterPropertiesSet();
return jobRepo = (JobRepository) jobRepoFactory.getObject();
}
else
{
log.info("Creating SQL jobRepository bean for SpringBatchConfig with db url: {}", dbDataSource.getConnection().getMetaData().getURL());
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dbDataSource);
factory.setTransactionManager(transMgr);
factory.afterPropertiesSet();
factory.setDatabaseType("SQLSERVER");
factory.setTablePrefix("dbo.BATCH_");
factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
return jobRepo = (JobRepository) factory.getObject();
}
}
@Override
public PlatformTransactionManager getTransactionManager() throws SQLException {
if(transactionMgr != null)
return transactionMgr;
if(!(activeProfile.equals("prod") || activeProfile.equals("uat")) && useInMemDb)
{
log.info("Getting resourceless transaction manager for in-mem jobRepository");
return transactionMgr = new ResourcelessTransactionManager();
}
else
{
log.info("Getting transaction manager for SpringBatchConfig with db url: {}", dbDataSource.getConnection().getMetaData().getURL());
return transactionMgr = new DataSourceTransactionManager(dbDataSource);
}
}
@Override
@Bean(name = "jobLauncher")
public JobLauncher getJobLauncher() throws Exception {
log.info("Creating jobLauncher bean for SpringBatchConfig with db url: {}", dbDataSource.getConnection().getMetaData().getURL());
if(this.jobLauncher != null)
return this.jobLauncher;
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.afterPropertiesSet();
return this.jobLauncher = jobLauncher;
}
@SuppressWarnings("deprecation")
@Override
@Bean(name = "jobExplorer")
public JobExplorer getJobExplorer() throws Exception {
if(jobExplorer != null)
return jobExplorer;
if(!(activeProfile.equals("prod") || activeProfile.equals("uat")) && useInMemDb)
{
log.info("Creating jobExplorer bean for in-mem repo");
MapJobRepositoryFactoryBean jobRepoFactory = new MapJobRepositoryFactoryBean(getTransactionManager());
jobRepoFactory.afterPropertiesSet();
MapJobExplorerFactoryBean explorerFactory = new MapJobExplorerFactoryBean(jobRepoFactory);
explorerFactory.afterPropertiesSet();
return jobExplorer = explorerFactory.getObject();
}
else
{
log.info("Creating jobExplorer bean for SpringBatchConfig with db url: {}", dbDataSource.getConnection().getMetaData().getURL());
JobExplorerFactoryBean explorerFactory = new JobExplorerFactoryBean();
explorerFactory.setDataSource(dbDataSource);
explorerFactory.afterPropertiesSet();
return jobExplorer = explorerFactory.getObject();
}
}
}
Job/Step creation for the strategy I explained above:
@Slf4j
@Configuration
public class SampleJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("sourceDBTemplate")
private JdbcTemplate sourceDBTemplate;
@Value("${spring.datasource.src.schema}")
private String schema;
@Autowired
@Qualifier("jobLauncher")
private JobLauncher jobLauncher;
@Autowired
private DatabaseConfigurations dbConfig;
@Autowired
private DBJobExecutionListener jobExecutionListener;
public void startJob() throws Exception {
Map<String, JobParameter> jobConfigMap = new HashMap<>();
jobConfigMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters parameters = new JobParameters(jobConfigMap);
try
{
jobLauncher.run(getJob(), parameters);
}catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
e.printStackTrace();
}
}
public Job getJob(List<Table> tables) throws Exception {
Clock currentClock = Clock.systemUTC();
OffsetDateTime odt = OffsetDateTime.ofInstant(Instant.now(currentClock), currentClock.getZone());
String jobName = "DataRep_"+odt.format(DateTimeFormatter.ofPattern("yyyyMMddhhmmss"));
log.info("Creating job {}; timezone: {}", jobName, currentClock.getZone().toString() + "--" + currentClock.getZone().getId() + "--" + currentClock.getZone().getDisplayName(TextStyle.FULL, Locale.getDefault()));
JobBuilder builder = jobBuilderFactory.get(jobName).listener(jobExecutionListener);
SimpleJobBuilder sjb = builder.start(getStep("sourceTable"));
//loop through list of tables to add a step per table; this is the reason I need to do things dynamically without beans; this is non-negotiable. I have simplified the code here to remove complex loops etc for clarity. It is just that whatever code has been presented here would just run in loop and a few other things that get some meta info dynamically to create the steps.
/*for(int index = 1; index < tables.size(); index++)
{
sjb.next(getStep(tables.get(index)));
}*/
return sjb.build();
}
private Step getStep(String table) throws Exception {
log.info("Getting step for table {}", table);
//needed to ensure same datasource instance from writer is assigned to transaction manager. This is still a problem as multiple writers would have different datasources for different servers! So what options do I have?
DataSourceWrapper dsw = new DataSourceWrapper();
SimpleStepBuilder<Map<String, Object>, Map<String, Object>> sb = stepBuilderFactory.get("Copy "+table)
.<Map<String, Object>, Map<String, Object>>chunk(5)
.reader(getItemReader(table))
.writer(getItemWriter(table, dsw)); //this should write the same chunk to multiple servers; If not then what options do I have?
// .faultTolerant()
// .noRetry(Exception.class);
// .retryPolicy( new NeverRetryPolicy());
// .skip(Exception.class)
// .skipLimit(Integer.MAX_VALUE)
// .skipPolicy(new AlwaysSkipItemSkipPolicy())
// .listener(new DBReplicationSkipListener<Map<String, Object>, Map<String, Object>>());
// sb.listener(new DBStepExecutionListener());
// sb.listener(new DBItemReadListener<Map<String, Object>>())
// .listener(new DBItemProcessListener<Map<String, Object>, Map<String, Object>>())
// .listener(new DBItemWriteListener<Map<String, Object>>())
// .listener(new DBChunkListener());
//use same datasource from writer
sb.transactionManager(new DataSourceTransactionManager(dsw.ds));
return sb.build();
}
public JdbcCursorItemReader<Map<String, Object>> getItemReader(String table) {
log.info("Creating item reader for table {}", table);
JdbcCursorItemReader<Map<String, Object>> jdbcCIR = new JdbcCursorItemReader<Map<String, Object>>();
jdbcCIR.setDataSource(sourceDBTemplate.getDataSource());
jdbcCIR.setSql(String.format("select col1, col2, col3, col4, col5 from %1$s order by col1 asc", table));
jdbcCIR.setRowMapper(new SourceRowMapper());
jdbcCIR.setMaxItemCount(15); //with chunk size of 5, 3 chunks would be created
return jdbcCIR;
}
//THis function would loop through the list of servers to create a CompositeItemWriter. Below code is simplified for clarity here.
public JdbcBatchItemWriter<Map<String, Object>> getItemWriter(String sourceTable, String table, DataSourceWrapper dsw) {
log.info("Creating item writer for table {}", sourceTable);
JdbcBatchItemWriterBuilder<Map<String, Object>> jdbcBatchItemWriter = new JdbcBatchItemWriterBuilder<Map<String, Object>>();
DataSource destinationDataSource = dbConfig.getDestinationDataSource(table);
String query = String.format("insert into %1$s(col_1, col_2, col_3, col_4, col_5) values(:col1, :col2, :col3, :col4, :col5)", table);
log.info("Insert query: {}",query);
//here would be the loop for multiple servers; here I have removed it for clarity
jdbcBatchItemWriter
.dataSource(dsw.ds = destinationDataSource)
.sql(query)
.itemSqlParameterSourceProvider(item -> {
MapSqlParameterSource paramSource = new MapSqlParameterSource();
paramSource.addValues(item);
int id = (int)item.get("col1");
log.info("Getting sql param source for writing item {}", id);
// if(id == 45678)
// {
// log.info("Simulating write error for item {}", id);
// Object tempObj = null;
// log.info("Id is {}", tempObj.toString())
// }
return paramSource;
});
JdbcBatchItemWriter<Map<String, Object>> writer = jdbcBatchItemWriter.build();
writer.afterPropertiesSet();
return writer;
}
private static class DataSourceWrapper
{
public DataSource ds;
public DataSourceWrapper()
{}
public DataSourceWrapper(DataSource ds)
{
this.ds = ds;
}
}
}
@Slf4j
public class SourceRowMapper implements RowMapper<Map<String, Object>>{
@Override
public Map<String, Object> mapRow(ResultSet rs, int rowNum) throws SQLException {
log.info(String.format("Reading row# %1$s", rowNum));
Map<String, Object> map = new HashMap<String, Object>();
String column;
Object id = null, val = null;
column = "col1";
val = rs.getObject(column);
id = val;
map.put(column, val);
column = "col2";
val = rs.getObject(column);
map.put(column, val);
column = "col3";
val = rs.getObject(column);
if(id.toString().equals("78787"))
{
log.info("setting val to null for non-null column col3 for id {}", id.toString());
val = null;
}
map.put(column, val);
column = "col4";
val = rs.getObject(column);
map.put(column, val);
column = "col5";
val = rs.getObject(column);
map.put(column, val);
return map;
}
}
What I have tried: I have tried various combinations of lines that I commented while creating the step in above sample. I even debugged batch code itself but can't get to the root of it nor can I find a solution to it. I checked the FaultTolerantChunkProcessor as explained in one of the links below. But my problem happens even when I have not called faultTolerant() on step!
A few links from a lot of them that I tried to figure out why transaction was not getting rolled back even when there was just one server (different datasource instance issue) and for multiple server handling: