3

I am using WatchServiceDirectoryScanner/RecursiveLeafOnlyDirectoryScanner to process files in File System. Files events are generated and Message are received at defined EndPoint, but sometimes it miss to process all files in that directory.

For example if there are 15 files, sometime it process 10 files and sometimes 5 files where metastore has information about all 15 files in "metadata-store.properties".

spring-integration-configuration.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:int-file="http://www.springframework.org/schema/integration/file"
        xmlns:int-mail="http://www.springframework.org/schema/integration/mail"
        xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation="http://www.springframework.org/schema/integration/mail http://www.springframework.org/schema/integration/mail/spring-integration-mail.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

        <int:annotation-config />
            <int:channel id="cfpFileIn"></int:channel>
    <int-file:inbound-channel-adapter id="cfpFileIn"
            directory="${cfp.flight.data.dir}" auto-startup="true" scanner="csvDirScanner">
            <int:poller fixed-delay="${cfp.flight.data.dir.polling.delay}"></int:poller>
        </int-file:inbound-channel-adapter>
    <bean id="csvDirScanner"
            class="org.springframework.integration.file.WatchServiceDirectoryScanner">
            <constructor-arg index="0" value="${cfp.flight.data.dir}" />
            <property name="filter" ref="csvCompositeFilter" />
            <property name="autoStartup" value="true" />
        </bean>

    <bean id="csvCompositeFilter"
            class="org.springframework.integration.file.filters.CompositeFileListFilter">
            <constructor-arg>
                <list>
                    <bean
                        class="org.springframework.integration.file.filters.SimplePatternFileListFilter">
                        <constructor-arg value="*.csv" />
                    </bean>
                    <ref bean="persistentFilter" />
                </list>
            </constructor-arg>
        </bean>

    <bean id="persistentFilter"
            class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
            <constructor-arg index="0" ref="metadataStore" />
            <constructor-arg index="1" name="prefix" value="" />
            <property name="flushOnUpdate" value="true" />
        </bean>

    <bean name="metadataStore"
            class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore">
            <property name="baseDirectory" value="${metadata.dir}"></property>
        </bean>
</beans>

BatchJobScheduler:

import java.io.File;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Component;
@Component
public class BatchJobScheduler  {

    private static final Logger logger = LoggerFactory.getLogger(BatchJobScheduler.class);
    @Autowired
protected JobLauncher jobLauncher;

    @Autowired
    @Qualifier(value = "job")
    private Job job;

    @ServiceActivator(inputChannel = "cfpFileIn")
    public void run(File file) {
        String fileName = file.getAbsolutePath();
        logger.info("BatchJobScheduler Running #################"+fileName);
        JobParameters jobParameters = new JobParametersBuilder().addString(
                "input.file", fileName).toJobParameters();


        try {

            JobExecution execution = jobLauncher.run(job,
                    jobParameters);
            logger.info(" BatchJobScheduler Exit Status : " + execution.getStatus() +"::"+execution.getAllFailureExceptions());

        } catch (JobExecutionAlreadyRunningException | JobRestartException
                | JobInstanceAlreadyCompleteException
                | JobParametersInvalidException e) {
            logger.error(" BatchJobScheduler Exit Status : Exception ::",e);
        }
    }
}

Seems I am missing some configuration or code.

Based on suggestions:

I have placed complete spring-integration.xml and log file on git repo - https://github.com/chandaku/spring-integration-issue.

5 channels are created here and working simultaneously I think when I run only one channel at a time, it successfully process all files in directory, but if I will open all file channel it has problem.

Cœur
  • 37,241
  • 25
  • 195
  • 267
Chandan Gawri
  • 364
  • 1
  • 4
  • 15
  • I suggest you turn on DEBUG logging; it should show what's happening. If the files are in the metadata store, it means the events were processed. – Gary Russell May 07 '16 at 13:21
  • Thanks Gary Russell for your comments, looking at logs I can see few files print but not all inside given @ServiceActivator(inputChannel = "cfpFileIn") and those are printed are only getting processed. Could you please suggest what I may have missed. – Chandan Gawri May 09 '16 at 02:07
  • Please post the log and contents of the properties file somewhere like a github gist or pastebin. – Gary Russell May 09 '16 at 13:56
  • @GaryRussell I have posted logs and complete integration config on git hub could you please check. – Chandan Gawri May 12 '16 at 05:15
  • There is some mess in your information. For example I don't see logs for the `VT-SZA-SG14-DXB-BOM-03Jan16.csv` file, but there is something about `VT-SZA-SG14-DXB-BOM-04Jan16.csv`. Looks like some different time frames. Although that doesn't shed the light. I suggest you to have separate `MetadaStore` for different dirs and dig yourself further. For me your concern does not make sense. Because with the pointed issue we would be bad for a long time already. On the other hand logs look good for me. – Artem Bilan May 12 '16 at 15:59
  • This was my bad, I was doing something silly, setting one of channel's path as parent of other channel's path, which was giving me loss of messages at some endpoint. Creating separate independent file path for inbound-chanle-adapter fixed my issue.Thanks Guys, For your support and help. – Chandan Gawri May 14 '16 at 06:41
  • After getting through on above issue I got into issue mentioned in following URL :http://stackoverflow.com/questions/33630208/deadlock-when-creating-job-instances which I could fix by re-launching job if it gets failed due to this error. – Chandan Gawri May 14 '16 at 06:45

0 Answers0