18

I 'm using Spring Batch(3.0.1.RELEASE) / JPA and an HSQLBD server database. I need to browse an entire table (using paging) and update items (one by one). So I used a jpaPagingItemReader. But when I run the job I can see that some rows are skipped, and the number of skipped rows is equal to the page size. For i.e. if my table has 12 rows and the jpaPagingItemReader.pagesize = 3 the job will read : lines 1,2,3 then lines 7,8,9 (so skip the lines 4,5,6)… Could you tell me what is wrong in my code/configuration, or maybe it's an issue with HSQLDB paging? Below is my code:

[EDIT] : The problem is with my ItemProcessor that performs modification to the POJOs Entities. Since JPAPagingItemReader made a flush between each reading, the Entities are updated ((this is what I want) . But it seems that the cursor paging is also incremented (as can be seen in the log: row ID 4, 5 and 6 have been skipped). How can I manage this issue ?

@Configuration
@EnableBatchProcessing(modular=true)
public class AppBatchConfig {
  @Inject
  private InfrastructureConfiguration infrastructureConfiguration;  
  @Inject private JobBuilderFactory jobs;
  @Inject private StepBuilderFactory steps;

  @Bean  public Job job() {
     return jobs.get("Myjob1").start(step1()).build();
  }
  @Bean  public Step step1() {  
      return steps.get("step1")
                .<SNUserPerCampaign, SNUserPerCampaign> chunk(0)
                .reader(reader()).processor(processor()).build();   
  }
  @Bean(destroyMethod = "")
@JobScope 
public ItemStreamReader<SNUserPerCampaign> reader() String trigramme) {
    JpaPagingItemReader reader = new JpaPagingItemReader();
    reader.setEntityManagerFactory(infrastructureConfiguration.getEntityManagerFactory());
    reader.setQueryString("select t from SNUserPerCampaign t where t.isactive=true");
    reader.setPageSize(3));
    return reader;
}
 @Bean @JobScope
 public ItemProcessor<SNUserPerCampaign, SNUserPerCampaign> processor() {   
     return new MyItemProcessor();
 }
}

@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
 @Inject private EntityManagerFactory emf;  
 @Override
public EntityManagerFactory getEntityManagerFactory() {
    return emf;
}
}  

from my ItemProcessor:

@Override
public SNUserPerCampaign process(SNUserPerCampaign item) throws Exception {
    //do some stuff …
   //then if (condition) update the Entity pojo :   
   item.setModificationDate(new Timestamp(System.currentTimeMillis());
   item.setIsactive = false;

}

from Spring xml config file:

<tx:annotation-driven transaction-manager="transactionManager" />     
<bean id="transactionManager" class="org.springframework.orm.jpa.JpaTransactionManager">
    <property name="entityManagerFactory" ref="entityManagerFactory" />
</bean>

<bean id="entityManagerFactory" class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean">
    <property name="dataSource" ref="dataSource" />
</bean>

<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <property name="driverClassName" value="org.hsqldb.jdbcDriver" />
    <property name="url" value="jdbc:hsqldb:hsql://localhost:9001/MYAppDB" />
    <property name="username" value="sa" />
    <property name="password" value="" />
</bean>

trace/log summarized :

11:16:05.728 TRACE MyItemProcessor - item processed: snUserInternalId=1]
11:16:06.038 TRACE MyItemProcessor - item processed: snUserInternalId=2]
11:16:06.350 TRACE MyItemProcessor - item processed: snUserInternalId=3]

11:16:06.674 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.677 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...
11:16:06.679 DEBUG SQL- update SNUSER_CAMPAIGN  set ...etc...

11:16:06.681 DEBUG SQL- select ...etc... from  SNUSER_CAMPAIGN snuserperc0_ 

11:16:06.687 TRACE MyItemProcessor - item processed: snUserInternalId=7]
11:16:06.998 TRACE MyItemProcessor - item processed: snUserInternalId=8]
11:16:07.314 TRACE MyItemProcessor - item processed: snUserInternalId=9]
ThierryC
  • 1,794
  • 3
  • 19
  • 34
  • How have you confirmed what rows are being read? Is there any stack trace/log information you can provide? – Michael Minella Oct 22 '14 at 16:27
  • I realized that the problem was my ItemProcessor that changes the entity pojo, I edited my question by adding traces – ThierryC Oct 23 '14 at 09:50
  • I'm a bit confused. The items returned from the `JpaPagingItemReader` are detached so they should not be updated without an explicit write. Can you provide the entire job configuration? – Michael Minella Oct 23 '14 at 15:54
  • I will try to add more details tomorrow. Anyway, what I can see in the Spring JPAPagingItemreader‘s source code, is that the items seems to be detached when reading the next page (by doing a flush and clear to the context). That’s why I don’t need an ItemWritter to persist my update. An important detail may be that in my Item Processor I update a boolean that is in my SELECT where clause; I don’t know if this can modify the paging cursor. – ThierryC Oct 23 '14 at 17:47
  • The flush and clear is just a clean up at the beginning. At the end of `JpaPagingItemReader` we loop through the elements and explicitly detach them, or commit the transaction so that they are detached. In either way, the entities returned from the `JpaPagingItemReader` should be in the detached state. Can you post the configuration for your full job? – Michael Minella Oct 23 '14 at 21:05
  • I posted the configuration job's code, but what I fear : That entities are detached or not, the problem might be that I update some fields' table that are in the SELECT request's criteria , creating a lag in the JpaPagingItemReader's cursor. So what is the best way to browse a table by page and update some records? – ThierryC Oct 24 '14 at 09:30

4 Answers4

20

org.springframework.batch.item.database.JpaPagingItemReader creates is own entityManager instance

(from org.springframework.batch.item.database.JpaPagingItemReader#doOpen) :

entityManager = entityManagerFactory.createEntityManager(jpaPropertyMap);

If you are within a transaction, as it seems to be, reader entities are not detached (from org.springframework.batch.item.database.JpaPagingItemReader#doReadPage):

    if (!transacted) {
        List<T> queryResult = query.getResultList();
        for (T entity : queryResult) {
            entityManager.detach(entity);
            results.add(entity);
        }//end if
    } else {
        results.addAll(query.getResultList());
        tx.commit();
    }

For this reason, when you update an item into processor, or writer, this item is still managed by reader's entityManager.

When the item reader reads the next chunk of data, it flushes the context to the database.

So, if we look at your case, after the first chunk of data processes, we have in database:

|id|active
|1 | false
|2 | false
|3 | false

org.springframework.batch.item.database.JpaPagingItemReader uses limit & offset to retrieve paginated data. So the next select created by the reader looks like :

select * from table where active = true offset 3 limits 3. 

Reader will miss the items with id 4,5,6, because they are now the first rows retrieved by database.

What you can do, as a workaround, is to use jdbc implementation (org.springframework.batch.item.database.JdbcPagingItemReader) as it does not use limit & offset. It is based on a sorted column (typically the id column), so you will not miss any data. Of course, you will have to update your data into the writer (using either JPA ou pure JDBC implementation)

Reader will be more verbose:

@Bean
public ItemReader<? extends Entity> reader() {
    JdbcPagingItemReader<Entity> reader = new JdbcPagingItemReader<Entity>();
    final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
    sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
    sqlPagingQueryProviderFactoryBean.setSelectClause("select *");
    sqlPagingQueryProviderFactoryBean.setFromClause("from <your table name>");
    sqlPagingQueryProviderFactoryBean.setWhereClause("where active = true");
    sqlPagingQueryProviderFactoryBean.setSortKey("id");
    try {
        reader.setQueryProvider(sqlPagingQueryProviderFactoryBean.getObject());
    } catch (Exception e) {
        e.printStackTrace();
    }
    reader.setDataSource(dataSource);
    reader.setPageSize(3);
    reader.setRowMapper(new BeanPropertyRowMapper<Entity>(Entity.class));
    return reader;
rMonteiro
  • 1,371
  • 1
  • 14
  • 37
Manuel Verriez
  • 669
  • 6
  • 7
  • That's Sound good. I will need to do more tests in order to valid this solution. Because my final SQL will be more complex (composite primary key, where clause including a "select in" …) I will also have to inherit from JdbcPagingItemReader to add some extra operations. Thanks anyway… – ThierryC Oct 24 '14 at 12:35
  • For a composite key, you may use `org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean#setSortKeys` . It works with alias since Michael Minella's fix (https://jira.spring.io/browse/BATCH-2079 , thanx Michael!). – Manuel Verriez Oct 24 '14 at 12:51
  • The statement above about not being detached is incorrect. The tx.commit() should leave all entities associated with the transaction (which was created at the beginning of the doReadPage method) detached. – Michael Minella Oct 24 '14 at 14:06
  • I expected that too, but using debugger, i can see than reader's entityManager contains entities updated in processor/writer. In my case, entities are not detached. – Manuel Verriez Oct 24 '14 at 14:15
  • I agree it seems that even after the JpaPagingItemReader commit, the items are still in the entityManager and are updated when reading the next page, in my log file I don't see some SQL request that could correspond to a "re-attached" request. – ThierryC Oct 24 '14 at 14:40
  • According to JPA 2 spec : `The managed entities of a transaction-scoped persiste nce context become detached when the transaction commits; the managed entities of an extended persistence context remain managed.`. In my case, i use a resource local transaction (with extended context). Jpa provider does not clear persistence context in this case. Anyway, it should work using JTA. – Manuel Verriez Oct 24 '14 at 15:52
  • Hi Manuel, your solution with JdbcPagingItemReader works for me. I would have preferred a pure JPA style, but it seems too difficult to create a kind of "JPA Paging Read/write Item processor". Thank you to you and Michael for your contributions. – ThierryC Oct 27 '14 at 14:50
  • Glad to hear that! If you do want to use JPA style, maybe you could override `org.springframework.batch.item.database.JpaPagingItemReader`, add sortColumn(s) property, and override createQuery() to use this new property (without using setFirstResult). – Manuel Verriez Oct 27 '14 at 15:44
8

I faced the same case, my reader was a JpaPagingItemReader that queried on a field that was updated in the writer. Consequently skipping half of the items that needed to be updated, due to the page window progressing while the items already read were not in the reader scope anymore.

The simplest workaround for me was to override getPage method on the JpaPagingItemReader to always return the first page.

JpaPagingItemReader<XXXXX> jpaPagingItemReader = new JpaPagingItemReader() {
    @Override
    public int getPage() {
        return 0;
    }
};
Yves-Marie L.
  • 95
  • 1
  • 5
6

A couple things to note:

  1. All entities that are returned from the JpaPagingItemReader are detached. We accomplish this in one of two ways. We either create a transaction before querying for the page, then commit the transaction (which detaches all entities associated with the EntityManager for that transaction) or we explicitly call entityManager.detach. We do this so that features like retry and skip can be correctly performed.
  2. While you didn't post all the code in your processor, my hunch is that in the //do some stuff section, your item is getting re-attached which is why the update is occurring. However, without being able to see that code, I can't be sure.
  3. In either case, using an explicit ItemWriter should be done. In fact, I consider it a bug that we don't require an ItemWriter when using java config (we do for XML).
  4. For your specific issue of missing records, you need to keep in mind that a cursor isn't used by any of the *PagingItemReaders. They all execute independent queries for each page of data. So if you update the underlying data in between each page, it can have an impact on the items returned in future pages. For example, if my paging query specifies where val1 > 4 and I have a record that val1 was 1 to be 5, in chunk 2, that item may be returned since it now meets the criteria. If you need to update values that are in your where clause (thereby impacting what falls into the set of data you'd be processing), it's best to add a processed flag of some kind that you can query by instead.
Michael R
  • 1,753
  • 20
  • 18
Michael Minella
  • 20,843
  • 4
  • 55
  • 67
  • Thanks for your posts. In my processor I handle the item and change some values (including some that are in the select criteria ). So this explain the issue, but don't see any action or log trace that could " re-attached" the items, it seems that they are still in the transaction during processing until the next page is reading. – ThierryC Oct 24 '14 at 14:46
  • 1
    It's too big, but yesterday I could reproduce the issue even when I put almost all in comment. The item processor is using the getter and setter of the Pojo entity; and the problem occurred because, as mentioned, I modified some field that was in the SELECT request criteria. – ThierryC Oct 24 '14 at 15:50
  • 2
    I'm confused. Wouldn't it be a common pattern with any database reader that you'd use a status column to mark processed items and use that in the reader's where clause? What's the preferred way to do this? And if the *PagingrItemReaders execute independent queries for each page, why track the page at all? @Yves-Marie L's hack to override getPage() seems to work - but it also seems like there should be a better way to handle this. What am I missing? – crig May 08 '20 at 22:12
1

I had the same problem with rows being skipped based on the pageSize. If I have pageSize set to 2 for example, it would read 2, ignore 2, read 2, ignore 2 etc.

I was building a daemon processor to poll a 'Request' database table for records at a 'Waiting To Be Processed' status. The daemon is designed to run for ever in the background.

I had a 'status' field which was defined in the @NamedQuery and would select records whose status was '10':Waiting to be processed. After the record was processed, the status field would be updated to '20':Error or '30':Success. This turned out to be the cause of the problem - I was updating a field which was defined in the query. If I introduced a 'processedField' and updated that instead of the 'status' field then no problem - all the records would be read.

As a possible solution to updating the status field, I setMaxItemCount to be the same as the PageSize; this updated the records correctly before step completion. I then keep executing the step until a request is made to stop the daemon. OK, probably not the most efficient way to do it (but I’m still benefiting from the ease of use that JPA provides) but I think it would probably be better to use JdbcPagingItemReader (described above – thanks!). Opinions on the best approach to this batch database polling problem would be welcome :)