The solution suggested by Mahmoud finally worked :) However, there are a few caveats. ClassifierCompositeItemWriter
is designed to write into different files. i.e., if there are 3 different Record Types, there will be 3 different output files. But in my use case, I'm expecting the output in a single file in the same order. So, I mentioned the same file name in each Writer bean and added `writer.setAppendAllowed(true); But after this, different Record Types were clubbed together and the sorting order changed. So, I reduced the chunk size from 50 to 3. 3 is the total number of Record Types. This will take a hit on the performance, however. With some tweaks like this, finally I'm getting the desired output. Here is my implementation (Just for reference, needs more cleanup)
Configuration.java (StepBuilderFactory)
...chunk(3).reader(reader).processor(processor()).writer(writer).stream(recordType1FlatFileItemWriter()).stream(recordType2FlatFileItemWriter()).build();
Processor Step
@Bean
@StepScope
public ItemProcessor processor() {
ClassifierCompositeItemProcessor<? extends RecordType, ? extends RecordType> processor = new ClassifierCompositeItemProcessor<>();
ItemProcessor<RecordType1, RecordType1> recordType1Processor = new ItemProcessor<RecordType1, RecordType1>() {
@Nullable
@Override
public RecordType1 process(RecordType1 recordType1) throws Exception {
//Processing logic
return recordType1;
}
};
ItemProcessor<RecordType2, RecordType2> recordType2Processor = new ItemProcessor<RecordType2, RecordType2>() {
@Nullable
@Override
public RecordType2 process(RecordType2 recordType2) throws Exception {
//Processing logic
return recordType2;
}
};
SubclassClassifier classifier = new SubclassClassifier();
Map typeMap = new HashMap();
typeMap.put(RecordType1.class, recordType1Processor);
typeMap.put(RecordType2.class, recordType2Processor);
classifier.setTypeMap(typeMap);
processor.setClassifier(classifier);
return processor;
}
Writer Step
@Bean
@StepScope
public ClassifierCompositeItemWriter writer(@Value("#{stepExecutionContext[fileName]}") Resource file) throws Exception {
ClassifierCompositeItemWriter<String> writer = new ClassifierCompositeItemWriter<>();
SubclassClassifier classifier = new SubclassClassifier<>();
Map typeMap = new HashMap<>();
typeMap.put(RecordType1.class, recordType1FlatFileItemWriter());
typeMap.put(RecordType2.class, recordType2FlatFileItemWriter());
typeMap.put(RecordType3.class, recordType3FlatFileItemWriter());
classifier.setTypeMap(typeMap);
writer.setClassifier(classifier);
return writer;
}
Writer
@Bean
public FlatFileItemWriter<RecordType1> recordType1FlatFileItemWriter() throws Exception{
FlatFileItemWriter<RecordType1> writer = new FlatFileItemWriter<>();
writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<RecordType1>() {{
setDelimiter("#");
setFieldExtractor(new BeanWrapperFieldExtractor<RecordType1>() {
{
setNames(new String[] { "RecordType", "ID1", "ID2", "ID3");
}
});
}});
return writer;
}
@Bean
public FlatFileItemWriter<RecordType2> recordType2FlatFileItemWriter() throws Exception{
FlatFileItemWriter<RecordType2> writer = new FlatFileItemWriter<>();
writer.setResource( new FileSystemResource(outputFolder + "test.txt"));
writer.setAppendAllowed(true);
writer.setLineAggregator(new DelimitedLineAggregator<RecordType2>() {{
setDelimiter("#");
setFieldExtractor(new BeanWrapperFieldExtractor<RecordType2>() {
{
setNames(new String[] { "RecordType", "ID9", "ID8",);
}
});
}});
return writer;
}