12

I am currently developing data loaders.Reading a file and writing to database. I am using partition handler to process multiple Comma Separated files in 30 threads. I want to scale and have throughput.Daily i receive 15000 files(each having 1 million records ) , how do i scale using spring batch.i want the job to complete this within a day.Do we have any open source grid computing , that can do this fairly, or is there any simple fine tuning steps.

The spring batch data loader runs stand alone. There is no web container involved. it runs on single solaris machine having 24 cpus. The data is written in to single database.default isolation and propagation is provided.The xml config is given below:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:batch="http://www.springframework.org/schema/batch" 
    xmlns:p="http://www.springframework.org/schema/p" 
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd">

    <!--  IMPORT DB CONFIG -->
    <import resource="classpath:bom/bom/bomloader/job/DataSourcePoolConfig.xml" />

     <!-- USE ANNOTATIONS TO CONFIGURE SPRING BEANS -->
    <context:component-scan base-package="bom.bom.bom" />

    <!--  INJECT THE PROCESS PARAMS HASHMAP BEFORE CONTEXT IS INITIALISED -->
    <bean id="holder" class="bom.bom.bom.loader.util.PlaceHolderBean" >
        <property name="beanName" value="holder"/>
    </bean>

    <bean id="logger" class="bom.bom.bom.loader.util.PlaceHolderBean"  >
        <property name="beanName" value="logger"/>
    </bean>

    <bean id="dataMap" class="java.util.concurrent.ConcurrentHashMap" />

    <!--  JOB REPOSITORY - WE USE DATABASE REPOSITORY  -->

<!--      <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" >-->
<!--            <property name="transactionManager" ref="frdtransactionManager" />-->
<!--            <property name="dataSource" ref="frddataSource" />-->
<!--            <property name="databaseType" value="oracle" />-->
<!--            <property name="tablePrefix" value="batch_"/>           -->
<!--      </bean>-->

   <!--  JOB REPOSITORY - WE IN MEMORY REPOSITORY  -->

    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
            <property name="transactionManager" ref="frdtransactionManager" />
    </bean>

<!--    <bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">-->
<!--        <property name="dataSource" ref="frddataSource" />-->
<!--        <property name="tablePrefix" value="batch_"/>  -->
<!--    </bean>-->

   <!--  LAUNCH JOBS FROM A REPOSITORY -->

      <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
         <property name="jobRepository" ref="jobRepository" />
           <property name="taskExecutor">
                <bean class="org.springframework.core.task.SyncTaskExecutor" />
           </property>      
      </bean>

   <!--  CONFIGURE SCHEDULING IN QUARTZ -->
<!--     <bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">-->
<!--        <property name="jobClass" value="bom.bom.bom.assurance.core.JobLauncherDetails" />-->
<!--        <property name="group" value="quartz-batch" />-->
<!--        <property name="jobDataAsMap">-->
<!--            <map>-->
<!--                <entry key="jobName" value="${jobname}"/>-->
<!--                <entry key="jobLocator" value-ref="jobRegistry"/>-->
<!--                <entry key="jobLauncher" value-ref="jobLauncher"/>-->
<!--            </map>-->
<!--        </property>-->
<!--     </bean>-->

      <!--  RUN EVERY 2 HOURS -->
<!--    <bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">-->
<!--      <property name="triggers">-->
<!--        <bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">-->
<!--          <property name="jobDetail" ref="jobDetail" />-->
<!--          <property name="cronExpression" value="2/0 * * * * ?" />-->
<!--        </bean>-->
<!--      </property>-->
<!--    </bean>-->
<!--    -->

     <!--  RUN STANDALONE -->
    <bean id="jobRunner" class="bom.bom.bom.loader.core.DataLoaderJobRunner">
        <constructor-arg value="${LOADER_NAME}" />  
    </bean>

    <!-- Get all the files for the exchanges and feed as resource to the MultiResourcePartitioner -->

    <bean id="fileresource" class="bom.bom.bom.loader.util.FiltersFoldersResourceFactory" p:dataMap-ref="dataMap">
        <property name="filePath" value="${PARENT_PATH}" />
        <property name="acceptedFolders"  value="${EXCH_CODE}" />
        <property name="logger" ref="logger" />
    </bean>

    <!--  The network  Data Loading Configuration goes here  -->      

    <job id="CDR_network _PARALLEL"     xmlns="http://www.springframework.org/schema/batch" restartable="false" >
        <step id="PREPARE_CLEAN" >
            <flow parent="prepareCleanFlow" />
            <next on="COMPLETED" to="LOAD_EXCHANGE_DATA" />
            <fail on="FAILED"  exit-code="Failed on cleaning error records."/>                    
       </step>
        <step id="LOAD_EXCHANGE_DATA"  >
            <tasklet ref="businessData" transaction-manager="ratransactionManager"  />
            <next on="COMPLETED" to="LOAD_CDR_FILES" />
            <fail on="FAILED" exit-code="FAILED ON LOADING EXCHANGE INFORMATION FROM DB." />
        </step>
        <step id="LOAD_CDR_FILES"  >
            <tasklet ref="fileresource" transaction-manager="frdtransactionManager" />
            <next on="COMPLETED" to="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" />
            <fail on="FAILED" exit-code="FAILED ON LOADING CDR FILES." />
        </step>        
        <step id="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" next="limitDecision" >
            <partition step="filestep" partitioner="filepartitioner" >
                <handler grid-size="100" task-executor="executorWithCallerRunsPolicy" />
            </partition>
        </step>
        <decision id="limitDecision" decider="limitDecider"> 
            <next on="COMPLETED" to="MOVE_RECS_STAGING_TO_MAIN_TABLE" />
            <next on="CONTINUE" to="PROCESS_FILE_TO_STAGING_TABLE_PARALLEL" />
        </decision>     
        <step id="MOVE_RECS_STAGING_TO_MAIN_TABLE" >
            <tasklet ref="moveRecords" transaction-manager="ratransactionManager" >
                <transaction-attributes isolation="SERIALIZABLE"/>
            </tasklet>          
            <fail on="FAILED" exit-code="FAILED ON MOVING DATA TO THE MAIN TABLE." />           
            <next on="*"  to="PREPARE_ARCHIVE"/>                        
        </step>     
        <step id="PREPARE_ARCHIVE" >
            <flow parent="prepareArchiveFlow" />
            <fail on="FAILED" exit-code="FAILED ON Archiving files" />  
            <end on="*" />                    
       </step>
    </job>

    <flow id="prepareCleanFlow" xmlns="http://www.springframework.org/schema/batch">
        <step id="CLEAN_ERROR_RECORDS" next="archivefileExistsDecisionInFlow" >
            <tasklet ref="houseKeeping"  transaction-manager="ratransactionManager" />
        </step>   
        <decision id="archivefileExistsDecisionInFlow" decider="archivefileExistsDecider">
           <end on="NO_ARCHIVE_FILE" />
           <next on="ARCHIVE_FILE_EXISTS" to="runprepareArchiveFlow" />
        </decision>     
        <step id="runprepareArchiveFlow"  >
            <flow parent="prepareArchiveFlow" />    
        </step>  
    </flow>

    <flow id="prepareArchiveFlow" xmlns="http://www.springframework.org/schema/batch" >
          <step id="ARCHIVE_CDR_FILES" >
            <tasklet ref="archiveFiles" transaction-manager="frdtransactionManager" />          
         </step>
    </flow>

    <bean id="archivefileExistsDecider" class="bom.bom.bom.loader.util.ArchiveFileExistsDecider" >
        <property name="logger" ref="logger" />
        <property name="frdjdbcTemplate" ref="frdjdbcTemplate" />   
    </bean>

    <bean id="filepartitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner" scope="step" >
        <property name="resources" value="#{dataMap[processFiles]}"/>
    </bean>

    <task:executor id="executorWithCallerRunsPolicy"
                   pool-size="90-95"
                   queue-capacity="6"
                   rejection-policy="CALLER_RUNS"/>

<!-- <bean id="dynamicJobParameters" class="bom.bom.bom.assurance.core.DynamicJobParameters" />-->


    <bean id="houseKeeping" class="bom.bom.bom.loader.core.HousekeepingOperation">
        <property name="logger" ref="logger" />
        <property name="jdbcTemplate" ref="rajdbcTemplate" />
        <property name="frdjdbcTemplate" ref="frdjdbcTemplate" />   
    </bean>

    <bean id="businessData" class="bom.bom.bom.loader.core.BusinessValidatorData">
        <property name="logger" ref="logger" />
        <property name="jdbcTemplate" ref="NrajdbcTemplate" />  
        <property name="param" value="${EXCH_CODE}" />
        <property name="sql" value="${LOOKUP_QUERY}" /> 
    </bean>

    <step id="filestep" xmlns="http://www.springframework.org/schema/batch">
        <tasklet transaction-manager="ratransactionManager" allow-start-if-complete="true"  >
            <chunk writer="jdbcItenWriter" reader="fileItemReader" processor="itemProcessor" commit-interval="500" retry-limit="2">
             <retryable-exception-classes>
                <include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
             </retryable-exception-classes>
            </chunk>
            <listeners>
                <listener ref="customStepExecutionListener">
                </listener>
            </listeners>
        </tasklet>
    </step> 

    <bean id="moveRecords"  class="bom.bom.bom.loader.core.MoveDataFromStaging">
        <property name="logger" ref="logger" />
        <property name="jdbcTemplate" ref="rajdbcTemplate" />
    </bean>

    <bean id="archiveFiles" class="bom.bom.bom.loader.core.ArchiveCDRFile" >
        <property name="logger" ref="logger" />
        <property name="jdbcTemplate" ref="frdjdbcTemplate" />  
        <property name="archiveFlag" value="${ARCHIVE_FILE}" />
        <property name="archiveDir" value="${ARCHIVE_LOCATION}" />
    </bean>

    <bean id="limitDecider" class="bom.bom.bom.loader.util.LimitDecider" p:dataMap-ref="dataMap">
        <property name="logger" ref="logger" />
    </bean>

<!--    <bean id="multifileReader" class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step" >-->
<!--        <property name="resources" value="#{stepExecutionContext[fileName]}" />-->
<!--        <property name="delegate" ref="fileItemReader" />-->
<!--    </bean>-->

    <!--  READ EACH FILE PARALLELY   -->    
    <bean id="fileItemReader" scope="step" autowire-candidate="false" parent="itemReaderParent">
            <property name="resource" value="#{stepExecutionContext[fileName]}" />
            <property name="saveState" value="false" />         
    </bean>

    <!--  LISTEN AT THE END OF EACH FILE TO DO POST PROCESSING  -->
    <bean id="customStepExecutionListener" class="bom.bom.bom.loader.core.StagingStepExecutionListener" scope="step">
        <property name="logger" ref="logger" />
        <property name="frdjdbcTemplate" ref="frdjdbcTemplate" />
        <property name="jdbcTemplate" ref="rajdbcTemplate" />   
        <property name="sql" value="${INSERT_IA_QUERY_COLUMNS}" />

    </bean>

    <!--  CONFIGURE THE ITEM PROCESSOR TO DO  BUSINESS LOGIC ON EACH ITEM -->
    <bean id="itemProcessor" class="bom.bom.bom.loader.core.StagingLogicProcessor"  scope="step">
        <property name="logger" ref="logger" />
        <property name="params" ref="businessData" />
    </bean>

    <!--  CONFIGURE THE JDBC ITEM WRITER TO WRITE IN TO DB -->
    <bean id="jdbcItenWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter"  scope="step">
        <property name="dataSource" ref="radataSource"/>
        <property name="sql"> 
            <value>
                <![CDATA[
                    ${SQL1A}
                ]]>
            </value>
        </property>
        <property name="itemSqlParameterSourceProvider"> 
            <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider">
            </bean>
        </property>
    </bean>

<!--    <bean id="itemWriter" class="bom.bom.bom.assurance.core.LoaderDBWriter" scope="step">-->
<!--        <property name="sQL" value="${loader.sql}" />-->
<!--        <property name="jdbcTemplate" ref="NrajdbcTemplate" />-->
<!--    </bean>-->


    <!--  CONFIGURE THE FLAT FILE ITEM READER TO READ INDIVIDUAL BATCH -->
    <bean id="itemReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" abstract="true">
        <property name="strict" value="false"/>
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean class="org.springframework.batch.item.file.transform.FixedLengthTokenizer">
                        <property name="names" value="${COLUMNS}" />
                        <property name="columns" value="${RANGE}" />
                    </bean>
                </property>
                <property name="fieldSetMapper">
                    <bean class="bom.bom.bom.loader.util.DataLoaderMapper">
                        <property name="params" value="${BEANPROPERTIES}"/>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Tried:

  • i could see that the ThreadPoolExecutor hangs after 3 hours.The prstat in solaris says it is processing, but no processing in the log.

  • Tried with less chunk size 500 ,due memory foot print,no progress.

  • Since it inserts in to single database( 30 pooled connections).is there anythin i can do here.

Instances from visual vm

enter image description here

stacktrace of thread all are locked at connection level

Full thread dump Java HotSpot(TM) Server VM (11.3-b02 mixed mode):

"Attach Listener" daemon prio=3 tid=0x00bbf800 nid=0x26 waiting on condition [0x00000000..0x00000000]
   java.lang.Thread.State: RUNNABLE

"executorWithCallerRunsPolicy-1" prio=3 tid=0x008a7000 nid=0x25 runnable [0xd5a7d000..0xd5a7fb70]
   java.lang.Thread.State: RUNNABLE
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.read(SocketInputStream.java:129)
        at oracle.net.ns.Packet.receive(Packet.java:240)
        at oracle.net.ns.DataPacket.receive(DataPacket.java:92)
        at oracle.net.ns.NetInputStream.getNextPacket(NetInputStream.java:172)
        at oracle.net.ns.NetInputStream.read(NetInputStream.java:117)
        at oracle.net.ns.NetInputStream.read(NetInputStream.java:92)
        at oracle.net.ns.NetInputStream.read(NetInputStream.java:77)
        at oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java:1034)
        at oracle.jdbc.driver.T4CMAREngine.unmarshalSB1(T4CMAREngine.java:1010)
        at oracle.jdbc.driver.T4C8Oall.receive(T4C8Oall.java:588)
        at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:194)
        at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:953)
        at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1222)
        at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3387)
        at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3468)
        - locked <0xdbdafa30> (a oracle.jdbc.driver.T4CConnection)
        at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1350)
        at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:818)
        at org.springframework.jdbc.core.JdbcTemplate$2.doInPreparedStatement(JdbcTemplate.java:1)
        at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:587)
        at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:812)
        at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:868)
        at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:876)
        at 
Dead Programmer
  • 12,427
  • 23
  • 80
  • 112
  • do you can provide more details of your system architectur? single database (read: bottleneck), servlet container, j2ee container, and so on :-) – Michael Pralow Nov 17 '11 at 08:13
  • so.... what's the problem you are facing? It is really hard to tell what you can improve without much information, like which part of the logic is using up most time etc. Ever tried to look at bulk insert in DB? Tuning on the chunk size? it is usually the first I will look into – Adrian Shum Nov 17 '11 at 10:26
  • @Adrian : bulk insert in DB? -- if u could see the xml file, i specified chunk size as 500 , so i am having batch insert using JdbcItemWriter. – Dead Programmer Nov 17 '11 at 10:37
  • so you have a memory problem? do you clean up the dataMap (for the files) after successfully processing a file? – Michael Pralow Nov 20 '11 at 12:01
  • @lange: when u say clean up , how to do it in spring . is there any destroy methods for threadpool executor , reader and writer.i have 50000 files , is it possible to scale ( means completing in 8 hrs ) using spring batch . i m currently tryin to scale this using gridgain . will this work .can u point out from config file pasted above,where there is possibiity of clean up. – Dead Programmer Nov 20 '11 at 12:51
  • i guess you would have to implement it by yourself, right now i am still not sure what the source of the "problem" is, memory allocation ? connection dead locks ? – Michael Pralow Nov 20 '11 at 15:02
  • @Michael:thanks for reply.i think using spring task executor is not good enough to load 150 million records.may be i need to reduce no of threads(5) and chunk size. i will try and let u know. – Dead Programmer Nov 20 '11 at 16:10
  • @SureshSankar: If your threads are running (you see the high CPU load in e.g. `top`) then what prevents you from connecting your JVM in debug mode and see what exactly thread is doing? You can always pause a thread and see the complete stack trace. – dma_k Nov 24 '11 at 16:29
  • @dma_k:kindly look at the thread stack trace. all are waiting at the connection . I am using ojdbc6.jar oracles latest version. – Dead Programmer Nov 25 '11 at 05:07
  • OK, from the information you've provided: first two threads on thread pool are waiting for new jobs – ignore them. Next three threads are waiting for response from Oracle. If you are sure, that they are frozen, then you need to view what are the [running queries](http://stackoverflow.com/questions/622289) and analyse the table rows they have locked. If you have 3 threads updating the same tables you've got a race condition and deadlock! – dma_k Nov 25 '11 at 10:32
  • @dma_k: i tried with only one thread to write to database , it still hangs at the oracle connection.updated the thread dump in the question. – Dead Programmer Nov 25 '11 at 11:19
  • It looks like it is not Spring Batch problem. Please, close this question and start a new one with `oracle` tag. Oracle fans will help you. You need to have an advanced monitor of what is happening in Oracle DB (list of all running requests + list of all table locks). Having that you find what is the problem. There is nothing else I can add, as it looks like you have several transactions writing to the same table which are not in a hurry to commit the data. – dma_k Nov 25 '11 at 22:06
  • @dma_k: thanks ,i am too looking at oracle's running queries and table that acquires lock. – Dead Programmer Nov 29 '11 at 03:31
  • @dma_k: http://stackoverflow.com/questions/8309975/table-lock-happens-for-spring-batch-chunk-item-reader-and-writer – Dead Programmer Nov 29 '11 at 11:31

4 Answers4

4

I would suggest you lower the chunk size to 50.

500 seems to be too big : you wait too much while talking with the DB.

At the same time, lower the TaskExecutor's pool size or increase your DB pool size. You can choose which on by watching your DB host : if it's CPU and IO is not maxxed, increase your DB pool size to increase the DB load. If your DB CPU is already at it's maximum, lower the TaskExecutor's pool size. The objective is to have a fluid process.

I think the DB will be your main limitating element. So begin by adjusting the DB pool size according to the DB host capacities. When it's done, adjust your TaskExecutor's pool size according to the DB pool size (TE pool size = DB pool size * 1.5), plus the batch's host capacities (CPU, memory and IOs).

Splitting your incoming files on multiple hard drives may help too (if possible).

Jean-Philippe Briend
  • 3,455
  • 29
  • 41
  • I am currently working with GridGain Cloud Platform , to run parallel steps in another jvm and aggregate results. – Dead Programmer Dec 15 '11 at 10:15
  • I tried lowering chunk size, increasing db pool and also task executor. There is no througput . I have to complete processing 15k file in 6 hours time. – Dead Programmer Dec 15 '11 at 10:24
  • Plug JProfiler to check where is time consumed during the process (do not launch with so much files : use 30 files for example). – Jean-Philippe Briend Dec 15 '11 at 13:09
  • I am trying to run each file one by one. As the time goes, the spring batch goes slower. – Dead Programmer Dec 15 '11 at 13:27
  • 1
    It smells db pool problems : watch the evolution of your database connection pool (connection used vs free connections). Maybe some of them are not released and thus les and less connections are available. Also, check the memory of your JVM. – Jean-Philippe Briend Dec 15 '11 at 16:03
  • I tried with BoneCP and odbc6 connection pool . The performance of spring batch with gridgain is not that fast or no optimal throughput.They need to reengineer chunk processor or how they extract fields from line. – Dead Programmer Dec 28 '11 at 13:48
  • If you're seeing locking, sometimes a bug is just a bug....At least rule this out : http://bugs.sun.com/view_bug.do?bug_id=6822370 OS = Solaris 10 (does not happen with Linux) JDK = JDK 6.0 upd 10 and 12 – Trever Shick Feb 26 '12 at 16:31
0

I think the problem here is million records in the file. Since you already reduced the chunk size , you should process smaller records. For testing sake, reduce the number of records in each file to 10k. My guess is you creating creating objects, doing some processing and you are doing this for 1m records in a loop. Each thread will hold the object in memory unless the processing is completed. My guess is because of volume of data, there are too many objects in your memory which are not garbage collected. If reducing the size helps, then you can try to use lightweight objects in your code and try setting each object to null at end of processing.

vsingh
  • 6,365
  • 3
  • 53
  • 57
0

just a fix in your cron expression. This is the correct for 2 hours:

0 0 0/2 1/1 * ? *

HJUNIOR
  • 11
  • 1
0
  1. Is your batch job relying on reflection(e.g., BeanPropertyRowMapper)? That can hamper performance.
  2. If your database is causing problems, you may want to profile it. Don't have much concrete to offer here.
  3. Already mentioned, drop that chunk size.
Andy Sampson
  • 271
  • 4
  • 7