0

I have standard Step with reader, one processor and writer. Reader is reading A class objects from database table (say table1) using JpaPagingItemReaderBuilder:

@Bean
public ItemReader<A> aItemReader(EntityManagerFactory entityManagerFactory) {
    return new JpaPagingItemReaderBuilder<A>().name("Areader")
            ...
            .build();
}

then I have processor that maps A class objects to B class objects

@Bean
public ItemProcessor<A, B> itemProcessor() {
    return item -> {
        ...
        // returns B object
    };
}

and writer that writes B class objects to another db table (say table2)

@Component
@RequiredArgsConstructor
@StepScope
public class BEntityWriter implements ItemWriter<B> {
    private final Brepository brepository;

    @Override
    public void write(List<? extends B> items) {
        brepository.persistAll(items);
    }
}

I have defined a step:

stepFactory.get("name")
    .<A, B>chunk(SIZE)
    .reader(aItemReader)
    .processor(itemProcessor)
    .writer(bEntityWriter)
    .faultTolerant()
    .skip(Throwable.class)
    .skipLimit(LIMIT)
    .build();

In case of any exception during processing/writing of a record, I want to skip that record and mark it in A table (table1) as failing - lets say A class has status field with corresponding table1 column and I want to set it to FAILED.

I think it can be achieved by using SkipListener, I am able to access A class object in

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void onSkipInWrite(A item, Throwable t) {
    A a = aRepository.findById(item.getId())
            .orElseThrow(RuntimeException::new);

    a.setStatus("FAILED");
}

but I am not able to access when there is a failure on write level, I only can access B object:

public void onSkipInWrite(B item, Throwable t) 

The solution I have found is to pass A object in some kind of wrapper class to Writer along with B object, then I can operate on wrapper class both on Writer and SkipListener, but this does not look nice. I was thinking about using StepContext, but I only want to put there failing records.

Is there any other way I can access reader objects (in my case A class objects) in SkipListener?

myschu
  • 91
  • 1
  • 8

1 Answers1

0

For a chunk-oriented step defined as .<A, B>chunk(SIZE), the expected skip listener should have the following methods:

void onSkipInRead(Throwable t);
void onSkipInProcess(A item, Throwable t);
void onSkipInWrite(B item, Throwable t);

If I understand correctly, you want to access original items of type A in the onSkipInWrite method. This callback is only aware of transformed items. If you want to keep a reference to the original type from which items have been transformed, you can add a reference to them in your output type, something like:

class B {
   A a; // reference to the source from which this object has been transformed
}

That said, I see that you are intending to modify read items with a paging item reader (mark them as skipped) here:

I want to skip that record and mark it in A table (table1) as failing

This can cause issue when reading subsequent pages, see Spring batch jpaPagingItemReader why some rows are not read?.

Mahmoud Ben Hassine
  • 28,519
  • 3
  • 32
  • 50
  • thanks both for an answer and catching possible problem with jpaPagingItemReader, really appreciate! I was also wondering if I understand how does SkipListener work correctly - changes made on SkipListener (like setting a status in my case) will not be affected by a rollback and remain persisted? I am refering to spring batch doc fragment: "The SkipListener is always called just before the transaction is committed. This is to ensure that any transactional resources call by the listener are not rolled back by a failure within the ItemWriter" – myschu Mar 01 '23 at 13:36
  • 1
    You are requiring a new transaction in your skip listener with `@Transactional(propagation = Propagation.REQUIRES_NEW)`, so that method of your listener will run in a separate transaction an will not be affected by a rollback of the main transaction driven by Spring Batch. – Mahmoud Ben Hassine Mar 01 '23 at 14:16
  • ok.. does it mean that if I change propagation level to default (REQUIRED), the changes done under SkipListener will be rollbacked as well? – myschu Mar 01 '23 at 14:49
  • 1
    Yes, as is mentioned in the docs: `The SkipListener is always called just before the transaction is committed`, which implies the listener is called inside the transaction. Hence, with the default propagation level, any change done by the skip listener will be rolled back as well if the outer transaction is rolled back. – Mahmoud Ben Hassine Mar 01 '23 at 15:55
  • ok, got it, assuming that I set A record (table1) status to success for example in ItemWrite, then I got fail because of some exception, I am entering SkipListener with transactional REQUIRES_NEW, I am trying to set status of A record (table1) to FAILED, is there any chance it can result in deadlock? – myschu Mar 02 '23 at 08:31
  • 1
    I see no reason for a deadlock with two separate transactions. – Mahmoud Ben Hassine Mar 02 '23 at 08:59