2

We are using Spring Batch to ETL a pipe-delimited file into a DB. Every record in the file has many fields and is identified by a ClaimNumber:

ClaimNumber|AdjustmentVersion|.....
0038017282|3|....
0071517729|3|....
0081517745|3|....

Inside the batch step a regular read-process-write flow is used:

<step id="stagingDataDump" next="gatherStats">
            <tasklet>
                <!-- <chunk reader="genericBatchItemReader" writer="genericBatchItemWriter" -->
                <chunk reader="genericBatchItemReader" writer="compositeWriter" processor="validationProcessor"
                    commit-interval="1000" skip-limit="100000" > 
                    <skippable-exception-classes>
                        <batch:include  class="org.springframework.batch.item.file.FlatFileParseException" />
                        <batch:include  class="org.beanio.BeanIOException" />
                    </skippable-exception-classes>
                </chunk>
                <listeners>
                    <listener ref="genericItemSkipListener"/>
                </listeners>
            </tasklet>
        </step>

Reader utilizes the BeanIOFlatFileItemReader:

<bean id="genericBatchItemReader"   class="org.beanio.spring.BeanIOFlatFileItemReader"  scope="step"
        p:streamMapping="classpath:beanio-mapping.xml" 
        p:streamName="#{jobParameters[feedProcessorLauncherImpl.BEANIO_STREAM_MAPPING]}" 
        p:resource="file://#{jobParameters[feedProcessorLauncherImpl.RESOURCE_FILE_NAME_UNENCRYPTED]}" 
        p:errorHandler-ref="beanIoRecordErrorHandler"/>

Processor phase encapsulates item validation:

<util:map id="handlerRegistryContents">
        <entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).MEDI}" value-ref="medicalClaimsValidator"/>
        <entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).LAB}" value-ref="labClaimsValidator"/>
        <entry key="#{T(org.fuwt.iws.claims.service.filemanagement.ClaimsEnums$ContentSubType).RXPD}" value-ref="pharmaClaimsValidator"/>
</util:map>

<bean id="validationProcessor"  class="org.fuwt.iws.claims.validation.springbatch.ValidationProcessor"  scope="step">

    <property name="handlerRegistry" ref="handlerRegistryContents"/>

</bean> 

The write is composite:

    <bean id="genericBatchItemWriter"   class="org.fuwt.iws.claims.springbatch.GenericBatchItemWriter"  scope="step"
        p:metadataId="#{jobParameters[feedProcessorLauncherImpl.METADATA_ID]}"/>

    <bean id="softValidationsItemWriter"    class="org.fuwt.iws.claims.springbatch.SoftValidationsItemWriter"   scope="step"
        p:metadataId="#{jobParameters[feedProcessorLauncherImpl.METADATA_ID]}"/>

<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter" scope="step">
        <property name="delegates">
            <list>
                <!-- Order here is significant as ID's, which are generated by the first writer - genericBatchItemWriter - need to be passed around -->
                <ref bean="genericBatchItemWriter"/>
                <ref bean="softValidationsItemWriter"/>
            </list>
        </property>
    </bean> 

During the processing/validation step above, the ValidationProcessor determines the type of the record, based on which it instantiates an appropriate composite validator (MedicalClaimsValidator in this case), inside of which all the individual validations for this type are configured (a composite pattern), HCPCSCodeLength in the logs below.

While each item (claim) being validated via this infrastructure, the errors found are accumulated into item's errors field - a Map<String, Collection<String>> where a failure of a particular validation is keyed by that validation name and described in the Map's value.

Which brings me to the description of the erroneous behavior we are seeing:

After careful log file examination of the load of the above test CSV file with 3 claim records, we found the following (strange!?) behavior whereby every record undergoes validation as many times as its ordinal number in the file. As seen below: the 1st record (claim) gets validated once, resulting in a single record in the Errors map; the 2nd record (claim) is validated twice and the Errors map for it now houses the message repeated twice; the 3rd record is being validated 3 times, resulting in the entry into the Errors map with the same message repeated 3 times.

Each record in the file is pretty much identical in terms of its invalidity, so an expected result is that each record should have an identical Errors collection.

The actual result is that the value of the error keeps increasing with each subsequent record:

1st Record:

INFO  2016-06-23 10:16:24,214 [main] org.fuwt.iws.claims.validation.springbatch.medical.MedicalClaimsValidator: Service date to: Thu Dec 10 00:00:00 EST 2015
INFO  2016-06-23 10:16:24,216 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0038017282
INFO  2016-06-23 10:16:24,223 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]} 

2nd Record

INFO  2016-06-23 10:16:24,227 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0071517729
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0071517729
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}

3rd Record

INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}
INFO  2016-06-23 10:16:24,228 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Started HCPCSCodeLength validation on org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength@1a9ddcb7 for claimNumber = 0081517745
INFO  2016-06-23 10:16:24,229 [main] org.fuwt.iws.claims.validation.springbatch.medical.HCPCSCodeLength: Errors: {HCPCSCode=[supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value, supplied HCPCS code is blank or null, but should be 3-5 characters in length, anything greater than or less than is invalid value]}

Versions used:

spring-batch-core: 2.2.0.RELEASE
beanio: 2.1.0

Question:

What if anything makes Spring Batch to issue those repeated calls on the processor, is that Spring Batch's normal behavior, and how one can stop it and achieve the desired behavior I described above?

Updates:

This validation component exhibits the erroneous behavior:

@Component("medicalClaimsValidator")
public class MedicalClaimsValidator implements ClaimValidation {

    private final static Logger logger = LoggerFactory.getLogger(MedicalClaimsValidator.class);

    @Autowired private AbstractMedicalClaimValidation  HCPCSCodeLength;


    List<ClaimValidation> medicalClaimValidations = new ArrayList<>();

    @Override
    public boolean supports(Class<?> clazz) {
        return QualcareMedicalClaimWeeklyNDT.class.equals(clazz);
    }

    @Override

    public Map<String, Collection<String>> validate(Object item,  MessageSource messageSource) {

        logger.info("\nSoft-validating the bean...");
        QualcareMedicalClaimWeeklyNDT medicalClaim = (QualcareMedicalClaimWeeklyNDT)item;

        logger.info("Claim #: {}", medicalClaim.getClaimNumber());
        logger.info("Service date from: {}", medicalClaim.getServiceDateFrom());
        logger.info("Service date to: {}", medicalClaim.getServiceDateTo());

        //TODO: A candidate for externalization into a config file once we have all the known rules
        //medicalClaimValidations.add(new ServiceDateFromGreaterThanTo());
        //medicalClaimValidations.add(new ProcedureCodeLength());
        medicalClaimValidations.add(HCPCSCodeLength/*new HCPCSCodeLength()*/);
        //medicalClaimValidations.add(new TypeOfBillPresenseAndLengthForInstitutionalClaims());
        //medicalClaimValidations.add(new DischargeStatusPresenseAndLengthForInpatientClaims());
        //medicalClaimValidations.add(new DiagnosisCodeFormat());


        for(ClaimValidation validation:medicalClaimValidations) {
            logger.info("validation type: {}",validation.getClass());
             validation.validate(medicalClaim, messageSource);
        }

        return medicalClaim.getErrors();
    }

}

The following workaround hides the erroneous behavior:

@Component("medicalClaimsValidator")
public class MedicalClaimsValidator implements ClaimValidation {

    private final static Logger logger = LoggerFactory.getLogger(MedicalClaimsValidator.class);

    @Autowired @Qualifier("HCPCSCodeLength")private AbstractMedicalClaimValidation  HCPCSCodeLength;
    @Autowired @Qualifier("serviceDateFromGreaterThanTo")private AbstractMedicalClaimValidation  serviceDateFromGreaterThanTo;
    @Autowired @Qualifier("procedureCodeLength")private AbstractMedicalClaimValidation  procedureCodeLength;
    @Autowired @Qualifier("typeOfBillPresenseAndLengthForInstitutionalClaims")private AbstractMedicalClaimValidation  typeOfBillPresenseAndLengthForInstitutionalClaims;
    @Autowired @Qualifier("dischargeStatusPresenseAndLengthForInpatientClaims")private AbstractMedicalClaimValidation  dischargeStatusPresenseAndLengthForInpatientClaims;
    @Autowired @Qualifier("diagnosisCodeFormat")private AbstractMedicalClaimValidation  diagnosisCodeFormat;


    List<ValidationProcessTuple> medicalClaimValidations = new ArrayList<>();

    @Override
    public boolean supports(Class<?> clazz) {
        return QualcareMedicalClaimWeeklyNDT.class.equals(clazz);
    }

    @Override

    public Map<String, Collection<String>> validate(Object item,  MessageSource messageSource) {

        logger.info("\nSoft-validating the bean...");
        QualcareMedicalClaimWeeklyNDT medicalClaim = (QualcareMedicalClaimWeeklyNDT)item;

        logger.info("Claim #: {}", medicalClaim.getClaimNumber());
        logger.info("Service date from: {}", medicalClaim.getServiceDateFrom());
        logger.info("Service date to: {}", medicalClaim.getServiceDateTo());

        //TODO: A candidate for externalization into a config file once we have all the known rules
        medicalClaimValidations.add(new ValidationProcessTuple(serviceDateFromGreaterThanTo, false));
        medicalClaimValidations.add(new ValidationProcessTuple(procedureCodeLength, false));
        medicalClaimValidations.add(new ValidationProcessTuple(HCPCSCodeLength, false));
        medicalClaimValidations.add(new ValidationProcessTuple(typeOfBillPresenseAndLengthForInstitutionalClaims, false));
        medicalClaimValidations.add(new ValidationProcessTuple(dischargeStatusPresenseAndLengthForInpatientClaims, false));
        medicalClaimValidations.add(new ValidationProcessTuple(diagnosisCodeFormat, false));

        for (ValidationProcessTuple tuple : medicalClaimValidations) {
            if (!tuple.processed) {//to counteract the erroneous behavior whereby validation calls get repeated as many times as there are records
                tuple.validation.validate(item, messageSource);
                tuple.processed = true;
            }
        }

        return medicalClaim.getErrors();
    }

}

I am still in the dark as to why this behavior happens in the first place - any explanation of that is certainly welcome.

Simeon Leyzerzon
  • 18,658
  • 9
  • 54
  • 82
  • It sounds like a flaw in your beanio reader – Dean Clark Jun 23 '16 at 17:52
  • Is your input file a delimited flat file? You may find the `FlatFileItemReader`/`BeanWrapperFieldSetMapper`/`DelimitedLineTokenizer` pattern works well in your case. ([Example here](http://www.mkyong.com/spring-batch/how-to-convert-date-in-beanwrapperfieldsetmapper/)) – Dean Clark Jun 23 '16 at 18:51
  • Yes, the input file is a pipe delimited file. Unfortunately, BeanIO is too deeply entrenched into this code base (it's a legacy code) so to change it may be a very last resort. How would I prove that BeanIO is really a culprit - I'm more suspicious of the structure of the processors where the main processor delegates to the validation components. I added processor listener to the picture, and that doesn't show that processor gets invoked multiple times, however I can clearly see it in the results. Anything else comes to mind to detect where the problem might lie? – Simeon Leyzerzon Jun 23 '16 at 23:30
  • Your `ValidationProcessor` could _possibly_ be the issue if it has some sort of loop where the loop count is associated to the item count. Some additional logging in that class would let you know whether `process(T item)` is being called multiple times or just once. If it's multiple times, your reader is to blame. If once, it's your processor. The other way to check would be if duplicate records wind up in your `ItemWriter`. If so, it's the reader. If not, it's the processor. – Dean Clark Jun 24 '16 at 13:17
  • As I mentioned, the added ProcessorListener doesn't indicate that processor gets invoked multiple times. Processor, however, internally loops over a list of polymorphic validators, and within that loop each validator gets invoked, looks like, as many times as there are records in a batch. The signature of validators is as follows `Map> validation.validate(T item, org.springframework.context.MessageSource)`. I'm suspecting now that maybe MessageSource being a wider scope than the processor(step) brings that kind of behavior into the picture. – Simeon Leyzerzon Jun 24 '16 at 14:05
  • I updated my question with the code of validation component that's called from within the processor which I mentioned above and a patch I found to hide the erroneous behavior. I still don't understand what causes that behavior - anything that sheds light on the issue is very welcome. – Simeon Leyzerzon Jun 24 '16 at 16:00

1 Answers1

2

In MedicalClaimsValidator class why are you accumulating the read items in this List inside the validate() method? I don't see the reason for this. It just keeps adding processed tuples in this list, every time you process a new row. You can define the processing rules outsite the method, in an init method, or constructor.

   //TODO: A candidate for externalization into a config file once we have all the known rules
    medicalClaimValidations.add(new ValidationProcessTuple(serviceDateFromGreaterThanTo, false));
    medicalClaimValidations.add(new ValidationProcessTuple(procedureCodeLength, false));
    medicalClaimValidations.add(new ValidationProcessTuple(HCPCSCodeLength, false));
    medicalClaimValidations.add(new ValidationProcessTuple(typeOfBillPresenseAndLengthForInstitutionalClaims, false));
    medicalClaimValidations.add(new ValidationProcessTuple(dischargeStatusPresenseAndLengthForInpatientClaims, false));
    medicalClaimValidations.add(new ValidationProcessTuple(diagnosisCodeFormat, false));
Shankar
  • 2,625
  • 3
  • 25
  • 49
  • Shankar PS: sorry, I don't understand what you are claiming to be happening. Does what you say attempt to answer my original question? – Simeon Leyzerzon Jun 24 '16 at 16:58
  • Yes. What I mean is the items are not getting repeated. The validation rules are getting repeated in medicalClaimValidations list. In the code it looks like the same rules are added repeatedly in a loop for each item. So, wont the rules run repeatedly and accumulate? This accumulation is happening in the fixed code too, but there you are skipping the old rules. – Shankar Jun 24 '16 at 17:21
  • Again, I don't understand what you claim happening and why you've chosen to critique the code that actually makes it work but to explain, the excerpt you are referring to just adds the validation rules to the list, later there's a call on each validation in that list. I don't see where you find that "the same rules are added repeatedly in a loop for each item", perhaps you've spotted something I couldn't notice - would you mind showing exactly what you mean? – Simeon Leyzerzon Jun 24 '16 at 17:34
  • In the MedicalClaimsValidator.java class, the validate method is called once for each item. So, if there are 5 rows, it is called 5 times. I think it is not correct that in this method, the rules are added to medicalClaimValidations list. So, after the second row, the same rule will be in the list twice. After processing the third row, the list will have 3 rules. The list of rules keeps growing, because it is populated in the validate() method. – Shankar Jun 24 '16 at 17:40