You're problem is that you reader is not in scope step .
That's means : all your threads share the same input Stream (Resource file).
To have for each thread one row to process you need to :
- Be sure that all threads read the file from the start to the
end of file (Each thread should open the stream and close it for
each execution context )
- The partitioner must inject the start and end position for each
execution context.
- You're reader must read the file with this positions.
I write some code and this is the output :
Code of com.test.partitioner.RangePartitioner
class :
public Map<String, ExecutionContext> partition() {
Map < String, ExecutionContext > result = new HashMap < String, ExecutionContext >();
int range = 1;
int fromId = 1;
int toId = range;
for (int i = 1; i <= gridSize; i++) {
ExecutionContext value = new ExecutionContext();
log.debug("\nStarting : Thread" + i);
log.debug("fromId : " + fromId);
log.debug("toId : " + toId);
value.putInt("fromId", fromId);
value.putInt("toId", toId);
// give each thread a name, thread 1,2,3
value.putString("name", "Thread" + i);
result.put("partition" + i, value);
fromId = toId + 1;
toId += range;
}
return result;
}
--> Look at the outPut console
Starting : Thread1
fromId : 1
toId : 1
Starting : Thread2
fromId : 2
toId : 2
Starting : Thread3
fromId : 3
toId : 3
Starting : Thread4
fromId : 4
toId : 4
Starting : Thread5
fromId : 5
toId : 5
Starting : Thread6
fromId : 6
toId : 6
Starting : Thread7
fromId : 7
toId : 7
Starting : Thread8
fromId : 8
toId : 8
Starting : Thread9
fromId : 9
toId : 9
Starting : Thread10
fromId : 10
toId : 10
Look at the configuration bellow :
http://www.springframework.org/schema/batch/spring-batch-2.2.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
<import resource="../config/context.xml" />
<import resource="../config/database.xml" />
<bean id="mouvement" class="com.test.model.Mouvement" scope="prototype" />
<bean id="itemProcessor" class="com.test.processor.CustomItemProcessor" scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}" />
</bean>
<bean id="xmlItemWriter" class="com.test.writer.ItemWriter" />
<batch:job id="mouvementImport" xmlns:batch="http://www.springframework.org/schema/batch">
<batch:listeners>
<batch:listener ref="myAppJobExecutionListener" />
</batch:listeners>
<batch:step id="masterStep">
<batch:partition step="slave" partitioner="rangePartitioner">
<batch:handler grid-size="10" task-executor="taskExecutor" />
</batch:partition>
</batch:step>
</batch:job>
<bean id="rangePartitioner" class="com.test.partitioner.RangePartitioner" />
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<batch:step id="slave">
<batch:tasklet>
<batch:listeners>
<batch:listener ref="stepExecutionListener" />
</batch:listeners>
<batch:chunk reader="mouvementReader" writer="xmlItemWriter" processor="itemProcessor" commit-interval="1">
</batch:chunk>
</batch:tasklet>
</batch:step>
<bean id="stepExecutionListener" class="com.test.listener.step.StepExecutionListenerCtxInjecter" scope="step" />
<bean id="myAppJobExecutionListener" class="com.test.listener.job.MyAppJobExecutionListener" />
<bean id="mouvementReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="classpath:XXXXX/XXXXXXXX.csv" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value="|" />
<property name="names"
value="id,numen,prenom,grade,anneeScolaire,academieOrigin,academieArrivee,codeUsi,specialiteEmploiType,natureSupport,dateEffet,modaliteAffectation" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.test.mapper.MouvementFieldSetMapper" />
</property>
</bean>
</property>
</bean>
<!-- <bean id="itemReader" scope="step" autowire-candidate="false" parent="mouvementReaderParent">-->
<!-- <property name="resource" value="#{stepExecutionContext[fileName]}" />-->
<!-- </bean>-->
<bean id="mouvementReader" class="com.test.reader.MouvementItemReader" scope="step">
<property name="delegate" ref="mouvementReaderParent" />
<property name="parameterValues">
<map>
<entry key="fromId" value="#{stepExecutionContext[fromId]}" />
<entry key="toId" value="#{stepExecutionContext[toId]}" />
</map>
</property>
</bean>
<!-- <bean id="xmlItemWriter" class="org.springframework.batch.item.xml.StaxEventItemWriter">-->
<!-- <property name="resource" value="file:xml/outputs/Mouvements.xml" />-->
<!-- <property name="marshaller" ref="reportMarshaller" />-->
<!-- <property name="rootTagName" value="Mouvement" />-->
<!-- </bean>-->
<bean id="reportMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller">
<property name="classesToBeBound">
<list>
<value>com.test.model.Mouvement</value>
</list>
</property>
</bean>
TODO : Change my reader on other that read with position (start and end position) like with Scanner Class in java.
Hope this help.