-1

I've a SpringBatch Job where I skip all duplicate items write to a Flat file.

However the FlatFileItemWriter throws the below error whenever there's a duplicate:

Writer must be open before it can be written to

Below is the Writer & SkipListener configuration -

    @Bean(name = "duplicateItemWriter")
    public FlatFileItemWriter<InventoryFileItem> dupItemWriter(){

        return new FlatFileItemWriterBuilder<InventoryFileItem>()
                .name("duplicateItemWriter")
                .resource(new FileSystemResource("duplicateItem.txt"))
                .lineAggregator(new PassThroughLineAggregator<>())
                .append(true)
                .shouldDeleteIfExists(true)
                .build();
    }


public class StepSkipListener implements SkipListener<InventoryFileItem, InventoryItem> {

    private FlatFileItemWriter<InventoryFileItem> skippedItemsWriter;
    
    public StepSkipListener(FlatFileItemWriter<InventoryFileItem> skippedItemsWriter) {
        this.skippedItemsWriter = skippedItemsWriter;
    }
    @Override
    public void onSkipInProcess(InventoryFileItem item, Throwable t) {
        System.out.println(item.getBibNum() + " Process - " + t.getMessage());
        
        try {
            skippedItemsWriter.write(Collections.singletonList(item));
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }


The overall Job is defined as below and I'm using the duplicateItemWriter from the SkipListener.

@Bean(name = "fileLoadJob")
    @Autowired
    public Job fileLoadJob(JobBuilderFactory jobs, StepBuilderFactory steps,
            FlatFileItemReader<inventoryFileItem> fileItemReader,
            CompositeItemProcessor compositeItemProcessor,
            @Qualifier(value = "itemWriter") ItemWriter<InventoryItem> itemWriter,
            StepSkipListener skipListener) {

        return jobs.get("libraryFileLoadJob")
                .start(steps.get("step").<InventoryFileItem, InventoryItem>chunk(chunkSize)
                        .reader(FileItemReader)
                        .processor(compositeItemProcessor)
                        .writer(itemWriter)
                        .faultTolerant()
                        .skip(Exception.class)
                        .skipLimit(Integer.parseInt(skipLimit))
                        .listener(skipListener)
                        .build())
                .build();
    }

I've also tried to write all data to FlatFileItemWriter - that doesn't work as well. However if write to a DB, then there's no issue with it.

The Spring-Batch version I'm using is - 4.3.3 I've referred to the below threads as well:

Saum
  • 35
  • 1
  • 8

3 Answers3

1

This was just gross oversight, I missed that the FlatFileItemWriter needs a stream. I'm somewhat disappointed to put up this question, but I'm posting the answer just in case it helps someone.

The solution was as simple as adding a stream(dupItemWriter) to the Job Definition.

FlatfileItemWriter with Compositewriter example

@Bean(name = "fileLoadJob")
    @Autowired
    public Job fileLoadJob(JobBuilderFactory jobs, StepBuilderFactory steps,
            FlatFileItemReader<inventoryFileItem> fileItemReader,
            CompositeItemProcessor compositeItemProcessor,
            @Qualifier(value = "itemWriter") ItemWriter<InventoryItem> itemWriter,
@Qualifier(value = "duplicateItemWriter")FlatFileItemWriter<InventoryFileItem> dupItemWriter,
            StepSkipListener skipListener) {

        return jobs.get("libraryFileLoadJob")
                .start(steps.get("step").<InventoryFileItem, InventoryItem>chunk(chunkSize)
                        .reader(FileItemReader)
                        .processor(compositeItemProcessor)
                        .writer(itemWriter)
                        .faultTolerant()
                        .skip(Exception.class)
                        .skipLimit(Integer.parseInt(skipLimit))
                        .listener(skipListener)
                        .stream(dupItemWriter)
                        .build())
                .build();
    }
Saum
  • 35
  • 1
  • 8
  • If this solved your problem, then please accept it as the answer so that the question can be closed and others can benefit from that information as well. Thanks! – João Dias Sep 05 '21 at 08:52
  • 1
    Sure @JoãoDias - there's a 'cooling period' since I answered my own question - will do that after it has elapsed. – Saum Sep 05 '21 at 09:49
0

Its not absolutely necessary to include the .stream(dupItemWriter) you can also call the writer's .open() method instead.

In my case I was creating dynamic/programmatic ItemWriters so adding them to the streams was not feasible.

.stream(writer-1)
.stream(writer-2)
.stream(writer-N)

Instead I called .open() method myself:

FlatFileItemWriter<OutputContact> itemWriter = new FlatFileItemWriter<>();
itemWriter.setResource(outPutResource);
itemWriter.setAppendAllowed(true);
itemWriter.setLineAggregator(lineAggregator);
itemWriter.setHeaderCallback(writer -> writer.write("ACCT,MEMBER,SOURCE"));
itemWriter.open(new ExecutionContext());
Jet_C
  • 644
  • 9
  • 12
0

I had the same issue, I created two writers by inheritance of FlatFileItemWriter. That was working before I added @StepScope annotation. But after that, the first one throws an Exception with "Writer must be open before it can be written to" error message. But the second one worked without any problem. I solved that by calling the method open(new ExecutionContext()); but i still not understand why the second one works but not the first one.