4

I have 10 records in my source table and I am having item count as 3.

I have 2 partitions to process these 10 records(i.e first 5 records will be processed in first partition and remaining records processed in 2nd partition while processing records in 2nd partition I am throwing an exception so job will be failed at 2nd chunk of 2nd partition.when I am restarting the job ,failed partition is processing all the records again(that is first chunk and 2nd chunk). Restarting the job should only process from last failed chunk records but not all the records in that partition.Can you please guide me how to achieve this?

My JSL is like below:

    <?xml version="1.0" encoding="UTF-8"?>
<job xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
    xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
    id="readingfrom-db" restartable="true" version="1.0" >
    <properties >
        <property name="numRec" value="#{jobParameters['numRec']}?:5;"/>        
        <property name="chunkSize" value="#{jobParameters['chunkSize']}?:3;"/>
        <property name="whereclauseFrom" value="#{jobParameters['whereclauseFrom']}?:5;"/>
        <property name="whereclauseTo" value="#{jobParameters['whereclauseTo']}?:6;"/>      
        <property name="dsJNDI" value="#{jobParameters['dsJNDI']}?:jdbc/db2;"/>
        <property name="dsJNDI1" value="#{jobParameters['dsJNDI1']}?:jdbc/db2;"/>
        <property name="tableName" value="#{jobParameters['tableName']}?:CISDW.AIF1_CH;"/>
        <property name="ProcesstableName" value="#{jobParameters['ProcesstableName']}?:CISDW.PROC_AIF1_CH;"/>
    </properties>
    <step id="runcache" next="readFromDB">
        <batchlet ref="com.cdc.runcache.CacheRunnerBatchlet" />
    </step>
    <step id="readFromDB">
        <listeners>
            <listener ref="com.cdc.dbreader.LogExceptionListener"/>
        </listeners> 
        <chunk item-count="3" checkpoint-policy="item">
            <reader ref="com.cdc.dbreader.DBItemReader">
                <properties >
                    <property name="dsJNDI" value="#{jobProperties['dsJNDI']}"/>
                    <property name="tableName" value="#{jobProperties['tableName']}"/>
                    <property name="whereclauseFrom" value="#{partitionPlan['modrec']}"/>                   
                </properties>
            </reader>
            <processor ref="com.cdc.dbreader.DBItemProcessor" />            
            <writer ref="com.cdc.dbreader.DBItemWriter">
                <properties >
                    <property name="dsJNDI" value="#{jobProperties['dsJNDI1']}"/>
                    <property name="tableName" value="#{jobProperties['ProcesstableName']}"/>
                </properties>
            </writer>
        </chunk>
        <partition>
            <plan partitions="2" threads="2">
                <properties partition="0">
                    <property name="modrec" value="#{jobProperties['whereclauseFrom']}"/>                   
                </properties>
                <properties partition="1">
                    <property name="modrec" value="#{jobProperties['whereclauseTo']}"/>
                </properties>
            </plan>
        </partition>        
    </step>                     
</job>

My Item Reader is like below:

 public class DBItemReader implements ItemReader {  
    @Inject
    @BatchProperty
    private String dsJNDI;

    @Inject
    @BatchProperty
    private String whereclauseFrom;


    @Inject
    @BatchProperty
    private String tableName;

    private Connection conn =null;
    private int totalRecords=0;

    private DataSource ds = null;
    List<RecObj> listRecObj=new ArrayList<RecObj>();    

    @Override
    public Object readItem() throws SQLException {
         if (listRecObj.size() == 0) {             
             return null;
         } else { 
             RecObj rec =null;           
             Iterator<RecObj> iter =listRecObj.iterator();
             while (iter.hasNext()) {               
                rec = iter.next();               

               if (Integer.parseInt(rec.getRec())  == 7) {                      
                  throw new IllegalStateException("Thrown Error");
                }
                iter.remove();
                return rec;
             }
             return rec;
         }


     @Override
    public void open(Serializable arg0) throws NamingException, SQLException {
          ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI)); 
//        System.out.println("whereclauseFrom: " + whereclauseFrom);          
          conn = ds.getConnection(); 
          String sql ="";
          if(Integer.parseInt(whereclauseFrom) == 5){
              sql = "SELECT * FROM " + tableName + " WHERE  CAST(REC AS INTEGER) <= "+ whereclauseFrom;
          }else if(Integer.parseInt(whereclauseFrom) == 6){
              sql = "SELECT * FROM " + tableName + " WHERE  CAST(REC AS INTEGER) >= "+ whereclauseFrom;
          }

          PreparedStatement ps = conn.prepareStatement(sql);
          ResultSet rs=ps.executeQuery();
          while(rs.next()){
             totalRecords++;
             String rec=rs.getString("REC"); 
             if(rec != null)
                listRecObj.add(new RecObj(rec));

          }          
          rs.close();          
    }   
    @Override
    public void close() throws SQLException {
        conn.close();       
    }   
    @Override
    public Serializable checkpointInfo() {       
            return null;
    }

}
    }

MY Writer class is like below:

public class DBItemWriter extends AbstractItemWriter implements ItemWriter {    
    @Inject
    @BatchProperty
    private String dsJNDI;

    @Inject
    @BatchProperty
    private String tableName;

    private DataSource ds = null; 

    @Override
    public void open(Serializable arg0) throws NamingException {
         ds = DataSource.class.cast(new InitialContext().lookup(dsJNDI));            
    }

    @Override
    public void writeItems(List<java.lang.Object> items) throws BatchUpdateException,SQLException{      
        Connection conn = ds.getConnection();           
        String sql = "INSERT INTO "+tableName+ "(MOD_REC) VALUES(?) ";       
        PreparedStatement ps = conn.prepareStatement(sql);        
        for (Object obj : items) {          
            RecObj v = (RecObj)obj;
            System.out.println("=======Writer values===="+v.getRec());                
            ps.setString(1, v.getRec());            
            ps.addBatch();
        }        
        ps.executeBatch();
        ps.clearBatch();
        ps.close();
        conn.close();
    }
}

Below is my Processor:

public class DBItemProcessor implements ItemProcessor {
    Integer count=0;   
    @Override
    public Object processItem(Object arg0) {
        count++;
        RecObj v=(RecObj)arg0;
        String vname=v.getRec();
        System.out.println("=========Processer Values==="+vname);
        return new RecObj(vname+count);
    }
}

Below is my Bean class

public class RecObj {
   private String rec;


  public RecObj(String rec) {
    this.rec=rec;
}
Gaslan
  • 808
  • 1
  • 20
  • 27
Srinivas K
  • 112
  • 7
  • It sounds like you have the basic understanding correct, so why don't you share some code snippets along with the JSL (XML) used to define the job. In particular it would probably help to see how you are parameterizing your reader (injecting **@BatchProperty** values) for each partition, as well as the code you use to implement positioning based on the checkpoint value. – Scott Kurz May 23 '16 at 13:29
  • I have added the all my artifacts in my question.can you please verify my code and can u please help how to solve my problem? – Srinivas K May 23 '16 at 14:03
  • I don't see anything wrong yet. If you add the reader's **open()** method which uses the checkpoint value to construct the DB query as well as whatever code you have throwing the exc on the 2nd chunk I'll take another look. – Scott Kurz May 23 '16 at 14:13
  • I have added Reader class open method.I am reading first 5 records in 1st partition based on Batch Property whereclauseFrom.same in 2 partition I am reading remaining 5 records in 2nd partition. – Srinivas K May 23 '16 at 14:19

1 Answers1

2

You need to return a checkpoint value in your reader's checkpointInfo() which will be passed into your reader's open() method on restart. This is how a reader and the batch container coordinate to provide checkpointing on restart.

So you could have something like (look for the CHECKPOINT comments):

public class DBItemReader implements ItemReader {  

    // ... 

    // CHECKPOINT field defined
    private String checkpoint = null; 

    @Override
    public void open(Serializable checkpoint) throws NamingException, SQLException {

        // CHECKPOINT-based positioning through query value.
        // Initial position = whereclauseFrom, on restart set to checkpoint
        String queryVal = (String)(checkpoint == null ? whereclauseFrom : checkpoint);       

        if(Integer.parseInt(whereclauseFrom) == 5){
            sql = "SELECT * FROM " + tableName + " WHERE  CAST(REC AS INTEGER) <= "+ queryVal;
        }else if(Integer.parseInt(whereclauseFrom) == 6){
            sql = "SELECT * FROM " + tableName + " WHERE  CAST(REC AS INTEGER) >= "+ queryVal;
        }
        // ..
    }

    @Override
    public Object readItem() throws SQLException {
        if (listRecObj.size() == 0) {             
            return null;
        } else { 
            RecObj rec =null;           
            Iterator<RecObj> iter =listRecObj.iterator();
            while (iter.hasNext()) {               
                rec = iter.next();               
                // CHECKPOINT updated
                checkpoint = rec.getRec();
                if (Integer.parseInt(rec.getRec())  == 7) {                      
                    throw new IllegalStateException("Thrown Error");
                }
            }
        }
        // ...
    }

    @Override
    public Serializable checkpointInfo() {      
        // CHECKPOINT returned at end of chunk
        return checkpoint;
    }
}
Gaslan
  • 808
  • 1
  • 20
  • 27
Scott Kurz
  • 4,985
  • 1
  • 18
  • 40
  • Note there is one other issue with your example. Even if you made the suggested updates, if you fail on record #7, you're still going to start from the beginning after restarting the 2nd partition. That's because with an item-count of 3, starting at 6, the first chunk would consist of records 6,7,8. So you're actually failing in the first chunk, not the second. – Scott Kurz May 23 '16 at 14:55
  • Thank you Scott it is working fine.Thank you for your quick help. – Srinivas K May 23 '16 at 15:38
  • if(Integer.parseInt(queryVal) == 5 && arg0 == null){ sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) <= "+ whereclauseFrom; }else if(Integer.parseInt(whereclauseFrom) == 6 && arg0 == null){ sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) >= "+ whereclauseFrom; }else { sql = "SELECT * FROM " + tableName + " WHERE CAST(REC AS INTEGER) > "+ queryVal; } – Srinivas K May 23 '16 at 15:43
  • Great, glad to help ! Since it looks like you are new to the site, let me just note that you can accept the answer to show others that it did answer your question. Thanks. – Scott Kurz May 23 '16 at 16:02
  • Scott do we have any example for checkpoint for custom writer which implements itemWriter. – jcrshankar Oct 16 '18 at 01:57
  • @jcrshankar, since the original question asked about ItemReader, why don't you ask about the writer in a new, separate question with similar tags? I'll look for it. – Scott Kurz Oct 16 '18 at 13:10