1

I have 10 segments in the folder. Each segment has 2 files in it. I have implemented Partition mapper to read, process and write one segment at a time.

I want to get the value of no.of records read from the files of each segment and no.of records inserted in to DB . In the next step , send the status report for the job mentioning no.of records inserted in a mail.

I am using JSR 352 and implemented a thread for each segment using partition mapper.

1 Answers1

0

This is a similar question to this, but that didn't specifically ask about accessing the data in a later step.

You could start here with these two steps, (as mentioned in the other answer):

Passing data from each partition to the top-level job

1) Use the exit status of each partition to represent the number of records read for that partition.

2) Use the PartitionAnalyzer.analyzeStatus to aggregate these into a single object on the top-level thread.

E.g., supposing each partition's exit status was set to a stringified Integer representing the number of records processed by that partition, you could aggregate them like this (shown here using a briefly-outlined, custom PartitionData class):

       // Assumes persistent user data initialized somewhere else
        public void analyzeStatus(BatchStatus batchStatus,
        String exitStatus) throws Exception {
            if (batchStatus.equals(BatchStatus.COMPLETED)) {
                PartitionData ud = (PartitionData)stepCtx.getPersistentUserData();
                int numRecords = Integer.parseInt(exitStatus);
                pd.incrementCount(numRecords);
            }  // else maybe do something else
            // ...
        }

       // Your own class
       public class PartitionData {
           int totalNumRecords;
           public incrementCount(int numRecords) {
              totalNumRecords += numRecords;
           }
       }  

       // 
       // Setting partition exit status as num records processed not shown !
       //

This is thread-safe as the spec guarantees analyzeStatus will be called separately, on a single thread, as each partition ends.

Passing data from one step to the next (in a persistent manner)

Now, at this point you might think to simply set this aggregate object into the job's transient user data. The problem here is that if the job fails on the next step, and you restart it at that next step, this data would not then be populated (in the job transient user data), on restart.

So it would be best to persist this aggregate object somehow. It is possible to leverage the batch container's persistent store (the "job repository") by using the first (partitioned) step's persistent user data. This isn't a one-liner though, so I won't show it unless you ask.

Scott Kurz
  • 4,985
  • 1
  • 18
  • 40
  • Thanks! Scott.I have tried using partition analyser to get the record count(Total no of records written) in the partition analyser for each partition.But, how can we collect the final counts from all partitions ,so that it will be available to other steps to access it. – Pesala Sairam May 04 '18 at 18:45
  • @Pesala, I updated my answer outlining how you might go about aggregating the counts into a single value/object on the "top-level" thread (on which the PartitionAnalyzer runs). If you still have questions, please show more detail on what you are trying to do and I will try to reply. I wonder if your difficulty might be more with accessing this data from a later step than with aggregating it the first place, but will see what you reply. – Scott Kurz May 07 '18 at 18:36
  • Thanks a lot Scott. Finally I got the way that you mentioned and succeeded successfully. – Pesala Sairam May 09 '18 at 18:07
  • Great. Please accept the answer when you get a chance. – Scott Kurz May 09 '18 at 18:21