We are using Spring Batch to ETL a pipe-delimited file into a DB. Every record in the file has many fields and is identified by a ClaimNumber:
ClaimNumber|AdjustmentVersion|.....
0038017282|3|....
0071517729|3|....
0081517745|3|....
Inside the batch step a regular read-process-write flow is used:
<step id="stagingDataDump" next="gatherStats">
<tasklet>
<!-- <chunk reader="genericBatchItemReader" writer="genericBatchItemWriter" -->
<chunk reader="genericBatchItemReader" writer="compositeWriter" processor="validationProcessor"
commit-interval="1000" skip-limit="100000" >
<skippable-exception-classes>
<batch:include class="org.springframework.batch.item.file.FlatFileParseException" />
<batch:include class="org.beanio.BeanIOException" />
</skippable-exception-classes>
</chunk>
<listeners>
<listener ref="genericItemSkipListener"/>
</listeners>
</tasklet>
</step>
Reader utilizes the BeanIOFlatFileItemReader:
<bean id="genericBatchItemReader" class="org.beanio.spring.BeanIOFlatFileItemReader" scope="step"
p:streamMapping="classpath:beanio-mapping.xml"
p:streamName="#{jobParameters[feedProcessorLauncherImpl.BEANIO_STREAM_MAPPING]}"
p:resource="file://#{jobParameters[feedProcessorLauncherImpl.RESOURCE_FILE_NAME_UNENCRYPTED]}"
p:errorHandler-ref="beanIoRecordErrorHandler"/>
Processor phase encapsulates item validation:
<util:map id="handlerRegistryContents">
<entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).MEDI}" value-ref="medicalClaimsValidator"/>
<entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).LAB}" value-ref="labClaimsValidator"/>
<entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).RXPD}" value-ref="pharmaClaimsValidator"/>
</util:map>
<bean id="validationProcessor" class="org.fuwt.iws.claims.validation.springbatch.ValidationProcessor" scope="step">
<property name="handlerRegistry" ref="handlerRegistryContents"/>
</bean>
The write is composite:
<bean id="genericBatchItemWriter" class="org.fuwt.iws.claims.springbatch.GenericBatchItemWriter" scope="step"
p:metadataId="#{jobParameters[feedProcessorLauncherImpl.METADATA_ID]}"/>
<bean id="softValidationsItemWriter" class="org.fuwt.iws.claims.springbatch.SoftValidationsItemWriter" scope="step"
p:metadataId="#{jobParameters[feedProcessorLauncherImpl.METADATA_ID]}"/>
<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter" scope="step">
<property name="delegates">
<list>
<!-- Order here is significant as ID's, which are generated by the first writer - genericBatchItemWriter - need to be passed around -->
<ref bean="genericBatchItemWriter"/>
<ref bean="softValidationsItemWriter"/>
</list>
</property>
</bean>
During the processing/validation step above, the ValidationProcessor
determines the type of the record, based on which it instantiates an appropriate composite validator (MedicalClaimsValidator
in this case), inside of which all the individual validations for this type are configured (a composite pattern), HCPCSCodeLength
in the logs below.
While each item (claim) being validated via this infrastructure, the errors found are accumulated into item's errors
field - a Map<String, Collection<String>>
where a failure of a particular validation is keyed by that validation name and described in the Map's value.
Which brings me to the description of the erroneous behavior we are seeing:
After careful log file examination of the load of the above test CSV file with 3 claim records, we found the following (strange!?) behavior whereby every record undergoes validation as many times as its ordinal number in the file. As seen below: the 1st record (claim) gets validated once, resulting in a single record in the Errors map; the 2nd record (claim) is validated twice and the Errors map for it now houses the message repeated twice; the 3rd record is being validated 3 times, resulting in the entry into the Errors map with the same message repeated 3 times.
Each record in the file is pretty much identical in terms of its invalidity, so an expected result is that each record should have an identical Errors collection.
The actual result is that the value of the error keeps increasing with each subsequent record:
1st Record:
INFO 2016-06-23 10:16:24,214 [main] org.fuwt.iws.claims.validation.springbatch.medical.MedicalClaimsValidator: Service date to: Thu Dec 10 00:00:00 EST 2015
INFO 2016-06-23 10:16:24,216 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0038017282
INFO 2016-06-23 10:16:24,223 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
2nd Record
INFO 2016-06-23 10:16:24,227 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0071517729
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0071517729
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
3rd Record
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO 2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO 2016-06-23 10:16:24,229 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
Versions used:
spring-batch-core: 2.2.0.RELEASE
beanio: 2.1.0
Question:
What if anything makes Spring Batch to issue those repeated calls on the processor, is that Spring Batch's normal behavior, and how one can stop it and achieve the desired behavior I described above?
Updates:
This validation component exhibits the erroneous behavior:
@Component("medicalClaimsValidator")
public class MedicalClaimsValidator implements ClaimValidation {
private final static Logger logger = LoggerFactory.getLogger(MedicalClaimsValidator.class);
@Autowired private AbstractMedicalClaimValidation HCPCSCodeLength;
List<ClaimValidation> medicalClaimValidations = new ArrayList<>();
@Override
public boolean supports(Class<?> clazz) {
return QualcareMedicalClaimWeeklyNDT.class.equals(clazz);
}
@Override
public Map<String, Collection<String>> validate(Object item, MessageSource messageSource) {
logger.info("\nSoft-validating the bean...");
QualcareMedicalClaimWeeklyNDT medicalClaim = (QualcareMedicalClaimWeeklyNDT)item;
logger.info("Claim #: {}", medicalClaim.getClaimNumber());
logger.info("Service date from: {}", medicalClaim.getServiceDateFrom());
logger.info("Service date to: {}", medicalClaim.getServiceDateTo());
//TODO: A candidate for externalization into a config file once we have all the known rules
//medicalClaimValidations.add(new ServiceDateFromGreaterThanTo());
//medicalClaimValidations.add(new ProcedureCodeLength());
medicalClaimValidations.add(HCPCSCodeLength/*new HCPCSCodeLength()*/);
//medicalClaimValidations.add(new TypeOfBillPresenseAndLengthForInstitutionalClaims());
//medicalClaimValidations.add(new DischargeStatusPresenseAndLengthForInpatientClaims());
//medicalClaimValidations.add(new DiagnosisCodeFormat());
for(ClaimValidation validation:medicalClaimValidations) {
logger.info("validation type: {}",validation.getClass());
validation.validate(medicalClaim, messageSource);
}
return medicalClaim.getErrors();
}
}
The following workaround hides the erroneous behavior:
@Component("medicalClaimsValidator")
public class MedicalClaimsValidator implements ClaimValidation {
private final static Logger logger = LoggerFactory.getLogger(MedicalClaimsValidator.class);
@Autowired @Qualifier("HCPCSCodeLength")private AbstractMedicalClaimValidation HCPCSCodeLength;
@Autowired @Qualifier("serviceDateFromGreaterThanTo")private AbstractMedicalClaimValidation serviceDateFromGreaterThanTo;
@Autowired @Qualifier("procedureCodeLength")private AbstractMedicalClaimValidation procedureCodeLength;
@Autowired @Qualifier("typeOfBillPresenseAndLengthForInstitutionalClaims")private AbstractMedicalClaimValidation typeOfBillPresenseAndLengthForInstitutionalClaims;
@Autowired @Qualifier("dischargeStatusPresenseAndLengthForInpatientClaims")private AbstractMedicalClaimValidation dischargeStatusPresenseAndLengthForInpatientClaims;
@Autowired @Qualifier("diagnosisCodeFormat")private AbstractMedicalClaimValidation diagnosisCodeFormat;
List<ValidationProcessTuple> medicalClaimValidations = new ArrayList<>();
@Override
public boolean supports(Class<?> clazz) {
return QualcareMedicalClaimWeeklyNDT.class.equals(clazz);
}
@Override
public Map<String, Collection<String>> validate(Object item, MessageSource messageSource) {
logger.info("\nSoft-validating the bean...");
QualcareMedicalClaimWeeklyNDT medicalClaim = (QualcareMedicalClaimWeeklyNDT)item;
logger.info("Claim #: {}", medicalClaim.getClaimNumber());
logger.info("Service date from: {}", medicalClaim.getServiceDateFrom());
logger.info("Service date to: {}", medicalClaim.getServiceDateTo());
//TODO: A candidate for externalization into a config file once we have all the known rules
medicalClaimValidations.add(new ValidationProcessTuple(serviceDateFromGreaterThanTo, false));
medicalClaimValidations.add(new ValidationProcessTuple(procedureCodeLength, false));
medicalClaimValidations.add(new ValidationProcessTuple(HCPCSCodeLength, false));
medicalClaimValidations.add(new ValidationProcessTuple(typeOfBillPresenseAndLengthForInstitutionalClaims, false));
medicalClaimValidations.add(new ValidationProcessTuple(dischargeStatusPresenseAndLengthForInpatientClaims, false));
medicalClaimValidations.add(new ValidationProcessTuple(diagnosisCodeFormat, false));
for (ValidationProcessTuple tuple : medicalClaimValidations) {
if (!tuple.processed) {//to counteract the erroneous behavior whereby validation calls get repeated as many times as there are records
tuple.validation.validate(item, messageSource);
tuple.processed = true;
}
}
return medicalClaim.getErrors();
}
}
I am still in the dark as to why this behavior happens in the first place - any explanation of that is certainly welcome.