4

In a Spring Batch I am trying to read a CSV file and want to assign each row to a separate thread and process it. I have tried to achieve it by using TaskExecutor, but what is happening all the thread is picking the same row at a time. I also tried to implement the concept using Partioner, there also same thing happening. Please see below my Configuration Xml.

Step Description

    <step id="Step2">
        <tasklet task-executor="taskExecutor">
            <chunk reader="reader" processor="processor" writer="writer" commit-interval="1" skip-limit="1">
            </chunk>
        </tasklet> 
    </step>

              <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="file:cvs/user.csv" />

<property name="lineMapper">
    <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
      <!-- split it -->
      <property name="lineTokenizer">
            <bean
          class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
            <property name="names" value="userid,customerId,ssoId,flag1,flag2" />
        </bean>
      </property>
      <property name="fieldSetMapper">   

          <!-- map to an object -->
          <bean
            class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
            <property name="prototypeBeanName" value="user" />
          </bean>           
      </property>

      </bean>
  </property>

       </bean>

      <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
 <property name="concurrencyLimit" value="4"/>   

I have tried with different types of task executor, but all of them are behaving in same way. How can I assign each row to a separate thread?

slowhandblues
  • 87
  • 1
  • 1
  • 12
  • You can refer this http://stackoverflow.com/questions/20243629/how-to-increase-the-performance-of-flatfileitemreader-in-springbatch/20261835#20261835 – Karthik Prasad Jan 22 '14 at 18:52

3 Answers3

6

FlatFileItemReader is not thread-safe. In your example you can try to split the CSV file to smaller CSV files and then use a MultiResourcePartitioner to process each one of them. This can be done in 2 steps, one for splitting the original file(like 10 smaller files) and the other for processing splitted files.This way you won't have any issues since each file will be processed by one thread.

Example:

<batch:job id="csvsplitandprocess">
     <batch:step id="step1" next="step2master">
    <batch:tasklet>
        <batch:chunk reader="largecsvreader" writer="csvwriter" commit-interval="500">
        </batch:chunk>
    </batch:tasklet>
    </batch:step>
    <batch:step id="step2master">
    <partition step="step2" partitioner="partitioner">
        <handler grid-size="10" task-executor="taskExecutor"/>
    </partition>
</batch:step>
</batch:job>

<batch:step id="step2">
    <batch:tasklet>
        <batch:chunk reader="smallcsvreader" writer="writer" commit-interval="100">
        </batch:chunk>
    </batch:tasklet>
</batch:step>


<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <property name="corePoolSize" value="10" />
            <property name="maxPoolSize" value="10" />
    </bean>

<bean id="partitioner" 
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
<property name="resources" value="file:cvs/extracted/*.csv" />
</bean>

The alternative instead of partitioning might be a Custom Thread-safe Reader who will create a thread for each line, but probably partitioning is your best choice

dimzak
  • 2,511
  • 8
  • 38
  • 51
  • Yes...I realised that I have to go for either of this choices...but which one will be better performance wise? – slowhandblues Jan 22 '14 at 19:55
  • Definitely partitioning and that's because a custom Reader will still process line by line.On the other hand many smaller csv files will be procesed simultaneously(partitioned-step).Keep in mind that there are many other factors except scaling techinques for optimizing performance like playing with commit-interval, skip and retry-policy, generally every case has it's own bottlenecks. Hope that helps! – dimzak Jan 22 '14 at 20:25
  • Great, is it safe to use MultiThreadedFlatFileItemReader in remote partitioning ? https://github.com/sshcherbakov/spring-batch-talk/blob/master/src/main/java/org/springframework/batch/item/file/MultiThreadedFlatFileItemReader.java – vishal Jun 13 '14 at 05:31
  • I guess and as shown [here](http://stackoverflow.com/a/20342308/1499705), you have to be very careful where the resources of the readers are(local for slaves). With some caution I imagine it will work and if not, a question can solve the problem :) (I didn't try the Reader you mentioned but from a quick look is really handy) – dimzak Jun 13 '14 at 10:37
  • If we use MultiResourcePartitioner, how should be reader configuration ? can we use FlatFileItemReader with resouce #{stepExecutionContext[fileName]} ? or we need to use MultiResouceItemReader ? – vishal Jun 14 '14 at 05:09
  • @dimzak can you provide us the step of splitting the large file into small ones thanks – kaissun Mar 25 '17 at 19:15
1

You're problem is that you reader is not in scope step .

That's means : all your threads share the same input Stream (Resource file).

To have for each thread one row to process you need to :

  1. Be sure that all threads read the file from the start to the end of file (Each thread should open the stream and close it for each execution context )
  2. The partitioner must inject the start and end position for each execution context.
  3. You're reader must read the file with this positions.

I write some code and this is the output :

Code of com.test.partitioner.RangePartitioner class :

public Map<String, ExecutionContext> partition() {

    Map < String, ExecutionContext > result = new HashMap < String, ExecutionContext >();

    int range = 1;
    int fromId = 1;
    int toId = range;

    for (int i = 1; i <= gridSize; i++) {
        ExecutionContext value = new ExecutionContext();

        log.debug("\nStarting : Thread" + i);
        log.debug("fromId : " + fromId);
        log.debug("toId : " + toId);

        value.putInt("fromId", fromId);
        value.putInt("toId", toId);

        // give each thread a name, thread 1,2,3
        value.putString("name", "Thread" + i);

        result.put("partition" + i, value);

        fromId = toId + 1;
        toId += range;

    }

    return result;
}

--> Look at the outPut console

Starting : Thread1 fromId : 1 toId : 1

Starting : Thread2 fromId : 2 toId : 2

Starting : Thread3 fromId : 3 toId : 3

Starting : Thread4 fromId : 4 toId : 4

Starting : Thread5 fromId : 5 toId : 5

Starting : Thread6 fromId : 6 toId : 6

Starting : Thread7 fromId : 7 toId : 7

Starting : Thread8 fromId : 8 toId : 8

Starting : Thread9 fromId : 9 toId : 9

Starting : Thread10 fromId : 10 toId : 10

Look at the configuration bellow :

http://www.springframework.org/schema/batch/spring-batch-2.2.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">

<import resource="../config/context.xml" />
<import resource="../config/database.xml" />

<bean id="mouvement" class="com.test.model.Mouvement" scope="prototype" />

<bean id="itemProcessor" class="com.test.processor.CustomItemProcessor" scope="step">
    <property name="threadName" value="#{stepExecutionContext[name]}" />
</bean>
<bean id="xmlItemWriter" class="com.test.writer.ItemWriter" />

<batch:job id="mouvementImport" xmlns:batch="http://www.springframework.org/schema/batch">
    <batch:listeners>
        <batch:listener ref="myAppJobExecutionListener" />
    </batch:listeners>

    <batch:step id="masterStep">
        <batch:partition step="slave" partitioner="rangePartitioner">
            <batch:handler grid-size="10" task-executor="taskExecutor" />
        </batch:partition>
    </batch:step>
</batch:job>

<bean id="rangePartitioner" class="com.test.partitioner.RangePartitioner" />

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

<batch:step id="slave">
    <batch:tasklet>
        <batch:listeners>
            <batch:listener ref="stepExecutionListener" />
        </batch:listeners>

        <batch:chunk reader="mouvementReader" writer="xmlItemWriter" processor="itemProcessor" commit-interval="1">
        </batch:chunk>

    </batch:tasklet>
</batch:step>



<bean id="stepExecutionListener" class="com.test.listener.step.StepExecutionListenerCtxInjecter" scope="step" />

<bean id="myAppJobExecutionListener" class="com.test.listener.job.MyAppJobExecutionListener" />

<bean id="mouvementReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">

    <property name="resource" value="classpath:XXXXX/XXXXXXXX.csv" />

    <property name="lineMapper">
        <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
            <property name="lineTokenizer">
                <bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                    <property name="delimiter" value="|" />
                    <property name="names"
                        value="id,numen,prenom,grade,anneeScolaire,academieOrigin,academieArrivee,codeUsi,specialiteEmploiType,natureSupport,dateEffet,modaliteAffectation" />
                </bean>
            </property>
            <property name="fieldSetMapper">
                <bean class="com.test.mapper.MouvementFieldSetMapper" />
            </property>
        </bean>
    </property>

</bean>

<!--    <bean id="itemReader" scope="step" autowire-candidate="false" parent="mouvementReaderParent">-->
<!--        <property name="resource" value="#{stepExecutionContext[fileName]}" />-->
<!--    </bean>-->

<bean id="mouvementReader" class="com.test.reader.MouvementItemReader" scope="step">
    <property name="delegate" ref="mouvementReaderParent" />
    <property name="parameterValues">
        <map>
            <entry key="fromId" value="#{stepExecutionContext[fromId]}" />
            <entry key="toId" value="#{stepExecutionContext[toId]}" />
        </map>
    </property>
</bean>

<!--    <bean id="xmlItemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">-->
<!--        <property name="resource" value="file:xml/outputs/Mouvements.xml" />-->
<!--        <property name="marshaller" ref="reportMarshaller" />-->
<!--        <property name="rootTagName" value="Mouvement" />-->
<!--    </bean>-->

<bean id="reportMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
    <property name="classesToBeBound">
        <list>
            <value>com.test.model.Mouvement</value>
        </list>
    </property>
</bean>

TODO : Change my reader on other that read with position (start and end position) like with Scanner Class in java.

Hope this help.

Patrick M
  • 10,547
  • 9
  • 68
  • 101
  • Interesting solution. I have edited to fix up the format and add the missing method signature, but I probably got it wrong. Can you please review and add the correct method name and wiring options to get the `gridSize` variable initialized? – Patrick M Jun 13 '16 at 20:14
0

You can split your input file to many file , the use Partitionner and load small files with threads, but on error , you must restart all job after DB cleaned.

<batch:job id="transformJob">
<batch:step id="deleteDir" next="cleanDB">
    <batch:tasklet ref="fileDeletingTasklet" />
</batch:step>
<batch:step id="cleanDB" next="split">
    <batch:tasklet ref="countThreadTasklet" />
</batch:step>
<batch:step id="split" next="partitionerMasterImporter">
    <batch:tasklet>
        <batch:chunk reader="largeCSVReader" writer="smallCSVWriter" commit-interval="#{jobExecutionContext['chunk.count']}" />
    </batch:tasklet>
</batch:step>
<batch:step id="partitionerMasterImporter" next="partitionerMasterExporter">
    <partition step="importChunked" partitioner="filePartitioner">
        <handler grid-size="10" task-executor="taskExecutor" />
    </partition>
</batch:step>

Full example code working (on Github)

Hope this help.

M. Mohamed
  • 31
  • 5