0

I have a job that reads from SQL Server database list of documents. Documents needs to be in some status and sorted by column status_updated_time. I want to read an document.id and then to process it in job processor as Driving Query Based ItemReaders.
Column status is changed in writer, so I can't use JpaPagingItemReader because of this problem.
I used JdbcPagingItemReader but got an error on sorting by status_updated_time. Then I tried to add and id to sorting, but that didn't help.
Query that I want to get is:

SELECT id 
FROM document 
WHERE status IN (0, 1, 2)
ORDER BY status_updated_time ASC, id ASC 

My reader:

@StepScope
@Bean
private ItemReader<Long> statusReader() {
JdbcPagingItemReader<Long> reader = new JdbcPagingItemReader<>();
...
reader.setRowMapper(SingleColumnRowMapper.newInstance(Long.class));
...

Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("status_updated_time", Order.ASCENDING);
sortKeys.put("id", Order.ASCENDING);

SqlServerPagingQueryProvider queryProvider = new SqlServerPagingQueryProvider();
queryProvider.setSelectClause(SELECT_CLAUSE);
queryProvider.setFromClause(FROM_CLAUSE);
queryProvider.setWhereClause(WHERE_CLAUSE);
queryProvider.setSortKeys(sortKeys);

reader.setQueryProvider(queryProvider);
...
return reader;
}

Where constants are:

private static final String SELECT_CLAUSE = "id";
private static final String FROM_CLAUSE = "document";
private static final String WHERE_CLAUSE = "status IN (0, 1, 2) ";

When job is executed I get error:

org.springframework.dao.TransientDataAccessResourceException: StatementCallback; SQL [SELECT TOP 10 id FROM document WHERE status IN (0, 1, 2) ORDER BY id ASC, status_updated_time ASC]; The column name status_updated_time is not valid.; nested exception is com.microsoft.sqlserver.jdbc.SQLServerException: The column name status_updated_time is not valid.
at org.springframework.jdbc.support.SQLStateSQLExceptionTranslator.doTranslate(SQLStateSQLExceptionTranslator.java:110)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:72)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81)
at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81)
at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1443)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:388)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:452)
at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:462)
at org.springframework.batch.item.database.JdbcPagingItemReader.doReadPage(JdbcPagingItemReader.java:210)
at org.springframework.batch.item.database.AbstractPagingItemReader.doRead(AbstractPagingItemReader.java:108)
at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:92)
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:94)
at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87)
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:119)

I saw some question regarding The column name XYZ is not valid on stack overflow (this...) but haven't seen anything that works in my case where I need to sort by another column.
Another problem is sorting columns order.
No matter if I first add status_updated_time or id to the map sorting in generated script is always ORDER BY id ASC, status_updated_time ASC.

EDIT: Reading this question, specially this line:

JdbcPagingItemReader assumes here that the sort key and the column in the select clause are called exactly the same

I realized that I need column status_updated_time in result set, so I refactored:

private static final String SELECT_CLAUSE = "id, status_updated_time";
...
queryProvider.setSelectClause(SELECT_CLAUSE);
...
reader.setRowMapper(
    (rs, i) -> {
      Document document = new Document();
      document.setId(rs.getLong(1));
      document.setStatusUpdatedTime(rs.getObject(2, Timestamp.class));
      return document;
    }
);

Now application can compile and job can run.

But, problem with sorting stay the same. I can't order by first status_updated_time and then id. id always comes first.
I tried to remove id from sorting and came to another problem. On test env. I had 1600 rows to process. My job process row and update status_updated_time to now(). When job started processing he didn't stop at 1600, but continue processing because each row got new status_updated_time and reader consider it it new row, and kept processing endlessly.
When sort only by id job processed 1600 rows and then stopped.
So it seems like I can't use JdbcPagingItemReader because of sorting problem.
And I wanted some reader that can run in parallel to speed up this job (it runs about 20 minutes each hour in a day).
Any suggestions?

salerokada
  • 334
  • 3
  • 7
  • From your stacktrace, here is the query generated by Spring Batch: `SELECT TOP 10 id FROM document WHERE status IN (0, 1, 2) ORDER BY id ASC, status_updated_time ASC`. Have you tried to run this query using an sql client and see if it works as expected? – Mahmoud Ben Hassine Jul 15 '20 at 09:16
  • Yes, I tried. Everything is ok in the sql client. It looks like I need column `status_updated_time` in select clause. I found workaround (see edit of the question), but face another problem because of sorting by `status_updated_time` which is mandatory for this job. – salerokada Jul 15 '20 at 15:51
  • So if I understand correctly the *same* query runs correctly from your sql client but fails from your Spring Batch job, is that correct? Do you have a unique key constraint on the sort key as mentioned in the [javadoc](https://github.com/spring-projects/spring-batch/blob/d8fc58338d3b059b67b5f777adc132d2564d7402/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/JdbcPagingItemReader.java#L54-L55) (even though this is needed only for restartability)? From your update I understand that your goal is to improve performance. I suggest you use partitioning. – Mahmoud Ben Hassine Jul 16 '20 at 08:04
  • Yes, that query runs correctly in sql client and reporting an error when compile Spring Batch application. Order by `status_updated_time asc, id asc` would be unique (do I need a real database constraint or only that order columns be operationally unique?).
    Yes, job already exists and work single threaded with JpaPagingItemReader. I need to optimize and speed up execution.
    Good idea for partitioning. Thank you. I will try to implement that, and will post a results here.
    – salerokada Jul 16 '20 at 09:47
  • There should be a real database unique key constraint on that column (that's what the javadoc mentions). For partitioning, this might help: https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/java/org/springframework/batch/sample/common/ColumnRangePartitioner.java. – Mahmoud Ben Hassine Jul 16 '20 at 09:51
  • Thanks Mahmoud, really interesting class. But it didn't serve my needs because I don't have uniformly distributed values. Instead I used some trick as a workaround. You can see it in my answer. – salerokada Jul 26 '20 at 21:33

1 Answers1

0

I want to thank Mahmoud for monitoring Spring Batch question and trying to help. But his proposal didn't helped me so I used different approach. I used temporary (auxiliary) table to prepare data for main step execution and in the main step reader is reading from that table.

First step will drop help table:

@Bean
private Step dropHelpTable() {
  return stepBuilderFactory
    .get(STEP_DROP_HELP_TABLE)
    .transactionManager(cronTransactionManager)
    .tasklet(dropHelpTableTasklet())
    .build();
}

private Tasklet dropHelpTableTasklet() {
  return (contribution, chunkContext) -> {
    jdbcTemplate.execute(DROP_SCRIPT);
    return RepeatStatus.FINISHED;
  };
}

private static final String STEP_DROP_HELP_TABLE = "dropHelpTable";
private static final String DROP_SCRIPT = "IF EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.TABLES "
  + "WHERE TABLE_NAME = 'query_document_helper') "
  + "BEGIN "
  + " DROP TABLE query_document_helper "
  + "END";

Second step will prepare data. Insert id's of document which will be processed later:

@Bean
private Step insertDataToHelpTable() {
  return stepBuilderFactory
    .get(STEP_INSERT_HELP_TABLE)
    .transactionManager(cronTransactionManager)
    .tasklet(insertDataToHelpTableTasklet())
    .build();
}

private Tasklet insertDataToHelpTableTasklet() {
  return (contribution, chunkContext) -> {
    jdbcTemplate.execute("SELECT TOP " + limit + " id " + INSERT_SCRIPT);
    return RepeatStatus.FINISHED;
  };
}

private static final String STEP_INSERT_HELP_TABLE = "insertHelpTable";
private static final String INSERT_SCRIPT = "INTO query_document_helper "
  + "FROM dbo.document "
  + "WHERE status IN (0, 1, 2) "
  + "ORDER BY status_updated_time ASC";

@Value("${cron.batchjob.queryDocument.limit}")
private Integer limit;

After this I have all the data that will be used in one job execution so ordering by status_updated_time is not longer needed (condition was not to process youngest document in this job execution, but in some later execution when they become oldest). And then in the next step I use regular reader.

@Bean
private Step queryDocumentStep() {
  return stepBuilderFactory
    .get(STEP_QUERY_NEW_DOCUMENT_STATUS)
    .transactionManager(cronTransactionManager)
    .<Long, Document>chunk(chunk)
    .reader(documentReader())
    ...
    .taskExecutor(multiThreadingTaskExecutor.threadPoolTaskExecutor())
    .build();
}

@StepScope
@Bean
private ItemReader<Long> documentReader() {
  JdbcPagingItemReader<Long> reader = new JdbcPagingItemReader<>();
  reader.setDataSource(coreBatchDataSource);
  reader.setMaxItemCount(limit);
  reader.setPageSize(chunk);
  ...
  Map<String, Order> sortKeys = new HashMap<>();
  sortKeys.put("id", Order.ASCENDING);

  SqlServerPagingQueryProvider queryProvider = new SqlServerPagingQueryProvider();
  queryProvider.setSelectClause(SELECT_CLAUSE);
  queryProvider.setFromClause(FROM_CLAUSE);
  queryProvider.setSortKeys(sortKeys);

  reader.setQueryProvider(queryProvider);
  ...
  return reader;
}

private static final String STEP_QUERY_NEW_DOCUMENT_STATUS = "queryNewDocumentStatus";
private static final String SELECT_CLAUSE = "id";
private static final String FROM_CLAUSE = "query_archive_document_helper";

And job looks like this:

@Bean
public Job queryDocumentJob() {
return jobBuilderFactory
    .get(JOB_QUERY_DOCUMENT)
    .incrementer(new RunIdIncrementer())
    .start(dropHelpTable())
    .next(insertDataToHelpTable())
    .next(queryDocumentStep())
    .build();
}

private static final String JOB_QUERY_DOCUMENT = "queryDocument";
salerokada
  • 334
  • 3
  • 7