1

I have the following FlatFileItemWriter defined in a multi-threaded step.

public FlatFileItemWriter<School> writer() throws Exception {
    FlatFileItemWriter<School> flatFileWriter = new FlatFileItemWriter<School>();
    flatFileWriter.setResource(new FileSystemResource("C:\\u01\\SchoolDetails.txt"));
    flatFileWriter.setName("School-File-Writer");
    flatFileWriter.setAppendAllowed(true);
    flatFileWriter.setLineSeparator("\n");
    flatFileWriter.setHeaderCallback(writer -> writer.write(columnHeaders()));
    flatFileWriter.setLineAggregator(new DelimitedLineAggregator<School>() {
        {
            setDelimiter("^");
            setFieldExtractor((FieldExtractor<School>) schoolFieldExtractor());
        }
    });
    return flatFileWriter;
}
    
private BeanWrapperFieldExtractor<School> schoolFieldExtractor() {
    return new BeanWrapperFieldExtractor<School>() {
        {
            String[] columnValuesMapper = new String[] { 
                    "schoolName", "schoolAddress"
            };
            setNames(columnValuesMapper);
        }
    };
}

The ItemWriter generates the files on most days. But once a while it throws the following error:

2022-02-14 22:07:46.652 [SimpleAsyncTaskExecutor-25] INFO  SpringBatchConfiguration:703 - Item Reader
2022-02-14 22:07:46.653 [SimpleAsyncTaskExecutor-25] INFO  PagingItemReader:80 - reading records 1 to 10
2022-02-14 22:07:46.657 [SimpleAsyncTaskExecutor-28] INFO  PagingItemReader:80 - reading records 11 to 20
2022-02-14 22:07:46.661 [SimpleAsyncTaskExecutor-27] INFO  PagingItemReader:80 - reading records 21 to 30
2022-02-14 22:07:46.665 [SimpleAsyncTaskExecutor-26] INFO  PagingItemReader:80 - reading records 31 to 40
2022-02-14 22:07:46.998 [SimpleAsyncTaskExecutor-25] INFO  o.s.batch.core.step.AbstractStep:272     - Step: [childStep:partition1] executed in 350ms
2022-02-14 22:07:47.005 [SimpleAsyncTaskExecutor-28] INFO  o.s.batch.core.step.AbstractStep:272     - Step: [childStep:partition3] executed in 357ms
2022-02-14 22:07:47.033 [SimpleAsyncTaskExecutor-27] ERROR o.s.batch.core.step.AbstractStep:237     - Encountered an error executing step childStep in School-Job-Process
org.springframework.batch.item.ItemStreamException: Output file was not created: [/u01/TotalRecordsFound-20220214.txt]
        at org.springframework.batch.item.util.FileUtils.setUpOutputFile(FileUtils.java:76)
        at org.springframework.batch.item.support.AbstractFileItemWriter$OutputState.initializeBufferedWriter(AbstractFileItemWriter.java:553)
        at org.springframework.batch.item.support.AbstractFileItemWriter$OutputState.access$000(AbstractFileItemWriter.java:385)
        at org.springframework.batch.item.support.AbstractFileItemWriter.doOpen(AbstractFileItemWriter.java:319)
        at org.springframework.batch.item.support.AbstractFileItemWriter.open(AbstractFileItemWriter.java:309)
        at org.springframework.batch.item.support.AbstractFileItemWriter$$FastClassBySpringCGLIB$$f2d35c3.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
        at org.springframework.batch.item.file.FlatFileItemWriter$$EnhancerBySpringCGLIB$$294bdfee.open(<generated>)
        at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:103)
        at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:311)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:205)
        at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:138)
        at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:135)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
    

The error occurs intermittenly. The error occurs when two or more threads collides to create and write the data to the file. I can avoid it by delegating my FlatFileItemWriter to a SynchronizedItemStreamWriter. But the Spring docs suggest otherwise. The docs suggest that using a FlatFileItemWriter in a multi-threaded step does NOT require synchronizing writes.

So, I am not sure on how I can avoid these errors and also according to the logs the first two partitions successfully completed running which means the file is created successfully and data is written to it (if exists). So, how is the third partition telling that the file is not created when its already created by the first two paritions.

Any help would be appreciated. Thanks in advance.

Vamsi
  • 619
  • 3
  • 9
  • 22
  • From the documentation for [FlatFileItemWriter](https://docs.spring.io/spring-batch/docs/current/api/org/springframework/batch/item/file/FlatFileItemWriter.html), it says "The implementation is **not** thread-safe." So I think you do need the delegate wrapper. – Sam F Feb 16 '22 at 04:17
  • Thanks for the comment. But From java docs on: SynchronizedItemStreamWriter `This decorator is useful when using a non thread-safe item writer in a multi-threaded step. Typical delegate examples are the JsonFileItemWriter and StaxEventItemWriter. It should be noted that synchronizing writes might introduce some performance degradation, so this decorator should be used wisely and only when necessary. For example, using a FlatFileItemWriter in a multi-threaded step does NOT require synchronizing writes, so using this decorator in such use case might be counter productive.` – Vamsi Feb 16 '22 at 15:19

0 Answers0